From 8571725a991a6771569ffb414a122a2198537a1a Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Thu, 19 Sep 2024 12:49:56 +0100 Subject: [PATCH 1/2] [Bifrost] Initial NodeSetChecker implementation This introduces ReplicationChecker utility that performs write and f-majority quorum checks. Note that the current implementation is not LocationScope aware and will only consider node-level scope. The utility is designed to allow attaching any attribute type to nodes in the nodeset and run the check against the set of nodes that match a predicate against those attributes. --- Cargo.lock | 1 + crates/bifrost/Cargo.toml | 1 + .../src/providers/replicated_loglet/mod.rs | 3 + .../replicated_loglet/replication/checker.rs | 379 ++++++++++++++++++ .../replicated_loglet/replication/mod.rs | 12 + .../providers/replicated_loglet/test_util.rs | 40 ++ crates/types/src/nodes_config.rs | 36 ++ crates/types/src/replicated_loglet/params.rs | 98 ++++- 8 files changed, 564 insertions(+), 6 deletions(-) create mode 100644 crates/bifrost/src/providers/replicated_loglet/replication/checker.rs create mode 100644 crates/bifrost/src/providers/replicated_loglet/replication/mod.rs create mode 100644 crates/bifrost/src/providers/replicated_loglet/test_util.rs diff --git a/Cargo.lock b/Cargo.lock index 7df62ae99..d16ebc8fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5495,6 +5495,7 @@ dependencies = [ "criterion", "derive_more", "enum-map", + "enumset", "futures", "googletest", "metrics", diff --git a/crates/bifrost/Cargo.toml b/crates/bifrost/Cargo.toml index 3d36b2e8e..ba5e6a229 100644 --- a/crates/bifrost/Cargo.toml +++ b/crates/bifrost/Cargo.toml @@ -45,6 +45,7 @@ restate-test-util = { workspace = true } restate-types = { workspace = true, features = ["test-util"] } criterion = { workspace = true, features = ["async_tokio"] } +enumset = { workspace = true } googletest = { workspace = true, features = ["anyhow"] } paste = { workspace = true } rlimit = { workspace = true } diff --git a/crates/bifrost/src/providers/replicated_loglet/mod.rs b/crates/bifrost/src/providers/replicated_loglet/mod.rs index 55e281a88..c4e599060 100644 --- a/crates/bifrost/src/providers/replicated_loglet/mod.rs +++ b/crates/bifrost/src/providers/replicated_loglet/mod.rs @@ -10,5 +10,8 @@ pub(crate) mod metric_definitions; mod provider; +pub mod replication; +#[cfg(any(test, feature = "test-util"))] +pub mod test_util; pub use provider::Factory; diff --git a/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs b/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs new file mode 100644 index 000000000..d85b63741 --- /dev/null +++ b/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs @@ -0,0 +1,379 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::collections::HashMap; + +use restate_types::nodes_config::{NodesConfiguration, StorageState}; +use restate_types::replicated_loglet::{NodeSet, ReplicationProperty}; +use restate_types::PlainNodeId; + +/// NodeSetChecker maintains a set of nodes that can be tagged with +/// an attribute, and provides an API for querying the replication properties of +/// the subset of nodes with a certain values for this attribute, given a +/// replication requirement across several failure domains. +/// +/// The checker is created with default value of Attribute set on all nodes. +/// +/// **NOTE:** Currently, this will not perform any failure-domain-aware quorum +/// checks, this will be implemented in the near future. +/// +/// The utility provides two methods: +/// - `check_write_quorum()`: Can be used to check if it'd be possible to replicate a record on the +/// subset of nodes that have a certain value for the attribute +/// - `check_fmajority()`: Used to check if enough nodes have certain values for the +/// attribute so that that set of nodes is an f-majority for at +/// least one of the scope for which there is a replication +/// requirement. +/// An example usage of this method is during loglet seal which +/// each node gets tagged with "SEALED" if that node has been sealed. +/// The seal operation is able to know if it can consider the seal +/// to be completed or not. +/// +/// Note that this doesn't track changes that happen to the storage-states after instantiation. +/// For a fresh view, rebuild this with a new nodes configuration. +pub struct NodeSetChecker<'a, Attribute> { + node_attribute: HashMap, + /// Mapping between node-id and its log-server storage state + storage_states: HashMap, + replication_property: &'a ReplicationProperty, +} + +impl<'a, Attribute> NodeSetChecker<'a, Attribute> { + // Note that this doesn't track changes that happen to the storage-states after instantiation. + // For a fresh view, rebuild this with a new nodes configuration. + pub fn new( + nodeset: &NodeSet, + nodes_config: &NodesConfiguration, + replication_property: &'a ReplicationProperty, + ) -> Self + where + Attribute: Default, + { + Self::with_factory(nodeset, nodes_config, replication_property, |_| { + Default::default() + }) + } + + pub fn with_factory( + nodeset: &NodeSet, + nodes_config: &NodesConfiguration, + replication_property: &'a ReplicationProperty, + attribute_factory: impl Fn(PlainNodeId) -> Attribute, + ) -> Self { + let storage_states: HashMap<_, _> = nodeset + .iter() + .filter_map(|n| { + match nodes_config.get_log_server_storage_state(n) { + // storage states. Only include nodes that enable reads or above. + storage_state if !storage_state.empty() => Some((*n, storage_state)), + // node is not readable or doesn't exist. Treat as DISABLED + _ => None, + } + }) + .collect(); + + let node_attribute: HashMap<_, _> = storage_states + .keys() + .map(|node_id| (*node_id, attribute_factory(*node_id))) + .collect(); + + Self { + node_attribute, + storage_states, + replication_property, + } + } + + pub fn len(&self) -> usize { + self.node_attribute.len() + } + + pub fn is_empty(&self) -> bool { + self.node_attribute.is_empty() + } + + /// Set the attribute value of a node. Note that a node can only be + /// associated with one attribute value at a time, so if the node has an + /// existing attribute value, the value will be cleared. + /// + /// Returns the old attribute if it was set + pub fn set_attribute( + &mut self, + node_id: PlainNodeId, + attribute: Attribute, + ) -> Option { + // ignore if the node is not in the original nodeset + if self.storage_states.contains_key(&node_id) { + self.node_attribute.insert(node_id, attribute) + } else { + None + } + } + + pub fn set_attribute_on_each(&mut self, nodes: &[PlainNodeId], f: impl Fn() -> Attribute) { + for node in nodes { + // ignore if the node is not in the original nodeset + if self.storage_states.contains_key(node) { + self.node_attribute.insert(*node, f()); + } + } + } + + pub fn remove_attribute(&mut self, node_id: &PlainNodeId) { + self.node_attribute.remove(node_id); + } + + pub fn get_attribute(&mut self, node_id: &PlainNodeId) -> Option<&Attribute> { + self.node_attribute.get(node_id) + } + + /// Check if nodes that match the predicate meet the write-quorum rules according to the + /// replication property. For instance, if replication property is set to {node: 3, zone: 2} + /// then this function will return `True` if nodes that match the predicate are spread across 2 + /// zones. + pub fn check_write_quorum(&self, predicate: Predicate) -> bool + where + Predicate: Fn(&Attribute) -> bool, + { + let filtered = self.node_attribute.iter().filter(|(node_id, v)| { + predicate(v) + && self + .storage_states + .get(node_id) + .expect("node must be in node-set") + // only consider nodes that are writeable. + .can_write_to() + }); + // todo(asoli): Location-aware quorum check + filtered.count() >= self.replication_property.num_copies().into() + } + + /// Check if enough nodes have certain values for the attribute so that that + /// set of nodes is an f-majority for at least one of the scope for which + /// there is a replication requirement. + /// + /// Two ways to form a mental model about this: + /// 1) Nodes that match the predicate (storage-state considered) will form an f-majority. + /// 2) Do we lose quorum-read availability if we lost all nodes that match the predicate? + pub fn check_fmajority(&self, predicate: Predicate) -> FMajorityResult + where + Predicate: Fn(&Attribute) -> bool, + { + let filtered = self + .node_attribute + .iter() + .filter(|(_, v)| predicate(v)) + // `node_attribute` nodes must be in storage_states + .map(|(node_id, _)| self.storage_states.get(node_id).unwrap()); + + let mut authoritative = 0; + let mut non_authoritative = 0; + for state in filtered { + // at the moment, data-loss is the only non-authoritative state + if state.is_data_loss() { + non_authoritative += 1; + } else { + authoritative += 1; + } + } + + // todo(asoli): Location-aware quorum check + let fmajority_requires: usize = + self.storage_states.len() - usize::from(self.replication_property.num_copies()) + 1; + + if non_authoritative + authoritative < fmajority_requires { + // not enough nodes to form an f-majority + return FMajorityResult::None; + } + + if non_authoritative > 0 { + // either BestEffort or SuccessWithRisk depends on how many authoritative nodes + if authoritative >= fmajority_requires { + return FMajorityResult::SuccessWithRisk; + } + return FMajorityResult::BestEffort; + } + FMajorityResult::Success + } +} + +/// Possible results of f-majority checks for a subset of the NodeSet. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FMajorityResult { + /// The subset of nodes neither satisfies the authoritative f-majority + /// property, nor does it contain all authoritative nodes. + /// + /// * Bad. No f-majority is possible. + None, + /// there are enough node with `DataLoss` that prevent us from + /// authoritatively deciding the replication state of records. Because of this, + /// the subset of nodes does not satisfy the authoritative f-majority property + /// However, the subset of nodes already contain all authoritative + /// nodes in the NodeSet. As a best effort, the subset of nodes is + /// considered to be non-authoritative f-majority. + /// * Bad but with chance of success + BestEffort, + /// the subset of nodes satisfy the authoritative f-majority property, and it + /// has _all_ authoritative nodes in the NodeSet. + /// + /// * Good + Success, + /// the subset of nodes satisfies the authoritative f-majority property, and + /// it has suffcient but _not_ all authoritative nodes in the + /// NodeSet. + /// + /// * Good + SuccessWithRisk, +} + +impl FMajorityResult { + pub fn passed(&self) -> bool { + matches!( + self, + FMajorityResult::Success | FMajorityResult::SuccessWithRisk + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use googletest::prelude::*; + + use restate_types::Version; + + use crate::providers::replicated_loglet::test_util::{ + generate_logserver_node, generate_logserver_nodes_config, + }; + + #[test] + fn test_replication_checker_basics() -> Result<()> { + // all_authoritative + let nodes_config = generate_logserver_nodes_config(10, StorageState::ReadWrite); + + let nodeset: NodeSet = (1..=5).collect(); + let replication = ReplicationProperty::new(3.try_into().unwrap()); + let mut checker: NodeSetChecker = + NodeSetChecker::new(&nodeset, &nodes_config, &replication); + + // all nodes in the nodeset are authoritative + assert_that!(checker.len(), eq(5)); + // all nodes are false by default. Can't establish write quorum. + assert_that!(checker.check_write_quorum(|attr| *attr), eq(false)); + + checker.set_attribute_on_each( + &[ + PlainNodeId::new(1), + PlainNodeId::new(2), + PlainNodeId::new(4), + ], + || true, + ); + // all nodes are false by default. Can't establish write-quorum. + assert_that!(checker.check_write_quorum(|attr| *attr), eq(true)); + + // 2 nodes are false in this node-set, not enough for write-quorum + assert_that!(checker.check_write_quorum(|attr| !(*attr)), eq(false)); + + assert_that!( + checker.check_fmajority(|attr| *attr), + eq(FMajorityResult::Success) + ); + + // we only have 2 nodes with false, impossible to achieve fmajority. + assert_that!( + checker.check_fmajority(|attr| !(*attr)), + eq(FMajorityResult::None) + ); + Ok(()) + } + + #[test] + fn test_replication_checker_mixed() -> Result<()> { + let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned()); + nodes_config.upsert_node(generate_logserver_node(1, StorageState::Disabled)); + nodes_config.upsert_node(generate_logserver_node(2, StorageState::ReadWrite)); + nodes_config.upsert_node(generate_logserver_node(3, StorageState::ReadOnly)); + nodes_config.upsert_node(generate_logserver_node(4, StorageState::ReadWrite)); + nodes_config.upsert_node(generate_logserver_node(5, StorageState::DataLoss)); + nodes_config.upsert_node(generate_logserver_node(6, StorageState::DataLoss)); + + // effective will be [2-6] because 1 is disabled (authoritatively drained) + let nodeset: NodeSet = (1..=6).collect(); + let replication = ReplicationProperty::new(3.try_into().unwrap()); + let mut checker: NodeSetChecker = + NodeSetChecker::new(&nodeset, &nodes_config, &replication); + + // 1 is removed + assert_that!(checker.len(), eq(5)); + + checker.set_attribute_on_each( + &[ + // validates that we actually ignore this + PlainNodeId::new(1), + PlainNodeId::new(2), + PlainNodeId::new(3), + PlainNodeId::new(4), + PlainNodeId::new(5), + ], + || true, + ); + // we cannot write on nodes 3, 5. This should fail the write quorum check because we only have + // 2 nodes that pass the predicate *and* are writeable (2, 4) and we need 3 for replication. + assert_that!(checker.check_write_quorum(|attr| *attr), eq(false)); + + // do we have f-majority? + // [nodeset] 2 3 4 5 6 + // [predicate] x x x x x + // [storage-state] RW RO RW DL DL + // + // We need 3 nodes of authoritative nodes for successful f-majority. Yes, we have them (2, 3, 4). + // But some nodes are non-authoritative, so we should observe + // FMajorityResult::SuccessWithRisk + assert_that!( + checker.check_fmajority(|attr| *attr), + eq(FMajorityResult::SuccessWithRisk) + ); + + // Can we lose Node 3? No. + checker.set_attribute(PlainNodeId::new(3), false); + + assert_that!( + checker.check_fmajority(|attr| *attr), + eq(FMajorityResult::BestEffort) + ); + assert!(!checker.check_fmajority(|attr| *attr).passed()); + + Ok(()) + } + + #[test] + fn test_replication_single_copy_single_node() -> Result<()> { + let nodes_config = generate_logserver_nodes_config(1, StorageState::ReadWrite); + + let replication = ReplicationProperty::new(1.try_into().unwrap()); + let mut checker: NodeSetChecker = NodeSetChecker::new( + &NodeSet::from_single(PlainNodeId::new(1)), + &nodes_config, + &replication, + ); + assert_that!(checker.len(), eq(1)); + checker.set_attribute(PlainNodeId::new(1), true); + + assert_that!(checker.check_write_quorum(|attr| *attr), eq(true)); + assert_that!( + checker.check_fmajority(|attr| *attr), + eq(FMajorityResult::Success) + ); + + Ok(()) + } +} diff --git a/crates/bifrost/src/providers/replicated_loglet/replication/mod.rs b/crates/bifrost/src/providers/replicated_loglet/replication/mod.rs new file mode 100644 index 000000000..32a998bd1 --- /dev/null +++ b/crates/bifrost/src/providers/replicated_loglet/replication/mod.rs @@ -0,0 +1,12 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. +mod checker; + +pub use checker::*; diff --git a/crates/bifrost/src/providers/replicated_loglet/test_util.rs b/crates/bifrost/src/providers/replicated_loglet/test_util.rs new file mode 100644 index 000000000..b2640469a --- /dev/null +++ b/crates/bifrost/src/providers/replicated_loglet/test_util.rs @@ -0,0 +1,40 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use restate_types::nodes_config::{ + LogServerConfig, NodeConfig, NodesConfiguration, Role, StorageState, +}; +use restate_types::{GenerationalNodeId, PlainNodeId, Version}; + +pub fn generate_logserver_node( + id: impl Into, + storage_state: StorageState, +) -> NodeConfig { + let id: PlainNodeId = id.into(); + NodeConfig::new( + format!("node-{}", id), + GenerationalNodeId::new(id.into(), 1), + format!("unix:/tmp/my_socket-{}", id).parse().unwrap(), + Role::LogServer.into(), + LogServerConfig { storage_state }, + ) +} + +pub fn generate_logserver_nodes_config( + num_nodes: u32, + storage_state: StorageState, +) -> NodesConfiguration { + let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned()); + // all_authoritative + for i in 1..=num_nodes { + nodes_config.upsert_node(generate_logserver_node(i, storage_state)); + } + nodes_config +} diff --git a/crates/types/src/nodes_config.rs b/crates/types/src/nodes_config.rs index 784604fc8..ab6905d7d 100644 --- a/crates/types/src/nodes_config.rs +++ b/crates/types/src/nodes_config.rs @@ -190,6 +190,18 @@ impl NodesConfiguration { self.find_node_by_id(*id).ok() } + /// Returns [`StorageState::Disabled`] if a node is deleted or unrecognized + pub fn get_log_server_storage_state(&self, node_id: &PlainNodeId) -> StorageState { + let maybe = self.nodes.get(node_id); + let Some(maybe) = maybe else { + return StorageState::Disabled; + }; + match maybe { + MaybeNode::Tombstone => StorageState::Disabled, + MaybeNode::Node(found) => found.log_server_config.storage_state, + } + } + /// Returns _an_ admin node. pub fn get_admin_node(&self) -> Option<&NodeConfig> { self.nodes.values().find_map(|maybe| match maybe { @@ -229,6 +241,7 @@ impl Versioned for NodesConfiguration { PartialEq, Ord, PartialOrd, + derive_more::IsVariant, serde::Serialize, serde::Deserialize, strum::Display, @@ -270,6 +283,29 @@ pub enum StorageState { DataLoss, } +impl StorageState { + pub fn can_write_to(&self) -> bool { + use StorageState::*; + match self { + Provisioning | Disabled | ReadOnly | DataLoss => false, + ReadWrite => true, + } + } + + pub fn should_read_from(&self) -> bool { + use StorageState::*; + match self { + ReadOnly | ReadWrite | DataLoss => true, + Provisioning | Disabled => false, + } + } + + /// Empty nodes are automatically excluded from node sets. + pub fn empty(&self) -> bool { + matches!(self, StorageState::Provisioning | StorageState::Disabled) + } +} + #[derive(Clone, Default, Debug, Eq, PartialEq, serde::Serialize, serde::Deserialize)] pub struct LogServerConfig { pub storage_state: StorageState, diff --git a/crates/types/src/replicated_loglet/params.rs b/crates/types/src/replicated_loglet/params.rs index c6d53b91a..70b792588 100644 --- a/crates/types/src/replicated_loglet/params.rs +++ b/crates/types/src/replicated_loglet/params.rs @@ -12,6 +12,7 @@ use std::collections::HashSet; use serde_with::DisplayFromStr; +use crate::nodes_config::NodesConfiguration; use crate::{GenerationalNodeId, PlainNodeId}; use super::ReplicationProperty; @@ -73,21 +74,43 @@ impl ReplicatedLogletId { } #[serde_with::serde_as] -#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Eq, PartialEq)] +#[derive( + serde::Serialize, + serde::Deserialize, + Debug, + Clone, + Eq, + PartialEq, + derive_more::IntoIterator, + derive_more::From, +)] pub struct NodeSet(#[serde_as(as = "HashSet")] HashSet); impl NodeSet { + pub fn empty() -> Self { + Self(HashSet::new()) + } + pub fn from_single(node: PlainNodeId) -> Self { let mut set = HashSet::new(); set.insert(node); Self(set) } - pub fn len(&self) -> u8 { - self.0 - .len() - .try_into() - .expect("nodeset cannot exceed 255 nodes") + pub fn len(&self) -> usize { + self.0.len() + } + + pub fn contains(&self, node: &PlainNodeId) -> bool { + self.0.contains(node) + } + + pub fn insert(&mut self, node: PlainNodeId) { + self.0.insert(node); + } + + pub fn remove(&mut self, node: &PlainNodeId) { + self.0.remove(node); } pub fn is_empty(&self) -> bool { @@ -97,4 +120,67 @@ impl NodeSet { pub fn iter(&self) -> impl Iterator { self.0.iter() } + + /// Filters out nodes that are not part of the effective nodeset (empty nodes) + pub fn to_effective(&self, nodes_config: &NodesConfiguration) -> EffectiveNodeSet { + EffectiveNodeSet::new(self, nodes_config) + } +} + +impl From<[PlainNodeId; N]> for NodeSet { + fn from(value: [PlainNodeId; N]) -> Self { + Self(From::from(value)) + } +} + +impl From<[u32; N]> for NodeSet { + fn from(value: [u32; N]) -> Self { + Self(value.into_iter().map(PlainNodeId::from).collect()) + } +} + +impl From for Vec { + fn from(value: NodeSet) -> Self { + value.0.into_iter().collect() + } +} + +impl From for Box<[PlainNodeId]> { + fn from(value: NodeSet) -> Self { + value.0.into_iter().collect() + } +} + +impl> FromIterator for NodeSet { + fn from_iter>(iter: T) -> Self { + Self(HashSet::from_iter(iter.into_iter().map(Into::into))) + } +} + +#[serde_with::serde_as] +#[derive( + serde::Serialize, + serde::Deserialize, + Debug, + Clone, + Eq, + PartialEq, + derive_more::Deref, + derive_more::AsRef, + derive_more::DerefMut, + derive_more::IntoIterator, + derive_more::Into, +)] +pub struct EffectiveNodeSet(NodeSet); + +impl EffectiveNodeSet { + pub fn new(nodeset: &NodeSet, nodes_config: &NodesConfiguration) -> Self { + Self( + nodeset + .iter() + .copied() + .filter(|node_id| !nodes_config.get_log_server_storage_state(node_id).empty()) + .collect(), + ) + } } From faff5b95475ca81c2a991c39f6f4e8ff5abd417a Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Thu, 19 Sep 2024 13:44:28 +0100 Subject: [PATCH 2/2] [Bifrost] SpreadSelector initial implementation Implements a flood selector (selects all writeable nodes in the effective nodeset). --- Cargo.lock | 1 + crates/bifrost/Cargo.toml | 1 + .../replicated_loglet/replication/mod.rs | 1 + .../replication/spread_selector.rs | 223 ++++++++++++++++++ crates/types/src/replicated_loglet/mod.rs | 2 + crates/types/src/replicated_loglet/spread.rs | 50 ++++ 6 files changed, 278 insertions(+) create mode 100644 crates/bifrost/src/providers/replicated_loglet/replication/spread_selector.rs create mode 100644 crates/types/src/replicated_loglet/spread.rs diff --git a/Cargo.lock b/Cargo.lock index d16ebc8fe..d75323a1e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5502,6 +5502,7 @@ dependencies = [ "parking_lot", "paste", "pin-project", + "rand", "restate-core", "restate-metadata-store", "restate-rocksdb", diff --git a/crates/bifrost/Cargo.toml b/crates/bifrost/Cargo.toml index ba5e6a229..65b0b4143 100644 --- a/crates/bifrost/Cargo.toml +++ b/crates/bifrost/Cargo.toml @@ -28,6 +28,7 @@ futures = { workspace = true } metrics = { workspace = true } parking_lot = { workspace = true } pin-project = { workspace = true } +rand = { workspace = true } rocksdb = { workspace = true } serde = { workspace = true } smallvec = { workspace = true } diff --git a/crates/bifrost/src/providers/replicated_loglet/replication/mod.rs b/crates/bifrost/src/providers/replicated_loglet/replication/mod.rs index 32a998bd1..3573a7722 100644 --- a/crates/bifrost/src/providers/replicated_loglet/replication/mod.rs +++ b/crates/bifrost/src/providers/replicated_loglet/replication/mod.rs @@ -8,5 +8,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. mod checker; +pub mod spread_selector; pub use checker::*; diff --git a/crates/bifrost/src/providers/replicated_loglet/replication/spread_selector.rs b/crates/bifrost/src/providers/replicated_loglet/replication/spread_selector.rs new file mode 100644 index 000000000..2bf22ab21 --- /dev/null +++ b/crates/bifrost/src/providers/replicated_loglet/replication/spread_selector.rs @@ -0,0 +1,223 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +#[cfg(any(test, feature = "test-util"))] +use std::sync::Arc; + +#[cfg(any(test, feature = "test-util"))] +use parking_lot::Mutex; +use rand::prelude::*; + +use restate_types::nodes_config::NodesConfiguration; +use restate_types::replicated_loglet::{NodeSet, ReplicationProperty, Spread}; + +use crate::providers::replicated_loglet::replication::NodeSetChecker; + +#[derive(Debug, Clone, thiserror::Error)] +pub enum SpreadSelectorError { + #[error("Insufficient writeable nodes in the nodeset")] + InsufficientWriteableNodes, +} + +#[derive(Debug)] +pub enum SelectorStrategy { + /// Selects all writeable nodes in the nodeset, this might lead to over-replication, + /// and it's up to the appender state machine to continue replicating beyond the + /// write-quorum requirements or not. + Flood, + #[cfg(any(test, feature = "test-util"))] + /// Used in testing, generates deterministically static spreads + Fixed(FixedSpreadSelector), +} + +/// Spread selector is thread-safe and can be used concurrently. +pub struct SpreadSelector { + nodeset: NodeSet, + strategy: SelectorStrategy, + replication_property: ReplicationProperty, +} + +impl SpreadSelector { + pub fn new( + nodeset: NodeSet, + strategy: SelectorStrategy, + replication_property: ReplicationProperty, + ) -> Self { + Self { + nodeset, + strategy, + replication_property, + } + } + + /// Generates a spread or fails if it's not possible to generate a spread out of + /// the nodeset modulo the non-writeable nodes in the nodes configuration and after excluding + /// the set of nodes passed in `exclude_nodes`. + /// + /// The selector avoids nodes non-writeable nodes + pub fn select( + &self, + rng: &mut R, + nodes_config: &NodesConfiguration, + exclude_nodes: &NodeSet, + ) -> Result { + // Get the list of non-empty nodes from the nodeset given the nodes configuration + let effective_nodeset = self.nodeset.to_effective(nodes_config); + let mut writeable_nodes: Vec<_> = effective_nodeset + .into_iter() + .filter(|node_id| !exclude_nodes.contains(node_id)) + .filter(|node_id| { + nodes_config + .get_log_server_storage_state(node_id) + .can_write_to() + }) + .collect(); + if writeable_nodes.len() < self.replication_property.num_copies().into() { + return Err(SpreadSelectorError::InsufficientWriteableNodes); + } + + let selected: Spread = match &self.strategy { + SelectorStrategy::Flood => { + writeable_nodes.shuffle(rng); + Spread::from(writeable_nodes) + } + #[cfg(any(test, feature = "test-util"))] + SelectorStrategy::Fixed(selector) => selector.select()?, + }; + + // validate that we can have write quorum with this spread + let mut checker = + NodeSetChecker::new(&self.nodeset, nodes_config, &self.replication_property); + checker.set_attribute_on_each(&selected, || true); + if !checker.check_write_quorum(|attr| *attr) { + return Err(SpreadSelectorError::InsufficientWriteableNodes); + } + + Ok(selected) + } +} + +static_assertions::assert_impl_all!(SpreadSelector: Send, Sync); + +#[cfg(any(test, feature = "test-util"))] +#[derive(Debug, Clone)] +pub struct FixedSpreadSelector { + pub result: Arc>>, +} + +#[cfg(any(test, feature = "test-util"))] +impl FixedSpreadSelector { + pub fn select(&self) -> Result { + let guard = self.result.lock(); + (*guard).clone() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use googletest::prelude::*; + + use restate_types::nodes_config::StorageState; + use restate_types::PlainNodeId; + + use crate::providers::replicated_loglet::test_util::generate_logserver_nodes_config; + + #[test] + fn test_with_fixed_spread_selector() -> Result<()> { + let nodes_config = generate_logserver_nodes_config(10, StorageState::ReadWrite); + let replication = ReplicationProperty::new(3.try_into().unwrap()); + let nodeset: NodeSet = (1..=5).collect(); + + // smoke test + let strategy = FixedSpreadSelector { + result: Arc::new(Mutex::new(Ok(Spread::from([1, 2, 3])))), + }; + let selector = SpreadSelector::new( + nodeset, + SelectorStrategy::Fixed(strategy.clone()), + replication, + ); + let mut rng = rand::thread_rng(); + let spread = selector.select(&mut rng, &nodes_config, &NodeSet::empty())?; + assert_that!(spread, eq(Spread::from([1, 2, 3]))); + + // Fixed selector ignores exclude nodes as long as sufficient nodes are passed down + let spread = selector.select(&mut rng, &nodes_config, &NodeSet::from([1]))?; + assert_that!(spread, eq(Spread::from([1, 2, 3]))); + + // No sufficient nodes to select from if nodes config is too small or sufficient nodes are + // excluded to make the effective nodeset too small + // + // only 2 nodes left in the nodeset + let spread = selector.select(&mut rng, &nodes_config, &NodeSet::from([1, 2, 3])); + assert_that!( + spread, + err(pat!(SpreadSelectorError::InsufficientWriteableNodes)) + ); + + let nodes_config = generate_logserver_nodes_config(2, StorageState::ReadWrite); + let replication = ReplicationProperty::new(3.try_into().unwrap()); + let nodeset: NodeSet = (1..=3).collect(); + let selector = SpreadSelector::new(nodeset, SelectorStrategy::Fixed(strategy), replication); + + let spread = selector.select(&mut rng, &nodes_config, &NodeSet::empty()); + assert_that!( + spread, + err(pat!(SpreadSelectorError::InsufficientWriteableNodes)) + ); + + Ok(()) + } + + #[test] + fn test_flood_spread_selector() -> Result<()> { + let nodes_config = generate_logserver_nodes_config(10, StorageState::ReadWrite); + let replication = ReplicationProperty::new(3.try_into().unwrap()); + let nodeset: NodeSet = (1..=5).collect(); + + let selector = SpreadSelector::new(nodeset, SelectorStrategy::Flood, replication); + let mut rng = rand::thread_rng(); + let spread = selector.select(&mut rng, &nodes_config, &NodeSet::empty())?; + let spread = spread.to_vec(); + + assert_that!( + spread, + unordered_elements_are![ + eq(PlainNodeId::new(1)), + eq(PlainNodeId::new(2)), + eq(PlainNodeId::new(3)), + eq(PlainNodeId::new(4)), + eq(PlainNodeId::new(5)) + ] + ); + + let spread = selector.select(&mut rng, &nodes_config, &NodeSet::from([1, 4]))?; + let spread = spread.to_vec(); + + assert_that!( + spread, + unordered_elements_are![ + eq(PlainNodeId::new(2)), + eq(PlainNodeId::new(3)), + eq(PlainNodeId::new(5)) + ] + ); + + let spread = selector.select(&mut rng, &nodes_config, &NodeSet::from([1, 4, 2])); + assert_that!( + spread, + err(pat!(SpreadSelectorError::InsufficientWriteableNodes)) + ); + + Ok(()) + } +} diff --git a/crates/types/src/replicated_loglet/mod.rs b/crates/types/src/replicated_loglet/mod.rs index e5f89496c..189293dfb 100644 --- a/crates/types/src/replicated_loglet/mod.rs +++ b/crates/types/src/replicated_loglet/mod.rs @@ -10,6 +10,8 @@ mod params; mod replication_property; +mod spread; pub use params::*; pub use replication_property::*; +pub use spread::*; diff --git a/crates/types/src/replicated_loglet/spread.rs b/crates/types/src/replicated_loglet/spread.rs new file mode 100644 index 000000000..67629ed3f --- /dev/null +++ b/crates/types/src/replicated_loglet/spread.rs @@ -0,0 +1,50 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::PlainNodeId; + +#[derive( + Debug, + Clone, + PartialEq, + Eq, + Hash, + derive_more::Deref, + derive_more::DerefMut, + derive_more::IntoIterator, + derive_more::From, + derive_more::Index, + derive_more::IndexMut, +)] +pub struct Spread(Box<[PlainNodeId]>); + +impl From> for Spread { + fn from(v: Vec) -> Self { + Self(v.into_boxed_slice()) + } +} + +impl From> for Spread { + fn from(v: Vec) -> Self { + Self(v.into_iter().map(PlainNodeId::from).collect()) + } +} + +impl From<[PlainNodeId; N]> for Spread { + fn from(value: [PlainNodeId; N]) -> Self { + Self(From::from(value)) + } +} + +impl From<[u32; N]> for Spread { + fn from(value: [u32; N]) -> Self { + Self(value.into_iter().map(PlainNodeId::from).collect()) + } +}