From d4125e537b07795deeb8330df96d2299259bca22 Mon Sep 17 00:00:00 2001 From: Muhamad Awad Date: Wed, 25 Sep 2024 12:49:14 +0200 Subject: [PATCH] scaffolding appender --- Cargo.lock | 1 + crates/bifrost/Cargo.toml | 1 + crates/bifrost/src/loglet/util.rs | 17 + .../replication/spread_selector.rs | 8 + .../replicated_loglet/sequencer/append.rs | 429 ++++++++++++++++++ .../replicated_loglet/sequencer/mod.rs | 61 +++ .../replicated_loglet/sequencer/node.rs | 2 +- crates/types/src/config/bifrost.rs | 37 +- crates/types/src/time.rs | 12 +- 9 files changed, 564 insertions(+), 4 deletions(-) create mode 100644 crates/bifrost/src/providers/replicated_loglet/sequencer/append.rs diff --git a/Cargo.lock b/Cargo.lock index 367e21c7b..c56e65a35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5506,6 +5506,7 @@ dependencies = [ "pin-project", "rand", "restate-core", + "restate-log-server", "restate-metadata-store", "restate-rocksdb", "restate-test-util", diff --git a/crates/bifrost/Cargo.toml b/crates/bifrost/Cargo.toml index cc28df2d5..11e40fd8c 100644 --- a/crates/bifrost/Cargo.toml +++ b/crates/bifrost/Cargo.toml @@ -43,6 +43,7 @@ tracing = { workspace = true } [dev-dependencies] restate-core = { workspace = true, features = ["test-util"] } +restate-log-server = { workspace = true } restate-metadata-store = { workspace = true } restate-test-util = { workspace = true } restate-types = { workspace = true, features = ["test-util"] } diff --git a/crates/bifrost/src/loglet/util.rs b/crates/bifrost/src/loglet/util.rs index 77bae725a..93c2fdeed 100644 --- a/crates/bifrost/src/loglet/util.rs +++ b/crates/bifrost/src/loglet/util.rs @@ -63,6 +63,23 @@ impl TailOffsetWatch { Ok(()) } + pub async fn wait_for_offset_or_seal( + &self, + offset: LogletOffset, + ) -> Result, ShutdownError> { + let mut receiver = self.sender.subscribe(); + receiver.mark_changed(); + receiver + .wait_for(|tail_state| match tail_state { + TailState::Sealed(_) => true, + TailState::Open(tail) if *tail >= offset => true, + _ => false, + }) + .await + .map(|m| *m) + .map_err(|_| ShutdownError) + } + /// The first yielded value is the latest known tail pub fn to_stream(&self) -> WatchStream> { let mut receiver = self.sender.subscribe(); diff --git a/crates/bifrost/src/providers/replicated_loglet/replication/spread_selector.rs b/crates/bifrost/src/providers/replicated_loglet/replication/spread_selector.rs index 2bf22ab21..14761b85f 100644 --- a/crates/bifrost/src/providers/replicated_loglet/replication/spread_selector.rs +++ b/crates/bifrost/src/providers/replicated_loglet/replication/spread_selector.rs @@ -57,6 +57,14 @@ impl SpreadSelector { } } + pub fn nodeset(&self) -> &NodeSet { + &self.nodeset + } + + pub fn replication_property(&self) -> &ReplicationProperty { + &self.replication_property + } + /// Generates a spread or fails if it's not possible to generate a spread out of /// the nodeset modulo the non-writeable nodes in the nodes configuration and after excluding /// the set of nodes passed in `exclude_nodes`. diff --git a/crates/bifrost/src/providers/replicated_loglet/sequencer/append.rs b/crates/bifrost/src/providers/replicated_loglet/sequencer/append.rs new file mode 100644 index 000000000..ecd82259a --- /dev/null +++ b/crates/bifrost/src/providers/replicated_loglet/sequencer/append.rs @@ -0,0 +1,429 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::{cmp::Ordering, sync::Arc, time::Duration}; + +use futures::{stream::FuturesUnordered, StreamExt}; +use tokio::{sync::OwnedSemaphorePermit, time::timeout}; + +use restate_core::{ + network::{ + rpc_router::{RpcError, RpcRouter}, + Incoming, NetworkError, Outgoing, TransportConnect, + }, + Metadata, +}; +use restate_types::{ + config::Configuration, + live::Live, + logs::{LogletOffset, Record, SequenceNumber, TailState}, + net::log_server::{Status, Store, StoreFlags, Stored}, + replicated_loglet::NodeSet, +}; + +use super::{ + node::{RemoteLogServer, RemoteLogServerManager}, + BatchExt, SequencerSharedState, +}; +use crate::{ + loglet::LogletCommitResolver, providers::replicated_loglet::replication::NodeSetChecker, +}; + +const DEFAULT_BACKOFF_TIME: Duration = Duration::from_millis(1000); + +enum AppenderState { + Wave { + // nodes that should be avoided by the spread selector + graylist: NodeSet, + }, + Done, + Backoff, +} + +/// Appender makes sure a batch of records will run to completion +pub(crate) struct Appender { + sequencer_shared_state: Arc, + log_server_manager: RemoteLogServerManager, + store_router: RpcRouter, + metadata: Metadata, + first_offset: LogletOffset, + records: Arc<[Record]>, + // permit is held during the entire live + // of the batch to limit the number of + // inflight batches + permit: Option, + commit_resolver: Option, + configuration: Live, +} + +impl Appender { + #[allow(clippy::too_many_arguments)] + pub fn new( + sequencer_shared_state: Arc, + log_server_manager: RemoteLogServerManager, + store_router: RpcRouter, + metadata: Metadata, + first_offset: LogletOffset, + records: Arc<[Record]>, + permit: OwnedSemaphorePermit, + commit_resolver: LogletCommitResolver, + ) -> Self { + Self { + sequencer_shared_state, + log_server_manager, + store_router, + metadata, + first_offset, + records, + permit: Some(permit), + commit_resolver: Some(commit_resolver), + configuration: Configuration::updateable(), + } + } + + pub async fn run(mut self) -> anyhow::Result<()> { + // initial wave has 0 replicated and 0 gray listed node + let mut state = AppenderState::Wave { + graylist: NodeSet::empty(), + }; + + let retry_policy = self + .configuration + .live_load() + .bifrost + .replicated_loglet + .sequencer_backoff_strategy + .clone(); + + let mut retry = retry_policy.iter(); + + loop { + state = match state { + AppenderState::Done => return Ok(()), + AppenderState::Wave { + graylist: gray_list, + } => self.wave(gray_list).await, + AppenderState::Backoff => { + // since backoff can be None, or run out of iterations, + // but appender should never give up we fall back to fixed backoff + let delay = retry.next().unwrap_or(DEFAULT_BACKOFF_TIME); + tokio::time::sleep(delay).await; + + AppenderState::Wave { + // todo: introduce some backoff strategy + graylist: NodeSet::empty(), + } + } + } + } + } + + async fn wave(&mut self, mut gray_list: NodeSet) -> AppenderState { + // select the spread + let spread = match self.sequencer_shared_state.selector.select( + &mut rand::thread_rng(), + &self.metadata.nodes_config_ref(), + &gray_list, + ) { + Ok(spread) => spread, + Err(_) => { + if gray_list.is_empty() { + // gray list was empty during spread selection! + // yet we couldn't find a spread. there is + // no reason to retry immediately. + return AppenderState::Backoff; + } + // otherwise, we retry without a gray list. + return AppenderState::Wave { + graylist: NodeSet::empty(), + }; + } + }; + + let mut gray = false; + let mut servers = Vec::with_capacity(spread.len()); + for id in spread { + // at this stage, if we fail to get connection to this server it must be + // a first time use. We can safely assume this has to be gray listed + let server = match self.log_server_manager.get(id).await { + Ok(server) => server, + Err(err) => { + tracing::error!("failed to connect to {}: {}", id, err); + gray = true; + gray_list.insert(id); + continue; + } + }; + + servers.push(server); + } + + if gray { + // Some nodes has been gray listed (wasn't in the original gray list) + // todo(azmy): we should check if the remaining set of nodes can still achieve + // write quorum + + // we basically try again with a new set of gray_list + return AppenderState::Wave { + graylist: gray_list, + }; + } + + // otherwise, we try to send the wave. + self.send_wave(servers).await + } + + async fn send_wave(&mut self, spread_servers: Vec) -> AppenderState { + let next_global_committed_tail = + self.records.last_offset(self.first_offset).unwrap().next(); + + let mut checker = NodeSetChecker::new( + self.sequencer_shared_state.selector.nodeset(), + &self.metadata.nodes_config_snapshot(), + self.sequencer_shared_state.selector.replication_property(), + ); + + // track the in flight server ids + let mut pending_servers = NodeSet::empty(); + + let mut store_tasks = FuturesUnordered::new(); + + for server in spread_servers { + // it is possible that we have visited this server + // in a previous wave. So we can short circuit here + // and just skip + if server.local_tail().latest_offset() >= next_global_committed_tail { + checker.set_attribute(server.node_id(), true); + continue; + } + + pending_servers.insert(server.node_id()); + + let task = LogServerStoreTask { + sequencer_shared_state: Arc::clone(&self.sequencer_shared_state), + server_manager: self.log_server_manager.clone(), + server, + first_offset: self.first_offset, + records: Arc::clone(&self.records), + rpc_router: self.store_router.clone(), + }; + + store_tasks.push(task.run()); + } + + loop { + let store_result = match timeout( + self.configuration + .live_load() + .bifrost + .replicated_loglet + .log_server_timeout, + store_tasks.next(), + ) + .await + { + Ok(Some(result)) => result, + Ok(None) => break, //no more tasks + Err(_err) => { + // timed out! + // none of the pending tasks has finished in time! we will assume all pending server + // are gray listed and try again + return AppenderState::Wave { + graylist: pending_servers, + }; + } + }; + + let LogServerStoreTaskResult { server, status } = store_result; + + let node_id = server.node_id(); + let response = match status { + StoreTaskStatus::Error(err) => { + // couldn't send store command to remote server + tracing::error!(node_id=%server.node_id(), "failed to send batch to node {}", err); + continue; + } + StoreTaskStatus::Sealed(_) => { + tracing::trace!(node_id=%server.node_id(), "node is sealed"); + continue; + } + StoreTaskStatus::AdvancedLocalTail(_) => { + // node local tail is behind the batch first offset + // question(azmy): we assume node has been replicated? + checker.set_attribute(node_id, true); + pending_servers.remove(&node_id); + continue; + } + StoreTaskStatus::Stored(stored) => stored, + }; + + // we had a response from this node and there is still a lot we can do + match response.status { + Status::Ok => { + // only if status is okay that we remove this node + // from the gray list, and move to replicated list + checker.set_attribute(node_id, true); + pending_servers.remove(&node_id); + } + _ => { + // todo(azmy): handle other status + // note: we don't remove the node from the gray list + } + } + + if checker.check_write_quorum(|attr| *attr) { + // resolve the commit if not resolved yet + if let Some(resolver) = self.commit_resolver.take() { + self.sequencer_shared_state + .global_committed_tail() + .notify_offset_update(next_global_committed_tail); + resolver.offset(next_global_committed_tail); + } + + // drop the permit + self.permit.take(); + } + } + + if checker.check_write_quorum(|attr| *attr) { + AppenderState::Done + } else { + AppenderState::Wave { + graylist: pending_servers, + } + } + } +} + +enum StoreTaskStatus { + Sealed(LogletOffset), + AdvancedLocalTail(LogletOffset), + Stored(Stored), + Error(NetworkError), +} + +impl From> for StoreTaskStatus { + fn from(value: Result) -> Self { + match value { + Ok(result) => result, + Err(err) => Self::Error(err), + } + } +} + +struct LogServerStoreTaskResult { + pub server: RemoteLogServer, + pub status: StoreTaskStatus, +} + +/// The LogServerStoreTask takes care of running a [`Store`] to end. +/// +/// The task will retry to connect to the remote server is connection +/// was lost. +struct LogServerStoreTask { + sequencer_shared_state: Arc, + server_manager: RemoteLogServerManager, + server: RemoteLogServer, + first_offset: LogletOffset, + records: Arc<[Record]>, + rpc_router: RpcRouter, +} + +impl LogServerStoreTask { + async fn run(mut self) -> LogServerStoreTaskResult { + let result = self.send().await; + LogServerStoreTaskResult { + server: self.server, + status: result.into(), + } + } + + async fn send(&mut self) -> Result { + let server_local_tail = self + .server + .local_tail() + .wait_for_offset_or_seal(self.first_offset) + .await?; + + match server_local_tail { + TailState::Sealed(offset) => return Ok(StoreTaskStatus::Sealed(offset)), + TailState::Open(offset) => { + match offset.cmp(&self.first_offset) { + Ordering::Equal => { + // we ready to send our write + } + Ordering::Less => { + // this should never happen since we waiting + // for local tail! + unreachable!() + } + Ordering::Greater => { + return Ok(StoreTaskStatus::AdvancedLocalTail(offset)); + } + }; + } + } + + let incoming = match self.try_send().await { + Ok(incoming) => incoming, + Err(err) => { + return Err(err); + } + }; + + self.server + .local_tail() + .notify_offset_update(incoming.body().local_tail); + + match incoming.body().status { + Status::Sealing | Status::Sealed => { + self.server.local_tail().notify_seal(); + } + _ => {} + } + + Ok(StoreTaskStatus::Stored(incoming.into_body())) + } + + async fn try_send(&mut self) -> Result, NetworkError> { + let store = Store { + first_offset: self.first_offset, + flags: StoreFlags::empty(), + known_archived: LogletOffset::INVALID, + known_global_tail: self.sequencer_shared_state.committed_tail.latest_offset(), + loglet_id: self.server.loglet_id(), + payloads: Vec::from_iter(self.records.iter().cloned()), + sequencer: self.sequencer_shared_state.node_id, + timeout_at: None, + }; + + let mut msg = Outgoing::new(self.server.node_id(), store); + + loop { + let with_connection = msg.assign_connection(self.server.connection().clone()); + match self.rpc_router.call_on_connection(with_connection).await { + Ok(incoming) => return Ok(incoming), + Err(RpcError::Shutdown(shutdown)) => return Err(NetworkError::Shutdown(shutdown)), + Err(RpcError::SendError(err)) => { + msg = err.original.forget_connection(); + + match err.source { + NetworkError::ConnectionClosed + | NetworkError::ConnectError(_) + | NetworkError::Timeout(_) => { + self.server_manager.renew(&mut self.server).await? + } + _ => return Err(err.source), + } + } + } + } + } +} diff --git a/crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs b/crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs index 6b9e9058d..a8a76396b 100644 --- a/crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs +++ b/crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs @@ -8,4 +8,65 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +mod append; mod node; + +use std::sync::Arc; + +use restate_core::ShutdownError; +use restate_types::{ + logs::{LogletOffset, Record}, + replicated_loglet::ReplicatedLogletId, + GenerationalNodeId, +}; + +use super::replication::spread_selector::SpreadSelector; +use crate::loglet::util::TailOffsetWatch; + +#[derive(thiserror::Error, Debug)] +pub enum SequencerError { + #[error("loglet offset exhausted")] + LogletOffsetExhausted, + #[error("batch exceeds possible length")] + InvalidBatchLength, + #[error(transparent)] + Shutdown(#[from] ShutdownError), +} + +/// Sequencer shared state +pub struct SequencerSharedState { + node_id: GenerationalNodeId, + loglet_id: ReplicatedLogletId, + committed_tail: TailOffsetWatch, + selector: SpreadSelector, +} + +impl SequencerSharedState { + pub fn node_id(&self) -> &GenerationalNodeId { + &self.node_id + } + + pub fn loglet_id(&self) -> &ReplicatedLogletId { + &self.loglet_id + } + + pub fn global_committed_tail(&self) -> &TailOffsetWatch { + &self.committed_tail + } +} + +trait BatchExt { + /// tail computes inflight tail after this batch is committed + fn last_offset(&self, first_offset: LogletOffset) -> Result; +} + +impl BatchExt for Arc<[Record]> { + fn last_offset(&self, first_offset: LogletOffset) -> Result { + let len = u32::try_from(self.len()).map_err(|_| SequencerError::InvalidBatchLength)?; + + first_offset + .checked_add(len - 1) + .map(LogletOffset::from) + .ok_or(SequencerError::LogletOffsetExhausted) + } +} diff --git a/crates/bifrost/src/providers/replicated_loglet/sequencer/node.rs b/crates/bifrost/src/providers/replicated_loglet/sequencer/node.rs index 27eb37cee..a66ec43d3 100644 --- a/crates/bifrost/src/providers/replicated_loglet/sequencer/node.rs +++ b/crates/bifrost/src/providers/replicated_loglet/sequencer/node.rs @@ -42,7 +42,7 @@ impl RemoteLogServer { self.loglet_id } - pub fn tail(&self) -> &TailOffsetWatch { + pub fn local_tail(&self) -> &TailOffsetWatch { &self.tail } diff --git a/crates/types/src/config/bifrost.rs b/crates/types/src/config/bifrost.rs index ea7c2da24..215835da9 100644 --- a/crates/types/src/config/bifrost.rs +++ b/crates/types/src/config/bifrost.rs @@ -208,9 +208,42 @@ impl Default for LocalLogletOptions { #[cfg(feature = "replicated-loglet")] #[serde_as] -#[derive(Debug, Default, Clone, Serialize, Deserialize, derive_builder::Builder)] +#[derive(Debug, Clone, Serialize, Deserialize, derive_builder::Builder)] #[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] #[cfg_attr(feature = "schemars", schemars(rename = "ReplicatedLoglet", default))] #[serde(rename_all = "kebab-case")] #[builder(default)] -pub struct ReplicatedLogletOptions {} +pub struct ReplicatedLogletOptions { + /// Maximum number of inflight batches before sequencer + /// + /// Once this maximum is hit, sequencer will induce back pressure + /// on clients + pub maximum_inflight_batches: NonZeroUsize, + + /// Sequencer backoff strategy + /// + /// Backoff introduced when sequencer fail to find a suitable spread + /// of log servers + pub sequencer_backoff_strategy: RetryPolicy, + + /// Log Server timeout + /// + /// Timeout waiting on log server response + pub log_server_timeout: Duration, +} + +impl Default for ReplicatedLogletOptions { + fn default() -> Self { + Self { + maximum_inflight_batches: NonZeroUsize::new(128).unwrap(), + + sequencer_backoff_strategy: RetryPolicy::exponential( + Duration::from_millis(100), + 0.1, + None, + Some(Duration::from_millis(2000)), + ), + log_server_timeout: Duration::from_millis(500), + } + } +} diff --git a/crates/types/src/time.rs b/crates/types/src/time.rs index 487425f68..45546f468 100644 --- a/crates/types/src/time.rs +++ b/crates/types/src/time.rs @@ -15,7 +15,17 @@ use std::time::{Duration, SystemTime}; /// Milliseconds since the unix epoch #[derive( - Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize, + Debug, + Clone, + Copy, + PartialEq, + Eq, + PartialOrd, + Ord, + Hash, + serde::Serialize, + serde::Deserialize, + derive_more::Into, )] #[serde(transparent)] #[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]