diff --git a/crates/worker/src/partition/state_machine/mod.rs b/crates/worker/src/partition/state_machine/mod.rs index 5df70fcb7..440c78363 100644 --- a/crates/worker/src/partition/state_machine/mod.rs +++ b/crates/worker/src/partition/state_machine/mod.rs @@ -86,10 +86,13 @@ mod tests { use crate::partition::types::{InvokerEffect, InvokerEffectKind}; use bytes::Bytes; use bytestring::ByteString; + use googletest::matcher::Matcher; use googletest::{all, assert_that, pat, property}; use restate_invoker_api::InvokeInputJournal; use restate_service_protocol::codec::ProtobufRawEntryCodec; + use restate_storage_api::inbox_table::InboxTable; use restate_storage_api::journal_table::{JournalEntry, JournalTable}; + use restate_storage_api::outbox_table::OutboxTable; use restate_storage_api::status_table::{ InvocationMetadata, JournalMetadata, NotificationTarget, }; @@ -98,12 +101,14 @@ mod tests { use restate_storage_rocksdb::RocksDBStorage; use restate_test_util::matchers::*; use restate_test_util::test; + use restate_types::errors::UserErrorCode; use restate_types::identifiers::{ - FullInvocationId, InvocationUuid, PartitionKey, ServiceId, WithPartitionKey, + FullInvocationId, InvocationId, InvocationUuid, PartitionId, PartitionKey, ServiceId, + WithPartitionKey, }; use restate_types::invocation::{ InvocationResponse, MaybeFullInvocationId, ResponseResult, ServiceInvocation, - ServiceInvocationSpanContext, + ServiceInvocationResponseSink, ServiceInvocationSpanContext, }; use restate_types::journal::enriched::EnrichedRawEntry; use restate_types::journal::raw::EntryHeader; @@ -136,6 +141,10 @@ mod tests { } impl MockStateMachine { + pub fn partition_id(&self) -> PartitionId { + 0 + } + pub fn new(inbox_seq_number: MessageIndex, outbox_seq_number: MessageIndex) -> Self { let temp_dir = tempdir().unwrap(); info!("Using RocksDB temp directory {}", temp_dir.path().display()); @@ -153,12 +162,13 @@ mod tests { pub async fn apply(&mut self, command: AckCommand) -> Vec { let transaction = self.rocksdb_storage.transaction(); + let partition_id = self.partition_id(); self.state_machine .apply( command, &mut self.effects_buffer, crate::partition::storage::Transaction::new( - 0, + partition_id, 0..=PartitionKey::MAX, transaction, ), @@ -174,12 +184,13 @@ mod tests { pub async fn apply_cmd(&mut self, command: Command) -> Vec { let transaction = self.rocksdb_storage.transaction(); + let partition_id = self.partition_id(); self.state_machine .apply( AckCommand::no_ack(command), &mut self.effects_buffer, crate::partition::storage::Transaction::new( - 0, + partition_id, 0..=PartitionKey::MAX, transaction, ), @@ -331,6 +342,92 @@ mod tests { ); } + #[test(tokio::test)] + async fn kill_inboxed_invocation() -> anyhow::Result<()> { + let mut state_machine = MockStateMachine::default(); + + let fid = FullInvocationId::generate("svc", "key"); + let inboxed_fid = FullInvocationId::generate("svc", "key"); + let caller_fid = FullInvocationId::mock_random(); + + let _ = state_machine + .apply_cmd(Command::Invocation(ServiceInvocation { + fid, + ..ServiceInvocation::mock() + })) + .await; + + let _ = state_machine + .apply_cmd(Command::Invocation(ServiceInvocation { + fid: inboxed_fid.clone(), + response_sink: Some(ServiceInvocationResponseSink::PartitionProcessor { + caller: caller_fid.clone(), + entry_index: 0, + }), + ..ServiceInvocation::mock() + })) + .await; + + let result = state_machine + .storage() + .transaction() + .get_inbox_entry(inboxed_fid.clone()) + .await?; + + // assert that inboxed invocation is in inbox + assert!(result.is_some()); + + let actions = state_machine + .apply_cmd(Command::Kill(InvocationId::from(inboxed_fid.clone()))) + .await; + + let result = state_machine + .storage() + .transaction() + .get_inbox_entry(inboxed_fid.clone()) + .await?; + + // assert that inboxed invocation has been removed + assert!(result.is_none()); + + fn outbox_message_matcher( + caller_fid: FullInvocationId, + ) -> impl Matcher { + pat!( + restate_storage_api::outbox_table::OutboxMessage::ServiceResponse(pat!( + restate_types::invocation::InvocationResponse { + id: eq(MaybeFullInvocationId::Full(caller_fid)), + entry_index: eq(0), + result: pat!(ResponseResult::Failure( + eq(UserErrorCode::Aborted), + eq(ByteString::from_static("killed")) + )) + } + )) + ) + } + + assert_that!( + actions, + contains(pat!(Action::NewOutboxMessage { + message: outbox_message_matcher(caller_fid.clone()) + })) + ); + + let outbox_message = state_machine + .storage() + .transaction() + .get_next_outbox_message(state_machine.partition_id(), 0) + .await?; + + assert_that!( + outbox_message, + some((ge(0), outbox_message_matcher(caller_fid.clone()))) + ); + + Ok(()) + } + mod virtual_invocation { use super::*;