Skip to content

Commit

Permalink
[Bifrost] Replicated loglet provider basic wiring
Browse files Browse the repository at this point in the history
This PR makes some progress in wiring up a real replicated loglet factory. Note that the `ReplicatedLoglet` is a stub and its inputs are not correct yet.
  • Loading branch information
AhmedSoliman committed Sep 20, 2024
1 parent 06e0e04 commit d37408a
Show file tree
Hide file tree
Showing 10 changed files with 202 additions and 26 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ restate-metadata-store = { workspace = true }
anyhow = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
dashmap = { workspace = true }
derive_more = { workspace = true }
enum-map = { workspace = true, features = ["serde"] }
futures = { workspace = true }
Expand All @@ -31,6 +32,7 @@ pin-project = { workspace = true }
rand = { workspace = true }
rocksdb = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
smallvec = { workspace = true }
static_assertions = { workspace = true }
thiserror = { workspace = true }
Expand Down
1 change: 0 additions & 1 deletion crates/bifrost/src/providers/local_loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,6 @@ mod tests {

let loglet = Arc::new(LocalLoglet::create(
params
.as_str()
.parse()
.expect("loglet params can be converted into u64"),
log_store,
Expand Down
1 change: 0 additions & 1 deletion crates/bifrost/src/providers/local_loglet/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ impl LogletProvider for LocalLogletProvider {
// NOTE: local-loglet expects params to be a `u64` string-encoded unique identifier under the hood.
let loglet = LocalLoglet::create(
params
.as_str()
.parse()
.expect("loglet params can be converted into u64"),
self.log_store.clone(),
Expand Down
53 changes: 53 additions & 0 deletions crates/bifrost/src/providers/replicated_loglet/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (c) 2024-2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::sync::Arc;

use restate_core::ShutdownError;
use restate_types::errors::MaybeRetryableError;
use restate_types::logs::metadata::SegmentIndex;
use restate_types::logs::LogId;

use crate::loglet::OperationError;

#[derive(Debug, thiserror::Error)]
pub(crate) enum ReplicatedLogletError {
#[error("cannot parse loglet configuration for log_id={0} at segment_index={1}: {2}")]
LogletParamsParsingError(LogId, SegmentIndex, serde_json::Error),
#[error(transparent)]
Shutdown(#[from] ShutdownError),
}

impl MaybeRetryableError for ReplicatedLogletError {
fn retryable(&self) -> bool {
match self {
Self::LogletParamsParsingError(..) => false,
Self::Shutdown(_) => false,
}
}
}

impl From<ReplicatedLogletError> for OperationError {
fn from(value: ReplicatedLogletError) -> Self {
match value {
ReplicatedLogletError::Shutdown(e) => OperationError::Shutdown(e),
e => OperationError::Other(Arc::new(e)),
}
}
}

impl From<ReplicatedLogletError> for crate::Error {
fn from(value: ReplicatedLogletError) -> Self {
match value {
ReplicatedLogletError::Shutdown(e) => crate::Error::Shutdown(e),
e => crate::Error::LogletError(Arc::new(e)),
}
}
}
71 changes: 71 additions & 0 deletions crates/bifrost/src/providers/replicated_loglet/loglet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::sync::Arc;

use async_trait::async_trait;
use futures::stream::BoxStream;

use restate_core::ShutdownError;
use restate_types::logs::{KeyFilter, LogletOffset, Record, TailState};
use restate_types::replicated_loglet::ReplicatedLogletParams;

use crate::loglet::{Loglet, LogletCommit, OperationError, SendableLogletReadStream};

#[derive(derive_more::Debug)]
pub(super) struct ReplicatedLoglet {
_my_params: ReplicatedLogletParams,
}

impl ReplicatedLoglet {
pub fn new(my_params: ReplicatedLogletParams) -> Self {
Self {
_my_params: my_params,
}
}
}

#[async_trait]
impl Loglet for ReplicatedLoglet {
async fn create_read_stream(
self: Arc<Self>,
_filter: KeyFilter,
_from: LogletOffset,
_to: Option<LogletOffset>,
) -> Result<SendableLogletReadStream, OperationError> {
todo!()
}

fn watch_tail(&self) -> BoxStream<'static, TailState<LogletOffset>> {
todo!()
}

async fn enqueue_batch(&self, _payloads: Arc<[Record]>) -> Result<LogletCommit, ShutdownError> {
todo!()
}

async fn find_tail(&self) -> Result<TailState<LogletOffset>, OperationError> {
todo!()
}

async fn get_trim_point(&self) -> Result<Option<LogletOffset>, OperationError> {
todo!()
}

/// Trim the log to the minimum of new_trim_point and last_committed_offset
/// new_trim_point is inclusive (will be trimmed)
async fn trim(&self, _new_trim_point: LogletOffset) -> Result<(), OperationError> {
todo!()
}

async fn seal(&self) -> Result<(), OperationError> {
todo!()
}
}
2 changes: 2 additions & 0 deletions crates/bifrost/src/providers/replicated_loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

mod error;
mod loglet;
pub(crate) mod metric_definitions;
mod provider;
pub mod replication;
Expand Down
80 changes: 68 additions & 12 deletions crates/bifrost/src/providers/replicated_loglet/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,28 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

// todo: remove when fleshed out
#![allow(unused)]

use std::sync::Arc;

use async_trait::async_trait;
use dashmap::DashMap;

use restate_core::network::rpc_router::RpcRouter;
use restate_core::network::{MessageRouterBuilder, Networking};
use restate_core::Metadata;
use restate_core::{Metadata, TaskCenter};
use restate_metadata_store::MetadataStoreClient;
use restate_types::config::ReplicatedLogletOptions;
use restate_types::live::BoxedLiveLoad;
use restate_types::logs::metadata::{LogletParams, ProviderKind, SegmentIndex};
use restate_types::logs::LogId;
use restate_types::replicated_loglet::ReplicatedLogletParams;

use super::loglet::ReplicatedLoglet;
use super::metric_definitions;
use crate::loglet::{Loglet, LogletProvider, LogletProviderFactory, OperationError};
use crate::providers::replicated_loglet::error::ReplicatedLogletError;
use crate::Error;

pub struct Factory {
task_center: TaskCenter,
opts: BoxedLiveLoad<ReplicatedLogletOptions>,
metadata: Metadata,
metadata_store_client: MetadataStoreClient,
Expand All @@ -37,13 +38,18 @@ pub struct Factory {

impl Factory {
pub fn new(
task_center: TaskCenter,
opts: BoxedLiveLoad<ReplicatedLogletOptions>,
metadata_store_client: MetadataStoreClient,
metadata: Metadata,
networking: Networking,
_router_builder: &mut MessageRouterBuilder,
) -> Self {
// todo(asoli):
// - Create the shared RpcRouter(s)
// - A Handler to answer to control plane monitoring questions
Self {
task_center,
opts,
metadata,
metadata_store_client,
Expand All @@ -60,22 +66,72 @@ impl LogletProviderFactory for Factory {

async fn create(self: Box<Self>) -> Result<Arc<dyn LogletProvider>, OperationError> {
metric_definitions::describe_metrics();
Ok(Arc::new(ReplicatedLogletProvider))
Ok(Arc::new(ReplicatedLogletProvider::new(
self.task_center,
self.opts,
self.metadata,
self.metadata_store_client,
self.networking,
)))
}
}

struct ReplicatedLogletProvider;
struct ReplicatedLogletProvider {
active_loglets: DashMap<(LogId, SegmentIndex), Arc<ReplicatedLoglet>>,
_task_center: TaskCenter,
_opts: BoxedLiveLoad<ReplicatedLogletOptions>,
_metadata: Metadata,
_metadata_store_client: MetadataStoreClient,
_networking: Networking,
}

impl ReplicatedLogletProvider {
fn new(
task_center: TaskCenter,
opts: BoxedLiveLoad<ReplicatedLogletOptions>,
metadata: Metadata,
metadata_store_client: MetadataStoreClient,
networking: Networking,
) -> Self {
// todo(asoli): create all global state here that'll be shared across loglet instances
// - RecordCache.
// - NodeState map.
Self {
active_loglets: Default::default(),
_task_center: task_center,
_opts: opts,
_metadata: metadata,
_metadata_store_client: metadata_store_client,
_networking: networking,
}
}
}

#[async_trait]
impl LogletProvider for ReplicatedLogletProvider {
async fn get_loglet(
&self,
// todo: we need richer params
_log_id: LogId,
_segment_index: SegmentIndex,
_params: &LogletParams,
log_id: LogId,
segment_index: SegmentIndex,
params: &LogletParams,
) -> Result<Arc<dyn Loglet>, Error> {
todo!("Not implemented yet")
let loglet = match self.active_loglets.entry((log_id, segment_index)) {
dashmap::Entry::Vacant(entry) => {
// NOTE: replicated-loglet expects params to be a `json` string.
let params =
ReplicatedLogletParams::deserialize_from(params.as_bytes()).map_err(|e| {
ReplicatedLogletError::LogletParamsParsingError(log_id, segment_index, e)
})?;

// Create loglet
let loglet = ReplicatedLoglet::new(params);
let key_value = entry.insert(Arc::new(loglet));
Arc::clone(key_value.value())
}
dashmap::Entry::Occupied(entry) => entry.get().clone(),
};

Ok(loglet as Arc<dyn Loglet>)
}

async fn shutdown(&self) -> Result<(), OperationError> {
Expand Down
1 change: 1 addition & 0 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ impl Node {
// replicated-loglet
#[cfg(feature = "replicated-loglet")]
let replicated_loglet_factory = restate_bifrost::providers::replicated_loglet::Factory::new(
tc.clone(),
updateable_config
.clone()
.map(|c| &c.bifrost.replicated_loglet)
Expand Down
15 changes: 3 additions & 12 deletions crates/types/src/logs/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

use std::collections::{BTreeMap, HashMap, HashSet};

use bytes::Bytes;
use bytestring::ByteString;
use enum_map::Enum;
use rand::RngCore;
Expand Down Expand Up @@ -108,7 +107,9 @@ pub struct LogletConfig {
/// for a loglet kind to construct a configured loglet instance modulo the log-id
/// and start-lsn. It's provided by bifrost on loglet creation. This allows the
/// parameters to be shared between segments and logs if needed.
#[derive(Debug, Clone, Hash, Eq, PartialEq, derive_more::From, Serialize, Deserialize)]
#[derive(
Debug, Clone, Hash, Eq, PartialEq, derive_more::From, derive_more::Deref, Serialize, Deserialize,
)]
pub struct LogletParams(ByteString);

impl From<String> for LogletParams {
Expand Down Expand Up @@ -167,16 +168,6 @@ impl LogletConfig {
}
}

impl LogletParams {
pub fn as_str(&self) -> &str {
&self.0
}

pub fn as_bytes(&self) -> &Bytes {
self.0.as_bytes()
}
}

impl Logs {
pub fn new(version: Version, logs: HashMap<LogId, Chain>) -> Self {
Self { version, logs }
Expand Down

0 comments on commit d37408a

Please sign in to comment.