diff --git a/crates/core/src/task_center_types.rs b/crates/core/src/task_center_types.rs index 38a8f894f..fccb81156 100644 --- a/crates/core/src/task_center_types.rs +++ b/crates/core/src/task_center_types.rs @@ -75,6 +75,7 @@ pub enum TaskKind { #[strum(props(OnError = "log"))] ConnectionReactor, Shuffle, + Cleaner, MetadataStore, // -- Bifrost Tasks /// A background task that the system needs for its operation. The task requires a system diff --git a/crates/ingress-dispatcher/src/dispatcher.rs b/crates/ingress-dispatcher/src/dispatcher.rs index ca875bff0..b7d6b0ba7 100644 --- a/crates/ingress-dispatcher/src/dispatcher.rs +++ b/crates/ingress-dispatcher/src/dispatcher.rs @@ -316,7 +316,7 @@ mod tests { ); invocation.argument = argument.clone(); invocation.idempotency_key = Some(idempotency_key.clone()); - invocation.completion_retention_time = Some(Duration::from_secs(60)); + invocation.completion_retention_duration = Some(Duration::from_secs(60)); let (ingress_req, _, res) = IngressDispatcherRequest::invocation(invocation); dispatcher.dispatch_ingress_request(ingress_req).await?; @@ -343,7 +343,7 @@ mod tests { invocation_target: eq(invocation_target.clone()), argument: eq(argument.clone()), idempotency_key: some(eq(idempotency_key.clone())), - completion_retention_time: some(eq(Duration::from_secs(60))) + completion_retention_duration: some(eq(Duration::from_secs(60))) }) ); let_assert!( diff --git a/crates/ingress-http/src/handler/service_handler.rs b/crates/ingress-http/src/handler/service_handler.rs index a3e68aa9f..bf6f5c8aa 100644 --- a/crates/ingress-http/src/handler/service_handler.rs +++ b/crates/ingress-http/src/handler/service_handler.rs @@ -180,7 +180,7 @@ where let mut service_invocation = ServiceInvocation::initialize(invocation_id, invocation_target, Source::Ingress); service_invocation.with_related_span(SpanRelation::Parent(ingress_span_context)); - service_invocation.completion_retention_time = + service_invocation.completion_retention_duration = invocation_target_meta.compute_retention(idempotency_key.is_some()); if let Some(key) = idempotency_key { service_invocation.idempotency_key = Some(key); diff --git a/crates/ingress-http/src/handler/tests.rs b/crates/ingress-http/src/handler/tests.rs index b470118db..ead18e550 100644 --- a/crates/ingress-http/src/handler/tests.rs +++ b/crates/ingress-http/src/handler/tests.rs @@ -373,7 +373,7 @@ async fn idempotency_key_parsing() { Some(ByteString::from_static("123456")) ); assert_eq!( - service_invocation.completion_retention_time, + service_invocation.completion_retention_duration, Some(Duration::from_secs(60 * 60 * 24)) ); @@ -437,7 +437,7 @@ async fn idempotency_key_and_send() { Some(ByteString::from_static("123456")) ); assert_eq!( - service_invocation.completion_retention_time, + service_invocation.completion_retention_duration, Some(Duration::from_secs(60 * 60 * 24)) ); assert_that!( @@ -497,7 +497,7 @@ async fn idempotency_key_and_send_with_different_invocation_id() { Some(ByteString::from_static("123456")) ); assert_eq!( - service_invocation.completion_retention_time, + service_invocation.completion_retention_duration, Some(Duration::from_secs(60 * 60 * 24)) ); assert_that!( diff --git a/crates/partition-store/tests/integration_test.rs b/crates/partition-store/tests/integration_test.rs index 64f385480..7df752e5e 100644 --- a/crates/partition-store/tests/integration_test.rs +++ b/crates/partition-store/tests/integration_test.rs @@ -92,7 +92,7 @@ pub(crate) fn mock_service_invocation(service_id: ServiceId) -> ServiceInvocatio span_context: Default::default(), headers: vec![], execution_time: None, - completion_retention_time: None, + completion_retention_duration: None, idempotency_key: None, submit_notification_sink: None, } diff --git a/crates/partition-store/tests/invocation_status_table_test/mod.rs b/crates/partition-store/tests/invocation_status_table_test/mod.rs index f69428746..0798fadf1 100644 --- a/crates/partition-store/tests/invocation_status_table_test/mod.rs +++ b/crates/partition-store/tests/invocation_status_table_test/mod.rs @@ -82,9 +82,9 @@ fn invoked_status( journal_metadata: JournalMetadata::initialize(ServiceInvocationSpanContext::empty()), pinned_deployment: None, response_sinks: HashSet::new(), - timestamps: StatusTimestamps::new(MillisSinceEpoch::new(0), MillisSinceEpoch::new(0)), + timestamps: StatusTimestamps::init(MillisSinceEpoch::new(0)), source: Source::Ingress, - completion_retention_time: Duration::ZERO, + completion_retention_duration: Duration::ZERO, idempotency_key: None, source_table, }) @@ -100,9 +100,9 @@ fn suspended_status( journal_metadata: JournalMetadata::initialize(ServiceInvocationSpanContext::empty()), pinned_deployment: None, response_sinks: HashSet::new(), - timestamps: StatusTimestamps::new(MillisSinceEpoch::new(0), MillisSinceEpoch::new(0)), + timestamps: StatusTimestamps::init(MillisSinceEpoch::new(0)), source: Source::Ingress, - completion_retention_time: Duration::ZERO, + completion_retention_duration: Duration::ZERO, idempotency_key: None, source_table, }, diff --git a/crates/storage-api/proto/dev/restate/storage/v1/domain.proto b/crates/storage-api/proto/dev/restate/storage/v1/domain.proto index 5d05532a3..9196f69e6 100644 --- a/crates/storage-api/proto/dev/restate/storage/v1/domain.proto +++ b/crates/storage-api/proto/dev/restate/storage/v1/domain.proto @@ -112,15 +112,21 @@ message NeoInvocationStatus { InvocationTarget invocation_target = 2; Source source = 3; SpanContext span_context = 4; + repeated ServiceInvocationResponseSink response_sinks = 7; + Duration completion_retention_duration = 11; + + // Timestamps uint64 creation_time = 5; uint64 modification_time = 6; - repeated ServiceInvocationResponseSink response_sinks = 7; + optional uint64 inboxed_transition_time = 19; + optional uint64 scheduled_transition_time = 20; + optional uint64 running_transition_time = 21; + optional uint64 completed_transition_time = 22; // Scheduled/Inboxed optional bytes argument = 8; repeated Header headers = 9; optional uint64 execution_time = 10; - Duration completion_retention_time = 11; optional string idempotency_key = 12; // Inboxed diff --git a/crates/storage-api/src/invocation_status_table/mod.rs b/crates/storage-api/src/invocation_status_table/mod.rs index e64530cb8..23957f5a2 100644 --- a/crates/storage-api/src/invocation_status_table/mod.rs +++ b/crates/storage-api/src/invocation_status_table/mod.rs @@ -35,40 +35,154 @@ pub enum SourceTable { pub struct StatusTimestamps { creation_time: MillisSinceEpoch, modification_time: MillisSinceEpoch, + + inboxed_transition_time: Option, + scheduled_transition_time: Option, + running_transition_time: Option, + completed_transition_time: Option, } impl StatusTimestamps { - pub fn new(creation_time: MillisSinceEpoch, modification_time: MillisSinceEpoch) -> Self { + pub fn init(creation_time: MillisSinceEpoch) -> Self { Self { creation_time, - modification_time, + modification_time: creation_time, + inboxed_transition_time: None, + scheduled_transition_time: None, + running_transition_time: None, + completed_transition_time: None, } } - pub fn now() -> Self { - StatusTimestamps::new(MillisSinceEpoch::now(), MillisSinceEpoch::now()) + pub fn new( + creation_time: MillisSinceEpoch, + modification_time: MillisSinceEpoch, + inboxed_transition_time: Option, + scheduled_transition_time: Option, + running_transition_time: Option, + completed_transition_time: Option, + ) -> Self { + Self { + creation_time, + modification_time, + inboxed_transition_time, + scheduled_transition_time, + running_transition_time, + completed_transition_time, + } } /// Update the statistics with an updated [`Self::modification_time()`]. + /// + /// # Safety + /// The value of this time is not consistent across replicas of a partition, because it's not agreed. + /// You **MUST NOT** use it within the Partition processor business logic, but only for observability purposes. pub fn update(&mut self) { self.modification_time = MillisSinceEpoch::now() } + /// Create a new StatusTimestamps data structure using the system time. + /// + /// # Safety + /// The value of this time is not consistent across replicas of a partition, because it's not agreed. + /// You **MUST NOT** use it within the Partition processor business logic, but only for observability purposes. + pub fn now() -> Self { + StatusTimestamps::init(MillisSinceEpoch::now()) + } + + /// Update the statistics with an updated [`Self::inboxed_transition_time()`]. + /// + /// # Safety + /// The value of this time is not consistent across replicas of a partition, because it's not agreed. + /// You **MUST NOT** use it within the Partition processor business logic, but only for observability purposes. + fn record_inboxed_transition_time(&mut self) { + self.update(); + self.inboxed_transition_time = Some(self.modification_time) + } + + /// Update the statistics with an updated [`Self::scheduled_transition_time()`]. + /// + /// # Safety + /// The value of this time is not consistent across replicas of a partition, because it's not agreed. + /// You **MUST NOT** use it within the Partition processor business logic, but only for observability purposes. + fn record_scheduled_transition_time(&mut self) { + self.update(); + self.scheduled_transition_time = Some(self.modification_time) + } + + /// Update the statistics with an updated [`Self::running_transition_time()`]. + /// + /// # Safety + /// The value of this time is not consistent across replicas of a partition, because it's not agreed. + /// You **MUST NOT** use it within the Partition processor business logic, but only for observability purposes. + fn record_running_transition_time(&mut self) { + self.update(); + self.running_transition_time = Some(self.modification_time) + } + + /// Update the statistics with an updated [`Self::completed_transition_time()`]. + /// + /// # Safety + /// The value of this time is not consistent across replicas of a partition, because it's not agreed. + /// You **MUST NOT** use it within the Partition processor business logic, but only for observability purposes. + fn record_completed_transition_time(&mut self) { + self.update(); + self.completed_transition_time = Some(self.modification_time) + } + /// Creation time of the [`InvocationStatus`]. /// - /// Note: The value of this time is not consistent across replicas of a partition, because it's not agreed. - /// You **MUST NOT** use it for business logic, but only for observability purposes. - pub fn creation_time(&self) -> MillisSinceEpoch { + /// # Safety + /// The value of this time is not consistent across replicas of a partition, because it's not agreed. + /// You **MUST NOT** use it within the Partition processor business logic, but only for observability purposes. + pub unsafe fn creation_time(&self) -> MillisSinceEpoch { self.creation_time } /// Modification time of the [`InvocationStatus`]. /// - /// Note: The value of this time is not consistent across replicas of a partition, because it's not agreed. - /// You **MUST NOT** use it for business logic, but only for observability purposes. - pub fn modification_time(&self) -> MillisSinceEpoch { + /// # Safety + /// The value of this time is not consistent across replicas of a partition, because it's not agreed. + /// You **MUST NOT** use it within the Partition processor business logic, but only for observability purposes. + pub unsafe fn modification_time(&self) -> MillisSinceEpoch { self.modification_time } + + /// Inboxed transition time of the [`InvocationStatus`], if any. + /// + /// # Safety + /// The value of this time is not consistent across replicas of a partition, because it's not agreed. + /// You **MUST NOT** use it within the Partition processor business logic, but only for observability purposes. + pub unsafe fn inboxed_transition_time(&self) -> Option { + self.inboxed_transition_time + } + + /// Scheduled transition time of the [`InvocationStatus`], if any. + /// + /// # Safety + /// The value of this time is not consistent across replicas of a partition, because it's not agreed. + /// You **MUST NOT** use it within the Partition processor business logic, but only for observability purposes. + pub unsafe fn scheduled_transition_time(&self) -> Option { + self.scheduled_transition_time + } + + /// First transition to Running time of the [`InvocationStatus`], if any. + /// + /// # Safety + /// The value of this time is not consistent across replicas of a partition, because it's not agreed. + /// You **MUST NOT** use it within the Partition processor business logic, but only for observability purposes. + pub unsafe fn running_transition_time(&self) -> Option { + self.running_transition_time + } + + /// Completed transition time of the [`InvocationStatus`], if any. + /// + /// # Safety + /// The value of this time is not consistent across replicas of a partition, because it's not agreed. + /// You **MUST NOT** use it within the Partition processor business logic, but only for observability purposes. + pub unsafe fn completed_transition_time(&self) -> Option { + self.completed_transition_time + } } /// Status of an invocation. @@ -202,13 +316,15 @@ impl InvocationStatus { } } - pub fn update_timestamps(&mut self) { + #[inline] + pub fn get_timestamps_mut(&mut self) -> Option<&mut StatusTimestamps> { match self { - InvocationStatus::Scheduled(metadata) => metadata.metadata.timestamps.update(), - InvocationStatus::Inboxed(metadata) => metadata.metadata.timestamps.update(), - InvocationStatus::Invoked(metadata) => metadata.timestamps.update(), - InvocationStatus::Suspended { metadata, .. } => metadata.timestamps.update(), - _ => {} + InvocationStatus::Scheduled(metadata) => Some(&mut metadata.metadata.timestamps), + InvocationStatus::Inboxed(metadata) => Some(&mut metadata.metadata.timestamps), + InvocationStatus::Invoked(metadata) => Some(&mut metadata.timestamps), + InvocationStatus::Suspended { metadata, .. } => Some(&mut metadata.timestamps), + InvocationStatus::Completed(completed) => Some(&mut completed.timestamps), + _ => None, } } } @@ -258,7 +374,7 @@ pub struct PreFlightInvocationMetadata { /// Time when the request should be executed pub execution_time: Option, /// If zero, the invocation completion will not be retained. - pub completion_retention_time: Duration, + pub completion_retention_duration: Duration, pub idempotency_key: Option, /// Used by the Table implementation to pick where to write @@ -271,7 +387,9 @@ pub struct ScheduledInvocation { } impl ScheduledInvocation { - pub fn from_pre_flight_invocation_metadata(metadata: PreFlightInvocationMetadata) -> Self { + pub fn from_pre_flight_invocation_metadata(mut metadata: PreFlightInvocationMetadata) -> Self { + metadata.timestamps.record_scheduled_transition_time(); + Self { metadata } } } @@ -290,8 +408,8 @@ impl PreFlightInvocationMetadata { span_context: service_invocation.span_context, headers: service_invocation.headers, execution_time: service_invocation.execution_time, - completion_retention_time: service_invocation - .completion_retention_time + completion_retention_duration: service_invocation + .completion_retention_duration .unwrap_or_default(), idempotency_key: service_invocation.idempotency_key, source_table, @@ -309,9 +427,11 @@ pub struct InboxedInvocation { impl InboxedInvocation { pub fn from_pre_flight_invocation_metadata( - metadata: PreFlightInvocationMetadata, + mut metadata: PreFlightInvocationMetadata, inbox_sequence_number: u64, ) -> Self { + metadata.timestamps.record_inboxed_transition_time(); + Self { inbox_sequence_number, metadata, @@ -319,15 +439,13 @@ impl InboxedInvocation { } pub fn from_scheduled_invocation( - mut scheduled_invocation: ScheduledInvocation, + scheduled_invocation: ScheduledInvocation, inbox_sequence_number: u64, ) -> Self { - scheduled_invocation.metadata.timestamps.update(); - - Self { + Self::from_pre_flight_invocation_metadata( + scheduled_invocation.metadata, inbox_sequence_number, - metadata: scheduled_invocation.metadata, - } + ) } } @@ -340,7 +458,7 @@ pub struct InFlightInvocationMetadata { pub timestamps: StatusTimestamps, pub source: Source, /// If zero, the invocation completion will not be retained. - pub completion_retention_time: Duration, + pub completion_retention_duration: Duration, pub idempotency_key: Option, /// Used by the Table implementation to pick where to write @@ -349,8 +467,12 @@ pub struct InFlightInvocationMetadata { impl InFlightInvocationMetadata { pub fn from_pre_flight_invocation_metadata( - pre_flight_invocation_metadata: PreFlightInvocationMetadata, + mut pre_flight_invocation_metadata: PreFlightInvocationMetadata, ) -> (Self, InvocationInput) { + pre_flight_invocation_metadata + .timestamps + .record_running_transition_time(); + ( Self { invocation_target: pre_flight_invocation_metadata.invocation_target, @@ -361,7 +483,8 @@ impl InFlightInvocationMetadata { response_sinks: pre_flight_invocation_metadata.response_sinks, timestamps: pre_flight_invocation_metadata.timestamps, source: pre_flight_invocation_metadata.source, - completion_retention_time: pre_flight_invocation_metadata.completion_retention_time, + completion_retention_duration: pre_flight_invocation_metadata + .completion_retention_duration, idempotency_key: pre_flight_invocation_metadata.idempotency_key, source_table: pre_flight_invocation_metadata.source_table, }, @@ -373,10 +496,8 @@ impl InFlightInvocationMetadata { } pub fn from_inboxed_invocation( - mut inboxed_invocation: InboxedInvocation, + inboxed_invocation: InboxedInvocation, ) -> (Self, InvocationInput) { - inboxed_invocation.metadata.timestamps.update(); - Self::from_pre_flight_invocation_metadata(inboxed_invocation.metadata) } @@ -397,6 +518,7 @@ pub struct CompletedInvocation { pub idempotency_key: Option, pub timestamps: StatusTimestamps, pub response_result: ResponseResult, + pub completion_retention_duration: Duration, /// Used by the Table implementation to pick where to write pub source_table: SourceTable, @@ -407,7 +529,9 @@ impl CompletedInvocation { mut in_flight_invocation_metadata: InFlightInvocationMetadata, response_result: ResponseResult, ) -> (Self, Duration) { - in_flight_invocation_metadata.timestamps.update(); + in_flight_invocation_metadata + .timestamps + .record_completed_transition_time(); ( Self { @@ -416,9 +540,11 @@ impl CompletedInvocation { idempotency_key: in_flight_invocation_metadata.idempotency_key, timestamps: in_flight_invocation_metadata.timestamps, response_result, + completion_retention_duration: in_flight_invocation_metadata + .completion_retention_duration, source_table: in_flight_invocation_metadata.source_table, }, - in_flight_invocation_metadata.completion_retention_time, + in_flight_invocation_metadata.completion_retention_duration, ) } } @@ -473,10 +599,50 @@ mod test_util { response_sinks: HashSet::new(), timestamps: StatusTimestamps::now(), source: Source::Ingress, - completion_retention_time: Duration::ZERO, + completion_retention_duration: Duration::ZERO, + idempotency_key: None, + source_table: SourceTable::New, + } + } + } + + impl CompletedInvocation { + pub fn mock_neo() -> Self { + let mut timestamps = StatusTimestamps::now(); + timestamps.record_running_transition_time(); + timestamps.record_completed_transition_time(); + + CompletedInvocation { + invocation_target: InvocationTarget::virtual_object( + "MyService", + "MyKey", + "mock", + VirtualObjectHandlerType::Exclusive, + ), + source: Source::Ingress, idempotency_key: None, + timestamps, + response_result: ResponseResult::Success(Bytes::from_static(b"123")), + completion_retention_duration: Duration::from_secs(60 * 60), source_table: SourceTable::New, } } + + pub fn mock_old() -> Self { + CompletedInvocation { + invocation_target: InvocationTarget::virtual_object( + "MyService", + "MyKey", + "mock", + VirtualObjectHandlerType::Exclusive, + ), + source: Source::Ingress, + idempotency_key: None, + timestamps: StatusTimestamps::now(), + response_result: ResponseResult::Success(Bytes::from_static(b"123")), + completion_retention_duration: Duration::from_secs(60 * 60), + source_table: SourceTable::Old, + } + } } } diff --git a/crates/storage-api/src/storage.rs b/crates/storage-api/src/storage.rs index 156c1b64e..14a4420e3 100644 --- a/crates/storage-api/src/storage.rs +++ b/crates/storage-api/src/storage.rs @@ -310,10 +310,14 @@ pub mod v1 { creation_time, modification_time, response_sinks, + inboxed_transition_time, + scheduled_transition_time, + running_transition_time, + completed_transition_time, argument, headers, execution_time, - completion_retention_time, + completion_retention_duration, idempotency_key, inbox_sequence_number, journal_length, @@ -327,6 +331,10 @@ pub mod v1 { let timestamps = crate::invocation_status_table::StatusTimestamps::new( MillisSinceEpoch::new(creation_time), MillisSinceEpoch::new(modification_time), + inboxed_transition_time.map(MillisSinceEpoch::new), + scheduled_transition_time.map(MillisSinceEpoch::new), + running_transition_time.map(MillisSinceEpoch::new), + completed_transition_time.map(MillisSinceEpoch::new), ); let source = expect_or_fail!(source)?.try_into()?; let response_sinks = response_sinks @@ -358,9 +366,10 @@ pub mod v1 { span_context: expect_or_fail!(span_context)?.try_into()?, headers, execution_time: execution_time.map(MillisSinceEpoch::new), - completion_retention_time: completion_retention_time - .unwrap_or_default() - .try_into()?, + completion_retention_duration: + completion_retention_duration + .unwrap_or_default() + .try_into()?, idempotency_key: idempotency_key.map(ByteString::from), source_table: crate::invocation_status_table::SourceTable::New, @@ -382,9 +391,10 @@ pub mod v1 { span_context: expect_or_fail!(span_context)?.try_into()?, headers, execution_time: execution_time.map(MillisSinceEpoch::new), - completion_retention_time: completion_retention_time - .unwrap_or_default() - .try_into()?, + completion_retention_duration: + completion_retention_duration + .unwrap_or_default() + .try_into()?, idempotency_key: idempotency_key.map(ByteString::from), source_table: crate::invocation_status_table::SourceTable::New, @@ -407,7 +417,7 @@ pub mod v1 { service_protocol_version, )?, source, - completion_retention_time: completion_retention_time + completion_retention_duration: completion_retention_duration .unwrap_or_default() .try_into()?, idempotency_key: idempotency_key.map(ByteString::from), @@ -430,7 +440,7 @@ pub mod v1 { service_protocol_version, )?, source, - completion_retention_time: completion_retention_time + completion_retention_duration: completion_retention_duration .unwrap_or_default() .try_into()?, idempotency_key: idempotency_key.map(ByteString::from), @@ -450,6 +460,9 @@ pub mod v1 { idempotency_key: idempotency_key.map(ByteString::from), source_table: crate::invocation_status_table::SourceTable::New, response_result: expect_or_fail!(result)?.try_into()?, + completion_retention_duration: completion_retention_duration + .unwrap_or_default() + .try_into()?, }, )) } @@ -479,7 +492,7 @@ pub mod v1 { span_context, headers, execution_time, - completion_retention_time, + completion_retention_duration, idempotency_key, source_table: _, }, @@ -489,8 +502,20 @@ pub mod v1 { invocation_target: Some(invocation_target.into()), source: Some(source.into()), span_context: Some(span_context.into()), - creation_time: timestamps.creation_time().as_u64(), - modification_time: timestamps.modification_time().as_u64(), + creation_time: unsafe { timestamps.creation_time() }.as_u64(), + modification_time: unsafe { timestamps.modification_time() }.as_u64(), + inboxed_transition_time: unsafe { timestamps.inboxed_transition_time() } + .map(|t| t.as_u64()), + scheduled_transition_time: unsafe { + timestamps.scheduled_transition_time() + } + .map(|t| t.as_u64()), + running_transition_time: unsafe { timestamps.running_transition_time() } + .map(|t| t.as_u64()), + completed_transition_time: unsafe { + timestamps.completed_transition_time() + } + .map(|t| t.as_u64()), response_sinks: response_sinks .into_iter() .map(|s| ServiceInvocationResponseSink::from(Some(s))) @@ -498,7 +523,7 @@ pub mod v1 { argument: Some(argument), headers: headers.into_iter().map(Into::into).collect(), execution_time: execution_time.map(|t| t.as_u64()), - completion_retention_time: Some(completion_retention_time.into()), + completion_retention_duration: Some(completion_retention_duration.into()), idempotency_key: idempotency_key.map(|key| key.to_string()), inbox_sequence_number: None, journal_length: 0, @@ -519,7 +544,7 @@ pub mod v1 { span_context, headers, execution_time, - completion_retention_time, + completion_retention_duration, idempotency_key, source_table: _, }, @@ -530,8 +555,20 @@ pub mod v1 { invocation_target: Some(invocation_target.into()), source: Some(source.into()), span_context: Some(span_context.into()), - creation_time: timestamps.creation_time().as_u64(), - modification_time: timestamps.modification_time().as_u64(), + creation_time: unsafe { timestamps.creation_time() }.as_u64(), + modification_time: unsafe { timestamps.modification_time() }.as_u64(), + inboxed_transition_time: unsafe { timestamps.inboxed_transition_time() } + .map(|t| t.as_u64()), + scheduled_transition_time: unsafe { + timestamps.scheduled_transition_time() + } + .map(|t| t.as_u64()), + running_transition_time: unsafe { timestamps.running_transition_time() } + .map(|t| t.as_u64()), + completed_transition_time: unsafe { + timestamps.completed_transition_time() + } + .map(|t| t.as_u64()), response_sinks: response_sinks .into_iter() .map(|s| ServiceInvocationResponseSink::from(Some(s))) @@ -539,7 +576,7 @@ pub mod v1 { argument: Some(argument), headers: headers.into_iter().map(Into::into).collect(), execution_time: execution_time.map(|t| t.as_u64()), - completion_retention_time: Some(completion_retention_time.into()), + completion_retention_duration: Some(completion_retention_duration.into()), idempotency_key: idempotency_key.map(|key| key.to_string()), inbox_sequence_number: Some(inbox_sequence_number), journal_length: 0, @@ -556,7 +593,7 @@ pub mod v1 { response_sinks, timestamps, source, - completion_retention_time, + completion_retention_duration, idempotency_key, source_table: _, }, @@ -574,8 +611,24 @@ pub mod v1 { invocation_target: Some(invocation_target.into()), source: Some(source.into()), span_context: Some(journal_metadata.span_context.into()), - creation_time: timestamps.creation_time().as_u64(), - modification_time: timestamps.modification_time().as_u64(), + creation_time: unsafe { timestamps.creation_time() }.as_u64(), + modification_time: unsafe { timestamps.modification_time() }.as_u64(), + inboxed_transition_time: unsafe { + timestamps.inboxed_transition_time() + } + .map(|t| t.as_u64()), + scheduled_transition_time: unsafe { + timestamps.scheduled_transition_time() + } + .map(|t| t.as_u64()), + running_transition_time: unsafe { + timestamps.running_transition_time() + } + .map(|t| t.as_u64()), + completed_transition_time: unsafe { + timestamps.completed_transition_time() + } + .map(|t| t.as_u64()), response_sinks: response_sinks .into_iter() .map(|s| ServiceInvocationResponseSink::from(Some(s))) @@ -583,7 +636,9 @@ pub mod v1 { argument: None, headers: vec![], execution_time: None, - completion_retention_time: Some(completion_retention_time.into()), + completion_retention_duration: Some( + completion_retention_duration.into(), + ), idempotency_key: idempotency_key.map(|key| key.to_string()), inbox_sequence_number: None, journal_length: journal_metadata.length, @@ -602,7 +657,7 @@ pub mod v1 { response_sinks, timestamps, source, - completion_retention_time, + completion_retention_duration, idempotency_key, source_table: _, }, @@ -621,8 +676,24 @@ pub mod v1 { invocation_target: Some(invocation_target.into()), source: Some(source.into()), span_context: Some(journal_metadata.span_context.into()), - creation_time: timestamps.creation_time().as_u64(), - modification_time: timestamps.modification_time().as_u64(), + creation_time: unsafe { timestamps.creation_time() }.as_u64(), + modification_time: unsafe { timestamps.modification_time() }.as_u64(), + inboxed_transition_time: unsafe { + timestamps.inboxed_transition_time() + } + .map(|t| t.as_u64()), + scheduled_transition_time: unsafe { + timestamps.scheduled_transition_time() + } + .map(|t| t.as_u64()), + running_transition_time: unsafe { + timestamps.running_transition_time() + } + .map(|t| t.as_u64()), + completed_transition_time: unsafe { + timestamps.completed_transition_time() + } + .map(|t| t.as_u64()), response_sinks: response_sinks .into_iter() .map(|s| ServiceInvocationResponseSink::from(Some(s))) @@ -630,7 +701,9 @@ pub mod v1 { argument: None, headers: vec![], execution_time: None, - completion_retention_time: Some(completion_retention_time.into()), + completion_retention_duration: Some( + completion_retention_duration.into(), + ), idempotency_key: idempotency_key.map(|key| key.to_string()), inbox_sequence_number: None, journal_length: journal_metadata.length, @@ -649,6 +722,7 @@ pub mod v1 { idempotency_key, timestamps, response_result, + completion_retention_duration, source_table: _, }, ) => NeoInvocationStatus { @@ -656,13 +730,25 @@ pub mod v1 { invocation_target: Some(invocation_target.into()), source: Some(source.into()), span_context: None, - creation_time: timestamps.creation_time().as_u64(), - modification_time: timestamps.modification_time().as_u64(), + creation_time: unsafe { timestamps.creation_time() }.as_u64(), + modification_time: unsafe { timestamps.modification_time() }.as_u64(), + inboxed_transition_time: unsafe { timestamps.inboxed_transition_time() } + .map(|t| t.as_u64()), + scheduled_transition_time: unsafe { + timestamps.scheduled_transition_time() + } + .map(|t| t.as_u64()), + running_transition_time: unsafe { timestamps.running_transition_time() } + .map(|t| t.as_u64()), + completed_transition_time: unsafe { + timestamps.completed_transition_time() + } + .map(|t| t.as_u64()), response_sinks: vec![], argument: None, headers: vec![], execution_time: None, - completion_retention_time: None, + completion_retention_duration: Some(completion_retention_duration.into()), idempotency_key: idempotency_key.map(|key| key.to_string()), inbox_sequence_number: None, journal_length: 0, @@ -834,9 +920,13 @@ pub mod v1 { timestamps: crate::invocation_status_table::StatusTimestamps::new( MillisSinceEpoch::new(value.creation_time), MillisSinceEpoch::new(value.modification_time), + None, + None, + None, + None, ), source, - completion_retention_time, + completion_retention_duration: completion_retention_time, idempotency_key, source_table: crate::invocation_status_table::SourceTable::Old, }) @@ -852,7 +942,7 @@ pub mod v1 { journal_metadata, timestamps, source, - completion_retention_time, + completion_retention_duration: completion_retention_time, idempotency_key, source_table: _, } = value; @@ -874,8 +964,8 @@ pub mod v1 { deployment_id, service_protocol_version, journal_meta: Some(JournalMeta::from(journal_metadata)), - creation_time: timestamps.creation_time().as_u64(), - modification_time: timestamps.modification_time().as_u64(), + creation_time: unsafe { timestamps.creation_time() }.as_u64(), + modification_time: unsafe { timestamps.modification_time() }.as_u64(), source: Some(Source::from(source)), completion_retention_time: Some(Duration::from(completion_retention_time)), idempotency_key: idempotency_key.map(|key| key.to_string()), @@ -942,9 +1032,13 @@ pub mod v1 { timestamps: crate::invocation_status_table::StatusTimestamps::new( MillisSinceEpoch::new(value.creation_time), MillisSinceEpoch::new(value.modification_time), + None, + None, + None, + None, ), source: caller, - completion_retention_time, + completion_retention_duration: completion_retention_time, idempotency_key, source_table: crate::invocation_status_table::SourceTable::Old, }, @@ -987,12 +1081,12 @@ pub mod v1 { journal_meta: Some(journal_meta), deployment_id, service_protocol_version, - creation_time: metadata.timestamps.creation_time().as_u64(), - modification_time: metadata.timestamps.modification_time().as_u64(), + creation_time: unsafe { metadata.timestamps.creation_time() }.as_u64(), + modification_time: unsafe { metadata.timestamps.modification_time() }.as_u64(), waiting_for_completed_entries, source: Some(Source::from(metadata.source)), completion_retention_time: Some(Duration::from( - metadata.completion_retention_time, + metadata.completion_retention_duration, )), idempotency_key: metadata.idempotency_key.map(|key| key.to_string()), } @@ -1058,6 +1152,10 @@ pub mod v1 { timestamps: crate::invocation_status_table::StatusTimestamps::new( MillisSinceEpoch::new(value.creation_time), MillisSinceEpoch::new(value.modification_time), + None, + None, + None, + None, ), source, span_context, @@ -1065,7 +1163,7 @@ pub mod v1 { argument: value.argument, execution_time, idempotency_key, - completion_retention_time, + completion_retention_duration: completion_retention_time, invocation_target, source_table: crate::invocation_status_table::SourceTable::Old, }, @@ -1086,7 +1184,7 @@ pub mod v1 { span_context, headers, execution_time, - completion_retention_time, + completion_retention_duration: completion_retention_time, idempotency_key, source_table: _, }, @@ -1102,8 +1200,8 @@ pub mod v1 { .into_iter() .map(|s| ServiceInvocationResponseSink::from(Some(s))) .collect(), - creation_time: timestamps.creation_time().as_u64(), - modification_time: timestamps.modification_time().as_u64(), + creation_time: unsafe { timestamps.creation_time() }.as_u64(), + modification_time: unsafe { timestamps.modification_time() }.as_u64(), source: Some(Source::from(source)), span_context: Some(SpanContext::from(span_context)), headers, @@ -1139,6 +1237,10 @@ pub mod v1 { timestamps: crate::invocation_status_table::StatusTimestamps::new( MillisSinceEpoch::new(value.creation_time), MillisSinceEpoch::new(value.modification_time), + None, + None, + None, + None, ), response_result: value .result @@ -1146,6 +1248,9 @@ pub mod v1 { .try_into()?, idempotency_key, source_table: crate::invocation_status_table::SourceTable::Old, + // The value Duration::MAX here disables the new cleaner task business logic. + // Look at crates/worker/src/partition/cleaner.rs for more details. + completion_retention_duration: std::time::Duration::MAX, }) } } @@ -1158,6 +1263,8 @@ pub mod v1 { idempotency_key, timestamps, response_result, + // We don't store this in the old invocation status table + completion_retention_duration: _, source_table: _, } = value; @@ -1165,8 +1272,8 @@ pub mod v1 { invocation_target: Some(InvocationTarget::from(invocation_target)), source: Some(Source::from(source)), result: Some(ResponseResult::from(response_result)), - creation_time: timestamps.creation_time().as_u64(), - modification_time: timestamps.modification_time().as_u64(), + creation_time: unsafe { timestamps.creation_time() }.as_u64(), + modification_time: unsafe { timestamps.modification_time() }.as_u64(), idempotency_key: idempotency_key.map(|s| s.to_string()), } } @@ -1374,7 +1481,7 @@ pub mod v1 { span_context, headers, execution_time, - completion_retention_time, + completion_retention_duration: completion_retention_time, idempotency_key, submit_notification_sink: submit_notification_sink, }) @@ -1398,7 +1505,9 @@ pub mod v1 { source: Some(source), headers, execution_time: value.execution_time.map(|m| m.as_u64()).unwrap_or_default(), - completion_retention_time: value.completion_retention_time.map(Duration::from), + completion_retention_time: value + .completion_retention_duration + .map(Duration::from), idempotency_key: value.idempotency_key.map(|s| s.to_string()), submit_notification_sink: value.submit_notification_sink.map(Into::into), } diff --git a/crates/storage-api/src/timer_table/mod.rs b/crates/storage-api/src/timer_table/mod.rs index 5de76fc12..9348a0df1 100644 --- a/crates/storage-api/src/timer_table/mod.rs +++ b/crates/storage-api/src/timer_table/mod.rs @@ -183,6 +183,7 @@ pub enum Timer { // TODO remove this variant when removing the old invocation status table Invoke(ServiceInvocation), CompleteJournalEntry(InvocationId, u32), + // TODO remove this variant when removing the old invocation status table CleanInvocationStatus(InvocationId), NeoInvoke(InvocationId), } diff --git a/crates/storage-query-datafusion/src/invocation_status/row.rs b/crates/storage-query-datafusion/src/invocation_status/row.rs index d6fffe9f7..87d13a47e 100644 --- a/crates/storage-query-datafusion/src/invocation_status/row.rs +++ b/crates/storage-query-datafusion/src/invocation_status/row.rs @@ -130,8 +130,8 @@ fn fill_invoked_by(row: &mut SysInvocationStatusRowBuilder, output: &mut String, #[inline] fn fill_timestamps(row: &mut SysInvocationStatusRowBuilder, stat: &StatusTimestamps) { - row.created_at(stat.creation_time().as_u64() as i64); - row.modified_at(stat.modification_time().as_u64() as i64); + row.created_at(unsafe { stat.creation_time() }.as_u64() as i64); + row.modified_at(unsafe { stat.modification_time() }.as_u64() as i64); } #[inline] diff --git a/crates/types/src/config/worker.rs b/crates/types/src/config/worker.rs index de7a1e76c..1e6c284e9 100644 --- a/crates/types/src/config/worker.rs +++ b/crates/types/src/config/worker.rs @@ -8,12 +8,11 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use serde::{Deserialize, Serialize}; +use serde_with::serde_as; use std::num::{NonZeroU64, NonZeroUsize}; use std::path::PathBuf; use std::time::Duration; - -use serde::{Deserialize, Serialize}; -use serde_with::serde_as; use tracing::warn; use restate_serde_util::NonZeroByteCount; @@ -37,6 +36,17 @@ pub struct WorkerOptions { /// The number of timers in memory limit is used to bound the amount of timers loaded in memory. If this limit is set, when exceeding it, the timers farther in the future will be spilled to disk. num_timers_in_memory_limit: Option, + /// # Cleanup interval + /// + /// In order to clean up completed invocations, that is invocations invoked with an idempotency id, or workflows, + /// Restate periodically scans among the completed invocations to check whether they need to be removed or not. + /// This interval sets the scan interval of the cleanup procedure. Default: 1 hour. + /// + /// Can be configured using the [`humantime`](https://docs.rs/humantime/latest/humantime/fn.parse_duration.html) format. + #[serde_as(as = "serde_with::DisplayFromStr")] + #[cfg_attr(feature = "schemars", schemars(with = "String"))] + cleanup_interval: humantime::Duration, + #[cfg_attr(feature = "schemars", schemars(skip))] experimental_feature_new_invocation_status_table: bool, @@ -54,6 +64,10 @@ impl WorkerOptions { self.num_timers_in_memory_limit.map(Into::into) } + pub fn cleanup_interval(&self) -> Duration { + self.cleanup_interval.into() + } + pub fn experimental_feature_new_invocation_status_table(&self) -> bool { self.experimental_feature_new_invocation_status_table } @@ -64,6 +78,7 @@ impl Default for WorkerOptions { Self { internal_queue_length: NonZeroUsize::new(10000).unwrap(), num_timers_in_memory_limit: None, + cleanup_interval: Duration::from_secs(60 * 60).into(), experimental_feature_new_invocation_status_table: false, storage: StorageOptions::default(), invoker: Default::default(), diff --git a/crates/types/src/invocation.rs b/crates/types/src/invocation.rs index d2d402457..150113016 100644 --- a/crates/types/src/invocation.rs +++ b/crates/types/src/invocation.rs @@ -268,7 +268,7 @@ pub struct ServiceInvocation { pub headers: Vec
, /// Time when the request should be executed pub execution_time: Option, - pub completion_retention_time: Option, + pub completion_retention_duration: Option, pub idempotency_key: Option, // Where to send the response, if any @@ -302,7 +302,7 @@ impl ServiceInvocation { span_context: ServiceInvocationSpanContext::empty(), headers: vec![], execution_time: None, - completion_retention_time: None, + completion_retention_duration: None, idempotency_key: None, submit_notification_sink: None, } @@ -866,7 +866,7 @@ mod mocks { span_context: Default::default(), headers: vec![], execution_time: None, - completion_retention_time: None, + completion_retention_duration: None, idempotency_key: None, submit_notification_sink: None, } diff --git a/crates/worker/src/partition/cleaner.rs b/crates/worker/src/partition/cleaner.rs new file mode 100644 index 000000000..d099bdd5c --- /dev/null +++ b/crates/worker/src/partition/cleaner.rs @@ -0,0 +1,309 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use anyhow::Context; +use futures::StreamExt; +use restate_bifrost::Bifrost; +use restate_core::cancellation_watcher; +use restate_storage_api::invocation_status_table::{ + InvocationStatus, ReadOnlyInvocationStatusTable, +}; +use restate_types::identifiers::WithPartitionKey; +use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey}; +use restate_types::invocation::PurgeInvocationRequest; +use restate_types::GenerationalNodeId; +use restate_wal_protocol::{ + append_envelope_to_bifrost, Command, Destination, Envelope, Header, Source, +}; +use std::ops::RangeInclusive; +use std::time::{Duration, SystemTime}; +use tokio::time::MissedTickBehavior; +use tracing::{debug, instrument, warn}; + +pub(super) struct Cleaner { + partition_id: PartitionId, + leader_epoch: LeaderEpoch, + node_id: GenerationalNodeId, + partition_key_range: RangeInclusive, + storage: Storage, + bifrost: Bifrost, + cleanup_interval: Duration, +} + +impl Cleaner +where + Storage: ReadOnlyInvocationStatusTable + Send + Sync + 'static, +{ + pub(super) fn new( + partition_id: PartitionId, + leader_epoch: LeaderEpoch, + node_id: GenerationalNodeId, + storage: Storage, + bifrost: Bifrost, + partition_key_range: RangeInclusive, + cleanup_interval: Duration, + ) -> Self { + Self { + partition_id, + leader_epoch, + node_id, + partition_key_range, + storage, + bifrost, + cleanup_interval, + } + } + + #[instrument(skip_all, fields(restate.node = %self.node_id, restate.partition.id = %self.partition_id))] + pub(super) async fn run(self) -> anyhow::Result<()> { + let Self { + partition_id, + leader_epoch, + node_id, + partition_key_range, + storage, + bifrost, + cleanup_interval, + } = self; + debug!("Running cleaner"); + + let bifrost_envelope_source = Source::Processor { + partition_id, + partition_key: None, + leader_epoch, + node_id, + }; + + let mut interval = tokio::time::interval(cleanup_interval); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + + loop { + tokio::select! { + _ = interval.tick() => { + if let Err(e) = Self::do_cleanup(&storage, &bifrost, partition_key_range.clone(), &bifrost_envelope_source).await { + warn!("Error when trying to cleanup completed invocations: {e:?}"); + } + }, + _ = cancellation_watcher() => { + break; + } + } + } + + debug!("Stopping cleaner"); + + Ok(()) + } + + pub(super) async fn do_cleanup( + storage: &Storage, + bifrost: &Bifrost, + partition_key_range: RangeInclusive, + bifrost_envelope_source: &Source, + ) -> anyhow::Result<()> { + debug!("Executing completed invocations cleanup"); + + let invocations_stream = storage.all_invocation_statuses(partition_key_range); + tokio::pin!(invocations_stream); + + while let Some((invocation_id, invocation_status)) = invocations_stream + .next() + .await + .transpose() + .context("Cannot read the next item of the invocation status table")? + { + let InvocationStatus::Completed(completed_invocation) = invocation_status else { + continue; + }; + + // SAFETY: it's ok to use the completed_transition_time here, + // because only the leader runs this cleaner code, so there's no need to use an agreed time. + let Some(completed_time) = + (unsafe { completed_invocation.timestamps.completed_transition_time() }) + else { + // If completed time is unavailable, the invocation is on the old invocation table, + // thus it will be cleaned up with the old timer. + continue; + }; + let Some(expiration_time) = SystemTime::from(completed_time) + .checked_add(completed_invocation.completion_retention_duration) + else { + // If sum overflow, then the cleanup time lies far enough in the future + continue; + }; + + if SystemTime::now() >= expiration_time { + append_envelope_to_bifrost( + bifrost, + Envelope { + header: Header { + source: bifrost_envelope_source.clone(), + dest: Destination::Processor { + partition_key: invocation_id.partition_key(), + dedup: None, + }, + }, + command: Command::PurgeInvocation(PurgeInvocationRequest { invocation_id }), + }, + ) + .await + .context("Cannot append to bifrost")?; + }; + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use futures::{stream, Stream}; + use googletest::prelude::*; + use restate_core::{TaskKind, TestCoreEnvBuilder}; + use restate_storage_api::invocation_status_table::{ + CompletedInvocation, InFlightInvocationMetadata, InvocationStatus, + }; + use restate_types::identifiers::{InvocationId, InvocationUuid}; + use restate_types::invocation::InvocationTarget; + use restate_types::partition_table::{FindPartition, PartitionTable}; + use restate_types::Version; + use std::future::Future; + use test_log::test; + + #[allow(dead_code)] + struct MockInvocationStatusReader(Vec<(InvocationId, InvocationStatus)>); + + impl ReadOnlyInvocationStatusTable for MockInvocationStatusReader { + fn get_invocation_status( + &mut self, + _: &InvocationId, + ) -> impl Future> + Send { + todo!(); + #[allow(unreachable_code)] + std::future::pending() + } + + fn invoked_invocations( + &mut self, + _: RangeInclusive, + ) -> impl Stream> + Send + { + todo!(); + #[allow(unreachable_code)] + stream::empty() + } + + fn all_invocation_statuses( + &self, + _: RangeInclusive, + ) -> impl Stream> + Send + { + stream::iter(self.0.clone()).map(Ok) + } + } + + // Start paused makes sure the timer is immediately fired + #[test(tokio::test(start_paused = true))] + pub async fn cleanup_works() { + let env = TestCoreEnvBuilder::new_with_mock_network() + .with_partition_table(PartitionTable::with_equally_sized_partitions( + Version::MIN, + 1, + )) + .build() + .await; + let tc = &env.tc; + let bifrost = tc + .run_in_scope( + "init bifrost", + None, + Bifrost::init_in_memory(env.metadata.clone()), + ) + .await; + + let expired_invocation = InvocationId::from_parts(PartitionKey::MIN, InvocationUuid::new()); + let not_expired_invocation_1 = + InvocationId::from_parts(PartitionKey::MIN, InvocationUuid::new()); + let not_expired_invocation_2 = + InvocationId::from_parts(PartitionKey::MIN, InvocationUuid::new()); + let not_completed_invocation = + InvocationId::from_parts(PartitionKey::MIN, InvocationUuid::new()); + + let mock_storage = MockInvocationStatusReader(vec![ + ( + expired_invocation, + InvocationStatus::Completed(CompletedInvocation { + completion_retention_duration: Duration::ZERO, + ..CompletedInvocation::mock_neo() + }), + ), + ( + not_expired_invocation_1, + InvocationStatus::Completed(CompletedInvocation { + completion_retention_duration: Duration::MAX, + ..CompletedInvocation::mock_neo() + }), + ), + ( + not_expired_invocation_2, + // Old status invocations are still processed with the cleanup timer in the PP + InvocationStatus::Completed(CompletedInvocation::mock_old()), + ), + ( + not_completed_invocation, + InvocationStatus::Invoked(InFlightInvocationMetadata::mock()), + ), + ]); + + tc.spawn( + TaskKind::Cleaner, + "cleaner", + Some(PartitionId::MIN), + Cleaner::new( + PartitionId::MIN, + LeaderEpoch::INITIAL, + GenerationalNodeId::new(1, 1), + mock_storage, + bifrost.clone(), + RangeInclusive::new(PartitionKey::MIN, PartitionKey::MAX), + Duration::from_secs(1), + ) + .run(), + ) + .unwrap(); + + // By yielding once we let the cleaner task run, and perform the cleanup + tokio::task::yield_now().await; + + // All the invocation ids were created with same partition keys, hence same partition id. + let partition_id = env + .metadata + .partition_table_snapshot() + .find_partition_id(expired_invocation.partition_key()) + .unwrap(); + + let mut log_entries = bifrost.read_all(partition_id.into()).await.unwrap(); + let bifrost_message = log_entries + .remove(0) + .try_decode::() + .unwrap() + .unwrap(); + + assert_that!( + bifrost_message.command, + pat!(Command::PurgeInvocation(pat!(PurgeInvocationRequest { + invocation_id: eq(expired_invocation) + }))) + ); + assert_that!(log_entries, empty()); + } +} diff --git a/crates/worker/src/partition/leadership.rs b/crates/worker/src/partition/leadership.rs index 1aabe8d7d..4f4fbe990 100644 --- a/crates/worker/src/partition/leadership.rs +++ b/crates/worker/src/partition/leadership.rs @@ -46,6 +46,7 @@ use restate_wal_protocol::{Command, Destination, Envelope, Header, Source}; use super::storage::invoker::InvokerStorageReader; use crate::metric_definitions::PARTITION_HANDLE_LEADER_ACTIONS; use crate::partition::action_effect_handler::ActionEffectHandler; +use crate::partition::cleaner::Cleaner; use crate::partition::shuffle::{HintSender, Shuffle, ShuffleMetadata}; use crate::partition::state_machine::Action; use crate::partition::{shuffle, storage}; @@ -87,6 +88,7 @@ pub(crate) struct LeaderState { invoker_stream: ReceiverStream, shuffle_stream: ReceiverStream, + cleaner_task_id: TaskId, } pub enum State { @@ -131,6 +133,7 @@ pub(crate) struct LeadershipState { partition_processor_metadata: PartitionProcessorMetadata, num_timers_in_memory_limit: Option, + cleanup_interval: Duration, channel_size: usize, invoker_tx: I, network_tx: N, @@ -142,9 +145,11 @@ where I: restate_invoker_api::InvokerHandle>, N: NetworkSender + 'static, { + #[allow(clippy::too_many_arguments)] pub(crate) fn new( partition_processor_metadata: PartitionProcessorMetadata, num_timers_in_memory_limit: Option, + cleanup_interval: Duration, channel_size: usize, invoker_tx: I, bifrost: Bifrost, @@ -155,6 +160,7 @@ where state: State::Follower, partition_processor_metadata, num_timers_in_memory_limit, + cleanup_interval, channel_size, invoker_tx, bifrost, @@ -344,9 +350,29 @@ where metadata(), ); + let cleaner = Cleaner::new( + self.partition_processor_metadata.partition_id, + leader_epoch, + self.partition_processor_metadata.node_id, + partition_storage.clone().into_inner(), + self.bifrost.clone(), + self.partition_processor_metadata + .partition_key_range + .clone(), + self.cleanup_interval, + ); + + let cleaner_task_id = task_center().spawn_child( + TaskKind::Cleaner, + "cleaner", + Some(self.partition_processor_metadata.partition_id), + cleaner.run(), + )?; + self.state = State::Leader(LeaderState { leader_epoch, shuffle_task_id, + cleaner_task_id, shuffle_hint_tx, timer_service, action_effect_handler, @@ -409,13 +435,16 @@ where if let State::Leader(LeaderState { leader_epoch, shuffle_task_id, + cleaner_task_id, .. }) = self.state { let shuffle_handle = OptionFuture::from(task_center().cancel_task(shuffle_task_id)); + let cleaner_handle = OptionFuture::from(task_center().cancel_task(cleaner_task_id)); - let (shuffle_result, abort_result) = tokio::join!( + let (shuffle_result, cleaner_result, abort_result) = tokio::join!( shuffle_handle, + cleaner_handle, self.invoker_tx.abort_all_partition(( self.partition_processor_metadata.partition_id, leader_epoch @@ -427,6 +456,9 @@ where if let Some(shuffle_result) = shuffle_result { shuffle_result.expect("graceful termination of shuffle task"); } + if let Some(cleaner_result) = cleaner_result { + cleaner_result.expect("graceful termination of cleaner task"); + } } self.state = State::Follower; @@ -685,6 +717,7 @@ mod tests { use restate_wal_protocol::control::AnnounceLeader; use restate_wal_protocol::{Command, Envelope}; use std::ops::RangeInclusive; + use std::time::Duration; use test_log::test; const PARTITION_ID: PartitionId = PartitionId::MIN; @@ -724,6 +757,7 @@ mod tests { let mut state = LeadershipState::new( PARTITION_PROCESSOR_METADATA, None, + Duration::from_secs(60 * 60), 42, invoker_tx, bifrost.clone(), diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index dd427c756..1df728ae2 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -47,6 +47,7 @@ use crate::partition::state_machine::{ActionCollector, StateMachine}; use crate::partition::storage::{DedupSequenceNumberResolver, PartitionStorage, Transaction}; mod action_effect_handler; +mod cleaner; mod leadership; pub mod shuffle; mod state_machine; @@ -68,6 +69,7 @@ pub(super) struct PartitionProcessorBuilder { num_timers_in_memory_limit: Option, enable_new_invocation_status_table: bool, + cleanup_interval: Duration, channel_size: usize, status: PartitionProcessorStatus, @@ -88,6 +90,7 @@ where partition_key_range: RangeInclusive, status: PartitionProcessorStatus, num_timers_in_memory_limit: Option, + cleanup_interval: Duration, enable_new_invocation_status_table: bool, channel_size: usize, control_rx: mpsc::Receiver, @@ -101,6 +104,7 @@ where status, num_timers_in_memory_limit, enable_new_invocation_status_table, + cleanup_interval, channel_size, invoker_tx, control_rx, @@ -118,6 +122,7 @@ where partition_id, partition_key_range, num_timers_in_memory_limit, + cleanup_interval, enable_new_invocation_status_table, channel_size, invoker_tx, @@ -155,6 +160,7 @@ where partition_key_range.clone(), ), num_timers_in_memory_limit, + cleanup_interval, channel_size, invoker_tx, bifrost.clone(), diff --git a/crates/worker/src/partition/state_machine/mod.rs b/crates/worker/src/partition/state_machine/mod.rs index b81ab6be4..91f68f6db 100644 --- a/crates/worker/src/partition/state_machine/mod.rs +++ b/crates/worker/src/partition/state_machine/mod.rs @@ -737,7 +737,9 @@ impl StateMachine { span_context: span_context.clone(), headers: metadata.headers, execution_time: metadata.execution_time, - completion_retention_time: Some(metadata.completion_retention_time), + completion_retention_duration: Some( + metadata.completion_retention_duration, + ), idempotency_key: metadata.idempotency_key, response_sink: metadata.response_sinks.into_iter().next(), submit_notification_sink: None, @@ -1635,14 +1637,14 @@ impl StateMachine { invocation_metadata: InFlightInvocationMetadata, ) -> Result<(), Error> { let journal_length = invocation_metadata.journal_metadata.length; - let completion_retention_time = invocation_metadata.completion_retention_time; + let completion_retention_time = invocation_metadata.completion_retention_duration; self.notify_invocation_result( ctx, invocation_id, invocation_metadata.invocation_target.clone(), invocation_metadata.journal_metadata.span_context.clone(), - invocation_metadata.timestamps.creation_time(), + unsafe { invocation_metadata.timestamps.creation_time() }, Ok(()), ); @@ -1713,7 +1715,7 @@ impl StateMachine { invocation_id, invocation_metadata.invocation_target.clone(), invocation_metadata.journal_metadata.span_context.clone(), - invocation_metadata.timestamps.creation_time(), + unsafe { invocation_metadata.timestamps.creation_time() }, Err((error.code(), error.to_string())), ); @@ -1733,7 +1735,7 @@ impl StateMachine { Self::consume_inbox(ctx, &invocation_metadata.invocation_target).await?; // Store the completed status or free it - if !invocation_metadata.completion_retention_time.is_zero() { + if !invocation_metadata.completion_retention_duration.is_zero() { let (completed_invocation, completion_retention_time) = CompletedInvocation::from_in_flight_invocation_metadata( invocation_metadata, @@ -2262,7 +2264,7 @@ impl StateMachine { span_context: span_context.clone(), headers: request.headers, execution_time: None, - completion_retention_time: *completion_retention_time, + completion_retention_duration: *completion_retention_time, idempotency_key: None, submit_notification_sink: None, }; @@ -2312,7 +2314,7 @@ impl StateMachine { span_context: span_context.clone(), headers: request.headers, execution_time: delay, - completion_retention_time: *completion_retention_time, + completion_retention_duration: *completion_retention_time, idempotency_key: None, submit_notification_sink: None, }; @@ -2761,17 +2763,21 @@ impl StateMachine { "Effect: Store completed invocation" ); + // New table invocations are cleaned using the Cleaner task. + if let SourceTable::Old = completed_invocation.source_table { + ctx.action_collector + .push(Action::ScheduleInvocationStatusCleanup { + invocation_id, + retention, + }); + } + ctx.storage .store_invocation_status( &invocation_id, InvocationStatus::Completed(completed_invocation), ) .await?; - ctx.action_collector - .push(Action::ScheduleInvocationStatusCleanup { - invocation_id, - retention, - }); Ok(()) } @@ -3205,7 +3211,9 @@ impl StateMachine { .get_response_sinks_mut() .expect("No response sinks available") .insert(additional_response_sink); - previous_invocation_status.update_timestamps(); + if let Some(timestamps) = previous_invocation_status.get_timestamps_mut() { + timestamps.update(); + } ctx.storage .store_invocation_status(&invocation_id, previous_invocation_status) @@ -3518,7 +3526,9 @@ impl StateMachine { journal_meta.length = entry_index + 1; // Update timestamps - previous_invocation_status.update_timestamps(); + if let Some(timestamps) = previous_invocation_status.get_timestamps_mut() { + timestamps.update(); + } // Store invocation status state_storage diff --git a/crates/worker/src/partition/state_machine/tests/delayed_send.rs b/crates/worker/src/partition/state_machine/tests/delayed_send.rs index 389241c71..aa3be3d41 100644 --- a/crates/worker/src/partition/state_machine/tests/delayed_send.rs +++ b/crates/worker/src/partition/state_machine/tests/delayed_send.rs @@ -243,7 +243,7 @@ async fn send_with_delay_and_idempotency_key() { node_id, request_id: request_id_1, }), - completion_retention_time: Some(retention), + completion_retention_duration: Some(retention), // Doesn't matter the execution time here, just needs to be filled execution_time: Some(MillisSinceEpoch::from( SystemTime::now() + Duration::from_secs(60), @@ -287,7 +287,7 @@ async fn send_with_delay_and_idempotency_key() { node_id, request_id: request_id_2, }), - completion_retention_time: Some(retention), + completion_retention_duration: Some(retention), // Doesn't matter the execution time here, just needs to be filled execution_time: Some(MillisSinceEpoch::from( SystemTime::now() + Duration::from_secs(60), diff --git a/crates/worker/src/partition/state_machine/tests/idempotency.rs b/crates/worker/src/partition/state_machine/tests/idempotency.rs index 5bef2a94d..11b417c26 100644 --- a/crates/worker/src/partition/state_machine/tests/idempotency.rs +++ b/crates/worker/src/partition/state_machine/tests/idempotency.rs @@ -21,14 +21,15 @@ use restate_storage_api::timer_table::{Timer, TimerKey, TimerKeyKind}; use restate_types::errors::GONE_INVOCATION_ERROR; use restate_types::identifiers::{IdempotencyId, IngressRequestId}; use restate_types::invocation::{ - AttachInvocationRequest, InvocationQuery, InvocationTarget, SubmitNotificationSink, + AttachInvocationRequest, InvocationQuery, InvocationTarget, PurgeInvocationRequest, + SubmitNotificationSink, }; use restate_wal_protocol::timer::TimerKeyValue; use std::time::Duration; use test_log::test; #[test(tokio::test(flavor = "multi_thread", worker_threads = 2))] -async fn start_idempotent_invocation() { +async fn start_and_complete_idempotent_invocation() { let tc = TaskCenterBuilder::default() .default_runtime_handle(tokio::runtime::Handle::current()) .build() @@ -59,7 +60,7 @@ async fn start_idempotent_invocation() { request_id, }), idempotency_key: Some(idempotency_key), - completion_retention_time: Some(retention), + completion_retention_duration: Some(retention), ..ServiceInvocation::mock() })) .await; @@ -142,6 +143,128 @@ async fn start_idempotent_invocation() { ); } +#[test(tokio::test(flavor = "multi_thread", worker_threads = 2))] +async fn start_and_complete_idempotent_invocation_neo_table() { + let tc = TaskCenterBuilder::default() + .default_runtime_handle(tokio::runtime::Handle::current()) + .build() + .expect("task_center builds"); + let mut state_machine = tc + .run_in_scope( + "mock-state-machine", + None, + MockStateMachine::create_with_neo_invocation_status_table(), + ) + .await; + + let idempotency_key = ByteString::from_static("my-idempotency-key"); + let retention = Duration::from_secs(60) * 60 * 24; + let invocation_target = InvocationTarget::mock_virtual_object(); + let invocation_id = InvocationId::generate_with_idempotency_key( + &invocation_target, + Some(idempotency_key.clone()), + ); + let idempotency_id = + IdempotencyId::combine(invocation_id, &invocation_target, idempotency_key.clone()); + let node_id = GenerationalNodeId::new(1, 1); + let request_id = IngressRequestId::default(); + + // Send fresh invocation with idempotency key + let actions = state_machine + .apply(Command::Invoke(ServiceInvocation { + invocation_id, + invocation_target: invocation_target.clone(), + response_sink: Some(ServiceInvocationResponseSink::Ingress { + node_id, + request_id, + }), + idempotency_key: Some(idempotency_key), + completion_retention_duration: Some(retention), + ..ServiceInvocation::mock() + })) + .await; + assert_that!( + actions, + contains(pat!(Action::Invoke { + invocation_id: eq(invocation_id), + invoke_input_journal: pat!(InvokeInputJournal::CachedJournal(_, _)) + })) + ); + + // Assert idempotency key mapping exists + assert_that!( + state_machine + .storage() + .get_idempotency_metadata(&idempotency_id) + .await + .unwrap() + .unwrap(), + pat!(IdempotencyMetadata { + invocation_id: eq(invocation_id), + }) + ); + + // Send output, then end + let response_bytes = Bytes::from_static(b"123"); + let actions = state_machine + .apply_multiple([ + Command::InvokerEffect(InvokerEffect { + invocation_id, + kind: InvokerEffectKind::JournalEntry { + entry_index: 1, + entry: ProtobufRawEntryCodec::serialize_enriched(Entry::output( + EntryResult::Success(response_bytes.clone()), + )), + }, + }), + Command::InvokerEffect(InvokerEffect { + invocation_id, + kind: InvokerEffectKind::End, + }), + ]) + .await; + + // Assert response and timeout + assert_that!( + actions, + all!( + contains(pat!(Action::IngressResponse(pat!( + IngressResponseEnvelope { + target_node: eq(node_id), + inner: pat!(ingress::InvocationResponse { + request_id: eq(request_id), + invocation_id: some(eq(invocation_id)), + response: eq(IngressResponseResult::Success( + invocation_target.clone(), + response_bytes.clone() + )) + }) + } + )))), + not(contains(pat!(Action::ScheduleInvocationStatusCleanup { + invocation_id: eq(invocation_id) + }))) + ) + ); + + // InvocationStatus contains completed + let invocation_status = state_machine + .storage() + .get_invocation_status(&invocation_id) + .await + .unwrap(); + let_assert!(InvocationStatus::Completed(completed_invocation) = invocation_status); + assert_eq!( + completed_invocation.response_result, + ResponseResult::Success(response_bytes) + ); + assert!(unsafe { completed_invocation.timestamps.completed_transition_time() }.is_some()); + assert_eq!( + completed_invocation.completion_retention_duration, + retention + ); +} + #[test(tokio::test(flavor = "multi_thread", worker_threads = 2))] async fn complete_already_completed_invocation() { let tc = TaskCenterBuilder::default() @@ -176,6 +299,7 @@ async fn complete_already_completed_invocation() { idempotency_key: Some(idempotency_key.clone()), timestamps: StatusTimestamps::now(), response_result: ResponseResult::Success(response_bytes.clone()), + completion_retention_duration: Default::default(), source_table: SourceTable::New, }), ) @@ -270,7 +394,7 @@ async fn known_invocation_id_but_missing_completion() { request_id, }), idempotency_key: Some(idempotency_key), - completion_retention_time: Some(retention), + completion_retention_duration: Some(retention), ..ServiceInvocation::mock() })) .await; @@ -330,7 +454,7 @@ async fn attach_with_service_invocation_command_while_executing() { request_id: request_id_1, }), idempotency_key: Some(idempotency_key.clone()), - completion_retention_time: Some(retention), + completion_retention_duration: Some(retention), ..ServiceInvocation::mock() })) .await; @@ -447,7 +571,7 @@ async fn attach_with_send_service_invocation() { request_id: request_id_1, }), idempotency_key: Some(idempotency_key.clone()), - completion_retention_time: Some(retention), + completion_retention_duration: Some(retention), ..ServiceInvocation::mock() })) .await; @@ -470,7 +594,7 @@ async fn attach_with_send_service_invocation() { invocation_target: invocation_target.clone(), response_sink: None, idempotency_key: Some(idempotency_key.clone()), - completion_retention_time: Some(retention), + completion_retention_duration: Some(retention), submit_notification_sink: Some(SubmitNotificationSink::Ingress { node_id, request_id: request_id_2, @@ -580,7 +704,7 @@ async fn attach_inboxed_with_send_service_invocation() { invocation_id: attached_invocation_id, invocation_target: invocation_target.clone(), idempotency_key: Some(idempotency_key.clone()), - completion_retention_time: Some(Duration::from_secs(60) * 60 * 24), + completion_retention_duration: Some(Duration::from_secs(60) * 60 * 24), submit_notification_sink: Some(SubmitNotificationSink::Ingress { node_id, request_id: request_id_1, @@ -633,7 +757,7 @@ async fn attach_inboxed_with_send_service_invocation() { invocation_id: original_invocation_id, invocation_target: invocation_target.clone(), idempotency_key: Some(idempotency_key.clone()), - completion_retention_time: Some(Duration::from_secs(60) * 60 * 24), + completion_retention_duration: Some(Duration::from_secs(60) * 60 * 24), submit_notification_sink: Some(SubmitNotificationSink::Ingress { node_id, request_id: request_id_2, @@ -694,7 +818,7 @@ async fn attach_command() { request_id: request_id_1, }), idempotency_key: Some(idempotency_key.clone()), - completion_retention_time: Some(completion_retention), + completion_retention_duration: Some(completion_retention), ..ServiceInvocation::mock() })) .await; @@ -775,6 +899,7 @@ async fn attach_command() { ); } +// TODO remove this once we remove the old invocation status table #[test(tokio::test(flavor = "multi_thread", worker_threads = 2))] async fn timer_cleanup() { let tc = TaskCenterBuilder::default() @@ -806,7 +931,8 @@ async fn timer_cleanup() { idempotency_key: Some(idempotency_key.clone()), timestamps: StatusTimestamps::now(), response_result: ResponseResult::Success(Bytes::from_static(b"123")), - source_table: SourceTable::New, + completion_retention_duration: Duration::MAX, + source_table: SourceTable::Old, }), ) .await; @@ -843,3 +969,65 @@ async fn timer_cleanup() { none() ); } + +#[test(tokio::test)] +async fn purge_completed_idempotent_invocation() { + let tc = TaskCenterBuilder::default() + .default_runtime_handle(tokio::runtime::Handle::current()) + .build() + .expect("task_center builds"); + let mut state_machine = tc + .run_in_scope( + "mock-state-machine", + None, + MockStateMachine::create_with_neo_invocation_status_table(), + ) + .await; + + let idempotency_key = ByteString::from_static("my-idempotency-key"); + let invocation_target = InvocationTarget::mock_virtual_object(); + let invocation_id = InvocationId::generate_with_idempotency_key( + &invocation_target, + Some(idempotency_key.clone()), + ); + let idempotency_id = + IdempotencyId::combine(invocation_id, &invocation_target, idempotency_key.clone()); + + // Prepare idempotency metadata and completed status + let mut txn = state_machine.storage().transaction(); + txn.put_idempotency_metadata(&idempotency_id, IdempotencyMetadata { invocation_id }) + .await; + txn.put_invocation_status( + &invocation_id, + InvocationStatus::Completed(CompletedInvocation { + invocation_target, + idempotency_key: Some(idempotency_key.clone()), + ..CompletedInvocation::mock_neo() + }), + ) + .await; + txn.commit().await.unwrap(); + + // Send purge command + let _ = state_machine + .apply(Command::PurgeInvocation(PurgeInvocationRequest { + invocation_id, + })) + .await; + assert_that!( + state_machine + .storage() + .get_invocation_status(&invocation_id) + .await + .unwrap(), + pat!(InvocationStatus::Free) + ); + assert_that!( + state_machine + .storage() + .get_idempotency_metadata(&idempotency_id) + .await + .unwrap(), + none() + ); +} diff --git a/crates/worker/src/partition/state_machine/tests/mod.rs b/crates/worker/src/partition/state_machine/tests/mod.rs index 67786933a..a58db6629 100644 --- a/crates/worker/src/partition/state_machine/tests/mod.rs +++ b/crates/worker/src/partition/state_machine/tests/mod.rs @@ -665,7 +665,7 @@ async fn send_ingress_response_to_multiple_targets() -> TestResult { span_context: Default::default(), headers: vec![], execution_time: None, - completion_retention_time: None, + completion_retention_duration: None, idempotency_key: None, submit_notification_sink: None, })) @@ -1005,7 +1005,7 @@ async fn mock_start_invocation_with_invocation_target( span_context: Default::default(), headers: vec![], execution_time: None, - completion_retention_time: None, + completion_retention_duration: None, idempotency_key: None, submit_notification_sink: None, })) diff --git a/crates/worker/src/partition/state_machine/tests/workflow.rs b/crates/worker/src/partition/state_machine/tests/workflow.rs index 6c3d1a6b8..ff236f4b4 100644 --- a/crates/worker/src/partition/state_machine/tests/workflow.rs +++ b/crates/worker/src/partition/state_machine/tests/workflow.rs @@ -16,7 +16,9 @@ use restate_storage_api::invocation_status_table::{ use restate_storage_api::service_status_table::ReadOnlyVirtualObjectStatusTable; use restate_storage_api::timer_table::{Timer, TimerKey, TimerKeyKind}; use restate_types::errors::WORKFLOW_ALREADY_INVOKED_INVOCATION_ERROR; -use restate_types::invocation::{AttachInvocationRequest, InvocationQuery, InvocationTarget}; +use restate_types::invocation::{ + AttachInvocationRequest, InvocationQuery, InvocationTarget, PurgeInvocationRequest, +}; use restate_wal_protocol::timer::TimerKeyValue; use std::time::Duration; use test_log::test; @@ -42,7 +44,7 @@ async fn start_workflow_method() { .apply(Command::Invoke(ServiceInvocation { invocation_id, invocation_target: invocation_target.clone(), - completion_retention_time: Some(Duration::from_secs(60)), + completion_retention_duration: Some(Duration::from_secs(60)), response_sink: Some(ServiceInvocationResponseSink::Ingress { node_id, request_id: request_id_1, @@ -226,7 +228,7 @@ async fn attach_by_workflow_key() { .apply(Command::Invoke(ServiceInvocation { invocation_id, invocation_target: invocation_target.clone(), - completion_retention_time: Some(Duration::from_secs(60)), + completion_retention_duration: Some(Duration::from_secs(60)), response_sink: Some(ServiceInvocationResponseSink::Ingress { node_id, request_id: request_id_1, @@ -362,6 +364,7 @@ async fn attach_by_workflow_key() { ); } +// TODO remove this once we remove the old invocation status table #[test(tokio::test(flavor = "multi_thread", worker_threads = 2))] async fn timer_cleanup() { let tc = TaskCenterBuilder::default() @@ -385,6 +388,7 @@ async fn timer_cleanup() { idempotency_key: None, timestamps: StatusTimestamps::now(), response_result: ResponseResult::Success(Bytes::from_static(b"123")), + completion_retention_duration: Default::default(), source_table: SourceTable::New, }), ) @@ -427,3 +431,58 @@ async fn timer_cleanup() { pat!(VirtualObjectStatus::Unlocked) ); } + +#[test(tokio::test)] +async fn purge_completed_workflow() { + let tc = TaskCenterBuilder::default() + .default_runtime_handle(tokio::runtime::Handle::current()) + .build() + .expect("task_center builds"); + let mut state_machine = tc + .run_in_scope("mock-state-machine", None, MockStateMachine::create()) + .await; + + let invocation_target = InvocationTarget::mock_workflow(); + let invocation_id = InvocationId::mock_random(); + + // Prepare idempotency metadata and completed status + let mut txn = state_machine.storage().transaction(); + txn.put_invocation_status( + &invocation_id, + InvocationStatus::Completed(CompletedInvocation { + invocation_target: invocation_target.clone(), + idempotency_key: None, + ..CompletedInvocation::mock_neo() + }), + ) + .await; + txn.put_virtual_object_status( + &invocation_target.as_keyed_service_id().unwrap(), + VirtualObjectStatus::Locked(invocation_id), + ) + .await; + txn.commit().await.unwrap(); + + // Send timer fired command + let _ = state_machine + .apply(Command::PurgeInvocation(PurgeInvocationRequest { + invocation_id, + })) + .await; + assert_that!( + state_machine + .storage() + .get_invocation_status(&invocation_id) + .await + .unwrap(), + pat!(InvocationStatus::Free) + ); + assert_that!( + state_machine + .storage() + .get_virtual_object_status(&invocation_target.as_keyed_service_id().unwrap()) + .await + .unwrap(), + pat!(VirtualObjectStatus::Unlocked) + ); +} diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index f1beb94c6..787af7d19 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -613,6 +613,7 @@ impl PartitionProcessorManager { key_range.clone(), status, options.num_timers_in_memory_limit(), + options.cleanup_interval(), options.experimental_feature_new_invocation_status_table(), options.internal_queue_length(), control_rx,