Skip to content

Commit

Permalink
Add unit test StateMachine for killing inboxed invocations
Browse files Browse the repository at this point in the history
This commit adds a unit test for the StateMachine for killing inboxed invocations.
It asserts that the inboxed invocation is properly removed from the inbox state in
the storage and that it inserts an outgoing response into the outbox.
  • Loading branch information
tillrohrmann committed Dec 12, 2023
1 parent 7102984 commit dc4a3df
Showing 1 changed file with 101 additions and 4 deletions.
105 changes: 101 additions & 4 deletions crates/worker/src/partition/state_machine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,13 @@ mod tests {
use crate::partition::types::{InvokerEffect, InvokerEffectKind};
use bytes::Bytes;
use bytestring::ByteString;
use googletest::matcher::Matcher;
use googletest::{all, assert_that, pat, property};
use restate_invoker_api::InvokeInputJournal;
use restate_service_protocol::codec::ProtobufRawEntryCodec;
use restate_storage_api::inbox_table::InboxTable;
use restate_storage_api::journal_table::{JournalEntry, JournalTable};
use restate_storage_api::outbox_table::OutboxTable;
use restate_storage_api::status_table::{
InvocationMetadata, JournalMetadata, NotificationTarget,
};
Expand All @@ -98,12 +101,14 @@ mod tests {
use restate_storage_rocksdb::RocksDBStorage;
use restate_test_util::matchers::*;
use restate_test_util::test;
use restate_types::errors::UserErrorCode;
use restate_types::identifiers::{
FullInvocationId, InvocationUuid, PartitionKey, ServiceId, WithPartitionKey,
FullInvocationId, InvocationId, InvocationUuid, PartitionId, PartitionKey, ServiceId,
WithPartitionKey,
};
use restate_types::invocation::{
InvocationResponse, MaybeFullInvocationId, ResponseResult, ServiceInvocation,
ServiceInvocationSpanContext,
ServiceInvocationResponseSink, ServiceInvocationSpanContext,
};
use restate_types::journal::enriched::EnrichedRawEntry;
use restate_types::journal::raw::EntryHeader;
Expand Down Expand Up @@ -136,6 +141,10 @@ mod tests {
}

impl MockStateMachine {
pub fn partition_id(&self) -> PartitionId {
0
}

pub fn new(inbox_seq_number: MessageIndex, outbox_seq_number: MessageIndex) -> Self {
let temp_dir = tempdir().unwrap();
info!("Using RocksDB temp directory {}", temp_dir.path().display());
Expand All @@ -153,12 +162,13 @@ mod tests {

pub async fn apply(&mut self, command: AckCommand) -> Vec<Action> {
let transaction = self.rocksdb_storage.transaction();
let partition_id = self.partition_id();
self.state_machine
.apply(
command,
&mut self.effects_buffer,
crate::partition::storage::Transaction::new(
0,
partition_id,
0..=PartitionKey::MAX,
transaction,
),
Expand All @@ -174,12 +184,13 @@ mod tests {

pub async fn apply_cmd(&mut self, command: Command) -> Vec<Action> {
let transaction = self.rocksdb_storage.transaction();
let partition_id = self.partition_id();
self.state_machine
.apply(
AckCommand::no_ack(command),
&mut self.effects_buffer,
crate::partition::storage::Transaction::new(
0,
partition_id,
0..=PartitionKey::MAX,
transaction,
),
Expand Down Expand Up @@ -331,6 +342,92 @@ mod tests {
);
}

#[test(tokio::test)]
async fn kill_inboxed_invocation() -> anyhow::Result<()> {
let mut state_machine = MockStateMachine::default();

let fid = FullInvocationId::generate("svc", "key");
let inboxed_fid = FullInvocationId::generate("svc", "key");
let caller_fid = FullInvocationId::mock_random();

let _ = state_machine
.apply_cmd(Command::Invocation(ServiceInvocation {
fid,
..ServiceInvocation::mock()
}))
.await;

let _ = state_machine
.apply_cmd(Command::Invocation(ServiceInvocation {
fid: inboxed_fid.clone(),
response_sink: Some(ServiceInvocationResponseSink::PartitionProcessor {
caller: caller_fid.clone(),
entry_index: 0,
}),
..ServiceInvocation::mock()
}))
.await;

let result = state_machine
.storage()
.transaction()
.get_inbox_entry(inboxed_fid.clone())
.await?;

// assert that inboxed invocation is in inbox
assert!(result.is_some());

let actions = state_machine
.apply_cmd(Command::Kill(InvocationId::from(inboxed_fid.clone())))
.await;

let result = state_machine
.storage()
.transaction()
.get_inbox_entry(inboxed_fid.clone())
.await?;

// assert that inboxed invocation has been removed
assert!(result.is_none());

fn outbox_message_matcher(
caller_fid: FullInvocationId,
) -> impl Matcher<ActualT = restate_storage_api::outbox_table::OutboxMessage> {
pat!(
restate_storage_api::outbox_table::OutboxMessage::ServiceResponse(pat!(
restate_types::invocation::InvocationResponse {
id: eq(MaybeFullInvocationId::Full(caller_fid)),
entry_index: eq(0),
result: pat!(ResponseResult::Failure(
eq(UserErrorCode::Aborted),
eq(ByteString::from_static("killed"))
))
}
))
)
}

assert_that!(
actions,
contains(pat!(Action::NewOutboxMessage {
message: outbox_message_matcher(caller_fid.clone())
}))
);

let outbox_message = state_machine
.storage()
.transaction()
.get_next_outbox_message(state_machine.partition_id(), 0)
.await?;

assert_that!(
outbox_message,
some((ge(0), outbox_message_matcher(caller_fid.clone())))
);

Ok(())
}

mod virtual_invocation {
use super::*;

Expand Down

0 comments on commit dc4a3df

Please sign in to comment.