Skip to content

Commit

Permalink
scaffolding leader Sequencer (#1982)
Browse files Browse the repository at this point in the history
* scaffolding appender

* scaffolding leader Sequencer
  • Loading branch information
muhamadazmy authored Sep 25, 2024
1 parent cc40ed9 commit 750cc50
Show file tree
Hide file tree
Showing 9 changed files with 830 additions and 5 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ tracing = { workspace = true }

[dev-dependencies]
restate-core = { workspace = true, features = ["test-util"] }
restate-log-server = { workspace = true }
restate-metadata-store = { workspace = true }
restate-test-util = { workspace = true }
restate-types = { workspace = true, features = ["test-util"] }
Expand Down
17 changes: 17 additions & 0 deletions crates/bifrost/src/loglet/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,23 @@ impl TailOffsetWatch {
Ok(())
}

pub async fn wait_for_offset_or_seal(
&self,
offset: LogletOffset,
) -> Result<TailState<LogletOffset>, ShutdownError> {
let mut receiver = self.sender.subscribe();
receiver.mark_changed();
receiver
.wait_for(|tail_state| match tail_state {
TailState::Sealed(_) => true,
TailState::Open(tail) if *tail >= offset => true,
_ => false,
})
.await
.map(|m| *m)
.map_err(|_| ShutdownError)
}

/// The first yielded value is the latest known tail
pub fn to_stream(&self) -> WatchStream<TailState<LogletOffset>> {
let mut receiver = self.sender.subscribe();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub enum SpreadSelectorError {
InsufficientWriteableNodes,
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum SelectorStrategy {
/// Selects all writeable nodes in the nodeset, this might lead to over-replication,
/// and it's up to the appender state machine to continue replicating beyond the
Expand All @@ -38,6 +38,7 @@ pub enum SelectorStrategy {
}

/// Spread selector is thread-safe and can be used concurrently.
#[derive(Clone)]
pub struct SpreadSelector {
nodeset: NodeSet,
strategy: SelectorStrategy,
Expand All @@ -57,6 +58,14 @@ impl SpreadSelector {
}
}

pub fn nodeset(&self) -> &NodeSet {
&self.nodeset
}

pub fn replication_property(&self) -> &ReplicationProperty {
&self.replication_property
}

/// Generates a spread or fails if it's not possible to generate a spread out of
/// the nodeset modulo the non-writeable nodes in the nodes configuration and after excluding
/// the set of nodes passed in `exclude_nodes`.
Expand Down
Loading

0 comments on commit 750cc50

Please sign in to comment.