diff --git a/crates/bifrost/src/providers/replicated_loglet/mod.rs b/crates/bifrost/src/providers/replicated_loglet/mod.rs index c4e599060..b7e84a9fd 100644 --- a/crates/bifrost/src/providers/replicated_loglet/mod.rs +++ b/crates/bifrost/src/providers/replicated_loglet/mod.rs @@ -11,6 +11,8 @@ pub(crate) mod metric_definitions; mod provider; pub mod replication; +#[allow(dead_code)] +pub mod sequencer; #[cfg(any(test, feature = "test-util"))] pub mod test_util; diff --git a/crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs b/crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs new file mode 100644 index 000000000..ed86e2659 --- /dev/null +++ b/crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs @@ -0,0 +1,15 @@ +use restate_core::network::NetworkError; +use restate_types::nodes_config::NodesConfigError; + +mod node; + +#[derive(thiserror::Error, Debug)] +enum Error { + #[error("invalid node-set configuration")] + InvalidNodeSet, + + #[error(transparent)] + Network(#[from] NetworkError), + #[error(transparent)] + NodeConfig(#[from] NodesConfigError), +} diff --git a/crates/bifrost/src/providers/replicated_loglet/sequencer/node.rs b/crates/bifrost/src/providers/replicated_loglet/sequencer/node.rs new file mode 100644 index 000000000..68ead6241 --- /dev/null +++ b/crates/bifrost/src/providers/replicated_loglet/sequencer/node.rs @@ -0,0 +1,236 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::{collections::BTreeMap, ops::Deref, sync::Arc}; + +use super::Error; +use restate_core::{ + network::{ConnectionManager, ConnectionSender}, + Metadata, +}; +use restate_types::{ + logs::{LogletOffset, SequenceNumber}, + replicated_loglet::NodeSet, + PlainNodeId, +}; +use tokio::sync::{watch, Mutex}; + +#[derive(Debug, Clone)] +/// sharable in memory log server state. +pub struct LogServerState { + sealed: watch::Sender, + local_tail: watch::Sender, +} + +impl Default for LogServerState { + fn default() -> Self { + Self { + sealed: watch::Sender::new(false), + local_tail: watch::Sender::new(LogletOffset::OLDEST), + } + } +} + +impl LogServerState { + /// check if log server is sealed or not + pub fn is_sealed(&self) -> bool { + *self.sealed.borrow() + } + + /// seal log server only marks log server as sealed. + /// it also notify all waiters on seal + pub fn seal(&self) { + self.sealed.send_if_modified(|s| { + if *s != true { + *s = true; + true + } else { + false + } + }); + } + + /// get current local tail + pub fn local_tail(&self) -> LogletOffset { + *self.local_tail.borrow() + } + + /// update server local tail if and only if new tail is newer + /// that last known tail value + pub fn maybe_update_local_tail(&self, new_tail: LogletOffset) { + self.local_tail.send_if_modified(|m| { + if new_tail > *m { + *m = new_tail; + true + } else { + false + } + }); + } + + /// wait for tail to be at this value or higher + pub async fn wait_for_tail(&self, value: LogletOffset) -> Option { + let mut receiver = self.local_tail.subscribe(); + receiver.wait_for(|v| *v >= value).await.map(|f| *f).ok() + } + + /// wait for seal + pub async fn wait_for_seal(&self) -> Option { + let mut receiver = self.sealed.subscribe(); + receiver.wait_for(|v| *v).await.map(|f| *f).ok() + } +} + +/// LogServer instance +#[derive(Clone)] +pub struct LogServer { + node: PlainNodeId, + state: LogServerState, + sender: ConnectionSender, +} + +impl LogServer { + pub fn node(&self) -> &PlainNodeId { + &self.node + } + + pub fn state(&self) -> &LogServerState { + &self.state + } + + pub fn sender(&self) -> ConnectionSender { + self.sender.clone() + } +} + +#[derive(Default)] +struct LogServerLock(Mutex>); + +/// LogServerManager maintains a set of log servers that provided via the +/// [`NodeSet`]. +/// +/// The manager makes sure there is only one active connection per server. +/// It's up to the user of the client to do [`LogServerManager::renew`] if needed +pub(crate) struct LogServerManager { + servers: Arc>, + node_set: NodeSet, + metadata: Metadata, + connection_manager: ConnectionManager, +} + +impl Clone for LogServerManager { + fn clone(&self) -> Self { + Self { + servers: Arc::clone(&self.servers), + node_set: self.node_set.clone(), + metadata: self.metadata.clone(), + connection_manager: self.connection_manager.clone(), + } + } +} + +impl LogServerManager { + /// creates the node set and start the appenders + pub fn new( + metadata: Metadata, + connection_manager: ConnectionManager, + node_set: NodeSet, + ) -> Result { + let mut servers = BTreeMap::default(); + for node_id in node_set.iter() { + servers.insert(*node_id, LogServerLock::default()); + } + + Ok(Self { + servers: Arc::new(servers), + node_set, + metadata, + connection_manager, + }) + } + + async fn connect(&self, id: PlainNodeId) -> Result { + let conf = self.metadata.nodes_config_ref(); + let node = conf.find_node_by_id(id)?; + let connection = self + .connection_manager + .get_node_sender(node.current_generation) + .await?; + + Ok(connection) + } + + /// gets a log-server instance. On fist time it will initialize a new connection + /// to log server. It will make sure all following get call will hold the same + /// connection. + /// + /// it's up to the client to call [`Self::renew`] if the connection it holds + /// is closed + pub async fn get(&self, id: PlainNodeId) -> Result { + let server = self.servers.get(&id).ok_or(Error::InvalidNodeSet)?; + + let mut guard = server.0.lock().await; + + if let Some(current) = guard.deref() { + return Ok(current.clone()); + } + + // initialize a new instance + let server = LogServer { + node: id, + state: LogServerState::default(), + sender: self.connect(id).await?, + }; + + // we need to update initialize it + *guard = Some(server.clone()); + + Ok(server) + } + + /// renew makes sure server connection is renewed if and only if + /// the provided server holds an outdated connection. Otherwise + /// the latest connection associated with this server is used. + /// + /// It's up the holder of the log server instance to retry to renew + /// if that connection is not valid. + /// + /// It also grantees that concurrent call to renew on the same server instance + /// will only renew the connection once for all callers + pub async fn renew(&self, server: &mut LogServer) -> Result<(), Error> { + if !server.sender.is_closed() { + // no need to renew! + return Ok(()); + } + + // this key must already be in the map + let current = self + .servers + .get(&server.node) + .ok_or(Error::InvalidNodeSet)?; + + let mut guard = current.0.lock().await; + + // if you calling renew then the LogServer has already been initialized + let inner = guard.as_mut().expect("initialized log server instance"); + + if inner.sender != server.sender { + // someone else has already renewed the connection + server.sender = inner.sender.clone(); + return Ok(()); + } + + let sender = self.connect(server.node).await?; + inner.sender = sender.clone(); + server.sender = sender.clone(); + + Ok(()) + } +} diff --git a/crates/core/src/network/connection.rs b/crates/core/src/network/connection.rs index 90a5e8ec2..70baefcad 100644 --- a/crates/core/src/network/connection.rs +++ b/crates/core/src/network/connection.rs @@ -336,4 +336,10 @@ impl ConnectionSender { } } +impl PartialEq for ConnectionSender { + fn eq(&self, other: &Self) -> bool { + self.connection.ptr_eq(&other.connection) + } +} + static_assertions::assert_impl_all!(ConnectionSender: Send, Sync);