Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flatten effects #1837

Merged
merged 10 commits into from
Aug 15, 2024
7 changes: 1 addition & 6 deletions crates/worker/src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::metric_definitions::{
PP_APPLY_RECORD_DURATION,
};
use crate::partition::leadership::{LeadershipState, PartitionProcessorMetadata};
use crate::partition::state_machine::{ActionCollector, Effects, StateMachine};
use crate::partition::state_machine::{ActionCollector, StateMachine};
use crate::partition::storage::{DedupSequenceNumberResolver, PartitionStorage, Transaction};

mod action_effect_handler;
Expand Down Expand Up @@ -316,7 +316,6 @@ where
let actuator_effects_handled = counter!(PARTITION_ACTUATOR_HANDLED);

let mut action_collector = ActionCollector::default();
let mut effects = Effects::default();

loop {
tokio::select! {
Expand Down Expand Up @@ -345,12 +344,10 @@ where

// clear buffers used when applying the next record
action_collector.clear();
effects.clear();

let leadership_change = self.apply_record(
record,
&mut transaction,
&mut effects,
&mut action_collector).await?;

if let Some((header, announce_leader)) = leadership_change {
Expand Down Expand Up @@ -433,7 +430,6 @@ where
&mut self,
record: (Lsn, Envelope),
transaction: &mut Transaction<RocksDBTransaction<'_>>,
effects: &mut Effects,
action_collector: &mut ActionCollector,
) -> Result<Option<(Header, AnnounceLeader)>, state_machine::Error> {
let (lsn, envelope) = record;
Expand Down Expand Up @@ -477,7 +473,6 @@ where
self.state_machine
.apply(
envelope.command,
effects,
transaction,
action_collector,
self.leadership_state.is_leader(),
Expand Down
2 changes: 2 additions & 0 deletions crates/worker/src/partition/state_machine/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use restate_types::message::MessageIndex;
use restate_wal_protocol::timer::TimerKeyValue;
use std::time::Duration;

pub type ActionCollector = Vec<Action>;

#[derive(Debug, strum::IntoStaticStr)]
pub enum Action {
Invoke {
Expand Down
Loading
Loading