Skip to content

Commit

Permalink
Add Cleaner task to clean completed invocations (#1845)
Browse files Browse the repository at this point in the history
* A bunch of changes regarding time.

Expanded the data structure StatusTimestamps that tracks changes within the InvocationStatus. Because those timestamps are currently not agreed between replicas, the functions are marked unsafe and should really be used only in cases where those timestamps don't influence the deterministic business logic of the PP. Also only the NeoInvocationStatus stores those fields, the old invocation status will never fill those.

Now CompletedInvocation propagates the `CompletionRetention`. This field is filled with a `Duration::MAX` for the old invocation status, causing the addition to overflow (thus not causing the cleanup in the new cleaner task).

"Time is what the clock says" A. Einstein

* Implement the new Cleaner task. Also stop scheduling the cleanup timer for NeoInvocationStatus.
  • Loading branch information
slinkydeveloper authored Aug 16, 2024
1 parent 11827fa commit e8cc298
Show file tree
Hide file tree
Showing 22 changed files with 1,039 additions and 134 deletions.
1 change: 1 addition & 0 deletions crates/core/src/task_center_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub enum TaskKind {
#[strum(props(OnError = "log"))]
ConnectionReactor,
Shuffle,
Cleaner,
MetadataStore,
// -- Bifrost Tasks
/// A background task that the system needs for its operation. The task requires a system
Expand Down
4 changes: 2 additions & 2 deletions crates/ingress-dispatcher/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ mod tests {
);
invocation.argument = argument.clone();
invocation.idempotency_key = Some(idempotency_key.clone());
invocation.completion_retention_time = Some(Duration::from_secs(60));
invocation.completion_retention_duration = Some(Duration::from_secs(60));
let (ingress_req, _, res) = IngressDispatcherRequest::invocation(invocation);
dispatcher.dispatch_ingress_request(ingress_req).await?;

Expand All @@ -343,7 +343,7 @@ mod tests {
invocation_target: eq(invocation_target.clone()),
argument: eq(argument.clone()),
idempotency_key: some(eq(idempotency_key.clone())),
completion_retention_time: some(eq(Duration::from_secs(60)))
completion_retention_duration: some(eq(Duration::from_secs(60)))
})
);
let_assert!(
Expand Down
2 changes: 1 addition & 1 deletion crates/ingress-http/src/handler/service_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ where
let mut service_invocation =
ServiceInvocation::initialize(invocation_id, invocation_target, Source::Ingress);
service_invocation.with_related_span(SpanRelation::Parent(ingress_span_context));
service_invocation.completion_retention_time =
service_invocation.completion_retention_duration =
invocation_target_meta.compute_retention(idempotency_key.is_some());
if let Some(key) = idempotency_key {
service_invocation.idempotency_key = Some(key);
Expand Down
6 changes: 3 additions & 3 deletions crates/ingress-http/src/handler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ async fn idempotency_key_parsing() {
Some(ByteString::from_static("123456"))
);
assert_eq!(
service_invocation.completion_retention_time,
service_invocation.completion_retention_duration,
Some(Duration::from_secs(60 * 60 * 24))
);

Expand Down Expand Up @@ -437,7 +437,7 @@ async fn idempotency_key_and_send() {
Some(ByteString::from_static("123456"))
);
assert_eq!(
service_invocation.completion_retention_time,
service_invocation.completion_retention_duration,
Some(Duration::from_secs(60 * 60 * 24))
);
assert_that!(
Expand Down Expand Up @@ -497,7 +497,7 @@ async fn idempotency_key_and_send_with_different_invocation_id() {
Some(ByteString::from_static("123456"))
);
assert_eq!(
service_invocation.completion_retention_time,
service_invocation.completion_retention_duration,
Some(Duration::from_secs(60 * 60 * 24))
);
assert_that!(
Expand Down
2 changes: 1 addition & 1 deletion crates/partition-store/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ pub(crate) fn mock_service_invocation(service_id: ServiceId) -> ServiceInvocatio
span_context: Default::default(),
headers: vec![],
execution_time: None,
completion_retention_time: None,
completion_retention_duration: None,
idempotency_key: None,
submit_notification_sink: None,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ fn invoked_status(
journal_metadata: JournalMetadata::initialize(ServiceInvocationSpanContext::empty()),
pinned_deployment: None,
response_sinks: HashSet::new(),
timestamps: StatusTimestamps::new(MillisSinceEpoch::new(0), MillisSinceEpoch::new(0)),
timestamps: StatusTimestamps::init(MillisSinceEpoch::new(0)),
source: Source::Ingress,
completion_retention_time: Duration::ZERO,
completion_retention_duration: Duration::ZERO,
idempotency_key: None,
source_table,
})
Expand All @@ -100,9 +100,9 @@ fn suspended_status(
journal_metadata: JournalMetadata::initialize(ServiceInvocationSpanContext::empty()),
pinned_deployment: None,
response_sinks: HashSet::new(),
timestamps: StatusTimestamps::new(MillisSinceEpoch::new(0), MillisSinceEpoch::new(0)),
timestamps: StatusTimestamps::init(MillisSinceEpoch::new(0)),
source: Source::Ingress,
completion_retention_time: Duration::ZERO,
completion_retention_duration: Duration::ZERO,
idempotency_key: None,
source_table,
},
Expand Down
10 changes: 8 additions & 2 deletions crates/storage-api/proto/dev/restate/storage/v1/domain.proto
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,21 @@ message NeoInvocationStatus {
InvocationTarget invocation_target = 2;
Source source = 3;
SpanContext span_context = 4;
repeated ServiceInvocationResponseSink response_sinks = 7;
Duration completion_retention_duration = 11;

// Timestamps
uint64 creation_time = 5;
uint64 modification_time = 6;
repeated ServiceInvocationResponseSink response_sinks = 7;
optional uint64 inboxed_transition_time = 19;
optional uint64 scheduled_transition_time = 20;
optional uint64 running_transition_time = 21;
optional uint64 completed_transition_time = 22;

// Scheduled/Inboxed
optional bytes argument = 8;
repeated Header headers = 9;
optional uint64 execution_time = 10;
Duration completion_retention_time = 11;
optional string idempotency_key = 12;

// Inboxed
Expand Down
Loading

0 comments on commit e8cc298

Please sign in to comment.