Skip to content

Commit

Permalink
Sweeping networking updates
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed Sep 23, 2024
1 parent d37408a commit f93d387
Show file tree
Hide file tree
Showing 53 changed files with 2,095 additions and 1,459 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ flexbuffers = { version = "2.0.0" }
futures = "0.3.25"
futures-sink = "0.3.25"
futures-util = "0.3.25"
googletest = "0.10"
googletest = { version = "0.10", features = ["anyhow"] }
hostname = { version = "0.4.0" }
http = "1.1.0"
http-body = "1.0.1"
Expand Down
1 change: 1 addition & 0 deletions crates/admin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ restate-types = { workspace = true, features = ["test-util"] }
googletest = { workspace = true }
tempfile = { workspace = true }
test-log = { workspace = true }
tokio-stream = { workspace = true, features = ["net"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }

29 changes: 16 additions & 13 deletions crates/admin/src/cluster_controller/cluster_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,32 @@ use std::time::Instant;
use tokio::sync::watch;

use restate_core::network::rpc_router::RpcRouter;
use restate_core::network::{MessageRouterBuilder, NetworkSender, Outgoing};
use restate_core::network::{MessageRouterBuilder, Networking, TransportConnect};
use restate_types::net::partition_processor_manager::GetProcessorsState;
use restate_types::nodes_config::Role;
use restate_types::time::MillisSinceEpoch;

use restate_core::{Metadata, ShutdownError, TaskCenter, TaskHandle};
use restate_types::Version;

pub struct ClusterStateRefresher<N> {
pub struct ClusterStateRefresher<T> {
task_center: TaskCenter,
metadata: Metadata,
get_state_router: RpcRouter<GetProcessorsState, N>,
network_sender: Networking<T>,
get_state_router: RpcRouter<GetProcessorsState>,
in_flight_refresh: Option<TaskHandle<anyhow::Result<()>>>,
cluster_state_update_rx: watch::Receiver<Arc<ClusterState>>,
cluster_state_update_tx: Arc<watch::Sender<Arc<ClusterState>>>,
}

impl<N> ClusterStateRefresher<N>
where
N: NetworkSender + 'static,
{
impl<T: TransportConnect> ClusterStateRefresher<T> {
pub fn new(
task_center: TaskCenter,
metadata: Metadata,
networking: N,
network_sender: Networking<T>,
router_builder: &mut MessageRouterBuilder,
) -> Self {
let get_state_router = RpcRouter::new(networking.clone(), router_builder);
let get_state_router = RpcRouter::new(router_builder);

let initial_state = ClusterState {
last_refreshed: None,
Expand All @@ -58,6 +56,7 @@ where
Self {
task_center,
metadata,
network_sender,
get_state_router,
in_flight_refresh: None,
cluster_state_update_rx,
Expand Down Expand Up @@ -97,6 +96,7 @@ where
self.in_flight_refresh = Self::start_refresh_task(
self.task_center.clone(),
self.get_state_router.clone(),
self.network_sender.clone(),
Arc::clone(&self.cluster_state_update_tx),
self.metadata.clone(),
)?;
Expand All @@ -106,7 +106,8 @@ where

fn start_refresh_task(
tc: TaskCenter,
get_state_router: RpcRouter<GetProcessorsState, N>,
get_state_router: RpcRouter<GetProcessorsState>,
network_sender: Networking<T>,
cluster_state_tx: Arc<watch::Sender<Arc<ClusterState>>>,
metadata: Metadata,
) -> Result<Option<TaskHandle<anyhow::Result<()>>>, ShutdownError> {
Expand Down Expand Up @@ -138,6 +139,7 @@ where

let rpc_router = get_state_router.clone();
let tc = tc.clone();
let network_sender = network_sender.clone();
join_set
.build_task()
.name("get-processors-state")
Expand All @@ -148,10 +150,11 @@ where
tokio::time::timeout(
// todo: make configurable
std::time::Duration::from_secs(1),
rpc_router.call(Outgoing::new(
rpc_router.call(
&network_sender,
node_id,
GetProcessorsState::default(),
)),
),
)
.await,
)
Expand Down Expand Up @@ -191,7 +194,7 @@ where
node_id,
NodeState::Alive(AliveNode {
last_heartbeat_at: MillisSinceEpoch::now(),
generational_node_id: from,
generational_node_id: *from.peer(),
partitions: msg.state,
}),
);
Expand Down
104 changes: 68 additions & 36 deletions crates/admin/src/cluster_controller/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use rand::seq::IteratorRandom;
use tracing::{debug, trace};

use restate_core::metadata_store::{MetadataStoreClient, Precondition, ReadError, WriteError};
use restate_core::network::{NetworkSender, Outgoing};
use restate_core::network::{NetworkSender, Networking, Outgoing, TransportConnect};
use restate_core::{ShutdownError, SyncError, TaskCenter, TaskKind};
use restate_types::cluster::cluster_state::{ClusterState, NodeState, RunMode};
use restate_types::cluster_controller::{
Expand Down Expand Up @@ -45,27 +45,24 @@ pub enum Error {
Shutdown(#[from] ShutdownError),
}

pub struct Scheduler<N> {
pub struct Scheduler<T> {
scheduling_plan: SchedulingPlan,
observed_cluster_state: ObservedClusterState,

task_center: TaskCenter,
metadata_store_client: MetadataStoreClient,
networking: N,
networking: Networking<T>,
}

/// The scheduler is responsible for assigning partition processors to nodes and to electing
/// leaders. It achieves it by deciding on a scheduling plan which is persisted to the metadata
/// store and then driving the observed cluster state to the target state (represented by the
/// scheduling plan).
impl<N> Scheduler<N>
where
N: NetworkSender + 'static,
{
impl<T: TransportConnect> Scheduler<T> {
pub async fn init(
task_center: TaskCenter,
metadata_store_client: MetadataStoreClient,
networking: N,
networking: Networking<T>,
) -> Result<Self, BuildError> {
let scheduling_plan = metadata_store_client
.get(SCHEDULING_PLAN_KEY.clone())
Expand All @@ -83,7 +80,7 @@ where

pub async fn on_attach_node(
&mut self,
node: GenerationalNodeId,
node: &GenerationalNodeId,
) -> Result<Vec<Action>, ShutdownError> {
trace!(node = %node, "Node is attaching to cluster");
// the convergence loop will make sure that the node receives its instructions
Expand Down Expand Up @@ -418,34 +415,41 @@ impl ObservedPartitionState {

#[cfg(test)]
mod tests {
use crate::cluster_controller::scheduler::{
ObservedClusterState, ObservedPartitionState, Scheduler,
};
use std::collections::{BTreeMap, BTreeSet};
use std::num::NonZero;
use std::sync::Arc;
use std::time::Duration;

use futures::StreamExt;
use googletest::matcher::{Matcher, MatcherResult};
use googletest::matchers::{empty, eq};
use googletest::{assert_that, elements_are, unordered_elements_are};
use http::Uri;
use rand::prelude::ThreadRng;
use rand::Rng;
use restate_core::TestCoreEnvBuilder;
use test_log::test;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;

use restate_core::network::{ForwardingHandler, Incoming, MessageCollectorMockConnector};
use restate_core::{TaskCenterBuilder, TestCoreEnvBuilder};
use restate_types::cluster::cluster_state::{
AliveNode, ClusterState, DeadNode, NodeState, PartitionProcessorStatus, RunMode,
};
use restate_types::cluster_controller::{ReplicationStrategy, SchedulingPlan};
use restate_types::identifiers::PartitionId;
use restate_types::metadata_store::keys::SCHEDULING_PLAN_KEY;
use restate_types::net::codec::WireDecode;
use restate_types::net::partition_processor_manager::{ControlProcessors, ProcessorCommand};
use restate_types::net::AdvertisedAddress;
use restate_types::net::{AdvertisedAddress, TargetName};
use restate_types::nodes_config::{LogServerConfig, NodeConfig, NodesConfiguration, Role};
use restate_types::partition_table::PartitionTable;
use restate_types::time::MillisSinceEpoch;
use restate_types::{GenerationalNodeId, PlainNodeId, Version};
use std::collections::{BTreeMap, BTreeSet};
use std::num::NonZero;
use std::sync::Arc;
use std::time::Duration;
use test_log::test;

use crate::cluster_controller::scheduler::{
ObservedClusterState, ObservedPartitionState, Scheduler,
};

impl ObservedClusterState {
fn remove_node_from_partition(
Expand Down Expand Up @@ -725,29 +729,57 @@ mod tests {
nodes_config.upsert_node(node_config);
}

let mut builder = TestCoreEnvBuilder::new_with_mock_network();
let mut control_processors = builder
.router_builder
.subscribe_to_stream::<ControlProcessors>(32);
let tc = TaskCenterBuilder::default_for_tests()
.build()
.expect("task_center builds");

// network messages going to other nodes are written to `tx`
let (tx, control_recv) = mpsc::channel(100);
let connector = MessageCollectorMockConnector::new(tc.clone(), 10, tx.clone());

let mut builder = TestCoreEnvBuilder::with_transport_connector(tc, connector);
builder.router_builder.add_raw_handler(
TargetName::ControlProcessors,
// network messages going to my node is also written to `tx`
Box::new(ForwardingHandler::new(GenerationalNodeId::new(1, 1), tx)),
);

let mut control_recv = ReceiverStream::new(control_recv)
.filter_map(|(node_id, message)| async move {
if message.body().target() == TargetName::ControlProcessors {
let message = message
.try_map(|mut m| {
ControlProcessors::decode(
&mut m.payload,
restate_types::net::CURRENT_PROTOCOL_VERSION,
)
})
.unwrap();
Some((node_id, message))
} else {
None
}
})
.boxed();

let partition_table =
PartitionTable::with_equally_sized_partitions(Version::MIN, num_partitions);
let initial_scheduling_plan = SchedulingPlan::from(&partition_table, replication_strategy);
let metadata_store_client = builder.metadata_store_client.clone();

let network_sender = builder.network_sender.clone();
let networking = builder.networking.clone();

let env = builder
.with_nodes_config(nodes_config)
.with_partition_table(partition_table.clone())
.with_scheduling_plan(initial_scheduling_plan)
.set_nodes_config(nodes_config.clone())
.set_partition_table(partition_table.clone())
.set_scheduling_plan(initial_scheduling_plan)
.build()
.await;
let tc = env.tc.clone();
env.tc
.run_in_scope("test", None, async move {
let mut scheduler =
Scheduler::init(tc, metadata_store_client.clone(), network_sender).await?;
Scheduler::init(tc, metadata_store_client.clone(), networking).await?;

for _ in 0..num_scheduling_rounds {
let cluster_state = random_cluster_state(&node_ids, num_partitions);
Expand All @@ -757,10 +789,9 @@ mod tests {
.on_cluster_state_update(Arc::clone(&cluster_state))
.await?;
// collect all control messages from the network to build up the effective scheduling plan
let control_messages = control_processors
let control_messages = control_recv
.as_mut()
.take_until(tokio::time::sleep(Duration::from_secs(10)))
.map(|message| message.split())
.collect::<Vec<_>>()
.await;

Expand Down Expand Up @@ -865,32 +896,33 @@ mod tests {

fn derive_observed_cluster_state(
cluster_state: &ClusterState,
control_messages: Vec<(GenerationalNodeId, ControlProcessors)>,
control_messages: Vec<(GenerationalNodeId, Incoming<ControlProcessors>)>,
) -> ObservedClusterState {
let mut observed_cluster_state = ObservedClusterState::default();
observed_cluster_state.update(cluster_state);

// apply commands
for (node_id, control_processors) in control_messages {
for control_processor in control_processors.commands {
for (target_node, control_processors) in control_messages {
let plain_node_id = target_node.as_plain();
for control_processor in control_processors.into_body().commands {
match control_processor.command {
ProcessorCommand::Stop => {
observed_cluster_state.remove_node_from_partition(
&control_processor.partition_id,
&node_id.as_plain(),
&plain_node_id,
);
}
ProcessorCommand::Follower => {
observed_cluster_state.add_node_to_partition(
control_processor.partition_id,
node_id.as_plain(),
plain_node_id,
RunMode::Follower,
);
}
ProcessorCommand::Leader => {
observed_cluster_state.add_node_to_partition(
control_processor.partition_id,
node_id.as_plain(),
plain_node_id,
RunMode::Leader,
);
}
Expand Down
Loading

0 comments on commit f93d387

Please sign in to comment.