diff --git a/Cargo.lock b/Cargo.lock index 7cc67e268..da6443aa4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5500,7 +5500,9 @@ dependencies = [ "parking_lot", "paste", "pin-project", + "rand", "restate-core", + "restate-log-server", "restate-metadata-store", "restate-rocksdb", "restate-test-util", diff --git a/crates/bifrost/Cargo.toml b/crates/bifrost/Cargo.toml index 3d36b2e8e..46ff0bb8f 100644 --- a/crates/bifrost/Cargo.toml +++ b/crates/bifrost/Cargo.toml @@ -28,6 +28,7 @@ futures = { workspace = true } metrics = { workspace = true } parking_lot = { workspace = true } pin-project = { workspace = true } +rand = { workspace = true } rocksdb = { workspace = true } serde = { workspace = true } smallvec = { workspace = true } @@ -40,6 +41,7 @@ tracing = { workspace = true } [dev-dependencies] restate-core = { workspace = true, features = ["test-util"] } +restate-log-server = { workspace = true } restate-metadata-store = { workspace = true } restate-test-util = { workspace = true } restate-types = { workspace = true, features = ["test-util"] } diff --git a/crates/bifrost/src/providers/replicated_loglet/mod.rs b/crates/bifrost/src/providers/replicated_loglet/mod.rs index 55e281a88..0104b3b63 100644 --- a/crates/bifrost/src/providers/replicated_loglet/mod.rs +++ b/crates/bifrost/src/providers/replicated_loglet/mod.rs @@ -10,5 +10,7 @@ pub(crate) mod metric_definitions; mod provider; +#[allow(dead_code)] +mod sequencer; pub use provider::Factory; diff --git a/crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs b/crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs new file mode 100644 index 000000000..3027ae4f8 --- /dev/null +++ b/crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs @@ -0,0 +1,136 @@ +use std::{ + collections::{BTreeMap, VecDeque}, + sync::{ + atomic::{self, Ordering}, + Arc, + }, + time::Duration, +}; + +use tokio::sync::{mpsc, oneshot}; +use worker::{Payload, WorkerEvent}; + +use restate_core::{ + cancellation_token, network::NetworkError, ShutdownError, TaskCenter, TaskKind, +}; + +use restate_types::{ + logs::{LogletOffset, Lsn, Record, SequenceNumber, TailState}, + net::log_server::{LogletInfo, Status, Store, Stored}, + replicated_loglet::ReplicatedLogletId, + time::MillisSinceEpoch, + GenerationalNodeId, PlainNodeId, +}; + +use crate::loglet::{LogletCommit, Resolver}; + +mod worker; + +#[derive(Debug, Default)] +pub struct NodeStatus { + // todo: this should be monotonic + last_response_time: atomic::AtomicU64, +} + +impl NodeStatus { + pub(crate) fn touch(&self) { + // update value with latest timestamp + self.last_response_time + .store(MillisSinceEpoch::now().into(), Ordering::Relaxed); + } + + pub fn last_response_time(&self) -> MillisSinceEpoch { + self.last_response_time.load(Ordering::Relaxed).into() + } + + pub fn duration_since_last_response(&self) -> Duration { + // last_response_time should be monotonic + self.last_response_time().elapsed() + } +} + +/// LogletHandler trait abstracts the log-server loglet interface. One of possible implementations +/// is a grpc client to running log server +#[async_trait::async_trait] +pub trait NodeClient { + async fn enqueue_store(&self, msg: Store) -> Result<(), NetworkError>; + async fn enqueue_get_loglet_info(&self) -> Result<(), NetworkError>; +} + +struct NodeInner { + client: C, + state: NodeStatus, +} + +pub struct Node { + inner: Arc>, +} + +impl Clone for Node { + fn clone(&self) -> Self { + Self { + inner: Arc::clone(&self.inner), + } + } +} + +impl Node { + fn new(client: C) -> Self { + Self { + inner: Arc::new(NodeInner { + client, + state: NodeStatus::default(), + }), + } + } + pub fn client(&self) -> &C { + &self.inner.client + } + + pub fn status(&self) -> &NodeStatus { + &self.inner.state + } +} + +/// part of state that is shared between multiple appenders +#[derive(Debug)] +pub(crate) struct SequencerGlobalState { + node_id: GenerationalNodeId, + loglet_id: ReplicatedLogletId, + global_committed_tail: atomic::AtomicU32, +} + +impl SequencerGlobalState { + pub fn node_id(&self) -> &GenerationalNodeId { + &self.node_id + } + + pub fn loglet_id(&self) -> &ReplicatedLogletId { + &self.loglet_id + } + + pub fn committed_tail(&self) -> LogletOffset { + LogletOffset::new(self.global_committed_tail.load(Ordering::Acquire)) + } + + pub(crate) fn set_committed_tail(&self, tail: LogletOffset) { + self.global_committed_tail + .fetch_max(tail.into(), Ordering::Release); + } +} + +//todo: improve error names and description +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("cannot satisfy spread")] + CannotSatisfySpread, + #[error("malformed batch")] + MalformedBatch, + #[error("invalid node set")] + InvalidNodeSet, + #[error("node {0} queue is full")] + TemporaryUnavailable(PlainNodeId), + #[error(transparent)] + Shutdown(#[from] ShutdownError), +} + diff --git a/crates/bifrost/src/providers/replicated_loglet/sequencer/worker.rs b/crates/bifrost/src/providers/replicated_loglet/sequencer/worker.rs new file mode 100644 index 000000000..94317d4b8 --- /dev/null +++ b/crates/bifrost/src/providers/replicated_loglet/sequencer/worker.rs @@ -0,0 +1,251 @@ +use std::{ + collections::VecDeque, + sync::{Arc, Weak}, + time::Duration, +}; + +use restate_core::{cancellation_token, ShutdownError, TaskCenter, TaskKind}; +use tokio::sync::mpsc; + +use restate_types::{ + logs::{LogletOffset, Record, SequenceNumber}, + net::log_server::{Store, StoreFlags, Stored}, +}; +use tokio_util::sync::CancellationToken; + +use super::{Node, NodeClient, SequencerGlobalState}; + +#[derive(Debug)] +pub(crate) struct Payload { + pub first_offset: LogletOffset, + pub records: Arc<[Record]>, +} + +impl Payload { + pub fn inflight_tail(&self) -> Option { + let len = u32::try_from(self.records.len()).ok()?; + self.first_offset.checked_add(len).map(Into::into) + } +} + +pub struct SendPermit<'a> { + inner: mpsc::Permit<'a, Weak>, +} + +impl<'a> SendPermit<'a> { + pub(crate) fn send(self, payload: Weak) { + self.inner.send(payload) + } +} + +pub(crate) enum WorkerEvent { + Stored(Stored), +} + +#[derive(Clone, Debug)] +pub struct NodeWorkerHandle { + batch_tx: mpsc::Sender>, + event_tx: mpsc::Sender, +} + +impl NodeWorkerHandle { + /// reserve a send slot on the worker queue + pub fn reserve(&self) -> Result> { + Ok(SendPermit { + inner: self.batch_tx.try_reserve()?, + }) + } + + pub(crate) async fn notify(&self, event: WorkerEvent) { + let _ = self.event_tx.send(event).await; + } +} + +enum NodeWorkerState { + Clear, + Congested, + Shutdown, +} + +pub(crate) struct NodeWorker { + batch_rx: mpsc::Receiver>, + event_rx: mpsc::Receiver, + node: Node, + global: Arc, + buffer: VecDeque>, + state: NodeWorkerState, +} + +impl NodeWorker +where + C: NodeClient + Send + Sync + 'static, +{ + pub fn start( + tc: &TaskCenter, + node: Node, + queue_size: usize, + global: Arc, + ) -> Result { + // we create the channel at a 10% capacity of the full buffer size + // since pending batches will be queued in a VecDequeue + let (batch_tx, batch_rx) = mpsc::channel(std::cmp::max(1, queue_size / 10)); + let (event_tx, event_rx) = mpsc::channel(1); + let handle = NodeWorkerHandle { batch_tx, event_tx }; + + let buffer = VecDeque::with_capacity(queue_size); + let worker = NodeWorker { + batch_rx, + event_rx, + node, + global, + buffer, + state: NodeWorkerState::Clear, + }; + + tc.spawn_unmanaged(TaskKind::Disposable, "appender", None, worker.run())?; + + Ok(handle) + } + + async fn worker_clear(&mut self, token: &CancellationToken) -> NodeWorkerState { + loop { + tokio::select! { + biased; + _ = token.cancelled() => { + return NodeWorkerState::Shutdown; + } + Some(event) = self.event_rx.recv() => { + self.process_event(event); + } + Some(batch) = self.batch_rx.recv() => { + self.process_batch(batch).await; + } + } + + // there is a chance here that buffer got filled up with pending batches + // that has never received a `stored` event. + // Hence we need to break out of this loop to stop accepting more batches + // to write! + // + // note: this should be == comparison but just in case + if self.buffer.len() >= self.buffer.capacity() { + return NodeWorkerState::Congested; + } + } + } + + async fn worker_congested(&mut self, token: &CancellationToken) -> NodeWorkerState { + // we can only reach here if we stopped receiving `stored` events + // in that case we will stop receiving more batches and only wait + // for the stored events or retry + let mut timer = tokio::time::interval(Duration::from_millis(250)); + loop { + // in this loop we only handle events (in case we can drain finally) + // but we don't accept any more batches. This will put back pressure + // since the replication policy will not be able to reserve this + // node anymore! + tokio::select! { + _ = token.cancelled() => { + return NodeWorkerState::Shutdown; + } + Some(event) = self.event_rx.recv() => { + self.process_event(event); + } + _ = timer.tick() => { + self.retry().await; + } + } + + if self.buffer.len() < self.buffer.capacity() { + // we made progress and we can break out of this inner + // loop. + return NodeWorkerState::Clear; + } + } + } + + async fn run(mut self) { + let token = cancellation_token(); + + loop { + self.state = match self.state { + NodeWorkerState::Clear => self.worker_clear(&token).await, + NodeWorkerState::Congested => self.worker_congested(&token).await, + NodeWorkerState::Shutdown => return, + } + } + } + + async fn retry(&self) { + // retry to send all items in the batch + for batch in self.buffer.iter() { + self.process_once(batch).await; + } + } + + fn process_event(&mut self, event: WorkerEvent) { + match event { + WorkerEvent::Stored(stored) => { + self.drain(stored); + } + } + } + + fn drain(&mut self, event: Stored) { + let mut trim = 0; + for (i, batch) in self.buffer.iter().enumerate() { + let Some(batch) = batch.upgrade() else { + // batch has been resolved externally and we can ignore it + trim = i + 1; + continue; + }; + + if batch.inflight_tail().unwrap() > event.local_tail { + // no confirmation for this batch yet. + break; + } + trim = i + 1; + } + + self.buffer.drain(..trim); + } + + async fn process_batch(&mut self, batch: Weak) { + if self.process_once(&batch).await { + self.buffer.push_back(batch); + } + } + + async fn process_once(&self, batch: &Weak) -> bool { + let batch = match batch.upgrade() { + Some(batch) => batch, + None => return false, + }; + + let inflight_tail = batch.inflight_tail().expect("valid inflight tail"); + if inflight_tail <= self.global.committed_tail() { + // todo: (question) batch is already committed and we can safely ignore it? + return false; + } + + let store = Store { + first_offset: batch.first_offset, + flags: StoreFlags::empty(), + known_archived: LogletOffset::INVALID, + known_global_tail: self.global.committed_tail(), + loglet_id: self.global.loglet_id, + sequencer: self.global.node_id, + timeout_at: None, + // todo: (question) better way to do this? + payloads: Vec::from_iter(batch.records.iter().cloned()), + }; + + if let Err(err) = self.node.client().enqueue_store(store).await { + //todo: retry + tracing::error!(error = %err, "failed to send store to node"); + } + + // batch is sent but there is a chance that we need to retry + true + } +} diff --git a/crates/types/src/time.rs b/crates/types/src/time.rs index 487425f68..45546f468 100644 --- a/crates/types/src/time.rs +++ b/crates/types/src/time.rs @@ -15,7 +15,17 @@ use std::time::{Duration, SystemTime}; /// Milliseconds since the unix epoch #[derive( - Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize, + Debug, + Clone, + Copy, + PartialEq, + Eq, + PartialOrd, + Ord, + Hash, + serde::Serialize, + serde::Deserialize, + derive_more::Into, )] #[serde(transparent)] #[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]