Skip to content

Commit

Permalink
[Bifrost] More Replicated Loglet Scaffolding (#1986)
Browse files Browse the repository at this point in the history
This introduces a few more bits that happen during loglet instantiation.
  • Loading branch information
AhmedSoliman authored Sep 25, 2024
1 parent d9407b9 commit cc40ed9
Show file tree
Hide file tree
Showing 7 changed files with 242 additions and 17 deletions.
67 changes: 62 additions & 5 deletions crates/bifrost/src/providers/replicated_loglet/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,47 +15,101 @@ use std::sync::Arc;

use async_trait::async_trait;
use futures::stream::BoxStream;
use restate_types::logs::metadata::SegmentIndex;
use tracing::debug;

use restate_core::network::{Networking, TransportConnect};
use restate_core::{Metadata, ShutdownError, TaskCenter};
use restate_types::logs::{KeyFilter, LogletOffset, Record, TailState};
use restate_types::logs::{KeyFilter, LogId, LogletOffset, Record, SequenceNumber, TailState};
use restate_types::replicated_loglet::ReplicatedLogletParams;

use crate::loglet::util::TailOffsetWatch;
use crate::loglet::{Loglet, LogletCommit, OperationError, SendableLogletReadStream};

use super::rpc_routers::LogServersRpc;
use super::record_cache::RecordCache;
use super::rpc_routers::{LogServersRpc, SequencersRpc};

#[derive(derive_more::Debug)]
pub(super) struct ReplicatedLoglet<T> {
/// This is used only to populate header of outgoing request to a remotely owned sequencer.
/// Otherwise, it's unused.
log_id: LogId,
/// This is used only to populate header of outgoing request to a remotely owned sequencer.
/// Otherwise, it's unused.
segment_index: SegmentIndex,
my_params: ReplicatedLogletParams,
#[debug(skip)]
task_center: TaskCenter,
#[debug(skip)]
metadata: Metadata,
#[debug(skip)]
networking: Networking<T>,
#[debug(skip)]
logservers_rpc: LogServersRpc,
#[debug(skip)]
networking: Networking<T>,
record_cache: RecordCache,
/// A shared watch for the last known global tail of the loglet.
/// Note that this comes with a few caveats:
/// - On startup, this defaults to `Open(OLDEST)`
/// - find_tail() should use this value iff we have a local sequencer for all other cases, we
/// should run a proper tail search.
known_global_tail: TailOffsetWatch,
sequencer: SequencerAccess,
}

impl<T: TransportConnect> ReplicatedLoglet<T> {
pub fn new(
pub fn start(
log_id: LogId,
segment_index: SegmentIndex,
my_params: ReplicatedLogletParams,
task_center: TaskCenter,
metadata: Metadata,
networking: Networking<T>,
logservers_rpc: LogServersRpc,
sequencers_rpc: &SequencersRpc,
record_cache: RecordCache,
) -> Self {
let sequencer = if metadata.my_node_id() == my_params.sequencer {
debug!(
loglet_id = %my_params.loglet_id,
"We are the sequencer node for this loglet"
);
SequencerAccess::Local {
// create the sequencer and store the handle
}
} else {
SequencerAccess::Remote {
sequencers_rpc: sequencers_rpc.clone(),
}
};
Self {
log_id,
segment_index,
my_params,
task_center,
metadata,
networking,
logservers_rpc,
record_cache,
known_global_tail: TailOffsetWatch::new(TailState::Open(LogletOffset::OLDEST)),
sequencer,
}
}
}

// todo(asoli): This will hold a handle to access the local sequencer, or a swappable handle if
// it's a remote sequencer.
#[derive(derive_more::Debug, derive_more::IsVariant)]
pub enum SequencerAccess {
/// The sequencer is remote (or retired/preempted)
#[debug("Remote")]
Remote { sequencers_rpc: SequencersRpc },
/// We are the loglet leaders
#[debug("Local")]
// todo (add handle)
Local {},
}

#[async_trait]
impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
async fn create_read_stream(
Expand All @@ -68,7 +122,10 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
}

fn watch_tail(&self) -> BoxStream<'static, TailState<LogletOffset>> {
todo!()
// It's acceptable for watch_tail to return an outdated value in the beginning,
// but if the loglet is unsealed, we need to ensure that we have a mechanism to update
// this value if we don't have a local sequencer.
Box::pin(self.known_global_tail.to_stream())
}

async fn enqueue_batch(&self, _payloads: Arc<[Record]>) -> Result<LogletCommit, ShutdownError> {
Expand Down
2 changes: 2 additions & 0 deletions crates/bifrost/src/providers/replicated_loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
mod error;
mod loglet;
pub(crate) mod metric_definitions;
mod network;
mod provider;
mod record_cache;
pub mod replication;
mod rpc_routers;
#[allow(dead_code)]
Expand Down
65 changes: 65 additions & 0 deletions crates/bifrost/src/providers/replicated_loglet/network.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

// todo(asoli): remove when fleshed out
#![allow(dead_code)]

use std::pin::Pin;
use std::sync::Arc;

use futures::Stream;

use restate_core::network::{Incoming, MessageRouterBuilder, TransportConnect};
use restate_core::{cancellation_watcher, Metadata};
use restate_types::config::ReplicatedLogletOptions;
use restate_types::net::replicated_loglet::Append;
use tracing::trace;

use super::provider::ReplicatedLogletProvider;

type MessageStream<T> = Pin<Box<dyn Stream<Item = Incoming<T>> + Send + Sync + 'static>>;

pub struct RequestPump {
metadata: Metadata,
append_stream: MessageStream<Append>,
}

impl RequestPump {
pub fn new(
_opts: &ReplicatedLogletOptions,
metadata: Metadata,
router_builder: &mut MessageRouterBuilder,
) -> Self {
// todo(asoli) read from opts
let queue_length = 10;
let append_stream = router_builder.subscribe_to_stream(queue_length);
Self {
metadata,
append_stream,
}
}

/// Must run in task-center context
pub async fn run<T: TransportConnect>(
self,
_provider: Arc<ReplicatedLogletProvider<T>>,
) -> anyhow::Result<()> {
trace!("Starting replicated loglet request pump");
let mut cancel = std::pin::pin!(cancellation_watcher());
loop {
tokio::select! {
_ = &mut cancel => {
break;
}
}
}
Ok(())
}
}
65 changes: 55 additions & 10 deletions crates/bifrost/src/providers/replicated_loglet/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ use std::sync::Arc;

use async_trait::async_trait;
use dashmap::DashMap;
use tracing::trace;

use restate_core::network::{MessageRouterBuilder, Networking, TransportConnect};
use restate_core::{Metadata, TaskCenter};
use restate_core::{task_center, Metadata, TaskCenter, TaskKind};
use restate_metadata_store::MetadataStoreClient;
use restate_types::config::ReplicatedLogletOptions;
use restate_types::live::BoxedLiveLoad;
Expand All @@ -24,7 +25,9 @@ use restate_types::replicated_loglet::ReplicatedLogletParams;

use super::loglet::ReplicatedLoglet;
use super::metric_definitions;
use super::rpc_routers::LogServersRpc;
use super::network::RequestPump;
use super::record_cache::RecordCache;
use super::rpc_routers::{LogServersRpc, SequencersRpc};
use crate::loglet::{Loglet, LogletProvider, LogletProviderFactory, OperationError};
use crate::providers::replicated_loglet::error::ReplicatedLogletError;
use crate::Error;
Expand All @@ -36,18 +39,24 @@ pub struct Factory<T> {
metadata_store_client: MetadataStoreClient,
networking: Networking<T>,
logserver_rpc_routers: LogServersRpc,
sequencer_rpc_routers: SequencersRpc,
request_pump: RequestPump,
}

impl<T: TransportConnect> Factory<T> {
pub fn new(
task_center: TaskCenter,
opts: BoxedLiveLoad<ReplicatedLogletOptions>,
mut opts: BoxedLiveLoad<ReplicatedLogletOptions>,
metadata_store_client: MetadataStoreClient,
metadata: Metadata,
networking: Networking<T>,
router_builder: &mut MessageRouterBuilder,
) -> Self {
// Handling Sequencer(s) incoming requests
let request_pump = RequestPump::new(opts.live_load(), metadata.clone(), router_builder);

let logserver_rpc_routers = LogServersRpc::new(router_builder);
let sequencer_rpc_routers = SequencersRpc::new(router_builder);
// todo(asoli): Create a handler to answer to control plane monitoring questions
Self {
task_center,
Expand All @@ -56,6 +65,8 @@ impl<T: TransportConnect> Factory<T> {
metadata_store_client,
networking,
logserver_rpc_routers,
sequencer_rpc_routers,
request_pump,
}
}
}
Expand All @@ -68,47 +79,67 @@ impl<T: TransportConnect> LogletProviderFactory for Factory<T> {

async fn create(self: Box<Self>) -> Result<Arc<dyn LogletProvider>, OperationError> {
metric_definitions::describe_metrics();
Ok(Arc::new(ReplicatedLogletProvider::new(
let provider = Arc::new(ReplicatedLogletProvider::new(
self.task_center,
self.opts,
self.metadata,
self.metadata_store_client,
self.networking,
self.logserver_rpc_routers,
)))
self.sequencer_rpc_routers,
));
// run the request pump. The request pump handles/routes incoming messages to our
// locally hosted sequencers.
task_center().spawn(
TaskKind::NetworkMessageHandler,
"sequencers-ingress",
None,
{
let request_pump = self.request_pump;
let provider = provider.clone();
async { request_pump.run(provider).await }
},
)?;

Ok(provider)
}
}

struct ReplicatedLogletProvider<T> {
pub(super) struct ReplicatedLogletProvider<T> {
active_loglets: DashMap<(LogId, SegmentIndex), Arc<ReplicatedLoglet<T>>>,
task_center: TaskCenter,
_opts: BoxedLiveLoad<ReplicatedLogletOptions>,
metadata: Metadata,
_metadata_store_client: MetadataStoreClient,
networking: Networking<T>,
record_cache: RecordCache,
logserver_rpc_routers: LogServersRpc,
sequencer_rpc_routers: SequencersRpc,
}

impl<T: TransportConnect> ReplicatedLogletProvider<T> {
fn new(
task_center: TaskCenter,
opts: BoxedLiveLoad<ReplicatedLogletOptions>,
_opts: BoxedLiveLoad<ReplicatedLogletOptions>,
metadata: Metadata,
metadata_store_client: MetadataStoreClient,
networking: Networking<T>,
logserver_rpc_routers: LogServersRpc,
sequencer_rpc_routers: SequencersRpc,
) -> Self {
// todo(asoli): create all global state here that'll be shared across loglet instances
// - RecordCache.
// - NodeState map.
Self {
active_loglets: Default::default(),
task_center,
_opts: opts,
_opts,
metadata,
_metadata_store_client: metadata_store_client,
networking,
// todo(asoli): read memory budget from ReplicatedLogletOptions
record_cache: RecordCache::new(20_000_000), // 20MB
logserver_rpc_routers,
sequencer_rpc_routers,
}
}
}
Expand All @@ -129,13 +160,27 @@ impl<T: TransportConnect> LogletProvider for ReplicatedLogletProvider<T> {
ReplicatedLogletError::LogletParamsParsingError(log_id, segment_index, e)
})?;

trace!(
log_id = %log_id,
segment_index = %segment_index,
loglet_id = %params.loglet_id,
nodeset = ?params.nodeset,
sequencer = %params.sequencer,
replication = ?params.replication,
"Creating a replicated loglet client"
);

// Create loglet
let loglet = ReplicatedLoglet::new(
let loglet = ReplicatedLoglet::start(
log_id,
segment_index,
params,
self.task_center.clone(),
self.metadata.clone(),
self.networking.clone(),
self.logserver_rpc_routers.clone(),
&self.sequencer_rpc_routers,
self.record_cache.clone(),
);
let key_value = entry.insert(Arc::new(loglet));
Arc::clone(key_value.value())
Expand Down
25 changes: 25 additions & 0 deletions crates/bifrost/src/providers/replicated_loglet/record_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

/// A placeholder for a global record cache.
///
/// This can be safely shared between all ReplicatedLoglet(s) and the LocalSequencers or the
/// RemoteSequencers
///
///
/// This is a streaming LRU-cache with total memory budget tracking and enforcement.
#[derive(Clone, Debug)]
pub struct RecordCache {}

impl RecordCache {
pub fn new(_memory_budget_bytes: usize) -> Self {
Self {}
}
}
Loading

0 comments on commit cc40ed9

Please sign in to comment.