diff --git a/Cargo.lock b/Cargo.lock index d75323a1e..fadf7408d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5493,6 +5493,7 @@ dependencies = [ "async-trait", "bytes", "criterion", + "dashmap 6.0.1", "derive_more", "enum-map", "enumset", @@ -5511,6 +5512,7 @@ dependencies = [ "rlimit", "rust-rocksdb", "serde", + "serde_json", "smallvec", "static_assertions", "tempfile", diff --git a/crates/bifrost/Cargo.toml b/crates/bifrost/Cargo.toml index 65b0b4143..cc28df2d5 100644 --- a/crates/bifrost/Cargo.toml +++ b/crates/bifrost/Cargo.toml @@ -22,6 +22,7 @@ restate-metadata-store = { workspace = true } anyhow = { workspace = true } async-trait = { workspace = true } bytes = { workspace = true } +dashmap = { workspace = true } derive_more = { workspace = true } enum-map = { workspace = true, features = ["serde"] } futures = { workspace = true } @@ -31,6 +32,7 @@ pin-project = { workspace = true } rand = { workspace = true } rocksdb = { workspace = true } serde = { workspace = true } +serde_json = { workspace = true } smallvec = { workspace = true } static_assertions = { workspace = true } thiserror = { workspace = true } diff --git a/crates/bifrost/src/providers/local_loglet/mod.rs b/crates/bifrost/src/providers/local_loglet/mod.rs index 2ce78cdd3..a1d1b20cc 100644 --- a/crates/bifrost/src/providers/local_loglet/mod.rs +++ b/crates/bifrost/src/providers/local_loglet/mod.rs @@ -324,7 +324,6 @@ mod tests { let loglet = Arc::new(LocalLoglet::create( params - .as_str() .parse() .expect("loglet params can be converted into u64"), log_store, diff --git a/crates/bifrost/src/providers/local_loglet/provider.rs b/crates/bifrost/src/providers/local_loglet/provider.rs index b4fedd4c7..53bc92dab 100644 --- a/crates/bifrost/src/providers/local_loglet/provider.rs +++ b/crates/bifrost/src/providers/local_loglet/provider.rs @@ -91,7 +91,6 @@ impl LogletProvider for LocalLogletProvider { // NOTE: local-loglet expects params to be a `u64` string-encoded unique identifier under the hood. let loglet = LocalLoglet::create( params - .as_str() .parse() .expect("loglet params can be converted into u64"), self.log_store.clone(), diff --git a/crates/bifrost/src/providers/replicated_loglet/error.rs b/crates/bifrost/src/providers/replicated_loglet/error.rs new file mode 100644 index 000000000..22f1ea545 --- /dev/null +++ b/crates/bifrost/src/providers/replicated_loglet/error.rs @@ -0,0 +1,53 @@ +// Copyright (c) 2024-2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::sync::Arc; + +use restate_core::ShutdownError; +use restate_types::errors::MaybeRetryableError; +use restate_types::logs::metadata::SegmentIndex; +use restate_types::logs::LogId; + +use crate::loglet::OperationError; + +#[derive(Debug, thiserror::Error)] +pub(crate) enum ReplicatedLogletError { + #[error("cannot parse loglet configuration for log_id={0} at segment_index={1}: {2}")] + LogletParamsParsingError(LogId, SegmentIndex, serde_json::Error), + #[error(transparent)] + Shutdown(#[from] ShutdownError), +} + +impl MaybeRetryableError for ReplicatedLogletError { + fn retryable(&self) -> bool { + match self { + Self::LogletParamsParsingError(..) => false, + Self::Shutdown(_) => false, + } + } +} + +impl From for OperationError { + fn from(value: ReplicatedLogletError) -> Self { + match value { + ReplicatedLogletError::Shutdown(e) => OperationError::Shutdown(e), + e => OperationError::Other(Arc::new(e)), + } + } +} + +impl From for crate::Error { + fn from(value: ReplicatedLogletError) -> Self { + match value { + ReplicatedLogletError::Shutdown(e) => crate::Error::Shutdown(e), + e => crate::Error::LogletError(Arc::new(e)), + } + } +} diff --git a/crates/bifrost/src/providers/replicated_loglet/loglet.rs b/crates/bifrost/src/providers/replicated_loglet/loglet.rs new file mode 100644 index 000000000..95ff4af84 --- /dev/null +++ b/crates/bifrost/src/providers/replicated_loglet/loglet.rs @@ -0,0 +1,71 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::sync::Arc; + +use async_trait::async_trait; +use futures::stream::BoxStream; + +use restate_core::ShutdownError; +use restate_types::logs::{KeyFilter, LogletOffset, Record, TailState}; +use restate_types::replicated_loglet::ReplicatedLogletParams; + +use crate::loglet::{Loglet, LogletCommit, OperationError, SendableLogletReadStream}; + +#[derive(derive_more::Debug)] +pub(super) struct ReplicatedLoglet { + _my_params: ReplicatedLogletParams, +} + +impl ReplicatedLoglet { + pub fn new(my_params: ReplicatedLogletParams) -> Self { + Self { + _my_params: my_params, + } + } +} + +#[async_trait] +impl Loglet for ReplicatedLoglet { + async fn create_read_stream( + self: Arc, + _filter: KeyFilter, + _from: LogletOffset, + _to: Option, + ) -> Result { + todo!() + } + + fn watch_tail(&self) -> BoxStream<'static, TailState> { + todo!() + } + + async fn enqueue_batch(&self, _payloads: Arc<[Record]>) -> Result { + todo!() + } + + async fn find_tail(&self) -> Result, OperationError> { + todo!() + } + + async fn get_trim_point(&self) -> Result, OperationError> { + todo!() + } + + /// Trim the log to the minimum of new_trim_point and last_committed_offset + /// new_trim_point is inclusive (will be trimmed) + async fn trim(&self, _new_trim_point: LogletOffset) -> Result<(), OperationError> { + todo!() + } + + async fn seal(&self) -> Result<(), OperationError> { + todo!() + } +} diff --git a/crates/bifrost/src/providers/replicated_loglet/mod.rs b/crates/bifrost/src/providers/replicated_loglet/mod.rs index c4e599060..bd725734c 100644 --- a/crates/bifrost/src/providers/replicated_loglet/mod.rs +++ b/crates/bifrost/src/providers/replicated_loglet/mod.rs @@ -8,6 +8,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +mod error; +mod loglet; pub(crate) mod metric_definitions; mod provider; pub mod replication; diff --git a/crates/bifrost/src/providers/replicated_loglet/provider.rs b/crates/bifrost/src/providers/replicated_loglet/provider.rs index ece0c984b..c2cdb9ca5 100644 --- a/crates/bifrost/src/providers/replicated_loglet/provider.rs +++ b/crates/bifrost/src/providers/replicated_loglet/provider.rs @@ -8,27 +8,28 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -// todo: remove when fleshed out -#![allow(unused)] - use std::sync::Arc; use async_trait::async_trait; +use dashmap::DashMap; -use restate_core::network::rpc_router::RpcRouter; use restate_core::network::{MessageRouterBuilder, Networking}; -use restate_core::Metadata; +use restate_core::{Metadata, TaskCenter}; use restate_metadata_store::MetadataStoreClient; use restate_types::config::ReplicatedLogletOptions; use restate_types::live::BoxedLiveLoad; use restate_types::logs::metadata::{LogletParams, ProviderKind, SegmentIndex}; use restate_types::logs::LogId; +use restate_types::replicated_loglet::ReplicatedLogletParams; +use super::loglet::ReplicatedLoglet; use super::metric_definitions; use crate::loglet::{Loglet, LogletProvider, LogletProviderFactory, OperationError}; +use crate::providers::replicated_loglet::error::ReplicatedLogletError; use crate::Error; pub struct Factory { + task_center: TaskCenter, opts: BoxedLiveLoad, metadata: Metadata, metadata_store_client: MetadataStoreClient, @@ -37,13 +38,18 @@ pub struct Factory { impl Factory { pub fn new( + task_center: TaskCenter, opts: BoxedLiveLoad, metadata_store_client: MetadataStoreClient, metadata: Metadata, networking: Networking, _router_builder: &mut MessageRouterBuilder, ) -> Self { + // todo(asoli): + // - Create the shared RpcRouter(s) + // - A Handler to answer to control plane monitoring questions Self { + task_center, opts, metadata, metadata_store_client, @@ -60,22 +66,72 @@ impl LogletProviderFactory for Factory { async fn create(self: Box) -> Result, OperationError> { metric_definitions::describe_metrics(); - Ok(Arc::new(ReplicatedLogletProvider)) + Ok(Arc::new(ReplicatedLogletProvider::new( + self.task_center, + self.opts, + self.metadata, + self.metadata_store_client, + self.networking, + ))) } } -struct ReplicatedLogletProvider; +struct ReplicatedLogletProvider { + active_loglets: DashMap<(LogId, SegmentIndex), Arc>, + _task_center: TaskCenter, + _opts: BoxedLiveLoad, + _metadata: Metadata, + _metadata_store_client: MetadataStoreClient, + _networking: Networking, +} + +impl ReplicatedLogletProvider { + fn new( + task_center: TaskCenter, + opts: BoxedLiveLoad, + metadata: Metadata, + metadata_store_client: MetadataStoreClient, + networking: Networking, + ) -> Self { + // todo(asoli): create all global state here that'll be shared across loglet instances + // - RecordCache. + // - NodeState map. + Self { + active_loglets: Default::default(), + _task_center: task_center, + _opts: opts, + _metadata: metadata, + _metadata_store_client: metadata_store_client, + _networking: networking, + } + } +} #[async_trait] impl LogletProvider for ReplicatedLogletProvider { async fn get_loglet( &self, - // todo: we need richer params - _log_id: LogId, - _segment_index: SegmentIndex, - _params: &LogletParams, + log_id: LogId, + segment_index: SegmentIndex, + params: &LogletParams, ) -> Result, Error> { - todo!("Not implemented yet") + let loglet = match self.active_loglets.entry((log_id, segment_index)) { + dashmap::Entry::Vacant(entry) => { + // NOTE: replicated-loglet expects params to be a `json` string. + let params = + ReplicatedLogletParams::deserialize_from(params.as_bytes()).map_err(|e| { + ReplicatedLogletError::LogletParamsParsingError(log_id, segment_index, e) + })?; + + // Create loglet + let loglet = ReplicatedLoglet::new(params); + let key_value = entry.insert(Arc::new(loglet)); + Arc::clone(key_value.value()) + } + dashmap::Entry::Occupied(entry) => entry.get().clone(), + }; + + Ok(loglet as Arc) } async fn shutdown(&self) -> Result<(), OperationError> { diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index aa371383b..61a1966e7 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -168,6 +168,7 @@ impl Node { // replicated-loglet #[cfg(feature = "replicated-loglet")] let replicated_loglet_factory = restate_bifrost::providers::replicated_loglet::Factory::new( + tc.clone(), updateable_config .clone() .map(|c| &c.bifrost.replicated_loglet) diff --git a/crates/types/src/logs/metadata.rs b/crates/types/src/logs/metadata.rs index 81d5b02ba..4d179c6cd 100644 --- a/crates/types/src/logs/metadata.rs +++ b/crates/types/src/logs/metadata.rs @@ -10,7 +10,6 @@ use std::collections::{BTreeMap, HashMap, HashSet}; -use bytes::Bytes; use bytestring::ByteString; use enum_map::Enum; use rand::RngCore; @@ -108,7 +107,9 @@ pub struct LogletConfig { /// for a loglet kind to construct a configured loglet instance modulo the log-id /// and start-lsn. It's provided by bifrost on loglet creation. This allows the /// parameters to be shared between segments and logs if needed. -#[derive(Debug, Clone, Hash, Eq, PartialEq, derive_more::From, Serialize, Deserialize)] +#[derive( + Debug, Clone, Hash, Eq, PartialEq, derive_more::From, derive_more::Deref, Serialize, Deserialize, +)] pub struct LogletParams(ByteString); impl From for LogletParams { @@ -167,16 +168,6 @@ impl LogletConfig { } } -impl LogletParams { - pub fn as_str(&self) -> &str { - &self.0 - } - - pub fn as_bytes(&self) -> &Bytes { - self.0.as_bytes() - } -} - impl Logs { pub fn new(version: Version, logs: HashMap) -> Self { Self { version, logs }