Skip to content

Commit

Permalink
scaffolding appender
Browse files Browse the repository at this point in the history
  • Loading branch information
muhamadazmy committed Sep 25, 2024
1 parent 2c3d08f commit 7da2942
Show file tree
Hide file tree
Showing 9 changed files with 549 additions and 4 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ tracing = { workspace = true }

[dev-dependencies]
restate-core = { workspace = true, features = ["test-util"] }
restate-log-server = { workspace = true }
restate-metadata-store = { workspace = true }
restate-test-util = { workspace = true }
restate-types = { workspace = true, features = ["test-util"] }
Expand Down
17 changes: 17 additions & 0 deletions crates/bifrost/src/loglet/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,23 @@ impl TailOffsetWatch {
Ok(())
}

pub async fn wait_for_offset_or_seal(
&self,
offset: LogletOffset,
) -> Result<TailState<LogletOffset>, ShutdownError> {
let mut receiver = self.sender.subscribe();
receiver.mark_changed();
receiver
.wait_for(|tail_state| match tail_state {
TailState::Sealed(_) => true,
TailState::Open(tail) if *tail >= offset => true,
_ => false,
})
.await
.map(|m| *m)
.map_err(|_| ShutdownError)
}

/// The first yielded value is the latest known tail
pub fn to_stream(&self) -> WatchStream<TailState<LogletOffset>> {
let mut receiver = self.sender.subscribe();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ impl SpreadSelector {
}
}

pub fn nodeset(&self) -> &NodeSet {
&self.nodeset
}

pub fn replication_property(&self) -> &ReplicationProperty {
&self.replication_property
}

/// Generates a spread or fails if it's not possible to generate a spread out of
/// the nodeset modulo the non-writeable nodes in the nodes configuration and after excluding
/// the set of nodes passed in `exclude_nodes`.
Expand Down
Loading

0 comments on commit 7da2942

Please sign in to comment.