From 82ba5ea228f0b726f337dc841afbdaf6544aafe2 Mon Sep 17 00:00:00 2001 From: Muhamad Awad Date: Mon, 23 Sep 2024 13:54:57 +0200 Subject: [PATCH] scaffold log-server manager --- .../src/providers/replicated_loglet/mod.rs | 2 + .../replicated_loglet/sequencer/mod.rs | 11 + .../replicated_loglet/sequencer/node.rs | 189 ++++++++++++++++++ crates/core/src/network/connection.rs | 6 + 4 files changed, 208 insertions(+) create mode 100644 crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs create mode 100644 crates/bifrost/src/providers/replicated_loglet/sequencer/node.rs diff --git a/crates/bifrost/src/providers/replicated_loglet/mod.rs b/crates/bifrost/src/providers/replicated_loglet/mod.rs index c4e599060..b7e84a9fd 100644 --- a/crates/bifrost/src/providers/replicated_loglet/mod.rs +++ b/crates/bifrost/src/providers/replicated_loglet/mod.rs @@ -11,6 +11,8 @@ pub(crate) mod metric_definitions; mod provider; pub mod replication; +#[allow(dead_code)] +pub mod sequencer; #[cfg(any(test, feature = "test-util"))] pub mod test_util; diff --git a/crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs b/crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs new file mode 100644 index 000000000..6b9e9058d --- /dev/null +++ b/crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs @@ -0,0 +1,11 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +mod node; diff --git a/crates/bifrost/src/providers/replicated_loglet/sequencer/node.rs b/crates/bifrost/src/providers/replicated_loglet/sequencer/node.rs new file mode 100644 index 000000000..5051d030d --- /dev/null +++ b/crates/bifrost/src/providers/replicated_loglet/sequencer/node.rs @@ -0,0 +1,189 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::{collections::BTreeMap, ops::Deref, sync::Arc}; + +use tokio::sync::Mutex; + +use restate_core::{ + network::{ConnectionSender, NetworkError, Networking}, + Metadata, +}; +use restate_types::{ + logs::{LogletOffset, SequenceNumber, TailState}, + replicated_loglet::{NodeSet, ReplicatedLogletId}, + PlainNodeId, +}; + +use crate::loglet::util::TailOffsetWatch; + +type LogServerLock = Mutex>; + +/// LogServer instance +#[derive(Clone)] +pub struct RemoteLogServer { + loglet_id: ReplicatedLogletId, + node: PlainNodeId, + tail: TailOffsetWatch, + //todo(azmy): maybe use ArcSwap here to update + sender: ConnectionSender, +} + +impl RemoteLogServer { + pub fn node_id(&self) -> PlainNodeId { + self.node + } + + pub fn loglet_id(&self) -> ReplicatedLogletId { + self.loglet_id + } + + pub fn tail(&self) -> &TailOffsetWatch { + &self.tail + } + + pub fn sender(&mut self) -> &mut ConnectionSender { + &mut self.sender + } +} + +struct RemoteLogServerManagerInner { + loglet_id: ReplicatedLogletId, + servers: BTreeMap, + node_set: NodeSet, + metadata: Metadata, + networking: Networking, +} + +/// LogServerManager maintains a set of [`RemoteLogServer`]s that provided via the +/// [`NodeSet`]. +/// +/// The manager makes sure there is only one active connection per server. +/// It's up to the user of the client to do [`LogServerManager::renew`] if needed +pub(crate) struct RemoteLogServerManager { + inner: Arc, +} + +impl Clone for RemoteLogServerManager { + fn clone(&self) -> Self { + Self { + inner: Arc::clone(&self.inner), + } + } +} + +impl RemoteLogServerManager { + /// creates the node set and start the appenders + pub fn new( + loglet_id: ReplicatedLogletId, + metadata: Metadata, + networking: Networking, + node_set: NodeSet, + ) -> Self { + let mut servers = BTreeMap::default(); + for node_id in node_set.iter() { + servers.insert(*node_id, LogServerLock::default()); + } + + let inner = RemoteLogServerManagerInner { + loglet_id, + servers, + node_set, + metadata, + networking, + }; + + Self { + inner: Arc::new(inner), + } + } + + async fn connect(&self, id: PlainNodeId) -> Result { + let conf = self.inner.metadata.nodes_config_ref(); + let node = conf.find_node_by_id(id)?; + let connection = self + .inner + .networking + .connection_manager() + .get_node_sender(node.current_generation) + .await?; + + Ok(connection) + } + + /// gets a log-server instance. On first time it will initialize a new connection + /// to log server. It will make sure all following get call will hold the same + /// connection. + /// + /// it's up to the client to call [`Self::renew`] if the connection it holds + /// is closed + pub async fn get(&self, id: PlainNodeId) -> Result { + let server = self.inner.servers.get(&id).expect("node is in nodeset"); + + let mut guard = server.lock().await; + + if let Some(current) = guard.deref() { + return Ok(current.clone()); + } + + // initialize a new instance + let server = RemoteLogServer { + loglet_id: self.inner.loglet_id, + node: id, + tail: TailOffsetWatch::new(TailState::Open(LogletOffset::OLDEST)), + sender: self.connect(id).await?, + }; + + // we need to update initialize it + *guard = Some(server.clone()); + + Ok(server) + } + + /// renew makes sure server connection is renewed if and only if + /// the provided server holds an outdated connection. Otherwise + /// the latest connection associated with this server is used. + /// + /// It's up the holder of the log server instance to retry to renew + /// if that connection is not valid. + /// + /// It also guarantees that concurrent calls to renew on the same server instance + /// will only renew the connection once for all callers + /// + /// However, this does not affect copies of LogServer that have been already retrieved + /// by calling [`Self::get()`]. + /// + /// Holder of old instances will have to call renew if they need to. + pub async fn renew(&self, server: &mut RemoteLogServer) -> Result<(), NetworkError> { + // this key must already be in the map + let current = self + .inner + .servers + .get(&server.node) + .expect("node is in nodeset"); + + let mut guard = current.lock().await; + + // if you calling renew then the LogServer has already been initialized + let inner = guard.as_mut().expect("initialized log server instance"); + + if inner.sender != server.sender { + // someone else has already renewed the connection + server.sender = inner.sender.clone(); + return Ok(()); + } + + let sender = self.connect(server.node).await?; + inner.sender = sender.clone(); + server.sender = sender.clone(); + + Ok(()) + } +} diff --git a/crates/core/src/network/connection.rs b/crates/core/src/network/connection.rs index 90a5e8ec2..70baefcad 100644 --- a/crates/core/src/network/connection.rs +++ b/crates/core/src/network/connection.rs @@ -336,4 +336,10 @@ impl ConnectionSender { } } +impl PartialEq for ConnectionSender { + fn eq(&self, other: &Self) -> bool { + self.connection.ptr_eq(&other.connection) + } +} + static_assertions::assert_impl_all!(ConnectionSender: Send, Sync);