-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Bifrost] BackgroundAppender implementation #1789
Conversation
49d5d2d
to
7e2e5b7
Compare
Test Results 5 files 5 suites 6m 7s ⏱️ Results for commit 35b10b9. ♻️ This comment has been updated with latest results. |
988a3aa
to
68e2d74
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great to have the BackgroundAppender
:-) LGTM. +1 for merging.
let keys = record.record_keys(); | ||
StorageCodec::encode(&record, &mut appender.serde_buffer) | ||
.expect("record serde is infallible"); | ||
// todo, optimize to avoid copying the payload bytes again | ||
let payload = Payload::new(appender.serde_buffer.split().freeze()); | ||
StorageCodec::encode(payload, &mut appender.serde_buffer) | ||
.expect("record serde is infallible"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe factor out into a function encode_as_payload<T: StorageEncode + HasRecordKeys>(value: &T, ...)
? I think I have seen this snippet at a few places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea but this code gets removed in a follow-up PR anyway so I wouldn't worry so much about it.
// What to do if task panics! | ||
if let Err(e) = task.await { | ||
warn!( | ||
?e, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe err
so that from the field name is clear that this is the error?
#[strum(props(OnCancel = "wait", OnError = "log"))] | ||
BifrostAppender, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are awaiting on cancel but do not listen to the task's cancellation token, we assume that this task will always be canceled together with the task that holds the AppenderHandle
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's correct. appender handle's description explains this.
Envelope implements a new trait `HasRecordKeys` to support future matching of records based on a `KeyMatcher`. The new appender is the canonical way to write to a single log. It tries to recycle the write buffer (although it doesn't use freelist which would be a potential future improvement) and it defers serialization to unlock the potential for better serialization write buffer management. Appender is created by `bifrost.create_appender(log_id)?` and it's the core abstraction that `BackgroundAppender` builds upon (next PR)
Introduces a new way to enqueue messages to Bifrost. This PR adds `bifrost.create_background_append(...).start()` that starts a background appender. The background appender automatically batches appends and provides flexible control over draining it or auto-destruction when all senders are dropped (see `detach()` on `AppenderHandle`). With this, `ActionEffectHandler` makes use of this new ability and will not block the partition processor loop when writing effects to bifrost.
[Bifrost] BackgroundAppender implementation
Introduces a new way to enqueue messages to Bifrost. This PR adds
bifrost.create_background_append(...).start()
that starts a background appender. The background appender automatically batches appends and provides flexible control over draining it or auto-destruction when all senders are dropped (seedetach()
onAppenderHandle
).With this,
ActionEffectHandler
makes use of this new ability and will not block the partition processor loop when writing effects to bifrost.Stack created with Sapling. Best reviewed with ReviewStack.