From faff5b95475ca81c2a991c39f6f4e8ff5abd417a Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Thu, 19 Sep 2024 13:44:28 +0100 Subject: [PATCH] [Bifrost] SpreadSelector initial implementation Implements a flood selector (selects all writeable nodes in the effective nodeset). --- Cargo.lock | 1 + crates/bifrost/Cargo.toml | 1 + .../replicated_loglet/replication/mod.rs | 1 + .../replication/spread_selector.rs | 223 ++++++++++++++++++ crates/types/src/replicated_loglet/mod.rs | 2 + crates/types/src/replicated_loglet/spread.rs | 50 ++++ 6 files changed, 278 insertions(+) create mode 100644 crates/bifrost/src/providers/replicated_loglet/replication/spread_selector.rs create mode 100644 crates/types/src/replicated_loglet/spread.rs diff --git a/Cargo.lock b/Cargo.lock index d16ebc8fe..d75323a1e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5502,6 +5502,7 @@ dependencies = [ "parking_lot", "paste", "pin-project", + "rand", "restate-core", "restate-metadata-store", "restate-rocksdb", diff --git a/crates/bifrost/Cargo.toml b/crates/bifrost/Cargo.toml index ba5e6a229..65b0b4143 100644 --- a/crates/bifrost/Cargo.toml +++ b/crates/bifrost/Cargo.toml @@ -28,6 +28,7 @@ futures = { workspace = true } metrics = { workspace = true } parking_lot = { workspace = true } pin-project = { workspace = true } +rand = { workspace = true } rocksdb = { workspace = true } serde = { workspace = true } smallvec = { workspace = true } diff --git a/crates/bifrost/src/providers/replicated_loglet/replication/mod.rs b/crates/bifrost/src/providers/replicated_loglet/replication/mod.rs index 32a998bd1..3573a7722 100644 --- a/crates/bifrost/src/providers/replicated_loglet/replication/mod.rs +++ b/crates/bifrost/src/providers/replicated_loglet/replication/mod.rs @@ -8,5 +8,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. mod checker; +pub mod spread_selector; pub use checker::*; diff --git a/crates/bifrost/src/providers/replicated_loglet/replication/spread_selector.rs b/crates/bifrost/src/providers/replicated_loglet/replication/spread_selector.rs new file mode 100644 index 000000000..2bf22ab21 --- /dev/null +++ b/crates/bifrost/src/providers/replicated_loglet/replication/spread_selector.rs @@ -0,0 +1,223 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +#[cfg(any(test, feature = "test-util"))] +use std::sync::Arc; + +#[cfg(any(test, feature = "test-util"))] +use parking_lot::Mutex; +use rand::prelude::*; + +use restate_types::nodes_config::NodesConfiguration; +use restate_types::replicated_loglet::{NodeSet, ReplicationProperty, Spread}; + +use crate::providers::replicated_loglet::replication::NodeSetChecker; + +#[derive(Debug, Clone, thiserror::Error)] +pub enum SpreadSelectorError { + #[error("Insufficient writeable nodes in the nodeset")] + InsufficientWriteableNodes, +} + +#[derive(Debug)] +pub enum SelectorStrategy { + /// Selects all writeable nodes in the nodeset, this might lead to over-replication, + /// and it's up to the appender state machine to continue replicating beyond the + /// write-quorum requirements or not. + Flood, + #[cfg(any(test, feature = "test-util"))] + /// Used in testing, generates deterministically static spreads + Fixed(FixedSpreadSelector), +} + +/// Spread selector is thread-safe and can be used concurrently. +pub struct SpreadSelector { + nodeset: NodeSet, + strategy: SelectorStrategy, + replication_property: ReplicationProperty, +} + +impl SpreadSelector { + pub fn new( + nodeset: NodeSet, + strategy: SelectorStrategy, + replication_property: ReplicationProperty, + ) -> Self { + Self { + nodeset, + strategy, + replication_property, + } + } + + /// Generates a spread or fails if it's not possible to generate a spread out of + /// the nodeset modulo the non-writeable nodes in the nodes configuration and after excluding + /// the set of nodes passed in `exclude_nodes`. + /// + /// The selector avoids nodes non-writeable nodes + pub fn select( + &self, + rng: &mut R, + nodes_config: &NodesConfiguration, + exclude_nodes: &NodeSet, + ) -> Result { + // Get the list of non-empty nodes from the nodeset given the nodes configuration + let effective_nodeset = self.nodeset.to_effective(nodes_config); + let mut writeable_nodes: Vec<_> = effective_nodeset + .into_iter() + .filter(|node_id| !exclude_nodes.contains(node_id)) + .filter(|node_id| { + nodes_config + .get_log_server_storage_state(node_id) + .can_write_to() + }) + .collect(); + if writeable_nodes.len() < self.replication_property.num_copies().into() { + return Err(SpreadSelectorError::InsufficientWriteableNodes); + } + + let selected: Spread = match &self.strategy { + SelectorStrategy::Flood => { + writeable_nodes.shuffle(rng); + Spread::from(writeable_nodes) + } + #[cfg(any(test, feature = "test-util"))] + SelectorStrategy::Fixed(selector) => selector.select()?, + }; + + // validate that we can have write quorum with this spread + let mut checker = + NodeSetChecker::new(&self.nodeset, nodes_config, &self.replication_property); + checker.set_attribute_on_each(&selected, || true); + if !checker.check_write_quorum(|attr| *attr) { + return Err(SpreadSelectorError::InsufficientWriteableNodes); + } + + Ok(selected) + } +} + +static_assertions::assert_impl_all!(SpreadSelector: Send, Sync); + +#[cfg(any(test, feature = "test-util"))] +#[derive(Debug, Clone)] +pub struct FixedSpreadSelector { + pub result: Arc>>, +} + +#[cfg(any(test, feature = "test-util"))] +impl FixedSpreadSelector { + pub fn select(&self) -> Result { + let guard = self.result.lock(); + (*guard).clone() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use googletest::prelude::*; + + use restate_types::nodes_config::StorageState; + use restate_types::PlainNodeId; + + use crate::providers::replicated_loglet::test_util::generate_logserver_nodes_config; + + #[test] + fn test_with_fixed_spread_selector() -> Result<()> { + let nodes_config = generate_logserver_nodes_config(10, StorageState::ReadWrite); + let replication = ReplicationProperty::new(3.try_into().unwrap()); + let nodeset: NodeSet = (1..=5).collect(); + + // smoke test + let strategy = FixedSpreadSelector { + result: Arc::new(Mutex::new(Ok(Spread::from([1, 2, 3])))), + }; + let selector = SpreadSelector::new( + nodeset, + SelectorStrategy::Fixed(strategy.clone()), + replication, + ); + let mut rng = rand::thread_rng(); + let spread = selector.select(&mut rng, &nodes_config, &NodeSet::empty())?; + assert_that!(spread, eq(Spread::from([1, 2, 3]))); + + // Fixed selector ignores exclude nodes as long as sufficient nodes are passed down + let spread = selector.select(&mut rng, &nodes_config, &NodeSet::from([1]))?; + assert_that!(spread, eq(Spread::from([1, 2, 3]))); + + // No sufficient nodes to select from if nodes config is too small or sufficient nodes are + // excluded to make the effective nodeset too small + // + // only 2 nodes left in the nodeset + let spread = selector.select(&mut rng, &nodes_config, &NodeSet::from([1, 2, 3])); + assert_that!( + spread, + err(pat!(SpreadSelectorError::InsufficientWriteableNodes)) + ); + + let nodes_config = generate_logserver_nodes_config(2, StorageState::ReadWrite); + let replication = ReplicationProperty::new(3.try_into().unwrap()); + let nodeset: NodeSet = (1..=3).collect(); + let selector = SpreadSelector::new(nodeset, SelectorStrategy::Fixed(strategy), replication); + + let spread = selector.select(&mut rng, &nodes_config, &NodeSet::empty()); + assert_that!( + spread, + err(pat!(SpreadSelectorError::InsufficientWriteableNodes)) + ); + + Ok(()) + } + + #[test] + fn test_flood_spread_selector() -> Result<()> { + let nodes_config = generate_logserver_nodes_config(10, StorageState::ReadWrite); + let replication = ReplicationProperty::new(3.try_into().unwrap()); + let nodeset: NodeSet = (1..=5).collect(); + + let selector = SpreadSelector::new(nodeset, SelectorStrategy::Flood, replication); + let mut rng = rand::thread_rng(); + let spread = selector.select(&mut rng, &nodes_config, &NodeSet::empty())?; + let spread = spread.to_vec(); + + assert_that!( + spread, + unordered_elements_are![ + eq(PlainNodeId::new(1)), + eq(PlainNodeId::new(2)), + eq(PlainNodeId::new(3)), + eq(PlainNodeId::new(4)), + eq(PlainNodeId::new(5)) + ] + ); + + let spread = selector.select(&mut rng, &nodes_config, &NodeSet::from([1, 4]))?; + let spread = spread.to_vec(); + + assert_that!( + spread, + unordered_elements_are![ + eq(PlainNodeId::new(2)), + eq(PlainNodeId::new(3)), + eq(PlainNodeId::new(5)) + ] + ); + + let spread = selector.select(&mut rng, &nodes_config, &NodeSet::from([1, 4, 2])); + assert_that!( + spread, + err(pat!(SpreadSelectorError::InsufficientWriteableNodes)) + ); + + Ok(()) + } +} diff --git a/crates/types/src/replicated_loglet/mod.rs b/crates/types/src/replicated_loglet/mod.rs index e5f89496c..189293dfb 100644 --- a/crates/types/src/replicated_loglet/mod.rs +++ b/crates/types/src/replicated_loglet/mod.rs @@ -10,6 +10,8 @@ mod params; mod replication_property; +mod spread; pub use params::*; pub use replication_property::*; +pub use spread::*; diff --git a/crates/types/src/replicated_loglet/spread.rs b/crates/types/src/replicated_loglet/spread.rs new file mode 100644 index 000000000..67629ed3f --- /dev/null +++ b/crates/types/src/replicated_loglet/spread.rs @@ -0,0 +1,50 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::PlainNodeId; + +#[derive( + Debug, + Clone, + PartialEq, + Eq, + Hash, + derive_more::Deref, + derive_more::DerefMut, + derive_more::IntoIterator, + derive_more::From, + derive_more::Index, + derive_more::IndexMut, +)] +pub struct Spread(Box<[PlainNodeId]>); + +impl From> for Spread { + fn from(v: Vec) -> Self { + Self(v.into_boxed_slice()) + } +} + +impl From> for Spread { + fn from(v: Vec) -> Self { + Self(v.into_iter().map(PlainNodeId::from).collect()) + } +} + +impl From<[PlainNodeId; N]> for Spread { + fn from(value: [PlainNodeId; N]) -> Self { + Self(From::from(value)) + } +} + +impl From<[u32; N]> for Spread { + fn from(value: [u32; N]) -> Self { + Self(value.into_iter().map(PlainNodeId::from).collect()) + } +}