Skip to content

Commit

Permalink
[Bifrost] LocalLoglet push-down filtering and new record format
Browse files Browse the repository at this point in the history
This introduces push-down key-based filtering support in local-loglet along with a new on-disk record layout that supports key checking without copying the payload over. It's extensible through an unused flags field and is currently used to store the key length and the key itself.
The new record layout is not enabled by default. This allows the next release to be the first safe version to rollback into once the new format is enabled.

Keys are now stored in the legacy payload holder as well, in a backward compatible fashion.

Code is not my prettiest, but the logic is solid ;)
  • Loading branch information
AhmedSoliman committed Aug 12, 2024
1 parent 55bb627 commit e07c449
Show file tree
Hide file tree
Showing 9 changed files with 368 additions and 81 deletions.
22 changes: 11 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl Bifrost {
}

/// Appends a batch of records to a log. The log id must exist, otherwise the
/// operation fails with [`Error::UnknownLogId`]. The returned Lsn is the Lsn of the first
/// operation fails with [`Error::UnknownLogId`]. The returned Lsn is the Lsn of the last
/// record in this batch. This will only return after all records have been stored.
pub async fn append_batch<T: StorageEncode>(
&self,
Expand Down
42 changes: 1 addition & 41 deletions crates/bifrost/src/providers/local_loglet/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,20 @@

use std::sync::Arc;

use bytes::Bytes;
use restate_types::logs::Keys;
use rocksdb::{BoundColumnFamily, DBCompressionType, SliceTransform, DB};
use serde::{Deserialize, Serialize};
use static_assertions::const_assert;

use restate_rocksdb::{
CfExactPattern, CfName, DbName, DbSpecBuilder, RocksDb, RocksDbManager, RocksError,
};
use restate_types::config::{LocalLogletOptions, RocksDbOptions};
use restate_types::flexbuffers_storage_encode_decode;
use restate_types::live::BoxedLiveLoad;
use restate_types::storage::{PolyBytes, StorageDecodeError, StorageEncodeError};
use restate_types::time::NanosSinceEpoch;
use restate_types::storage::{StorageDecodeError, StorageEncodeError};

use super::keys::{MetadataKey, MetadataKind, DATA_KEY_PREFIX_LENGTH};
use super::log_state::{log_state_full_merge, log_state_partial_merge, LogState};
use super::log_store_writer::LogStoreWriter;
use crate::loglet::LogletError;
use crate::Record;

// matches the default directory name
pub(crate) const DB_NAME: &str = "local-loglet";
Expand Down Expand Up @@ -241,37 +235,3 @@ fn cf_metadata_options(
opts
}
}

#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub(super) struct LocalLogletHeader {
pub created_at: NanosSinceEpoch,
}

impl Default for LocalLogletHeader {
fn default() -> Self {
Self {
created_at: NanosSinceEpoch::now(),
}
}
}

/// Owned payload.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub(super) struct LocalLogletPayload {
pub header: LocalLogletHeader,
pub body: Bytes,
// future work
// pub keys: Keys,
}

impl LocalLogletPayload {
pub fn into_record(self) -> Record {
let header = crate::Header {
created_at: self.header.created_at,
};
// todo(asoli): add keys
Record::from_parts(header, Keys::None, PolyBytes::Bytes(self.body))
}
}

flexbuffers_storage_encode_decode!(LocalLogletPayload);
20 changes: 4 additions & 16 deletions crates/bifrost/src/providers/local_loglet/log_store_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ use restate_rocksdb::{IoMode, Priority, RocksDb};
use restate_types::config::LocalLogletOptions;
use restate_types::live::BoxedLiveLoad;
use restate_types::logs::SequenceNumber;
use restate_types::storage::StorageCodec;

use super::keys::{MetadataKey, MetadataKind, RecordKey};
use super::log_state::LogStateUpdates;
use super::log_store::{LocalLogletHeader, LocalLogletPayload, DATA_CF, METADATA_CF};
use super::log_store::{DATA_CF, METADATA_CF};
use super::metric_definitions::{
BIFROST_LOCAL_WRITE_BATCH_COUNT, BIFROST_LOCAL_WRITE_BATCH_SIZE_BYTES,
};
use super::record_format::{encode_record_and_split, FORMAT_FOR_NEW_APPENDS};
use crate::loglet::{LogletOffset, OperationError};
use crate::record::ErasedInputRecord;

Expand Down Expand Up @@ -226,20 +226,8 @@ impl LogStoreWriter {
serde_buffer.reserve(payloads.len() * RECORD_SIZE_GUESS);
for payload in payloads.iter() {
let key_bytes = RecordKey::new(id, offset).encode_and_split(serde_buffer);
// todo(asoli) store keys
// let _keys = payload.keys;
let body_bytes = StorageCodec::encode_and_split(&*payload.body, serde_buffer)
.expect("record serde is infallible");

let final_payload = LocalLogletPayload {
header: LocalLogletHeader {
created_at: payload.header.created_at,
},
body: body_bytes.freeze(),
};

let value_bytes = StorageCodec::encode_and_split(&final_payload, serde_buffer)
.expect("record serde is infallible");
let value_bytes =
encode_record_and_split(FORMAT_FOR_NEW_APPENDS, payload, serde_buffer);
write_batch.put_cf(data_cf, key_bytes, value_bytes);
// advance the offset for the next record
offset = offset.next();
Expand Down
1 change: 1 addition & 0 deletions crates/bifrost/src/providers/local_loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ mod log_store_writer;
mod metric_definitions;
mod provider;
mod read_stream;
mod record_format;

use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
Expand Down
25 changes: 14 additions & 11 deletions crates/bifrost/src/providers/local_loglet/read_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ use tracing::{debug, error, warn};
use restate_core::ShutdownError;
use restate_rocksdb::RocksDbPerfGuard;
use restate_types::logs::{KeyFilter, SequenceNumber};
use restate_types::storage::StorageCodec;

use crate::loglet::{LogletBase, LogletOffset, LogletReadStream, OperationError};
use crate::providers::local_loglet::log_store::LocalLogletPayload;
use crate::providers::local_loglet::record_format::decode_and_filter_record;
use crate::providers::local_loglet::LogStoreError;
use crate::{LogEntry, Result, TailState};

Expand All @@ -33,6 +32,7 @@ use super::LocalLoglet;

pub(crate) struct LocalLogletReadStream {
log_id: u64,
filter: KeyFilter,
/// Buffer for serialization
serde_buffer: BytesMut,
// the next record this stream will attempt to read
Expand Down Expand Up @@ -64,7 +64,7 @@ unsafe fn ignore_iterator_lifetime<'a>(
impl LocalLogletReadStream {
pub(crate) async fn create(
loglet: Arc<LocalLoglet>,
_filter: KeyFilter,
filter: KeyFilter,
from_offset: LogletOffset,
to: Option<LogletOffset>,
) -> Result<Self, OperationError> {
Expand Down Expand Up @@ -114,6 +114,7 @@ impl LocalLogletReadStream {

Ok(Self {
log_id: loglet.loglet_id,
filter,
serde_buffer,
loglet,
read_pointer: from_offset,
Expand Down Expand Up @@ -170,6 +171,7 @@ impl Stream for LocalLogletReadStream {

match maybe_tail_state {
Some(tail_state) => {
// tail has been updated.
self.last_known_tail = tail_state.offset();
continue;
}
Expand Down Expand Up @@ -254,16 +256,17 @@ impl Stream for LocalLogletReadStream {
return Poll::Ready(None);
}

let mut raw_value = self.iterator.value().expect("log record exists");
let internal_payload: LocalLogletPayload =
StorageCodec::decode(&mut raw_value).expect("record serde is infallible");

self.read_pointer = loaded_key.offset.next();
let raw_value = self.iterator.value().expect("log record exists");

let maybe_record = decode_and_filter_record(raw_value, &self.filter)
.map_err(OperationError::terminal)?;

return Poll::Ready(Some(Ok(LogEntry::new_data(
key.offset,
internal_payload.into_record(),
))));
// The record matches the filter, good to return.
if let Some(record) = maybe_record {
return Poll::Ready(Some(Ok(LogEntry::new_data(key.offset, record))));
}
// Didn't match, loop and read the next record if possible.
}
}
}
Loading

0 comments on commit e07c449

Please sign in to comment.