diff --git a/Cargo.lock b/Cargo.lock index 436138462..128283d19 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6080,6 +6080,7 @@ dependencies = [ "enumset", "futures", "googletest", + "itertools 0.13.0", "metrics", "parking_lot", "paste", diff --git a/crates/admin/src/cluster_controller/logs_controller.rs b/crates/admin/src/cluster_controller/logs_controller.rs index 0a74a7078..92d4f494f 100644 --- a/crates/admin/src/cluster_controller/logs_controller.rs +++ b/crates/admin/src/cluster_controller/logs_controller.rs @@ -10,32 +10,30 @@ mod nodeset_selection; -use futures::never::Never; -use rand::prelude::IteratorRandom; -use rand::thread_rng; use std::collections::HashMap; use std::iter; use std::ops::Deref; use std::sync::Arc; use std::time::Duration; + +use futures::never::Never; +use rand::prelude::IteratorRandom; +use rand::thread_rng; use tokio::sync::Semaphore; use tokio::task::JoinSet; use tracing::{debug, error, trace, trace_span, Instrument}; use xxhash_rust::xxh3::Xxh3Builder; use restate_bifrost::{Bifrost, BifrostAdmin, Error as BifrostError}; -use restate_core::metadata_store::{ - retry_on_network_error, MetadataStoreClient, Precondition, ReadWriteError, WriteError, -}; +use restate_core::metadata_store::{MetadataStoreClient, Precondition, ReadWriteError, WriteError}; use restate_core::{Metadata, MetadataWriter, ShutdownError, TaskCenterFutureExt}; -use restate_types::config::Configuration; use restate_types::errors::GenericError; use restate_types::identifiers::PartitionId; use restate_types::live::Pinned; use restate_types::logs::builder::LogsBuilder; use restate_types::logs::metadata::{ - Chain, DefaultProvider, LogletConfig, LogletParams, Logs, LogsConfiguration, - NodeSetSelectionStrategy, ProviderKind, ReplicatedLogletConfig, SegmentIndex, + Chain, LogletConfig, LogletParams, Logs, LogsConfiguration, NodeSetSelectionStrategy, + ProviderConfiguration, ProviderKind, ReplicatedLogletConfig, SegmentIndex, }; use restate_types::logs::{LogId, LogletId, Lsn, TailState}; use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY; @@ -67,6 +65,8 @@ pub enum LogsControllerError { LogletParamsToConfiguration(GenericError), #[error(transparent)] Shutdown(#[from] ShutdownError), + #[error(transparent)] + Other(GenericError), } /// Node set selector hints for the [`LogsController`]. @@ -320,17 +320,17 @@ fn try_provisioning( node_set_selector_hints: impl NodeSetSelectorHints, ) -> Option { match logs_configuration.default_provider { - DefaultProvider::Local => { + ProviderConfiguration::Local => { let log_id = LogletId::new(log_id, SegmentIndex::OLDEST); Some(LogletConfiguration::Local(log_id.into())) } #[cfg(any(test, feature = "memory-loglet"))] - DefaultProvider::InMemory => { + ProviderConfiguration::InMemory => { let log_id = LogletId::new(log_id, SegmentIndex::OLDEST); Some(LogletConfiguration::Memory(log_id.into())) } #[cfg(feature = "replicated-loglet")] - DefaultProvider::Replicated(ref config) => build_new_replicated_loglet_configuration( + ProviderConfiguration::Replicated(ref config) => build_new_replicated_loglet_configuration( config, LogletId::new(log_id, SegmentIndex::OLDEST), &Metadata::with_current(|m| m.nodes_config_ref()), @@ -436,10 +436,10 @@ impl LogletConfiguration { ) -> bool { match (self, &logs_configuration.default_provider) { #[cfg(any(test, feature = "memory-loglet"))] - (Self::Memory(_), DefaultProvider::InMemory) => false, - (Self::Local(_), DefaultProvider::Local) => false, + (Self::Memory(_), ProviderConfiguration::InMemory) => false, + (Self::Local(_), ProviderConfiguration::Local) => false, #[cfg(feature = "replicated-loglet")] - (Self::Replicated(params), DefaultProvider::Replicated(config)) => { + (Self::Replicated(params), ProviderConfiguration::Replicated(config)) => { let sequencer_change_required = !observed_cluster_state .is_node_alive(params.sequencer) && !observed_cluster_state.alive_nodes.is_empty(); @@ -501,10 +501,14 @@ impl LogletConfiguration { match logs_configuration.default_provider { #[cfg(any(test, feature = "memory-loglet"))] - DefaultProvider::InMemory => Some(LogletConfiguration::Memory(loglet_id.next().into())), - DefaultProvider::Local => Some(LogletConfiguration::Local(loglet_id.next().into())), + ProviderConfiguration::InMemory => { + Some(LogletConfiguration::Memory(loglet_id.next().into())) + } + ProviderConfiguration::Local => { + Some(LogletConfiguration::Local(loglet_id.next().into())) + } #[cfg(feature = "replicated-loglet")] - DefaultProvider::Replicated(ref config) => { + ProviderConfiguration::Replicated(ref config) => { let previous_params = match self { Self::Replicated(previous_params) => Some(previous_params), _ => None, @@ -926,24 +930,15 @@ pub struct LogsController { impl LogsController { pub async fn init( - configuration: &Configuration, bifrost: Bifrost, metadata_store_client: MetadataStoreClient, metadata_writer: MetadataWriter, ) -> Result { - // obtain the latest logs or init it with an empty logs variant - let logs = retry_on_network_error( - configuration.common.network_error_retry_policy.clone(), - || { - metadata_store_client.get_or_insert(BIFROST_CONFIG_KEY.clone(), || { - Logs::from_configuration(configuration) - }) - }, - ) - .await?; - - let logs_configuration = logs.configuration().clone(); - metadata_writer.update(Arc::new(logs)).await?; + // fetches latest logs or init it with an empty logs variant + BifrostAdmin::new(&bifrost, &metadata_writer, &metadata_store_client) + .init_metadata() + .await + .map_err(|e| LogsControllerError::Other(e.into()))?; //todo(azmy): make configurable let retry_policy = RetryPolicy::exponential( @@ -955,7 +950,10 @@ impl LogsController { let mut this = Self { effects: Some(Vec::new()), - inner: LogsControllerInner::new(logs_configuration, retry_policy), + inner: LogsControllerInner::new( + Metadata::with_current(|m| m.logs_ref().configuration().clone()), + retry_policy, + ), bifrost, metadata_store_client, metadata_writer, @@ -1279,7 +1277,7 @@ pub mod tests { use enumset::{enum_set, EnumSet}; use restate_types::logs::metadata::{ - DefaultProvider, LogsConfiguration, NodeSetSelectionStrategy, ReplicatedLogletConfig, + LogsConfiguration, NodeSetSelectionStrategy, ProviderConfiguration, ReplicatedLogletConfig, }; use restate_types::logs::LogletId; use restate_types::nodes_config::{ @@ -1452,7 +1450,7 @@ pub mod tests { fn logs_configuration(replication_factor: u8) -> LogsConfiguration { LogsConfiguration { - default_provider: DefaultProvider::Replicated(ReplicatedLogletConfig { + default_provider: ProviderConfiguration::Replicated(ReplicatedLogletConfig { replication_property: ReplicationProperty::new( NonZeroU8::new(replication_factor).expect("must be non zero"), ), @@ -1537,7 +1535,7 @@ pub mod tests { &nodes.observed_state )); - let DefaultProvider::Replicated(ref replicated_loglet_config) = + let ProviderConfiguration::Replicated(ref replicated_loglet_config) = logs_config.default_provider else { unreachable!() @@ -1571,7 +1569,7 @@ pub mod tests { let logs_config = logs_configuration(2); - let DefaultProvider::Replicated(ref replicated_loglet_config) = + let ProviderConfiguration::Replicated(ref replicated_loglet_config) = logs_config.default_provider else { unreachable!() diff --git a/crates/admin/src/cluster_controller/logs_controller/nodeset_selection.rs b/crates/admin/src/cluster_controller/logs_controller/nodeset_selection.rs index 8d5a89e22..ebd05a0d6 100644 --- a/crates/admin/src/cluster_controller/logs_controller/nodeset_selection.rs +++ b/crates/admin/src/cluster_controller/logs_controller/nodeset_selection.rs @@ -13,9 +13,9 @@ use std::cmp::{max, Ordering}; use itertools::Itertools; use rand::prelude::IteratorRandom; use rand::Rng; -use restate_types::logs::metadata::NodeSetSelectionStrategy; use tracing::trace; +use restate_types::logs::metadata::NodeSetSelectionStrategy; use restate_types::nodes_config::NodesConfiguration; use restate_types::replicated_loglet::{LocationScope, NodeSet, ReplicationProperty}; diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index e5694601f..1c6397831 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -26,7 +26,7 @@ use tracing::{debug, info}; use restate_metadata_store::ReadModifyWriteError; use restate_types::cluster_controller::SchedulingPlan; use restate_types::logs::metadata::{ - DefaultProvider, LogletParams, Logs, LogsConfiguration, ProviderKind, SegmentIndex, + LogletParams, Logs, LogsConfiguration, ProviderConfiguration, ProviderKind, SegmentIndex, }; use restate_types::metadata_store::keys::{ BIFROST_CONFIG_KEY, PARTITION_TABLE_KEY, SCHEDULING_PLAN_KEY, @@ -183,7 +183,7 @@ enum ClusterControllerCommand { UpdateClusterConfiguration { num_partitions: NonZeroU16, replication_strategy: ReplicationStrategy, - default_provider: DefaultProvider, + default_provider: ProviderConfiguration, response_tx: oneshot::Sender>, }, SealAndExtendChain { @@ -249,7 +249,7 @@ impl ClusterControllerHandle { &self, num_partitions: NonZeroU16, replication_strategy: ReplicationStrategy, - default_provider: DefaultProvider, + default_provider: ProviderConfiguration, ) -> Result, ShutdownError> { let (response_tx, response_rx) = oneshot::channel(); @@ -439,7 +439,7 @@ impl Service { &self, num_partitions: u16, replication_strategy: ReplicationStrategy, - default_provider: DefaultProvider, + default_provider: ProviderConfiguration, ) -> anyhow::Result<()> { let logs = self .metadata_store_client @@ -457,8 +457,7 @@ impl Service { // we can only change the default provider if logs.version() != Version::INVALID - && logs.configuration().default_provider.as_provider_kind() - != default_provider.as_provider_kind() + && logs.configuration().default_provider.kind() != default_provider.kind() { { return Err( @@ -786,16 +785,16 @@ impl SealAndExtendTask { let (provider, params) = match &logs.configuration().default_provider { #[cfg(any(test, feature = "memory-loglet"))] - DefaultProvider::InMemory => ( + ProviderConfiguration::InMemory => ( ProviderKind::InMemory, u64::from(loglet_id.next()).to_string().into(), ), - DefaultProvider::Local => ( + ProviderConfiguration::Local => ( ProviderKind::Local, u64::from(loglet_id.next()).to_string().into(), ), #[cfg(feature = "replicated-loglet")] - DefaultProvider::Replicated(config) => { + ProviderConfiguration::Replicated(config) => { let schedule_plan = self .metadata_store_client .get::(SCHEDULING_PLAN_KEY.clone()) diff --git a/crates/admin/src/cluster_controller/service/state.rs b/crates/admin/src/cluster_controller/service/state.rs index 8b5548839..162178d7c 100644 --- a/crates/admin/src/cluster_controller/service/state.rs +++ b/crates/admin/src/cluster_controller/service/state.rs @@ -162,7 +162,6 @@ where .await?; let logs_controller = LogsController::init( - &configuration, service.bifrost.clone(), service.metadata_store_client.clone(), service.metadata_writer.clone(), diff --git a/crates/bifrost/Cargo.toml b/crates/bifrost/Cargo.toml index 4b3df3785..e0b9d1348 100644 --- a/crates/bifrost/Cargo.toml +++ b/crates/bifrost/Cargo.toml @@ -31,6 +31,7 @@ derive_more = { workspace = true } enum-map = { workspace = true, features = ["serde"] } futures = { workspace = true } googletest = { workspace = true, features = ["anyhow"], optional = true } +itertools = { workspace = true } metrics = { workspace = true } parking_lot = { workspace = true } pin-project = { workspace = true } diff --git a/crates/bifrost/src/appender.rs b/crates/bifrost/src/appender.rs index 81d8a26ba..0d4ec6d36 100644 --- a/crates/bifrost/src/appender.rs +++ b/crates/bifrost/src/appender.rs @@ -116,7 +116,7 @@ impl Appender { info!( attempt = attempt, segment_index = %loglet.segment_index(), - "Append batch will be retried (loglet being sealed), waiting for tail to be determined" + "Append batch will be retried (loglet is being sealed), waiting for tail to be determined" ); let new_loglet = Self::wait_next_unsealed_loglet( self.log_id, @@ -131,7 +131,7 @@ impl Appender { Err(AppendError::Other(err)) if err.retryable() => { if let Some(retry_dur) = retry_iter.next() { info!( - ?err, + %err, attempt = attempt, segment_index = %loglet.segment_index(), "Failed to append this batch. Since underlying error is retryable, will retry in {:?}", @@ -140,7 +140,7 @@ impl Appender { tokio::time::sleep(retry_dur).await; } else { warn!( - ?err, + %err, attempt = attempt, segment_index = %loglet.segment_index(), "Failed to append this batch and exhausted all attempts to retry", diff --git a/crates/bifrost/src/bifrost_admin.rs b/crates/bifrost/src/bifrost_admin.rs index bb3a6f50f..264b44eb1 100644 --- a/crates/bifrost/src/bifrost_admin.rs +++ b/crates/bifrost/src/bifrost_admin.rs @@ -11,13 +11,13 @@ use std::ops::Deref; use std::sync::Arc; +use restate_core::metadata_store::retry_on_network_error; use tracing::{debug, info, instrument}; use restate_core::{Metadata, MetadataKind, MetadataWriter}; use restate_metadata_store::MetadataStoreClient; use restate_types::config::Configuration; -use restate_types::logs::builder::BuilderError; -use restate_types::logs::metadata::{LogletParams, Logs, ProviderKind, SegmentIndex}; +use restate_types::logs::metadata::{Chain, LogletParams, Logs, ProviderKind, SegmentIndex}; use restate_types::logs::{LogId, Lsn, TailState}; use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY; use restate_types::Version; @@ -76,6 +76,61 @@ impl<'a> BifrostAdmin<'a> { self.bifrost.inner.trim(log_id, trim_point).await } + /// Seals a loglet under a set of conditions. + /// + /// The loglet will be sealed if and only if the following is true: + /// - if segment_index is set, the tail loglet must match segment_index. + /// If the intention is to create the log, then `segment_index` must be set to `None`. + /// + /// This will continue to retry sealing for seal retryable errors automatically. + #[instrument(level = "debug", skip(self), err)] + pub async fn seal_and_auto_extend_chain( + &self, + log_id: LogId, + segment_index: Option, + ) -> Result<()> { + self.bifrost.inner.fail_if_shutting_down()?; + let logs = Metadata::with_current(|m| m.logs_snapshot()); + let provider_config = &logs.configuration().default_provider; + let provider = self.bifrost.inner.provider_for(provider_config.kind())?; + // if this is a new log, we don't need to seal and we can immediately write to metadata + // store, otherwise, we need to seal first. + if logs.chain(&log_id).is_none() && segment_index.is_none() { + let proposed_params = + provider.propose_new_loglet_params(log_id, None, provider_config)?; + self.add_log(log_id, provider_config.kind(), proposed_params) + .await?; + return Ok(()); + } + + let segment_index = segment_index + .or_else(|| logs.chain(&log_id).map(|c| c.tail_index())) + .ok_or(Error::UnknownLogId(log_id))?; + + let sealed_segment = loop { + let sealed_segment = self.seal(log_id, segment_index).await?; + if sealed_segment.tail.is_sealed() { + break sealed_segment; + } + debug!(%log_id, %segment_index, "Segment is not sealed yet"); + tokio::time::sleep(Configuration::pinned().bifrost.seal_retry_interval.into()).await; + }; + + let proposed_params = + provider.propose_new_loglet_params(log_id, logs.chain(&log_id), provider_config)?; + + self.add_segment_with_params( + log_id, + segment_index, + sealed_segment.tail.offset(), + provider_config.kind(), + proposed_params, + ) + .await?; + + Ok(()) + } + /// Seals a loglet under a set of conditions. /// /// The loglet will be sealed if and only if the following is true: @@ -187,34 +242,93 @@ impl<'a> BifrostAdmin<'a> { params: LogletParams, ) -> Result<()> { self.bifrost.inner.fail_if_shutting_down()?; - let logs = self - .metadata_store_client - .read_modify_write(BIFROST_CONFIG_KEY.clone(), move |logs: Option| { - let logs = logs.ok_or(Error::UnknownLogId(log_id))?; - - let mut builder = logs.into_builder(); - let mut chain_builder = builder.chain(log_id).ok_or(Error::UnknownLogId(log_id))?; - - if chain_builder.tail().index() != last_segment_index { - // tail is not what we expected. - return Err(Error::from(AdminError::SegmentMismatch { - expected: last_segment_index, - found: chain_builder.tail().index(), - })); - } + let retry_policy = Configuration::pinned() + .common + .network_error_retry_policy + .clone(); + let logs = retry_on_network_error(retry_policy, || { + self.metadata_store_client.read_modify_write( + BIFROST_CONFIG_KEY.clone(), + |logs: Option| { + let logs = logs.ok_or(Error::UnknownLogId(log_id))?; - match chain_builder.append_segment(base_lsn, provider, params.clone()) { - Err(e) => match e { - BuilderError::SegmentConflict(lsn) => { - Err(Error::from(AdminError::SegmentConflict(lsn))) - } - _ => unreachable!("the log must exist at this point"), - }, - Ok(_) => Ok(builder.build()), - } - }) - .await - .map_err(|e| e.transpose())?; + let mut builder = logs.into_builder(); + let mut chain_builder = + builder.chain(log_id).ok_or(Error::UnknownLogId(log_id))?; + + if chain_builder.tail().index() != last_segment_index { + // tail is not what we expected. + return Err(Error::from(AdminError::SegmentMismatch { + expected: last_segment_index, + found: chain_builder.tail().index(), + })); + } + + let _ = chain_builder + .append_segment(base_lsn, provider, params.clone()) + .map_err(AdminError::from)?; + Ok(builder.build()) + }, + ) + }) + .await + .map_err(|e| e.transpose())?; + + self.metadata_writer.update(Arc::new(logs)).await?; + Ok(()) + } + + /// Adds a new log if it doesn't exist. + #[instrument(level = "debug", skip(self), err)] + async fn add_log( + &self, + log_id: LogId, + provider: ProviderKind, + params: LogletParams, + ) -> Result<()> { + self.bifrost.inner.fail_if_shutting_down()?; + let retry_policy = Configuration::pinned() + .common + .network_error_retry_policy + .clone(); + let logs = retry_on_network_error(retry_policy, || { + self.metadata_store_client.read_modify_write::<_, _, Error>( + BIFROST_CONFIG_KEY.clone(), + |logs: Option| { + // We assume that we'll always see a value set in metadata for BIFROST_CONFIG_KEY, + // provisioning the empty logs metadata is not our responsibility. + let logs = logs.ok_or(Error::UnknownLogId(log_id))?; + + let mut builder = logs.into_builder(); + builder + .add_log(log_id, Chain::new(provider, params.clone())) + .map_err(AdminError::from)?; + Ok(builder.build()) + }, + ) + }) + .await + .map_err(|e| e.transpose())?; + + self.metadata_writer.update(Arc::new(logs)).await?; + Ok(()) + } + + /// Creates empty metadata if none exists for bifrost and publishes it to metadata + /// manager. + pub async fn init_metadata(&self) -> Result<(), Error> { + let retry_policy = Configuration::pinned() + .common + .network_error_retry_policy + .clone(); + + let logs = retry_on_network_error(retry_policy, || { + self.metadata_store_client + .get_or_insert(BIFROST_CONFIG_KEY.clone(), || { + Logs::from_configuration(&Configuration::pinned()) + }) + }) + .await?; self.metadata_writer.update(Arc::new(logs)).await?; Ok(()) diff --git a/crates/bifrost/src/error.rs b/crates/bifrost/src/error.rs index 7ee8b6961..f61fb1e05 100644 --- a/crates/bifrost/src/error.rs +++ b/crates/bifrost/src/error.rs @@ -12,6 +12,7 @@ use std::sync::Arc; use restate_core::{ShutdownError, SyncError}; use restate_types::errors::MaybeRetryableError; +use restate_types::logs::builder::BuilderError; use restate_types::logs::metadata::SegmentIndex; use restate_types::logs::{LogId, Lsn}; @@ -53,6 +54,8 @@ pub enum EnqueueError { #[derive(Debug, thiserror::Error)] pub enum AdminError { + #[error("log {0} already exists")] + LogAlreadyExists(LogId), #[error("segment conflicts with existing segment with base_lsn={0}")] SegmentConflict(Lsn), #[error("segment index found in metadata does not match expected {expected}!={found}")] @@ -60,6 +63,8 @@ pub enum AdminError { expected: SegmentIndex, found: SegmentIndex, }, + #[error("loglet params could not be deserialized: {0}")] + ParamsSerde(#[from] serde_json::Error), } impl From for Error { @@ -70,3 +75,13 @@ impl From for Error { } } } + +impl From for AdminError { + fn from(value: BuilderError) -> Self { + match value { + BuilderError::LogAlreadyExists(log_id) => AdminError::LogAlreadyExists(log_id), + BuilderError::ParamsSerde(error) => AdminError::ParamsSerde(error), + BuilderError::SegmentConflict(lsn) => AdminError::SegmentConflict(lsn), + } + } +} diff --git a/crates/bifrost/src/loglet/provider.rs b/crates/bifrost/src/loglet/provider.rs index e0b397f27..59487e575 100644 --- a/crates/bifrost/src/loglet/provider.rs +++ b/crates/bifrost/src/loglet/provider.rs @@ -12,7 +12,9 @@ use std::sync::Arc; use async_trait::async_trait; -use restate_types::logs::metadata::{LogletParams, ProviderKind, SegmentIndex}; +use restate_types::logs::metadata::{ + Chain, LogletParams, ProviderConfiguration, ProviderKind, SegmentIndex, +}; use restate_types::logs::LogId; use super::{Loglet, OperationError}; @@ -37,6 +39,16 @@ pub trait LogletProvider: Send + Sync { params: &LogletParams, ) -> Result>; + /// Create a loglet client for a given segment and configuration. + /// + /// if `chain` is None, this means we no chain exists already for this log. + fn propose_new_loglet_params( + &self, + log_id: LogId, + chain: Option<&Chain>, + defaults: &ProviderConfiguration, + ) -> Result; + /// A hook that's called after provider is started. async fn post_start(&self) {} diff --git a/crates/bifrost/src/providers/local_loglet/provider.rs b/crates/bifrost/src/providers/local_loglet/provider.rs index 2d19cd4f0..685a9858a 100644 --- a/crates/bifrost/src/providers/local_loglet/provider.rs +++ b/crates/bifrost/src/providers/local_loglet/provider.rs @@ -17,8 +17,10 @@ use tracing::debug; use restate_types::config::{LocalLogletOptions, RocksDbOptions}; use restate_types::live::BoxedLiveLoad; -use restate_types::logs::metadata::{LogletParams, ProviderKind, SegmentIndex}; -use restate_types::logs::LogId; +use restate_types::logs::metadata::{ + Chain, LogletParams, ProviderConfiguration, ProviderKind, SegmentIndex, +}; +use restate_types::logs::{LogId, LogletId}; use super::log_store::RocksDbLogStore; use super::log_store_writer::RocksDbLogWriterHandle; @@ -105,6 +107,20 @@ impl LogletProvider for LocalLogletProvider { Ok(loglet as Arc) } + fn propose_new_loglet_params( + &self, + log_id: LogId, + chain: Option<&Chain>, + _defaults: &ProviderConfiguration, + ) -> Result { + let new_segment_index = chain + .map(|c| c.tail_index()) + .unwrap_or(SegmentIndex::OLDEST); + Ok(LogletParams::from( + LogletId::new(log_id, new_segment_index).to_string(), + )) + } + async fn shutdown(&self) -> Result<(), OperationError> { Ok(()) } diff --git a/crates/bifrost/src/providers/memory_loglet.rs b/crates/bifrost/src/providers/memory_loglet.rs index a1bb9ae0f..d90adb583 100644 --- a/crates/bifrost/src/providers/memory_loglet.rs +++ b/crates/bifrost/src/providers/memory_loglet.rs @@ -22,9 +22,11 @@ use tokio::sync::Mutex as AsyncMutex; use tracing::{debug, info}; use restate_core::ShutdownError; -use restate_types::logs::metadata::{LogletParams, ProviderKind, SegmentIndex}; +use restate_types::logs::metadata::{ + Chain, LogletParams, ProviderConfiguration, ProviderKind, SegmentIndex, +}; use restate_types::logs::{ - KeyFilter, LogId, LogletOffset, MatchKeyQuery, Record, SequenceNumber, TailState, + KeyFilter, LogId, LogletId, LogletOffset, MatchKeyQuery, Record, SequenceNumber, TailState, }; use crate::loglet::util::TailOffsetWatch; @@ -100,6 +102,20 @@ impl LogletProvider for MemoryLogletProvider { Ok(loglet as Arc) } + fn propose_new_loglet_params( + &self, + log_id: LogId, + chain: Option<&Chain>, + _defaults: &ProviderConfiguration, + ) -> Result { + let new_segment_index = chain + .map(|c| c.tail_index()) + .unwrap_or(SegmentIndex::OLDEST); + Ok(LogletParams::from( + LogletId::new(log_id, new_segment_index).to_string(), + )) + } + async fn shutdown(&self) -> Result<(), OperationError> { info!("Shutting down in-memory loglet provider"); Ok(()) diff --git a/crates/bifrost/src/providers/replicated_loglet/mod.rs b/crates/bifrost/src/providers/replicated_loglet/mod.rs index dc7b92f0b..4d3f89bc0 100644 --- a/crates/bifrost/src/providers/replicated_loglet/mod.rs +++ b/crates/bifrost/src/providers/replicated_loglet/mod.rs @@ -13,6 +13,7 @@ mod log_server_manager; mod loglet; pub(crate) mod metric_definitions; mod network; +mod nodeset_selector; mod provider; mod read_path; mod remote_sequencer; diff --git a/crates/bifrost/src/providers/replicated_loglet/nodeset_selector.rs b/crates/bifrost/src/providers/replicated_loglet/nodeset_selector.rs new file mode 100644 index 000000000..4b48b541e --- /dev/null +++ b/crates/bifrost/src/providers/replicated_loglet/nodeset_selector.rs @@ -0,0 +1,515 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::cmp::{max, Ordering}; + +use itertools::Itertools; +use rand::prelude::IteratorRandom; +use rand::Rng; +use tracing::trace; + +use restate_types::logs::metadata::NodeSetSelectionStrategy; +use restate_types::nodes_config::NodesConfiguration; +use restate_types::replicated_loglet::{LocationScope, NodeSet, ReplicationProperty}; +use restate_types::NodeId; + +/// TEMPORARY UNTIL NODE-LEVEL CLUSTER STATE IS ACTUALLY IMPLEMENTED +#[derive(Clone, Debug)] +pub struct ObservedClusterState; + +impl ObservedClusterState { + pub fn is_node_alive(&self, _node_id: impl Into) -> bool { + // assume all nodes are alive + true + } +} + +/// Nodeset selector for picking a set of storage nodes for a replicated loglet out of a broader +/// pool of available nodes. +/// +/// This selector can be reused once constructed to make multiple decisions in a single scheduling +/// iteration, if the node configuration and the replication settings are not changing. +#[derive(Clone)] +pub struct NodeSetSelector<'a> { + nodes_config: &'a NodesConfiguration, + cluster_state: &'a ObservedClusterState, +} + +impl<'a> NodeSetSelector<'a> { + pub fn new( + nodes_config: &'a NodesConfiguration, + cluster_state: &'a ObservedClusterState, + ) -> NodeSetSelector<'a> { + Self { + nodes_config, + cluster_state, + } + } + + /// Determines if a nodeset can be improved by adding or replacing members. Does NOT consider + /// sealability of the current configuration when making decisions! + #[allow(unused)] + pub fn can_improve( + &self, + nodeset: &NodeSet, + strategy: NodeSetSelectionStrategy, + replication_property: &ReplicationProperty, + ) -> bool { + let writable_nodeset = WritableNodeSet::from(self.nodes_config); + let alive_nodeset = writable_nodeset.alive(self.cluster_state); + let current_alive = alive_nodeset.intersect(nodeset); + + let nodeset_size = + nodeset_size_range(&strategy, replication_property, writable_nodeset.len()); + + if current_alive.len() == nodeset_size.target_size { + return false; + } + + // todo: we should check the current segment for sealability, otherwise we might propose + // reconfiguration when we are virtually certain to get stuck! + alive_nodeset.len() >= nodeset_size.minimum_size + && alive_nodeset.len() > current_alive.len() + } + + /// Picks a set of storage nodes for a replicated loglet out of the available pool. Only alive, + /// writable storage nodes will be used. Returns a proposed new nodeset that meets the + /// requirements of the supplied selection strategy and replication, or an explicit error. + pub fn select( + &self, + strategy: NodeSetSelectionStrategy, + replication_property: &ReplicationProperty, + rng: &mut R, + preferred_nodes: &NodeSet, + ) -> Result { + if replication_property.at_greatest_scope().0 != &LocationScope::Node { + // todo: add support for other location scopes + unimplemented!("only node-scoped replication is currently supported"); + } + + let writable_nodeset = WritableNodeSet::from(self.nodes_config); + // Only consider alive, writable storage nodes. + let alive_nodeset = writable_nodeset.alive(self.cluster_state); + + let nodeset_size = + nodeset_size_range(&strategy, replication_property, writable_nodeset.len()); + + if writable_nodeset.len() < nodeset_size.fault_tolerant_size { + trace!( + nodes_count = %writable_nodeset.len(), + ?nodeset_size.minimum_size, + ?nodeset_size.fault_tolerant_size, + cluster_state = ?self.cluster_state, + nodes_config = ?self.nodes_config, + "Not enough nodes to meet the fault tolerant replication requirements" + ); + return Err(NodeSelectionError::InsufficientWriteableNodes); + } + + let nodeset = match strategy { + NodeSetSelectionStrategy::StrictFaultTolerantGreedy => { + let mut nodes = preferred_nodes + .iter() + .copied() + .filter(|node_id| alive_nodeset.contains(node_id)) + .choose_multiple(rng, nodeset_size.target_size); + + if nodes.len() < nodeset_size.target_size { + let remaining = nodeset_size.target_size - nodes.len(); + nodes.extend( + alive_nodeset + .iter() + .filter(|node_id| !preferred_nodes.contains(node_id)) + .choose_multiple(rng, remaining), + ); + } + + if nodes.len() < nodeset_size.minimum_size { + trace!( + "Failed to place replicated loglet: insufficient alive nodes to meet minimum size requirement {} < {}", + nodes.len(), + nodeset_size.minimum_size, + ); + + return Err(NodeSelectionError::InsufficientWriteableNodes); + } + + // last possibility is if the selected nodeset is still + // smaller than fault tolerant size we try to extend from the full nodeset + // which includes possibly dead nodes + if nodes.len() < nodeset_size.fault_tolerant_size { + // greedy approach: Every other node that is not + // already in the set. + let remaining = nodeset_size.fault_tolerant_size - nodes.len(); + + let extension = writable_nodeset + .iter() + .filter(|node_id| !alive_nodeset.contains(node_id)) + .cloned() + .sorted_by(|l, r| { + // sorting nodes by "preferred" nodes. Preferred nodes comes first. + match (preferred_nodes.contains(l), preferred_nodes.contains(r)) { + (true, true) | (false, false) => Ordering::Equal, + (true, false) => Ordering::Less, + (false, true) => Ordering::Greater, + } + }) + .take(remaining); + + nodes.extend(extension); + } + + let nodes_len = nodes.len(); + let nodeset = NodeSet::from_iter(nodes); + assert_eq!( + nodeset.len(), + nodes_len, + "We have accidentally chosen duplicate candidates during nodeset selection" + ); + nodeset + } + }; + + // even with all possible dead node we still can't reach the fault tolerant + // nodeset size. This means there are not enough nodes in the cluster + // so we still return an error. + + // todo: implement location scope-aware selection + if nodeset.len() < nodeset_size.fault_tolerant_size { + trace!( + "Failed to place replicated loglet: insufficient writeable nodes to meet fault tolerant size requirement {} < {}", + nodeset.len(), + nodeset_size.fault_tolerant_size, + ); + return Err(NodeSelectionError::InsufficientWriteableNodes); + } + + Ok(nodeset) + } +} + +#[derive(Debug)] +struct NodeSetSizeRange { + /// Minimum number of nodes required to maintain write availability; + /// dropping below this threshold will result in loss of write availability. + minimum_size: usize, + /// The minimum number of nodes to satisfy replication + /// property with fault tolerance + /// + /// calculated as (minimum_size - 1) * 2 + 1 + fault_tolerant_size: usize, + /// The proposed number of nodes to use if possible + target_size: usize, +} + +fn nodeset_size_range( + strategy: &NodeSetSelectionStrategy, + replication_property: &ReplicationProperty, + writable_nodes_size: usize, +) -> NodeSetSizeRange { + let min_copies = replication_property.num_copies(); + + // ReplicationFactor(f+1) implies a minimum of 2f+1 nodes. At this point we are only + // calculating the nodeset floor size, the actual size will be determined by the specific + // strategy in use. + assert!( + min_copies < u8::MAX >> 1, + "The replication factor implies a cluster size that exceeds the maximum supported size" + ); + + let fault_tolerant_size = (usize::from(min_copies) - 1) * 2 + 1; + assert!( + fault_tolerant_size >= usize::from(min_copies), + "The calculated minimum nodeset size can not be less than the replication factor" + ); + + let (fault_tolerant_size, nodeset_target_size) = match strategy { + // writable_nodes_size includes any writable node (log-server) dead or alive. + // in the greedy strategy we take the max(fault_tolerant, writable_nodes_size) as + // our target size + NodeSetSelectionStrategy::StrictFaultTolerantGreedy => ( + fault_tolerant_size, + max(fault_tolerant_size, writable_nodes_size), + ), + }; + + NodeSetSizeRange { + minimum_size: min_copies.into(), + fault_tolerant_size, + target_size: nodeset_target_size, + } +} + +#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] +pub enum NodeSelectionError { + #[error("Insufficient writeable nodes in the nodeset")] + InsufficientWriteableNodes, +} + +/// Set of all log-server nodeset, regardless of the state +#[derive(Debug, Clone, Eq, PartialEq, derive_more::Into, derive_more::Deref)] +struct WritableNodeSet(NodeSet); + +impl WritableNodeSet { + fn alive(&self, state: &ObservedClusterState) -> AliveNodeSet { + self.iter() + .cloned() + .filter(|id| state.is_node_alive(*id)) + .collect::() + .into() + } +} + +impl From<&NodesConfiguration> for WritableNodeSet { + fn from(value: &NodesConfiguration) -> Self { + Self( + value + .iter() + .filter_map(|(node_id, config)| { + if config.log_server_config.storage_state.can_write_to() { + Some(node_id) + } else { + None + } + }) + .collect(), + ) + } +} + +/// A subset of WritableNodeset that is known to be alive at the time of creation. +#[derive(Debug, Clone, Eq, PartialEq, derive_more::Into, derive_more::Deref, derive_more::From)] +struct AliveNodeSet(NodeSet); + +#[cfg(test)] +pub mod tests { + // ** NOTE ** + // THESE TESTS ARE TEMPORARY DISABLED AND WILL ENABLED AFTER CLUSTER STATE IS IMPLEMENTED + // THIS IS A TRANSITIONAL STATE + + // use std::collections::HashSet; + // + // use enumset::enum_set; + // use rand::thread_rng; + // + // use restate_types::nodes_config::{NodesConfiguration, Role, StorageState}; + // use restate_types::replicated_loglet::{LocationScope, NodeSet, ReplicationProperty}; + // use restate_types::PlainNodeId; + // + // use super::*; + // use crate::cluster_controller::logs_controller::tests::{node, MockNodes}; + // use crate::cluster_controller::observed_cluster_state::ObservedClusterState; + // + // #[test] + // #[should_panic( + // expected = "not implemented: only node-scoped replication is currently supported" + // )] + // fn test_select_log_servers_rejects_unsupported_replication_scope() { + // let replication = + // ReplicationProperty::with_scope(LocationScope::Zone, 1.try_into().unwrap()); + // + // let nodes_config = NodesConfiguration::default(); + // let observed_state = ObservedClusterState::default(); + // + // let preferred_nodes = NodeSet::empty(); + // NodeSetSelector::new(&nodes_config, &observed_state) + // .select( + // NodeSetSelectionStrategy::StrictFaultTolerantGreedy, + // &replication, + // &mut thread_rng(), + // &preferred_nodes, + // ) + // .unwrap(); // panics + // } + // + // #[test] + // fn test_select_log_servers_insufficient_capacity() { + // let nodes: Vec = vec![1.into(), 2.into(), 3.into()]; + // let replication = + // ReplicationProperty::with_scope(LocationScope::Node, 2.try_into().unwrap()); + // + // let mut nodes_config = NodesConfiguration::default(); + // nodes_config.upsert_node(node(0, enum_set!(Role::Admin), StorageState::Disabled)); + // nodes_config.upsert_node(node( + // 1, + // enum_set!(Role::LogServer | Role::Worker), + // StorageState::Provisioning, + // )); + // nodes_config.upsert_node(node( + // 2, + // enum_set!(Role::LogServer | Role::Worker), + // StorageState::ReadWrite, + // )); + // nodes_config.upsert_node(node( + // 3, + // enum_set!(Role::LogServer | Role::Worker), + // StorageState::ReadOnly, + // )); + // nodes_config.upsert_node(node(4, enum_set!(Role::Worker), StorageState::Disabled)); + // + // let observed_state = ObservedClusterState { + // alive_nodes: nodes + // .iter() + // .copied() + // .map(|id| (id, id.with_generation(1))) + // .collect(), + // dead_nodes: HashSet::default(), + // ..Default::default() + // }; + // + // let strategy = NodeSetSelectionStrategy::StrictFaultTolerantGreedy; + // let preferred_nodes = NodeSet::empty(); + // let selection = NodeSetSelector::new(&nodes_config, &observed_state).select( + // strategy, + // &replication, + // &mut thread_rng(), + // &preferred_nodes, + // ); + // + // assert_eq!( + // selection, + // Err(NodeSelectionError::InsufficientWriteableNodes) + // ); + // } + // + // /// Replicated loglets should work just fine in single-node clusters, with the FT strategy inferring that f=0, + // /// as long as the replication factor is set to 1. + // #[test] + // fn test_select_log_servers_single_node_cluster() { + // let nodes = MockNodes::builder().with_mixed_server_nodes([1]).build(); + // + // let replication = + // ReplicationProperty::with_scope(LocationScope::Node, 1.try_into().unwrap()); + // + // let strategy = NodeSetSelectionStrategy::StrictFaultTolerantGreedy; + // let preferred_nodes = NodeSet::empty(); + // let selection = NodeSetSelector::new(&nodes.nodes_config, &nodes.observed_state).select( + // strategy, + // &replication, + // &mut thread_rng(), + // &preferred_nodes, + // ); + // + // assert_eq!( + // selection.unwrap(), + // NodeSet::from([1]), + // "A single-node cluster is possible with replication factor of 0" + // ); + // } + // + // /// In this test we have a cluster with 3 nodes and replication factor is 2. The strict FT + // /// strategy can bootstrap a loglet using all 3 nodes but won't choose a new nodeset when only 2 + // /// are alive, as that puts the loglet at risk. The assumption is that the previous nodeset will + // /// carry on in its original configuration - it is the data plane's problem to work around + // /// partial node availability. When an additional log server becomes available, the selector can + // /// reconfigure the loglet to use it. + // #[test] + // fn test_select_log_servers_respects_replication_factor() { + // let mut nodes = MockNodes::builder() + // .with_mixed_server_nodes([1, 2, 3]) + // .build(); + // + // let replication = + // ReplicationProperty::with_scope(LocationScope::Node, 2.try_into().unwrap()); + // + // // initial selection - no prior preferences + // let selection = NodeSetSelector::new(&nodes.nodes_config, &nodes.observed_state).select( + // NodeSetSelectionStrategy::StrictFaultTolerantGreedy, + // &replication, + // &mut thread_rng(), + // &NodeSet::empty(), + // ); + // assert!(selection.is_ok()); + // let initial_nodeset = selection.unwrap(); + // assert_eq!(initial_nodeset, NodeSet::from([1, 2, 3])); + // + // nodes.kill_node(1); + // + // let selection = NodeSetSelector::new(&nodes.nodes_config, &nodes.observed_state).select( + // NodeSetSelectionStrategy::StrictFaultTolerantGreedy, + // &replication, + // &mut thread_rng(), + // &initial_nodeset, // preferred nodes + // ); + // + // // while one node is dead, the selector can still satisfy a write quorum + // // based on supplied replication property. The dead node will be included + // // in the nodeset. + // assert!(selection.is_ok()); + // let initial_nodeset = selection.unwrap(); + // assert_eq!(initial_nodeset, NodeSet::from([1, 2, 3])); + // + // nodes.add_dedicated_log_server_node(4); + // + // let selection = NodeSetSelector::new(&nodes.nodes_config, &nodes.observed_state).select( + // NodeSetSelectionStrategy::StrictFaultTolerantGreedy, + // &replication, + // &mut thread_rng(), + // &initial_nodeset, // preferred nodes + // ); + // assert_eq!(selection.unwrap(), NodeSet::from([2, 3, 4])); + // } + // + // #[test] + // fn test_select_log_servers_respects_replication_factor_not_enough_nodes() { + // let nodes = MockNodes::builder().with_mixed_server_nodes([1, 2]).build(); + // + // let replication = + // ReplicationProperty::with_scope(LocationScope::Node, 2.try_into().unwrap()); + // + // // initial selection - no prior preferences + // let selection = NodeSetSelector::new(&nodes.nodes_config, &nodes.observed_state).select( + // NodeSetSelectionStrategy::StrictFaultTolerantGreedy, + // &replication, + // &mut thread_rng(), + // &NodeSet::empty(), + // ); + // + // // in this case, the entire cluster does not have enough nodes for an optimal + // // nodeset. + // assert_eq!( + // selection, + // Err(NodeSelectionError::InsufficientWriteableNodes), + // "The strict FT strategy does not compromise on the minimum 2f+1 nodeset size" + // ); + // } + // + // #[test] + // fn test_select_log_servers_insufficient_fault_tolerant_capacity() { + // // while we only have 2 alive node, the algorithm will still + // // prefer to use a dead node instead of failing as long as + // // we have write availability + // + // let nodes = MockNodes::builder() + // .with_nodes( + // [1, 2, 3], + // enum_set!(Role::LogServer | Role::Worker), + // StorageState::ReadWrite, + // ) + // .dead_nodes([3]) + // .build(); + // + // let replication = + // ReplicationProperty::with_scope(LocationScope::Node, 2.try_into().unwrap()); + // + // let strategy = NodeSetSelectionStrategy::StrictFaultTolerantGreedy; + // let preferred_nodes = NodeSet::empty(); + // let selection = NodeSetSelector::new(&nodes.nodes_config, &nodes.observed_state).select( + // strategy, + // &replication, + // &mut thread_rng(), + // &preferred_nodes, + // ); + // + // assert!(selection.is_ok()); + // let selection = selection.unwrap(); + // assert!(selection.contains(&PlainNodeId::from(3))); + // } +} diff --git a/crates/bifrost/src/providers/replicated_loglet/provider.rs b/crates/bifrost/src/providers/replicated_loglet/provider.rs index d164ccbfa..ab4fa6988 100644 --- a/crates/bifrost/src/providers/replicated_loglet/provider.rs +++ b/crates/bifrost/src/providers/replicated_loglet/provider.rs @@ -16,16 +16,19 @@ use dashmap::DashMap; use tracing::debug; use restate_core::network::{MessageRouterBuilder, Networking, TransportConnect}; -use restate_core::{TaskCenter, TaskKind}; +use restate_core::{my_node_id, Metadata, TaskCenter, TaskKind}; use restate_metadata_store::MetadataStoreClient; use restate_types::config::Configuration; -use restate_types::logs::metadata::{LogletParams, ProviderKind, SegmentIndex}; -use restate_types::logs::{LogId, RecordCache}; +use restate_types::logs::metadata::{ + Chain, LogletParams, ProviderConfiguration, ProviderKind, SegmentIndex, +}; +use restate_types::logs::{LogId, LogletId, RecordCache}; use restate_types::replicated_loglet::ReplicatedLogletParams; use super::loglet::ReplicatedLoglet; use super::metric_definitions; use super::network::RequestPump; +use super::nodeset_selector::{NodeSelectionError, NodeSetSelector, ObservedClusterState}; use super::rpc_routers::{LogServersRpc, SequencersRpc}; use crate::loglet::{Loglet, LogletProvider, LogletProviderFactory, OperationError}; use crate::providers::replicated_loglet::error::ReplicatedLogletError; @@ -201,6 +204,76 @@ impl LogletProvider for ReplicatedLogletProvider { Ok(loglet as Arc) } + fn propose_new_loglet_params( + &self, + log_id: LogId, + chain: Option<&Chain>, + defaults: &ProviderConfiguration, + ) -> Result { + let ProviderConfiguration::Replicated(defaults) = defaults else { + panic!("ProviderConfiguration::Replicated is expected"); + }; + + let new_segment_index = chain + .map(|c| c.tail_index()) + .unwrap_or(SegmentIndex::OLDEST); + + let loglet_id = LogletId::new(log_id, new_segment_index); + + let mut rng = rand::thread_rng(); + + let replication = defaults.replication_property.clone(); + let strategy = defaults.nodeset_selection_strategy; + + // if the last loglet in the chain is of the same provider kind, we can use this to + // influence the nodeset selector. + let previous_params = chain.and_then(|chain| { + let tail_config = chain.tail().config; + match tail_config.kind { + ProviderKind::Replicated => Some( + ReplicatedLogletParams::deserialize_from(tail_config.params.as_bytes()) + .expect("params serde must be infallible"), + ), + // Another kind, we don't care about its config + _ => None, + } + }); + + let preferred_nodes = previous_params + .map(|p| p.nodeset.clone()) + .unwrap_or_default(); + let nodes_config = Metadata::with_current(|m| m.nodes_config_ref()); + + let selection = NodeSetSelector::new(&nodes_config, &ObservedClusterState).select( + strategy, + &replication, + &mut rng, + &preferred_nodes, + ); + + match selection { + Ok(nodeset) => Ok(LogletParams::from( + ReplicatedLogletParams { + loglet_id, + // We choose ourselves to be the sequencer for this loglet + sequencer: my_node_id(), + replication, + nodeset, + } + .serialize() + .expect("params serde must be infallible"), + )), + Err(e @ NodeSelectionError::InsufficientWriteableNodes) => { + debug!( + ?loglet_id, + "Insufficient writeable nodes to select new nodeset for replicated loglet" + ); + + Err(OperationError::terminal(e)) + } + } + } + async fn shutdown(&self) -> Result<(), OperationError> { Ok(()) } diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/check_seal.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/check_seal.rs index 068ee3b6f..49398117c 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/check_seal.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/check_seal.rs @@ -94,6 +94,7 @@ impl CheckSealTask { loglet_id = %my_params.loglet_id, status = %nodeset_checker, effective_nodeset = %effective_nodeset, + replication = %my_params.replication, "Insufficient nodes responded to GetLogletInfo requests, we cannot determine seal status, we'll assume it's unsealed for now", ); return Ok(CheckSealOutcome::ProbablyOpen); diff --git a/crates/types/src/config/common.rs b/crates/types/src/config/common.rs index 178c05d1c..412353f1f 100644 --- a/crates/types/src/config/common.rs +++ b/crates/types/src/config/common.rs @@ -233,7 +233,7 @@ pub struct CommonOptions { /// # Network error retry policy /// - /// The retry policy for node network error + /// The retry policy for network related errors pub network_error_retry_policy: RetryPolicy, } diff --git a/crates/types/src/logs/metadata.rs b/crates/types/src/logs/metadata.rs index 1668910b5..c4954de3a 100644 --- a/crates/types/src/logs/metadata.rs +++ b/crates/types/src/logs/metadata.rs @@ -194,7 +194,7 @@ impl TryFrom for NodeSetSelectionStrategy { #[derive(Debug, Clone, Eq, PartialEq, Default, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "kebab-case")] -pub enum DefaultProvider { +pub enum ProviderConfiguration { #[cfg(any(test, feature = "memory-loglet"))] InMemory, #[default] @@ -202,8 +202,8 @@ pub enum DefaultProvider { Replicated(ReplicatedLogletConfig), } -impl DefaultProvider { - pub fn as_provider_kind(&self) -> ProviderKind { +impl ProviderConfiguration { + pub fn kind(&self) -> ProviderKind { match self { #[cfg(any(test, feature = "memory-loglet"))] Self::InMemory => ProviderKind::InMemory, @@ -213,17 +213,17 @@ impl DefaultProvider { } } -impl From for crate::protobuf::cluster::DefaultProvider { - fn from(value: DefaultProvider) -> Self { +impl From for crate::protobuf::cluster::DefaultProvider { + fn from(value: ProviderConfiguration) -> Self { use crate::protobuf::cluster; let mut result = crate::protobuf::cluster::DefaultProvider::default(); match value { - DefaultProvider::Local => result.provider = ProviderKind::Local.to_string(), + ProviderConfiguration::Local => result.provider = ProviderKind::Local.to_string(), #[cfg(any(test, feature = "memory-loglet"))] - DefaultProvider::InMemory => result.provider = ProviderKind::InMemory.to_string(), - DefaultProvider::Replicated(config) => { + ProviderConfiguration::InMemory => result.provider = ProviderKind::InMemory.to_string(), + ProviderConfiguration::Replicated(config) => { result.provider = ProviderKind::Replicated.to_string(); result.replicated_config = Some(cluster::ReplicatedProviderConfig { replication_property: config.replication_property.to_string(), @@ -236,7 +236,7 @@ impl From for crate::protobuf::cluster::DefaultProvider { } } -impl TryFrom for DefaultProvider { +impl TryFrom for ProviderConfiguration { type Error = anyhow::Error; fn try_from(value: crate::protobuf::cluster::DefaultProvider) -> Result { let provider_kind: ProviderKind = value.provider.parse()?; @@ -272,7 +272,7 @@ pub struct ReplicatedLogletConfig { #[derive(Debug, Clone, Eq, PartialEq, Default, serde::Serialize, serde::Deserialize)] pub struct LogsConfiguration { - pub default_provider: DefaultProvider, + pub default_provider: ProviderConfiguration, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] @@ -325,12 +325,14 @@ impl TryFrom for Logs { // this means we are migrating from an older setup that had replication-property // hardcoded to {node:2} config = Some(LogsConfiguration { - default_provider: DefaultProvider::Replicated(ReplicatedLogletConfig { - nodeset_selection_strategy: NodeSetSelectionStrategy::default(), - replication_property: ReplicationProperty::new( - NonZeroU8::new(2).expect("2 is not 0"), - ), - }), + default_provider: ProviderConfiguration::Replicated( + ReplicatedLogletConfig { + nodeset_selection_strategy: NodeSetSelectionStrategy::default(), + replication_property: ReplicationProperty::new( + NonZeroU8::new(2).expect("2 is not 0"), + ), + }, + ), }) } } @@ -495,14 +497,16 @@ impl Logs { Self::with_logs_configuration(LogsConfiguration { default_provider: match config.bifrost.default_provider { #[cfg(any(test, feature = "memory-loglet"))] - ProviderKind::InMemory => DefaultProvider::InMemory, - ProviderKind::Local => DefaultProvider::Local, - ProviderKind::Replicated => DefaultProvider::Replicated(ReplicatedLogletConfig { - nodeset_selection_strategy: NodeSetSelectionStrategy::default(), - replication_property: ReplicationProperty::new( - NonZeroU8::new(1).expect("1 is not zero"), - ), - }), + ProviderKind::InMemory => ProviderConfiguration::InMemory, + ProviderKind::Local => ProviderConfiguration::Local, + ProviderKind::Replicated => { + ProviderConfiguration::Replicated(ReplicatedLogletConfig { + nodeset_selection_strategy: NodeSetSelectionStrategy::default(), + replication_property: ReplicationProperty::new( + NonZeroU8::new(1).expect("1 is not zero"), + ), + }) + } }, }) } diff --git a/tools/restatectl/src/commands/cluster/config.rs b/tools/restatectl/src/commands/cluster/config.rs index 620a2f488..b985b1278 100644 --- a/tools/restatectl/src/commands/cluster/config.rs +++ b/tools/restatectl/src/commands/cluster/config.rs @@ -16,7 +16,7 @@ use std::fmt::{self, Display, Write}; use cling::prelude::*; use restate_types::{ - logs::metadata::DefaultProvider, partition_table::ReplicationStrategy, + logs::metadata::ProviderConfiguration, partition_table::ReplicationStrategy, protobuf::cluster::ClusterConfiguration, }; @@ -43,7 +43,7 @@ fn cluster_config_string(config: ClusterConfiguration) -> anyhow::Result write_leaf(&mut w, 1, false, "Bifrost replication strategy", strategy)?; - let provider: DefaultProvider = config.default_provider.unwrap_or_default().try_into()?; + let provider: ProviderConfiguration = config.default_provider.unwrap_or_default().try_into()?; write_default_provider(&mut w, 1, provider)?; Ok(w) @@ -52,19 +52,19 @@ fn cluster_config_string(config: ClusterConfiguration) -> anyhow::Result fn write_default_provider( w: &mut W, depth: usize, - provider: DefaultProvider, + provider: ProviderConfiguration, ) -> Result<(), fmt::Error> { let title = "Bifrost Provider"; match provider { #[cfg(any(test, feature = "memory-loglet"))] - DefaultProvider::InMemory => { + ProviderConfiguration::InMemory => { write_leaf(w, depth, true, title, "in-memory")?; } - DefaultProvider::Local => { + ProviderConfiguration::Local => { write_leaf(w, depth, true, title, "local")?; } #[cfg(feature = "replicated-loglet")] - DefaultProvider::Replicated(config) => { + ProviderConfiguration::Replicated(config) => { write_leaf(w, depth, true, title, "replicated")?; let depth = depth + 1; write_leaf( diff --git a/tools/restatectl/src/commands/cluster/config/set.rs b/tools/restatectl/src/commands/cluster/config/set.rs index 5c23c01f2..abdece88c 100644 --- a/tools/restatectl/src/commands/cluster/config/set.rs +++ b/tools/restatectl/src/commands/cluster/config/set.rs @@ -23,7 +23,7 @@ use restate_cli_util::_comfy_table::{Cell, Color, Table}; use restate_cli_util::c_println; use restate_cli_util::ui::console::{confirm_or_exit, StyledTable}; use restate_types::logs::metadata::{ - DefaultProvider, NodeSetSelectionStrategy, ProviderKind, ReplicatedLogletConfig, + NodeSetSelectionStrategy, ProviderConfiguration, ProviderKind, ReplicatedLogletConfig, }; use restate_types::partition_table::ReplicationStrategy; use restate_types::replicated_loglet::ReplicationProperty; @@ -89,8 +89,8 @@ async fn config_set(connection: &ConnectionInfo, set_opts: &ConfigSetOpts) -> an if let Some(provider) = set_opts.bifrost_provider { let default_provider = match provider { - ProviderKind::InMemory => DefaultProvider::InMemory, - ProviderKind::Local => DefaultProvider::Local, + ProviderKind::InMemory => ProviderConfiguration::InMemory, + ProviderKind::Local => ProviderConfiguration::Local, ProviderKind::Replicated => { let config = ReplicatedLogletConfig { replication_property: set_opts @@ -101,7 +101,7 @@ async fn config_set(connection: &ConnectionInfo, set_opts: &ConfigSetOpts) -> an .nodeset_selection_strategy .unwrap_or_default(), }; - DefaultProvider::Replicated(config) + ProviderConfiguration::Replicated(config) } };