diff --git a/Cargo.lock b/Cargo.lock index d75323a1e..69556cbd6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5504,6 +5504,7 @@ dependencies = [ "pin-project", "rand", "restate-core", + "restate-log-server", "restate-metadata-store", "restate-rocksdb", "restate-test-util", diff --git a/crates/bifrost/Cargo.toml b/crates/bifrost/Cargo.toml index 65b0b4143..716c14053 100644 --- a/crates/bifrost/Cargo.toml +++ b/crates/bifrost/Cargo.toml @@ -41,6 +41,7 @@ tracing = { workspace = true } [dev-dependencies] restate-core = { workspace = true, features = ["test-util"] } +restate-log-server = { workspace = true } restate-metadata-store = { workspace = true } restate-test-util = { workspace = true } restate-types = { workspace = true, features = ["test-util"] } diff --git a/crates/bifrost/src/providers/replicated_loglet/sequencer/append.rs b/crates/bifrost/src/providers/replicated_loglet/sequencer/append.rs new file mode 100644 index 000000000..249479a75 --- /dev/null +++ b/crates/bifrost/src/providers/replicated_loglet/sequencer/append.rs @@ -0,0 +1,321 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::sync::Arc; + +use futures::StreamExt; +use tokio::task::JoinSet; + +use restate_core::{ + network::{rpc_router::ResponseTracker, Incoming, NetworkError, Outgoing}, + Metadata, +}; +use restate_types::{ + logs::{LogletOffset, Record, SequenceNumber, TailState}, + net::log_server::{Status, Store, StoreFlags, Stored}, + replicated_loglet::NodeSet, +}; + +use super::{ + node::{RemoteLogServer, RemoteLogServerManager}, + SequencerGlobalState, +}; +use crate::providers::replicated_loglet::replication::spread_selector::SpreadSelector; + +enum AppenderState { + Wave { + replicated: NodeSet, + gray_list: NodeSet, + }, + Done, +} + +/// Appender makes sure a batch of records will run to completion +pub(crate) struct Appender { + global: Arc, + log_server_manager: RemoteLogServerManager, + response_tracker: ResponseTracker, + selector: SpreadSelector, + metadata: Metadata, + first_offset: LogletOffset, + records: Arc<[Record]>, +} + +impl Appender { + pub fn new( + global: Arc, + log_server_manager: RemoteLogServerManager, + response_tracker: ResponseTracker, + selector: SpreadSelector, + metadata: Metadata, + first_offset: LogletOffset, + records: Arc<[Record]>, + ) -> Self { + Self { + global, + log_server_manager, + response_tracker, + selector, + metadata, + first_offset, + records, + } + } + + pub async fn run(mut self) { + let mut state = AppenderState::Wave { + replicated: NodeSet::empty(), + gray_list: NodeSet::empty(), + }; + + // note: this should be verified by the sequencer, so this should be safe! + let tail = self + .first_offset + .checked_add(self.records.len() as u32) + .expect("possible tail"); + + loop { + state = match state { + AppenderState::Wave { + replicated, + gray_list, + } => self.wave(replicated, gray_list).await, + AppenderState::Done => { + self.global + .committed_tail() + .notify_offset_update(LogletOffset::from(tail)); + break; + } + } + } + } + + async fn wave(&mut self, replicated: NodeSet, mut gray_list: NodeSet) -> AppenderState { + let spread = match self.selector.select( + &mut rand::thread_rng(), + &self.metadata.nodes_config_ref(), + &gray_list, + ) { + Ok(spread) => spread, + Err(_) => { + //todo(azmy): retry without a gray-list ! + return AppenderState::Wave { + replicated, + gray_list: NodeSet::empty(), + }; + } + }; + + let mut gray = false; + let mut servers = Vec::with_capacity(spread.len()); + for id in spread { + // at this stage, if we fail to get connection to this server it must be + // a first time use. We can safely assume this has to be gray listed + let server = match self.log_server_manager.get(id).await { + Ok(server) => server, + Err(err) => { + tracing::error!("failed to connect to {}: {}", id, err); + gray = true; + gray_list.insert(id); + continue; + } + }; + + servers.push(server); + } + + if gray { + // Some nodes has been gray listed (wasn't in the original gray list) + // todo(azmy): is it possible that we still can get a write quorum on the remaining + // set of nodes. The selector probably should provide an interface to validate that! + + // we basically try again with a new set of gray_list + return AppenderState::Wave { + replicated, + gray_list, + }; + } + + // otherwise, we try to send the wave. + return self.send_wave(servers).await; + } + + async fn send_wave(&mut self, spread: Vec) -> AppenderState { + let mut waiters = JoinSet::new(); + + let replication_factor = spread.len(); + let mut replicated = NodeSet::empty(); + + for server in spread { + // it is possible that we have visited this server + // in a previous wave. So we can short circuit here + // and just skip + if server.tail().latest_offset() > self.first_offset { + replicated.insert(*server.node()); + continue; + } + + let sender = Sender { + global: Arc::clone(&self.global), + server_manager: self.log_server_manager.clone(), + server, + first_offset: self.first_offset, + records: Arc::clone(&self.records), + tracker: self.response_tracker.clone(), + }; + + waiters.spawn(sender.send()); + } + + let mut gray_list = NodeSet::empty(); + + // todo(azmy): join_next should timeout if nodes are taking too long to respond! + while let Some(Ok((server, result))) = waiters.join_next().await { + let status = match result { + Ok(status) => status, + Err(_err) => { + // todo(azmy): handle errors differently + gray_list.insert(*server.node()); + continue; + } + }; + + // we had a response from this node and there is still a lot we can do + match status { + Status::Ok => { + replicated.insert(*server.node()); + } + Status::Sealed | Status::Sealing => { + server.tail().notify_seal(); + } + _ => { + //todo(azmy): handle other status + gray_list.insert(*server.node()); + } + } + } + + if gray_list.is_empty() && replicated.len() == replication_factor { + AppenderState::Done + } else { + AppenderState::Wave { + replicated, + gray_list, + } + } + } +} + +struct Sender { + global: Arc, + server_manager: RemoteLogServerManager, + server: RemoteLogServer, + first_offset: LogletOffset, + records: Arc<[Record]>, + tracker: ResponseTracker, +} + +impl Sender { + async fn send(mut self) -> (RemoteLogServer, Result) { + let mut global_tail = self.global.committed_tail().to_stream(); + let mut local_tail = self.server.tail().to_stream(); + + loop { + let tail = tokio::select! { + Some(tail) = global_tail.next() => { + tail + }, + Some(tail) = local_tail.next() => { + tail + } + }; + + match tail { + TailState::Sealed(_) => { + //either local or global tail is commited, + return (self.server, Ok(Status::Sealed)); + } + TailState::Open(offset) => { + if offset > self.first_offset { + // somehow the global (of local) offset have moved + // behind our first offset! + // for now we assume we don't need to make the + // write! + return (self.server, Ok(Status::Ok)); + } else if offset == self.first_offset { + break; + } + } + } + } + + let incoming = match self.try_send().await { + Ok(incoming) => incoming, + Err(err) => { + return (self.server, Err(err)); + } + }; + + // quick actions + match incoming.status { + Status::Ok => { + self.server.tail().notify_offset_update(incoming.local_tail); + } + _ => {} + } + + (self.server, Ok(incoming.status)) + } + + async fn try_send(&mut self) -> Result, NetworkError> { + // if we are here so either we at global committed tail or node tail + // is at first_offset. In either cases, we can try to send our Store message! + let store = Store { + first_offset: self.first_offset, + flags: StoreFlags::empty(), + known_archived: LogletOffset::INVALID, + known_global_tail: self.global.committed_tail.latest_offset(), + loglet_id: self.server.loglet_id(), + payloads: Vec::from_iter(self.records.iter().cloned()), + sequencer: self.global.node_id, + timeout_at: None, + }; + + let mut msg = Outgoing::new(*self.server.node(), store); + let token = self + .tracker + .new_token(msg.msg_id()) + .expect("unique message id"); + + loop { + match self.server.sender().send(msg).await { + Ok(_) => break, + Err(send) => { + msg = send.message; + + match send.source { + NetworkError::ConnectionClosed + | NetworkError::ConnectError(_) + | NetworkError::Timeout(_) => { + self.server_manager.renew(&mut self.server).await? + } + _ => return Err(send.source.into()), + } + } + } + } + + // message has been sent! there is noway + // we are sure it has been received by the other peer + // so we wait for response. indefinitely + // it's up to the appender time outs to try again + token.recv().await.map_err(NetworkError::from) + } +} diff --git a/crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs b/crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs index 6b9e9058d..66967fdfb 100644 --- a/crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs +++ b/crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs @@ -8,4 +8,31 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +mod append; mod node; + +use restate_types::{replicated_loglet::ReplicatedLogletId, GenerationalNodeId}; + +use crate::loglet::util::TailOffsetWatch; + +/// A sharable part of the sequencer state. This is shared with node workers +#[derive(Debug)] +pub(crate) struct SequencerGlobalState { + node_id: GenerationalNodeId, + loglet_id: ReplicatedLogletId, + committed_tail: TailOffsetWatch, +} + +impl SequencerGlobalState { + pub fn node_id(&self) -> &GenerationalNodeId { + &self.node_id + } + + pub fn loglet_id(&self) -> &ReplicatedLogletId { + &self.loglet_id + } + + pub fn committed_tail(&self) -> &TailOffsetWatch { + &self.committed_tail + } +} diff --git a/crates/types/src/time.rs b/crates/types/src/time.rs index 487425f68..45546f468 100644 --- a/crates/types/src/time.rs +++ b/crates/types/src/time.rs @@ -15,7 +15,17 @@ use std::time::{Duration, SystemTime}; /// Milliseconds since the unix epoch #[derive( - Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize, + Debug, + Clone, + Copy, + PartialEq, + Eq, + PartialOrd, + Ord, + Hash, + serde::Serialize, + serde::Deserialize, + derive_more::Into, )] #[serde(transparent)] #[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]