Skip to content

Commit

Permalink
Re-enable replicated loglet test to run with replication factor of 2
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Dec 23, 2024
1 parent e8152ae commit 736900f
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 28 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/local-cluster-runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ publish = false
default = []

[dependencies]
restate-core = { workspace = true }
restate-metadata-store = { workspace = true }
# nb features here will also affect the compiled restate-server binary in integration tests
restate-types = { workspace = true, features = ["unsafe-mutable-config"] }

anyhow = { workspace = true }
arc-swap = { workspace = true }
clap = { workspace = true }
clap-verbosity-flag = { workspace = true }
Expand Down
6 changes: 0 additions & 6 deletions crates/local-cluster-runner/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,6 @@ impl Cluster {
.start_clustered(base_dir.as_path(), &cluster_name)
.await
.map_err(|err| ClusterStartError::NodeStartError(i, err))?;
if node.admin_address().is_some() {
// admin nodes are needed for later nodes to bootstrap. we should wait until they are serving
HealthCheck::Admin
.wait_healthy(&node, Duration::from_secs(30))
.await?;
}
started_nodes.push(node)
}

Expand Down
92 changes: 73 additions & 19 deletions crates/local-cluster-runner/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,31 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::random_socket_address;
use arc_swap::ArcSwapOption;
use enumset::{enum_set, EnumSet};
use futures::{stream, FutureExt, Stream, StreamExt, TryStreamExt};
use itertools::Itertools;
use regex::{Regex, RegexSet};
use restate_core::network::net_util::create_tonic_channel_from_advertised_address;
use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient;
use restate_core::protobuf::node_ctl_svc::{
ProvisionClusterRequest as ProtoProvisionClusterRequest, ProvisionClusterResponseKind,
};
use restate_types::logs::metadata::DefaultProvider;
use restate_types::partition_table::ReplicationStrategy;
use restate_types::retries::RetryPolicy;
use restate_types::{
config::{Configuration, MetadataStoreClient},
errors::GenericError,
metadata_store::keys::NODES_CONFIG_KEY,
net::{AdvertisedAddress, BindAddress},
nodes_config::{NodesConfiguration, Role},
PlainNodeId,
};
use rev_lines::RevLines;
use serde::{Deserialize, Serialize};
use std::num::NonZeroU16;
use std::{
ffi::OsString,
fmt::Display,
Expand All @@ -22,14 +47,6 @@ use std::{
task::{Context, Poll},
time::Duration,
};

use arc_swap::ArcSwapOption;
use enumset::{enum_set, EnumSet};
use futures::{stream, FutureExt, Stream, StreamExt, TryStreamExt};
use itertools::Itertools;
use regex::{Regex, RegexSet};
use rev_lines::RevLines;
use serde::{Deserialize, Serialize};
use tokio::{
fs::File,
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
Expand All @@ -40,17 +57,6 @@ use tokio::{process::Command, sync::mpsc::Sender};
use tracing::{error, info, warn};
use typed_builder::TypedBuilder;

use restate_types::{
config::{Configuration, MetadataStoreClient},
errors::GenericError,
metadata_store::keys::NODES_CONFIG_KEY,
net::{AdvertisedAddress, BindAddress},
nodes_config::{NodesConfiguration, Role},
PlainNodeId,
};

use crate::random_socket_address;

#[derive(Debug, Clone, Serialize, Deserialize, TypedBuilder)]
pub struct Node {
#[builder(mutators(
Expand Down Expand Up @@ -747,6 +753,54 @@ impl StartedNode {

!nodes_config.get_log_server_storage_state(&node_id).empty()
}

/// Provisions the cluster on this node with the given configuration. Returns true if the
/// cluster was newly provisioned.
pub async fn provision_cluster(
&self,
num_partitions: Option<NonZeroU16>,
placement_strategy: Option<ReplicationStrategy>,
log_provider: Option<DefaultProvider>,
) -> anyhow::Result<bool> {
let channel = create_tonic_channel_from_advertised_address(
self.node_address().clone(),
&Configuration::default().networking,
);

let request = ProtoProvisionClusterRequest {
dry_run: false,
num_partitions: num_partitions.map(|num| u32::from(num.get())),
placement_strategy: placement_strategy
.map(|replication_strategy| replication_strategy.into()),
log_provider: log_provider.map(|log_provider| log_provider.into()),
};

let retry_policy = RetryPolicy::exponential(
Duration::from_millis(10),
2.0,
Some(10),
Some(Duration::from_secs(1)),
);
let client = NodeCtlSvcClient::new(channel);

let response = retry_policy
.retry(|| {
let mut client = client.clone();
let request = request.clone();
async move { client.provision_cluster(request).await }
})
.await?
.into_inner();

Ok(match response.kind() {
ProvisionClusterResponseKind::ProvisionClusterResponseTypeUnknown => {
panic!("unknown cluster response type")
}
ProvisionClusterResponseKind::DryRun => unreachable!("request non dry run"),
ProvisionClusterResponseKind::NewlyProvisioned => true,
ProvisionClusterResponseKind::AlreadyProvisioned => false,
})
}
}

#[derive(Debug, Clone, Copy)]
Expand Down
25 changes: 22 additions & 3 deletions server/tests/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,22 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::num::NonZeroU16;
use std::num::{NonZeroU16, NonZeroU8};
use std::time::Duration;

use enumset::enum_set;
use futures_util::StreamExt;
use googletest::IntoTestResult;
use regex::Regex;
use restate_local_cluster_runner::{
cluster::Cluster,
node::{BinarySource, Node},
};
use restate_types::config::MetadataStoreClient;
use restate_types::logs::metadata::ProviderKind;
use restate_types::logs::metadata::{
DefaultProvider, NodeSetSelectionStrategy, ReplicatedLogletConfig,
};
use restate_types::replicated_loglet::ReplicationProperty;
use restate_types::{config::Configuration, nodes_config::Role, PlainNodeId};
use test_log::test;

Expand Down Expand Up @@ -126,7 +130,8 @@ async fn cluster_name_mismatch() -> googletest::Result<()> {
#[test(restate_core::test)]
async fn replicated_loglet() -> googletest::Result<()> {
let mut base_config = Configuration::default();
base_config.bifrost.default_provider = ProviderKind::Replicated;
// require an explicit provision step to configure the replication property to 2
base_config.common.allow_bootstrap = false;
base_config.common.bootstrap_num_partitions = NonZeroU16::new(1).expect("1 to be non-zero");

let nodes = Node::new_test_nodes_with_metadata(
Expand All @@ -148,6 +153,20 @@ async fn replicated_loglet() -> googletest::Result<()> {
.start()
.await?;

let replicated_loglet_config = ReplicatedLogletConfig {
replication_property: ReplicationProperty::new(NonZeroU8::new(2).expect("to be non-zero")),
nodeset_selection_strategy: NodeSetSelectionStrategy::default(),
};

cluster.nodes[0]
.provision_cluster(
None,
None,
Some(DefaultProvider::Replicated(replicated_loglet_config)),
)
.await
.into_test_result()?;

cluster.wait_healthy(Duration::from_secs(30)).await?;

for partition_processor in &mut partition_processors_starting_up {
Expand Down

0 comments on commit 736900f

Please sign in to comment.