From dc4a3dff08fd266f79165a07ee3716a60dac8aec Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 11 Dec 2023 12:48:24 +0100 Subject: [PATCH] Add unit test StateMachine for killing inboxed invocations This commit adds a unit test for the StateMachine for killing inboxed invocations. It asserts that the inboxed invocation is properly removed from the inbox state in the storage and that it inserts an outgoing response into the outbox. --- .../worker/src/partition/state_machine/mod.rs | 105 +++++++++++++++++- 1 file changed, 101 insertions(+), 4 deletions(-) diff --git a/crates/worker/src/partition/state_machine/mod.rs b/crates/worker/src/partition/state_machine/mod.rs index 5df70fcb7..440c78363 100644 --- a/crates/worker/src/partition/state_machine/mod.rs +++ b/crates/worker/src/partition/state_machine/mod.rs @@ -86,10 +86,13 @@ mod tests { use crate::partition::types::{InvokerEffect, InvokerEffectKind}; use bytes::Bytes; use bytestring::ByteString; + use googletest::matcher::Matcher; use googletest::{all, assert_that, pat, property}; use restate_invoker_api::InvokeInputJournal; use restate_service_protocol::codec::ProtobufRawEntryCodec; + use restate_storage_api::inbox_table::InboxTable; use restate_storage_api::journal_table::{JournalEntry, JournalTable}; + use restate_storage_api::outbox_table::OutboxTable; use restate_storage_api::status_table::{ InvocationMetadata, JournalMetadata, NotificationTarget, }; @@ -98,12 +101,14 @@ mod tests { use restate_storage_rocksdb::RocksDBStorage; use restate_test_util::matchers::*; use restate_test_util::test; + use restate_types::errors::UserErrorCode; use restate_types::identifiers::{ - FullInvocationId, InvocationUuid, PartitionKey, ServiceId, WithPartitionKey, + FullInvocationId, InvocationId, InvocationUuid, PartitionId, PartitionKey, ServiceId, + WithPartitionKey, }; use restate_types::invocation::{ InvocationResponse, MaybeFullInvocationId, ResponseResult, ServiceInvocation, - ServiceInvocationSpanContext, + ServiceInvocationResponseSink, ServiceInvocationSpanContext, }; use restate_types::journal::enriched::EnrichedRawEntry; use restate_types::journal::raw::EntryHeader; @@ -136,6 +141,10 @@ mod tests { } impl MockStateMachine { + pub fn partition_id(&self) -> PartitionId { + 0 + } + pub fn new(inbox_seq_number: MessageIndex, outbox_seq_number: MessageIndex) -> Self { let temp_dir = tempdir().unwrap(); info!("Using RocksDB temp directory {}", temp_dir.path().display()); @@ -153,12 +162,13 @@ mod tests { pub async fn apply(&mut self, command: AckCommand) -> Vec { let transaction = self.rocksdb_storage.transaction(); + let partition_id = self.partition_id(); self.state_machine .apply( command, &mut self.effects_buffer, crate::partition::storage::Transaction::new( - 0, + partition_id, 0..=PartitionKey::MAX, transaction, ), @@ -174,12 +184,13 @@ mod tests { pub async fn apply_cmd(&mut self, command: Command) -> Vec { let transaction = self.rocksdb_storage.transaction(); + let partition_id = self.partition_id(); self.state_machine .apply( AckCommand::no_ack(command), &mut self.effects_buffer, crate::partition::storage::Transaction::new( - 0, + partition_id, 0..=PartitionKey::MAX, transaction, ), @@ -331,6 +342,92 @@ mod tests { ); } + #[test(tokio::test)] + async fn kill_inboxed_invocation() -> anyhow::Result<()> { + let mut state_machine = MockStateMachine::default(); + + let fid = FullInvocationId::generate("svc", "key"); + let inboxed_fid = FullInvocationId::generate("svc", "key"); + let caller_fid = FullInvocationId::mock_random(); + + let _ = state_machine + .apply_cmd(Command::Invocation(ServiceInvocation { + fid, + ..ServiceInvocation::mock() + })) + .await; + + let _ = state_machine + .apply_cmd(Command::Invocation(ServiceInvocation { + fid: inboxed_fid.clone(), + response_sink: Some(ServiceInvocationResponseSink::PartitionProcessor { + caller: caller_fid.clone(), + entry_index: 0, + }), + ..ServiceInvocation::mock() + })) + .await; + + let result = state_machine + .storage() + .transaction() + .get_inbox_entry(inboxed_fid.clone()) + .await?; + + // assert that inboxed invocation is in inbox + assert!(result.is_some()); + + let actions = state_machine + .apply_cmd(Command::Kill(InvocationId::from(inboxed_fid.clone()))) + .await; + + let result = state_machine + .storage() + .transaction() + .get_inbox_entry(inboxed_fid.clone()) + .await?; + + // assert that inboxed invocation has been removed + assert!(result.is_none()); + + fn outbox_message_matcher( + caller_fid: FullInvocationId, + ) -> impl Matcher { + pat!( + restate_storage_api::outbox_table::OutboxMessage::ServiceResponse(pat!( + restate_types::invocation::InvocationResponse { + id: eq(MaybeFullInvocationId::Full(caller_fid)), + entry_index: eq(0), + result: pat!(ResponseResult::Failure( + eq(UserErrorCode::Aborted), + eq(ByteString::from_static("killed")) + )) + } + )) + ) + } + + assert_that!( + actions, + contains(pat!(Action::NewOutboxMessage { + message: outbox_message_matcher(caller_fid.clone()) + })) + ); + + let outbox_message = state_machine + .storage() + .transaction() + .get_next_outbox_message(state_machine.partition_id(), 0) + .await?; + + assert_that!( + outbox_message, + some((ge(0), outbox_message_matcher(caller_fid.clone()))) + ); + + Ok(()) + } + mod virtual_invocation { use super::*;