Skip to content

Commit

Permalink
scaffolding node appender worker
Browse files Browse the repository at this point in the history
  • Loading branch information
muhamadazmy committed Sep 18, 2024
1 parent 3743ffc commit 3acfbe2
Show file tree
Hide file tree
Showing 6 changed files with 404 additions and 1 deletion.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ futures = { workspace = true }
metrics = { workspace = true }
parking_lot = { workspace = true }
pin-project = { workspace = true }
rand = { workspace = true }
rocksdb = { workspace = true }
serde = { workspace = true }
smallvec = { workspace = true }
Expand All @@ -40,6 +41,7 @@ tracing = { workspace = true }

[dev-dependencies]
restate-core = { workspace = true, features = ["test-util"] }
restate-log-server = { workspace = true }
restate-metadata-store = { workspace = true }
restate-test-util = { workspace = true }
restate-types = { workspace = true, features = ["test-util"] }
Expand Down
2 changes: 2 additions & 0 deletions crates/bifrost/src/providers/replicated_loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,7 @@

pub(crate) mod metric_definitions;
mod provider;
#[allow(dead_code)]
mod sequencer;

pub use provider::Factory;
136 changes: 136 additions & 0 deletions crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
use std::{
collections::{BTreeMap, VecDeque},
sync::{
atomic::{self, Ordering},
Arc,
},
time::Duration,
};

use tokio::sync::{mpsc, oneshot};
use worker::{Payload, WorkerEvent};

use restate_core::{
cancellation_token, network::NetworkError, ShutdownError, TaskCenter, TaskKind,
};

use restate_types::{
logs::{LogletOffset, Lsn, Record, SequenceNumber, TailState},
net::log_server::{LogletInfo, Status, Store, Stored},
replicated_loglet::ReplicatedLogletId,
time::MillisSinceEpoch,
GenerationalNodeId, PlainNodeId,
};

use crate::loglet::{LogletCommit, Resolver};

mod worker;

#[derive(Debug, Default)]
pub struct NodeStatus {
// todo: this should be monotonic
last_response_time: atomic::AtomicU64,
}

impl NodeStatus {
pub(crate) fn touch(&self) {
// update value with latest timestamp
self.last_response_time
.store(MillisSinceEpoch::now().into(), Ordering::Relaxed);
}

pub fn last_response_time(&self) -> MillisSinceEpoch {
self.last_response_time.load(Ordering::Relaxed).into()
}

pub fn duration_since_last_response(&self) -> Duration {
// last_response_time should be monotonic
self.last_response_time().elapsed()
}
}

/// LogletHandler trait abstracts the log-server loglet interface. One of possible implementations
/// is a grpc client to running log server
#[async_trait::async_trait]
pub trait NodeClient {
async fn enqueue_store(&self, msg: Store) -> Result<(), NetworkError>;
async fn enqueue_get_loglet_info(&self) -> Result<(), NetworkError>;
}

struct NodeInner<C> {
client: C,
state: NodeStatus,
}

pub struct Node<C> {
inner: Arc<NodeInner<C>>,
}

impl<C> Clone for Node<C> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}

impl<C> Node<C> {
fn new(client: C) -> Self {
Self {
inner: Arc::new(NodeInner {
client,
state: NodeStatus::default(),
}),
}
}
pub fn client(&self) -> &C {
&self.inner.client
}

pub fn status(&self) -> &NodeStatus {
&self.inner.state
}
}

/// part of state that is shared between multiple appenders
#[derive(Debug)]
pub(crate) struct SequencerGlobalState {
node_id: GenerationalNodeId,
loglet_id: ReplicatedLogletId,
global_committed_tail: atomic::AtomicU32,
}

impl SequencerGlobalState {
pub fn node_id(&self) -> &GenerationalNodeId {
&self.node_id
}

pub fn loglet_id(&self) -> &ReplicatedLogletId {
&self.loglet_id
}

pub fn committed_tail(&self) -> LogletOffset {
LogletOffset::new(self.global_committed_tail.load(Ordering::Acquire))
}

pub(crate) fn set_committed_tail(&self, tail: LogletOffset) {
self.global_committed_tail
.fetch_max(tail.into(), Ordering::Release);
}
}

//todo: improve error names and description
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("cannot satisfy spread")]
CannotSatisfySpread,
#[error("malformed batch")]
MalformedBatch,
#[error("invalid node set")]
InvalidNodeSet,
#[error("node {0} queue is full")]
TemporaryUnavailable(PlainNodeId),
#[error(transparent)]

Check warning on line 133 in crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-22.04)

Diff in /home/runner/work/restate/restate/crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs

Check warning on line 133 in crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-22.04)

Diff in /home/runner/work/restate/restate/crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs
Shutdown(#[from] ShutdownError),
}

Loading

0 comments on commit 3acfbe2

Please sign in to comment.