Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds header to bifrost payloads #1520

Merged
merged 3 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 16 additions & 4 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@ use std::ops::Deref;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use bytes::BytesMut;
use enum_map::EnumMap;
use once_cell::sync::OnceCell;
use tracing::{error, instrument};

use restate_core::{metadata, Metadata, MetadataKind};
use restate_types::logs::metadata::ProviderKind;
use restate_types::logs::{LogId, Lsn, Payload, SequenceNumber};
use restate_types::storage::StorageCodec;
use restate_types::Version;
use tracing::{error, instrument};

use crate::loglet::{LogletBase, LogletProvider, LogletWrapper};
use crate::watchdog::{WatchdogCommand, WatchdogSender};
Expand Down Expand Up @@ -158,14 +160,20 @@ impl BifrostInner {
pub async fn append(&self, log_id: LogId, payload: Payload) -> Result<Lsn, Error> {
self.fail_if_shutting_down()?;
let loglet = self.writeable_loglet(log_id).await?;
loglet.append(payload).await
let mut buf = BytesMut::default();
StorageCodec::encode(payload, &mut buf).expect("serialization to bifrost is infallible");
loglet.append(buf.freeze()).await
}

pub async fn read_next_single(&self, log_id: LogId, after: Lsn) -> Result<LogRecord, Error> {
self.fail_if_shutting_down()?;

let loglet = self.find_loglet_for_lsn(log_id, after.next()).await?;
loglet.read_next_single(after).await
Ok(loglet
.read_next_single(after)
.await?
.decode()
.expect("decoding a bifrost envelope succeeds"))
}

pub async fn read_next_single_opt(
Expand All @@ -176,7 +184,11 @@ impl BifrostInner {
self.fail_if_shutting_down()?;

let loglet = self.find_loglet_for_lsn(log_id, after.next()).await?;
loglet.read_next_single_opt(after).await
Ok(loglet.read_next_single_opt(after).await?.map(|record| {
record
.decode()
.expect("decoding a bifrost envelope succeeds")
}))
}

pub async fn find_tail(
Expand Down
2 changes: 2 additions & 0 deletions crates/bifrost/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ mod error;
mod loglet;
mod loglets;
mod read_stream;
mod record;
mod service;
mod types;
mod watchdog;

pub use bifrost::Bifrost;
pub use error::{Error, ProviderError};
pub use read_stream::LogReadStream;
pub use record::*;
pub use service::BifrostService;
pub use types::*;
21 changes: 12 additions & 9 deletions crates/bifrost/src/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ use std::sync::Arc;

use async_trait::async_trait;

use bytes::Bytes;
use restate_types::config::Configuration;
use restate_types::logs::metadata::{LogletParams, ProviderKind};
use restate_types::logs::{Lsn, Payload, SequenceNumber};
use restate_types::logs::{Lsn, SequenceNumber};

use crate::{Error, LogRecord, LsnExt, ProviderError};

Expand Down Expand Up @@ -104,7 +105,7 @@ pub trait LogletBase: Send + Sync {
type Offset: SequenceNumber;

/// Append a record to the loglet.
async fn append(&self, payload: Payload) -> Result<Self::Offset, Error>;
async fn append(&self, data: Bytes) -> Result<Self::Offset, Error>;

/// Find the tail of the loglet. If the loglet is empty or have been trimmed, the loglet should
/// return `None`.
Expand All @@ -116,22 +117,24 @@ pub trait LogletBase: Send + Sync {

/// Read or wait for the record at `from` offset, or the next available record if `from` isn't
/// defined for the loglet.
async fn read_next_single(&self, after: Self::Offset)
-> Result<LogRecord<Self::Offset>, Error>;
async fn read_next_single(
&self,
after: Self::Offset,
) -> Result<LogRecord<Self::Offset, Bytes>, Error>;

/// Read the next record if it's been committed, otherwise, return None without waiting.
async fn read_next_single_opt(
&self,
after: Self::Offset,
) -> Result<Option<LogRecord<Self::Offset>>, Error>;
) -> Result<Option<LogRecord<Self::Offset, Bytes>>, Error>;
}

#[async_trait]
impl LogletBase for LogletWrapper {
type Offset = Lsn;

async fn append(&self, payload: Payload) -> Result<Lsn, Error> {
let offset = self.loglet.append(payload).await?;
async fn append(&self, data: Bytes) -> Result<Lsn, Error> {
let offset = self.loglet.append(data).await?;
// Return the LSN given the loglet offset.
Ok(self.base_lsn.offset_by(offset))
}
Expand All @@ -146,7 +149,7 @@ impl LogletBase for LogletWrapper {
Ok(self.base_lsn.offset_by(offset))
}

async fn read_next_single(&self, after: Lsn) -> Result<LogRecord<Lsn>, Error> {
async fn read_next_single(&self, after: Lsn) -> Result<LogRecord<Lsn, Bytes>, Error> {
// convert LSN to loglet offset
let offset = after.into_offset(self.base_lsn);
self.loglet
Expand All @@ -158,7 +161,7 @@ impl LogletBase for LogletWrapper {
async fn read_next_single_opt(
&self,
after: Self::Offset,
) -> Result<Option<LogRecord<Self::Offset>>, Error> {
) -> Result<Option<LogRecord<Self::Offset, Bytes>>, Error> {
let offset = after.into_offset(self.base_lsn);
self.loglet
.read_next_single_opt(offset)
Expand Down
17 changes: 10 additions & 7 deletions crates/bifrost/src/loglets/local_loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub use log_store::LogStoreError;
use metrics::{counter, histogram};
pub use provider::LocalLogletProvider;
use restate_core::ShutdownError;
use restate_types::logs::{Payload, SequenceNumber};
use restate_types::logs::SequenceNumber;
use tokio::sync::Mutex;
use tracing::{debug, warn};

Expand Down Expand Up @@ -95,7 +95,10 @@ impl LocalLoglet {
self.release_watch.notify(release_pointer);
}

fn read_after(&self, after: LogletOffset) -> Result<Option<LogRecord<LogletOffset>>, Error> {
fn read_after(
&self,
after: LogletOffset,
) -> Result<Option<LogRecord<LogletOffset, Bytes>>, Error> {
let trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Relaxed));
// Are we reading after before the trim point? Note that if `trim_point` == `after`
// then we don't return a trim gap, because the next record is potentially a data
Expand Down Expand Up @@ -138,15 +141,15 @@ impl LocalLoglet {
return Ok(None);
}
let data = Bytes::from(data);
Ok(Some(LogRecord::new_data(key.offset, Payload::from(data))))
Ok(Some(LogRecord::new_data(key.offset, data)))
}
}
}

#[async_trait]
impl LogletBase for LocalLoglet {
type Offset = LogletOffset;
async fn append(&self, payload: Payload) -> Result<LogletOffset, Error> {
async fn append(&self, payload: Bytes) -> Result<LogletOffset, Error> {
counter!(BIFROST_LOCAL_APPEND).increment(1);
let start_time = std::time::Instant::now();
// We hold the lock to ensure that offsets are enqueued in the order of
Expand All @@ -162,7 +165,7 @@ impl LogletBase for LocalLoglet {
.enqueue_put_record(
self.log_id,
offset,
payload.into(),
payload,
true, /* release_immediately */
)
.await?;
Expand Down Expand Up @@ -199,7 +202,7 @@ impl LogletBase for LocalLoglet {
async fn read_next_single(
&self,
after: Self::Offset,
) -> Result<LogRecord<Self::Offset>, Error> {
) -> Result<LogRecord<Self::Offset, Bytes>, Error> {
loop {
let next_record = self.read_after(after)?;
if let Some(next_record) = next_record {
Expand All @@ -213,7 +216,7 @@ impl LogletBase for LocalLoglet {
async fn read_next_single_opt(
&self,
after: Self::Offset,
) -> Result<Option<LogRecord<Self::Offset>>, Error> {
) -> Result<Option<LogRecord<Self::Offset, Bytes>>, Error> {
self.read_after(after)
}
}
43 changes: 23 additions & 20 deletions crates/bifrost/src/loglets/memory_loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;

use async_trait::async_trait;
use bytes::Bytes;
use restate_types::logs::metadata::LogletParams;
use restate_types::logs::{Payload, SequenceNumber};
use restate_types::logs::SequenceNumber;
use tokio::sync::oneshot::{Receiver, Sender};
use tokio::sync::Mutex as AsyncMutex;
use tracing::{debug, info};
Expand Down Expand Up @@ -111,7 +112,7 @@ impl Ord for OffsetWatcher {
pub struct MemoryLoglet {
// We treat params as an opaque identifier for the underlying loglet.
params: LogletParams,
log: Mutex<Vec<Payload>>,
log: Mutex<Vec<Bytes>>,
// internal offset of the first record (or slot available)
trim_point_offset: AtomicU64,
last_committed_offset: AtomicU64,
Expand Down Expand Up @@ -173,7 +174,10 @@ impl MemoryLoglet {
}
}

fn read_after(&self, after: LogletOffset) -> Result<Option<LogRecord<LogletOffset>>, Error> {
fn read_after(
&self,
after: LogletOffset,
) -> Result<Option<LogRecord<LogletOffset, Bytes>>, Error> {
let guard = self.log.lock().unwrap();
let trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Acquire));
// are we reading after before the trim point? Note that if trim_point == after then we
Expand Down Expand Up @@ -201,7 +205,7 @@ impl MemoryLoglet {
impl LogletBase for MemoryLoglet {
type Offset = LogletOffset;

async fn append(&self, payload: Payload) -> Result<LogletOffset, Error> {
async fn append(&self, payload: Bytes) -> Result<LogletOffset, Error> {
let mut log = self.log.lock().unwrap();
let offset = self.index_to_offset(log.len());
debug!(
Expand Down Expand Up @@ -233,24 +237,23 @@ impl LogletBase for MemoryLoglet {
async fn read_next_single(
&self,
after: LogletOffset,
) -> Result<LogRecord<Self::Offset>, Error> {
) -> Result<LogRecord<Self::Offset, Bytes>, Error> {
loop {
let next_record = self.read_after(after)?;
if let Some(next_record) = next_record {
break Ok(next_record);
} else {
// Wait and respond when available.
let receiver = self.watch_for_offset(after.next());
receiver.await.unwrap();
continue;
}
// Wait and respond when available.
let receiver = self.watch_for_offset(after.next());
receiver.await.unwrap();
continue;
}
}

async fn read_next_single_opt(
&self,
after: Self::Offset,
) -> Result<Option<LogRecord<Self::Offset>>, Error> {
) -> Result<Option<LogRecord<Self::Offset, Bytes>>, Error> {
self.read_after(after)
}
}
Expand All @@ -273,19 +276,19 @@ mod tests {
assert_eq!(None, loglet.find_tail().await?);

// Append 1
let offset = loglet.append(Payload::from("record1")).await?;
let offset = loglet.append(Bytes::from_static(b"record1")).await?;
assert_eq!(LogletOffset::OLDEST, offset);
assert_eq!(LogletOffset::INVALID, loglet.get_trim_point().await?);
assert_eq!(Some(LogletOffset::OLDEST), loglet.find_tail().await?);

// Append 2
let offset = loglet.append(Payload::from("record2")).await?;
let offset = loglet.append(Bytes::from_static(b"record2")).await?;
assert_eq!(LogletOffset(2), offset);
assert_eq!(LogletOffset::INVALID, loglet.get_trim_point().await?);
assert_eq!(Some(LogletOffset(2)), loglet.find_tail().await?);

// Append 3
let offset = loglet.append(Payload::from("record3")).await?;
let offset = loglet.append(Bytes::from_static(b"record3")).await?;
assert_eq!(LogletOffset(3), offset);
assert_eq!(LogletOffset::INVALID, loglet.get_trim_point().await?);
assert_eq!(Some(LogletOffset(3)), loglet.find_tail().await?);
Expand All @@ -296,17 +299,17 @@ mod tests {
assert_eq!(offset, loglet.get_trim_point().await?.next());
assert_eq!(LogletOffset::OLDEST, offset);
assert!(record.is_data());
assert_eq!(Payload::from("record1"), record.into_payload_unchecked());
assert_eq!(Some(&Bytes::from_static(b"record1")), record.payload());

// read record 2 (reading next after OLDEST)
let LogRecord { offset, record } = loglet.read_next_single(offset).await?;
assert_eq!(LogletOffset(2), offset);
assert_eq!(Payload::from("record2"), record.into_payload_unchecked());
assert_eq!(Some(&Bytes::from_static(b"record2")), record.payload());

// read record 3
let LogRecord { offset, record } = loglet.read_next_single(offset).await?;
assert_eq!(LogletOffset(3), offset);
assert_eq!(Payload::from("record3"), record.into_payload_unchecked());
assert_eq!(Some(&Bytes::from_static(b"record3")), record.payload());

// read from the future returns None
assert!(loglet
Expand All @@ -320,7 +323,7 @@ mod tests {
// read future record 4
let LogRecord { offset, record } = loglet.read_next_single(LogletOffset(3)).await?;
assert_eq!(LogletOffset(4), offset);
assert_eq!(Payload::from("record4"), record.into_payload_unchecked());
assert_eq!(Some(&Bytes::from_static(b"record4")), record.payload());
Ok(())
}
});
Expand All @@ -332,7 +335,7 @@ mod tests {
// read future record 10
let LogRecord { offset, record } = loglet.read_next_single(LogletOffset(9)).await?;
assert_eq!(LogletOffset(10), offset);
assert_eq!(Payload::from("record10"), record.into_payload_unchecked());
assert_eq!(Some(&Bytes::from_static(b"record10")), record.payload());
Ok(())
}
});
Expand All @@ -342,7 +345,7 @@ mod tests {
assert!(!handle1.is_finished());

// Append 4
let offset = loglet.append(Payload::from("record4")).await?;
let offset = loglet.append(Bytes::from_static(b"record4")).await?;
assert_eq!(LogletOffset(4), offset);
assert_eq!(LogletOffset::INVALID, loglet.get_trim_point().await?);
assert_eq!(Some(LogletOffset(4)), loglet.find_tail().await?);
Expand Down
Loading
Loading