diff --git a/crates/bifrost/src/providers/replicated_loglet/loglet.rs b/crates/bifrost/src/providers/replicated_loglet/loglet.rs index 70f6bb200..75af90cbe 100644 --- a/crates/bifrost/src/providers/replicated_loglet/loglet.rs +++ b/crates/bifrost/src/providers/replicated_loglet/loglet.rs @@ -15,47 +15,101 @@ use std::sync::Arc; use async_trait::async_trait; use futures::stream::BoxStream; +use restate_types::logs::metadata::SegmentIndex; +use tracing::debug; use restate_core::network::{Networking, TransportConnect}; use restate_core::{Metadata, ShutdownError, TaskCenter}; -use restate_types::logs::{KeyFilter, LogletOffset, Record, TailState}; +use restate_types::logs::{KeyFilter, LogId, LogletOffset, Record, SequenceNumber, TailState}; use restate_types::replicated_loglet::ReplicatedLogletParams; +use crate::loglet::util::TailOffsetWatch; use crate::loglet::{Loglet, LogletCommit, OperationError, SendableLogletReadStream}; -use super::rpc_routers::LogServersRpc; +use super::record_cache::RecordCache; +use super::rpc_routers::{LogServersRpc, SequencersRpc}; #[derive(derive_more::Debug)] pub(super) struct ReplicatedLoglet { + /// This is used only to populate header of outgoing request to a remotely owned sequencer. + /// Otherwise, it's unused. + log_id: LogId, + /// This is used only to populate header of outgoing request to a remotely owned sequencer. + /// Otherwise, it's unused. + segment_index: SegmentIndex, my_params: ReplicatedLogletParams, #[debug(skip)] task_center: TaskCenter, #[debug(skip)] metadata: Metadata, #[debug(skip)] + networking: Networking, + #[debug(skip)] logservers_rpc: LogServersRpc, #[debug(skip)] - networking: Networking, + record_cache: RecordCache, + /// A shared watch for the last known global tail of the loglet. + /// Note that this comes with a few caveats: + /// - On startup, this defaults to `Open(OLDEST)` + /// - find_tail() should use this value iff we have a local sequencer for all other cases, we + /// should run a proper tail search. + known_global_tail: TailOffsetWatch, + sequencer: SequencerAccess, } impl ReplicatedLoglet { - pub fn new( + pub fn start( + log_id: LogId, + segment_index: SegmentIndex, my_params: ReplicatedLogletParams, task_center: TaskCenter, metadata: Metadata, networking: Networking, logservers_rpc: LogServersRpc, + sequencers_rpc: &SequencersRpc, + record_cache: RecordCache, ) -> Self { + let sequencer = if metadata.my_node_id() == my_params.sequencer { + debug!( + loglet_id = %my_params.loglet_id, + "We are the sequencer node for this loglet" + ); + SequencerAccess::Local { + // create the sequencer and store the handle + } + } else { + SequencerAccess::Remote { + sequencers_rpc: sequencers_rpc.clone(), + } + }; Self { + log_id, + segment_index, my_params, task_center, metadata, networking, logservers_rpc, + record_cache, + known_global_tail: TailOffsetWatch::new(TailState::Open(LogletOffset::OLDEST)), + sequencer, } } } +// todo(asoli): This will hold a handle to access the local sequencer, or a swappable handle if +// it's a remote sequencer. +#[derive(derive_more::Debug, derive_more::IsVariant)] +pub enum SequencerAccess { + /// The sequencer is remote (or retired/preempted) + #[debug("Remote")] + Remote { sequencers_rpc: SequencersRpc }, + /// We are the loglet leaders + #[debug("Local")] + // todo (add handle) + Local {}, +} + #[async_trait] impl Loglet for ReplicatedLoglet { async fn create_read_stream( @@ -68,7 +122,10 @@ impl Loglet for ReplicatedLoglet { } fn watch_tail(&self) -> BoxStream<'static, TailState> { - todo!() + // It's acceptable for watch_tail to return an outdated value in the beginning, + // but if the loglet is unsealed, we need to ensure that we have a mechanism to update + // this value if we don't have a local sequencer. + Box::pin(self.known_global_tail.to_stream()) } async fn enqueue_batch(&self, _payloads: Arc<[Record]>) -> Result { diff --git a/crates/bifrost/src/providers/replicated_loglet/mod.rs b/crates/bifrost/src/providers/replicated_loglet/mod.rs index f0e2cc692..ab71d1c86 100644 --- a/crates/bifrost/src/providers/replicated_loglet/mod.rs +++ b/crates/bifrost/src/providers/replicated_loglet/mod.rs @@ -11,7 +11,9 @@ mod error; mod loglet; pub(crate) mod metric_definitions; +mod network; mod provider; +mod record_cache; pub mod replication; mod rpc_routers; #[allow(dead_code)] diff --git a/crates/bifrost/src/providers/replicated_loglet/network.rs b/crates/bifrost/src/providers/replicated_loglet/network.rs new file mode 100644 index 000000000..567d73578 --- /dev/null +++ b/crates/bifrost/src/providers/replicated_loglet/network.rs @@ -0,0 +1,65 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +// todo(asoli): remove when fleshed out +#![allow(dead_code)] + +use std::pin::Pin; +use std::sync::Arc; + +use futures::Stream; + +use restate_core::network::{Incoming, MessageRouterBuilder, TransportConnect}; +use restate_core::{cancellation_watcher, Metadata}; +use restate_types::config::ReplicatedLogletOptions; +use restate_types::net::replicated_loglet::Append; +use tracing::trace; + +use super::provider::ReplicatedLogletProvider; + +type MessageStream = Pin> + Send + Sync + 'static>>; + +pub struct RequestPump { + metadata: Metadata, + append_stream: MessageStream, +} + +impl RequestPump { + pub fn new( + _opts: &ReplicatedLogletOptions, + metadata: Metadata, + router_builder: &mut MessageRouterBuilder, + ) -> Self { + // todo(asoli) read from opts + let queue_length = 10; + let append_stream = router_builder.subscribe_to_stream(queue_length); + Self { + metadata, + append_stream, + } + } + + /// Must run in task-center context + pub async fn run( + self, + _provider: Arc>, + ) -> anyhow::Result<()> { + trace!("Starting replicated loglet request pump"); + let mut cancel = std::pin::pin!(cancellation_watcher()); + loop { + tokio::select! { + _ = &mut cancel => { + break; + } + } + } + Ok(()) + } +} diff --git a/crates/bifrost/src/providers/replicated_loglet/provider.rs b/crates/bifrost/src/providers/replicated_loglet/provider.rs index bd09c2fc2..d2336a500 100644 --- a/crates/bifrost/src/providers/replicated_loglet/provider.rs +++ b/crates/bifrost/src/providers/replicated_loglet/provider.rs @@ -12,9 +12,10 @@ use std::sync::Arc; use async_trait::async_trait; use dashmap::DashMap; +use tracing::trace; use restate_core::network::{MessageRouterBuilder, Networking, TransportConnect}; -use restate_core::{Metadata, TaskCenter}; +use restate_core::{task_center, Metadata, TaskCenter, TaskKind}; use restate_metadata_store::MetadataStoreClient; use restate_types::config::ReplicatedLogletOptions; use restate_types::live::BoxedLiveLoad; @@ -24,7 +25,9 @@ use restate_types::replicated_loglet::ReplicatedLogletParams; use super::loglet::ReplicatedLoglet; use super::metric_definitions; -use super::rpc_routers::LogServersRpc; +use super::network::RequestPump; +use super::record_cache::RecordCache; +use super::rpc_routers::{LogServersRpc, SequencersRpc}; use crate::loglet::{Loglet, LogletProvider, LogletProviderFactory, OperationError}; use crate::providers::replicated_loglet::error::ReplicatedLogletError; use crate::Error; @@ -36,18 +39,24 @@ pub struct Factory { metadata_store_client: MetadataStoreClient, networking: Networking, logserver_rpc_routers: LogServersRpc, + sequencer_rpc_routers: SequencersRpc, + request_pump: RequestPump, } impl Factory { pub fn new( task_center: TaskCenter, - opts: BoxedLiveLoad, + mut opts: BoxedLiveLoad, metadata_store_client: MetadataStoreClient, metadata: Metadata, networking: Networking, router_builder: &mut MessageRouterBuilder, ) -> Self { + // Handling Sequencer(s) incoming requests + let request_pump = RequestPump::new(opts.live_load(), metadata.clone(), router_builder); + let logserver_rpc_routers = LogServersRpc::new(router_builder); + let sequencer_rpc_routers = SequencersRpc::new(router_builder); // todo(asoli): Create a handler to answer to control plane monitoring questions Self { task_center, @@ -56,6 +65,8 @@ impl Factory { metadata_store_client, networking, logserver_rpc_routers, + sequencer_rpc_routers, + request_pump, } } } @@ -68,47 +79,67 @@ impl LogletProviderFactory for Factory { async fn create(self: Box) -> Result, OperationError> { metric_definitions::describe_metrics(); - Ok(Arc::new(ReplicatedLogletProvider::new( + let provider = Arc::new(ReplicatedLogletProvider::new( self.task_center, self.opts, self.metadata, self.metadata_store_client, self.networking, self.logserver_rpc_routers, - ))) + self.sequencer_rpc_routers, + )); + // run the request pump. The request pump handles/routes incoming messages to our + // locally hosted sequencers. + task_center().spawn( + TaskKind::NetworkMessageHandler, + "sequencers-ingress", + None, + { + let request_pump = self.request_pump; + let provider = provider.clone(); + async { request_pump.run(provider).await } + }, + )?; + + Ok(provider) } } -struct ReplicatedLogletProvider { +pub(super) struct ReplicatedLogletProvider { active_loglets: DashMap<(LogId, SegmentIndex), Arc>>, task_center: TaskCenter, _opts: BoxedLiveLoad, metadata: Metadata, _metadata_store_client: MetadataStoreClient, networking: Networking, + record_cache: RecordCache, logserver_rpc_routers: LogServersRpc, + sequencer_rpc_routers: SequencersRpc, } impl ReplicatedLogletProvider { fn new( task_center: TaskCenter, - opts: BoxedLiveLoad, + _opts: BoxedLiveLoad, metadata: Metadata, metadata_store_client: MetadataStoreClient, networking: Networking, logserver_rpc_routers: LogServersRpc, + sequencer_rpc_routers: SequencersRpc, ) -> Self { // todo(asoli): create all global state here that'll be shared across loglet instances - // - RecordCache. // - NodeState map. Self { active_loglets: Default::default(), task_center, - _opts: opts, + _opts, metadata, _metadata_store_client: metadata_store_client, networking, + // todo(asoli): read memory budget from ReplicatedLogletOptions + record_cache: RecordCache::new(20_000_000), // 20MB logserver_rpc_routers, + sequencer_rpc_routers, } } } @@ -129,13 +160,27 @@ impl LogletProvider for ReplicatedLogletProvider { ReplicatedLogletError::LogletParamsParsingError(log_id, segment_index, e) })?; + trace!( + log_id = %log_id, + segment_index = %segment_index, + loglet_id = %params.loglet_id, + nodeset = ?params.nodeset, + sequencer = %params.sequencer, + replication = ?params.replication, + "Creating a replicated loglet client" + ); + // Create loglet - let loglet = ReplicatedLoglet::new( + let loglet = ReplicatedLoglet::start( + log_id, + segment_index, params, self.task_center.clone(), self.metadata.clone(), self.networking.clone(), self.logserver_rpc_routers.clone(), + &self.sequencer_rpc_routers, + self.record_cache.clone(), ); let key_value = entry.insert(Arc::new(loglet)); Arc::clone(key_value.value()) diff --git a/crates/bifrost/src/providers/replicated_loglet/record_cache.rs b/crates/bifrost/src/providers/replicated_loglet/record_cache.rs new file mode 100644 index 000000000..384f31ad0 --- /dev/null +++ b/crates/bifrost/src/providers/replicated_loglet/record_cache.rs @@ -0,0 +1,25 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +/// A placeholder for a global record cache. +/// +/// This can be safely shared between all ReplicatedLoglet(s) and the LocalSequencers or the +/// RemoteSequencers +/// +/// +/// This is a streaming LRU-cache with total memory budget tracking and enforcement. +#[derive(Clone, Debug)] +pub struct RecordCache {} + +impl RecordCache { + pub fn new(_memory_budget_bytes: usize) -> Self { + Self {} + } +} diff --git a/crates/bifrost/src/providers/replicated_loglet/rpc_routers.rs b/crates/bifrost/src/providers/replicated_loglet/rpc_routers.rs index b0bacbf24..23f09ba60 100644 --- a/crates/bifrost/src/providers/replicated_loglet/rpc_routers.rs +++ b/crates/bifrost/src/providers/replicated_loglet/rpc_routers.rs @@ -14,6 +14,7 @@ use restate_core::network::rpc_router::RpcRouter; use restate_core::network::MessageRouterBuilder; use restate_types::net::log_server::{GetLogletInfo, GetRecords, Release, Seal, Store, Trim}; +use restate_types::net::replicated_loglet::Append; /// Used by replicated loglets to send requests and receive responses from log-servers /// Cloning this is cheap and all clones will share the same internal trackers. @@ -48,3 +49,21 @@ impl LogServersRpc { } } } + +/// Used by replicated loglets to send requests and receive responses from sequencers (other nodes +/// running replicated loglets) +/// Cloning this is cheap and all clones will share the same internal trackers. +#[derive(Clone)] +pub struct SequencersRpc { + pub append: RpcRouter, +} + +impl SequencersRpc { + /// Registers all routers into the supplied message router. This ensures that + /// responses are routed correctly. + pub fn new(router_builder: &mut MessageRouterBuilder) -> Self { + let append = RpcRouter::new(router_builder); + + Self { append } + } +} diff --git a/crates/types/src/net/replicated_loglet.rs b/crates/types/src/net/replicated_loglet.rs index f4d84ece5..3006ec336 100644 --- a/crates/types/src/net/replicated_loglet.rs +++ b/crates/types/src/net/replicated_loglet.rs @@ -16,7 +16,8 @@ use serde::{Deserialize, Serialize}; use super::log_server::Status; use super::TargetName; -use crate::logs::{LogletOffset, Record, SequenceNumber, TailState}; +use crate::logs::metadata::SegmentIndex; +use crate::logs::{LogId, LogletOffset, Record, SequenceNumber, TailState}; use crate::net::define_rpc; use crate::replicated_loglet::ReplicatedLogletId; @@ -28,6 +29,16 @@ define_rpc! { @response_target = TargetName::ReplicatedLogletAppended, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CommonRequestHeader { + /// This is used only to locate the loglet params if this operation activates + /// the remote loglet + pub log_id: LogId, + pub segment_index: SegmentIndex, + /// The loglet_id id globally unique + pub loglet_id: ReplicatedLogletId, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CommonResponseHeader { pub known_global_tail: Option, @@ -56,7 +67,8 @@ impl CommonResponseHeader { // ** APPEND #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Append { - pub loglet_id: ReplicatedLogletId, + #[serde(flatten)] + pub header: CommonRequestHeader, pub payloads: Vec, }