Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Cleaner task to clean completed invocations #1845

Merged
merged 9 commits into from
Aug 16, 2024
1 change: 1 addition & 0 deletions crates/core/src/task_center_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub enum TaskKind {
#[strum(props(OnError = "log"))]
ConnectionReactor,
Shuffle,
Cleaner,
MetadataStore,
// -- Bifrost Tasks
/// A background task that the system needs for its operation. The task requires a system
Expand Down
4 changes: 2 additions & 2 deletions crates/ingress-dispatcher/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ mod tests {
);
invocation.argument = argument.clone();
invocation.idempotency_key = Some(idempotency_key.clone());
invocation.completion_retention_time = Some(Duration::from_secs(60));
invocation.completion_retention_duration = Some(Duration::from_secs(60));
let (ingress_req, _, res) = IngressDispatcherRequest::invocation(invocation);
dispatcher.dispatch_ingress_request(ingress_req).await?;

Expand All @@ -343,7 +343,7 @@ mod tests {
invocation_target: eq(invocation_target.clone()),
argument: eq(argument.clone()),
idempotency_key: some(eq(idempotency_key.clone())),
completion_retention_time: some(eq(Duration::from_secs(60)))
completion_retention_duration: some(eq(Duration::from_secs(60)))
})
);
let_assert!(
Expand Down
2 changes: 1 addition & 1 deletion crates/ingress-http/src/handler/service_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ where
let mut service_invocation =
ServiceInvocation::initialize(invocation_id, invocation_target, Source::Ingress);
service_invocation.with_related_span(SpanRelation::Parent(ingress_span_context));
service_invocation.completion_retention_time =
service_invocation.completion_retention_duration =
invocation_target_meta.compute_retention(idempotency_key.is_some());
if let Some(key) = idempotency_key {
service_invocation.idempotency_key = Some(key);
Expand Down
6 changes: 3 additions & 3 deletions crates/ingress-http/src/handler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ async fn idempotency_key_parsing() {
Some(ByteString::from_static("123456"))
);
assert_eq!(
service_invocation.completion_retention_time,
service_invocation.completion_retention_duration,
Some(Duration::from_secs(60 * 60 * 24))
);

Expand Down Expand Up @@ -437,7 +437,7 @@ async fn idempotency_key_and_send() {
Some(ByteString::from_static("123456"))
);
assert_eq!(
service_invocation.completion_retention_time,
service_invocation.completion_retention_duration,
Some(Duration::from_secs(60 * 60 * 24))
);
assert_that!(
Expand Down Expand Up @@ -497,7 +497,7 @@ async fn idempotency_key_and_send_with_different_invocation_id() {
Some(ByteString::from_static("123456"))
);
assert_eq!(
service_invocation.completion_retention_time,
service_invocation.completion_retention_duration,
Some(Duration::from_secs(60 * 60 * 24))
);
assert_that!(
Expand Down
2 changes: 1 addition & 1 deletion crates/partition-store/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ pub(crate) fn mock_service_invocation(service_id: ServiceId) -> ServiceInvocatio
span_context: Default::default(),
headers: vec![],
execution_time: None,
completion_retention_time: None,
completion_retention_duration: None,
idempotency_key: None,
submit_notification_sink: None,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ fn invoked_status(
journal_metadata: JournalMetadata::initialize(ServiceInvocationSpanContext::empty()),
pinned_deployment: None,
response_sinks: HashSet::new(),
timestamps: StatusTimestamps::new(MillisSinceEpoch::new(0), MillisSinceEpoch::new(0)),
timestamps: StatusTimestamps::init(MillisSinceEpoch::new(0)),
source: Source::Ingress,
completion_retention_time: Duration::ZERO,
completion_retention_duration: Duration::ZERO,
idempotency_key: None,
source_table,
})
Expand All @@ -100,9 +100,9 @@ fn suspended_status(
journal_metadata: JournalMetadata::initialize(ServiceInvocationSpanContext::empty()),
pinned_deployment: None,
response_sinks: HashSet::new(),
timestamps: StatusTimestamps::new(MillisSinceEpoch::new(0), MillisSinceEpoch::new(0)),
timestamps: StatusTimestamps::init(MillisSinceEpoch::new(0)),
source: Source::Ingress,
completion_retention_time: Duration::ZERO,
completion_retention_duration: Duration::ZERO,
idempotency_key: None,
source_table,
},
Expand Down
10 changes: 8 additions & 2 deletions crates/storage-api/proto/dev/restate/storage/v1/domain.proto
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,21 @@ message NeoInvocationStatus {
InvocationTarget invocation_target = 2;
Source source = 3;
SpanContext span_context = 4;
repeated ServiceInvocationResponseSink response_sinks = 7;
Duration completion_retention_duration = 11;

// Timestamps
uint64 creation_time = 5;
uint64 modification_time = 6;
repeated ServiceInvocationResponseSink response_sinks = 7;
optional uint64 inboxed_transition_time = 19;
optional uint64 scheduled_transition_time = 20;
optional uint64 running_transition_time = 21;
optional uint64 completed_transition_time = 22;

// Scheduled/Inboxed
optional bytes argument = 8;
repeated Header headers = 9;
optional uint64 execution_time = 10;
Duration completion_retention_time = 11;
optional string idempotency_key = 12;

// Inboxed
Expand Down
Loading
Loading