Skip to content

Commit

Permalink
scaffolding appender
Browse files Browse the repository at this point in the history
  • Loading branch information
muhamadazmy committed Sep 20, 2024
1 parent 6af16c2 commit 4835885
Show file tree
Hide file tree
Showing 5 changed files with 361 additions and 1 deletion.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ tracing = { workspace = true }

[dev-dependencies]
restate-core = { workspace = true, features = ["test-util"] }
restate-log-server = { workspace = true }
restate-metadata-store = { workspace = true }
restate-test-util = { workspace = true }
restate-types = { workspace = true, features = ["test-util"] }
Expand Down
321 changes: 321 additions & 0 deletions crates/bifrost/src/providers/replicated_loglet/sequencer/append.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,321 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::sync::Arc;

use futures::StreamExt;
use tokio::task::JoinSet;

use restate_core::{
network::{rpc_router::ResponseTracker, Incoming, NetworkError, Outgoing},
Metadata,
};
use restate_types::{
logs::{LogletOffset, Record, SequenceNumber, TailState},
net::log_server::{Status, Store, StoreFlags, Stored},
replicated_loglet::NodeSet,
};

use super::{
node::{RemoteLogServer, RemoteLogServerManager},
SequencerGlobalState,
};
use crate::providers::replicated_loglet::replication::spread_selector::SpreadSelector;

enum AppenderState {
Wave {
replicated: NodeSet,
gray_list: NodeSet,
},
Done,
}

/// Appender makes sure a batch of records will run to completion
pub(crate) struct Appender {
global: Arc<SequencerGlobalState>,
log_server_manager: RemoteLogServerManager,
response_tracker: ResponseTracker<Stored>,
selector: SpreadSelector,
metadata: Metadata,
first_offset: LogletOffset,
records: Arc<[Record]>,
}

impl Appender {
pub fn new(
global: Arc<SequencerGlobalState>,
log_server_manager: RemoteLogServerManager,
response_tracker: ResponseTracker<Stored>,
selector: SpreadSelector,
metadata: Metadata,
first_offset: LogletOffset,
records: Arc<[Record]>,
) -> Self {
Self {
global,
log_server_manager,
response_tracker,
selector,
metadata,
first_offset,
records,
}
}

pub async fn run(mut self) {
let mut state = AppenderState::Wave {
replicated: NodeSet::empty(),
gray_list: NodeSet::empty(),
};

// note: this should be verified by the sequencer, so this should be safe!
let tail = self
.first_offset
.checked_add(self.records.len() as u32)
.expect("possible tail");

loop {
state = match state {
AppenderState::Wave {
replicated,
gray_list,
} => self.wave(replicated, gray_list).await,
AppenderState::Done => {
self.global
.committed_tail()
.notify_offset_update(LogletOffset::from(tail));
break;
}
}
}
}

async fn wave(&mut self, replicated: NodeSet, mut gray_list: NodeSet) -> AppenderState {
let spread = match self.selector.select(
&mut rand::thread_rng(),
&self.metadata.nodes_config_ref(),
&gray_list,
) {
Ok(spread) => spread,
Err(_) => {
//todo(azmy): retry without a gray-list !
return AppenderState::Wave {
replicated,
gray_list: NodeSet::empty(),
};
}
};

let mut gray = false;
let mut servers = Vec::with_capacity(spread.len());
for id in spread {
// at this stage, if we fail to get connection to this server it must be
// a first time use. We can safely assume this has to be gray listed
let server = match self.log_server_manager.get(id).await {
Ok(server) => server,
Err(err) => {
tracing::error!("failed to connect to {}: {}", id, err);
gray = true;
gray_list.insert(id);
continue;
}
};

servers.push(server);
}

if gray {
// Some nodes has been gray listed (wasn't in the original gray list)
// todo(azmy): is it possible that we still can get a write quorum on the remaining
// set of nodes. The selector probably should provide an interface to validate that!

// we basically try again with a new set of gray_list
return AppenderState::Wave {
replicated,
gray_list,
};
}

// otherwise, we try to send the wave.
return self.send_wave(servers).await;
}

async fn send_wave(&mut self, spread: Vec<RemoteLogServer>) -> AppenderState {
let mut waiters = JoinSet::new();

let replication_factor = spread.len();
let mut replicated = NodeSet::empty();

for server in spread {
// it is possible that we have visited this server
// in a previous wave. So we can short circuit here
// and just skip
if server.tail().latest_offset() > self.first_offset {
replicated.insert(*server.node());
continue;
}

let sender = Sender {
global: Arc::clone(&self.global),
server_manager: self.log_server_manager.clone(),
server,
first_offset: self.first_offset,
records: Arc::clone(&self.records),
tracker: self.response_tracker.clone(),
};

waiters.spawn(sender.send());
}

let mut gray_list = NodeSet::empty();

// todo(azmy): join_next should timeout if nodes are taking too long to respond!
while let Some(Ok((server, result))) = waiters.join_next().await {
let status = match result {
Ok(status) => status,
Err(_err) => {
// todo(azmy): handle errors differently
gray_list.insert(*server.node());
continue;
}
};

// we had a response from this node and there is still a lot we can do
match status {
Status::Ok => {
replicated.insert(*server.node());
}
Status::Sealed | Status::Sealing => {
server.tail().notify_seal();
}
_ => {
//todo(azmy): handle other status
gray_list.insert(*server.node());
}
}
}

if gray_list.is_empty() && replicated.len() == replication_factor {
AppenderState::Done
} else {
AppenderState::Wave {
replicated,
gray_list,
}
}
}
}

struct Sender {
global: Arc<SequencerGlobalState>,
server_manager: RemoteLogServerManager,
server: RemoteLogServer,
first_offset: LogletOffset,
records: Arc<[Record]>,
tracker: ResponseTracker<Stored>,
}

impl Sender {
async fn send(mut self) -> (RemoteLogServer, Result<Status, NetworkError>) {
let mut global_tail = self.global.committed_tail().to_stream();
let mut local_tail = self.server.tail().to_stream();

loop {
let tail = tokio::select! {
Some(tail) = global_tail.next() => {
tail
},
Some(tail) = local_tail.next() => {
tail
}
};

match tail {
TailState::Sealed(_) => {
//either local or global tail is commited,
return (self.server, Ok(Status::Sealed));
}
TailState::Open(offset) => {
if offset > self.first_offset {
// somehow the global (of local) offset have moved
// behind our first offset!
// for now we assume we don't need to make the
// write!
return (self.server, Ok(Status::Ok));
} else if offset == self.first_offset {
break;
}
}
}
}

let incoming = match self.try_send().await {
Ok(incoming) => incoming,
Err(err) => {
return (self.server, Err(err));
}
};

// quick actions
match incoming.status {
Status::Ok => {
self.server.tail().notify_offset_update(incoming.local_tail);
}
_ => {}
}

(self.server, Ok(incoming.status))
}

async fn try_send(&mut self) -> Result<Incoming<Stored>, NetworkError> {
// if we are here so either we at global committed tail or node tail
// is at first_offset. In either cases, we can try to send our Store message!
let store = Store {
first_offset: self.first_offset,
flags: StoreFlags::empty(),
known_archived: LogletOffset::INVALID,
known_global_tail: self.global.committed_tail.latest_offset(),
loglet_id: self.server.loglet_id(),
payloads: Vec::from_iter(self.records.iter().cloned()),
sequencer: self.global.node_id,
timeout_at: None,
};

let mut msg = Outgoing::new(*self.server.node(), store);
let token = self
.tracker
.new_token(msg.msg_id())
.expect("unique message id");

loop {
match self.server.sender().send(msg).await {
Ok(_) => break,
Err(send) => {
msg = send.message;

match send.source {
NetworkError::ConnectionClosed
| NetworkError::ConnectError(_)
| NetworkError::Timeout(_) => {
self.server_manager.renew(&mut self.server).await?
}
_ => return Err(send.source.into()),
}
}
}
}

// message has been sent! there is noway
// we are sure it has been received by the other peer
// so we wait for response. indefinitely
// it's up to the appender time outs to try again
token.recv().await.map_err(NetworkError::from)
}
}
27 changes: 27 additions & 0 deletions crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,31 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

mod append;
mod node;

use restate_types::{replicated_loglet::ReplicatedLogletId, GenerationalNodeId};

use crate::loglet::util::TailOffsetWatch;

/// A sharable part of the sequencer state. This is shared with node workers
#[derive(Debug)]
pub(crate) struct SequencerGlobalState {
node_id: GenerationalNodeId,
loglet_id: ReplicatedLogletId,
committed_tail: TailOffsetWatch,
}

impl SequencerGlobalState {
pub fn node_id(&self) -> &GenerationalNodeId {
&self.node_id
}

pub fn loglet_id(&self) -> &ReplicatedLogletId {
&self.loglet_id
}

pub fn committed_tail(&self) -> &TailOffsetWatch {
&self.committed_tail
}
}
12 changes: 11 additions & 1 deletion crates/types/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,17 @@ use std::time::{Duration, SystemTime};

/// Milliseconds since the unix epoch
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
Debug,
Clone,
Copy,
PartialEq,
Eq,
PartialOrd,
Ord,
Hash,
serde::Serialize,
serde::Deserialize,
derive_more::Into,
)]
#[serde(transparent)]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
Expand Down

0 comments on commit 4835885

Please sign in to comment.