Skip to content

Commit

Permalink
raft: make state machine application synchronous
Browse files Browse the repository at this point in the history
This significantly simplifies Raft processing by getting rid of the
state machine driver and its protocol. It does mean that state
transitions will block other Raft processing, but toyDB should prefer
simplicity over performance.
  • Loading branch information
erikgrinaker committed Mar 30, 2024
1 parent c94682c commit 11474e4
Show file tree
Hide file tree
Showing 8 changed files with 244 additions and 818 deletions.
5 changes: 0 additions & 5 deletions src/raft/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,6 @@ pub enum Event {
},
/// Followers confirm loyalty to leader after heartbeats.
ConfirmLeader {
/// The commit_index of the original leader heartbeat, to confirm
/// read requests.
///
/// TODO: remove this when migrated to read_seq.
commit_index: Index,
/// If false, the follower does not have the entry at commit_index
/// and would like the leader to replicate it.
has_committed: bool,
Expand Down
2 changes: 1 addition & 1 deletion src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ pub use self::log::{Entry, Index, Log};
pub use message::{Address, Event, Message, ReadSequence, Request, RequestID, Response, Status};
pub use node::{Node, NodeID, Term};
pub use server::Server;
pub use state::{Driver, Instruction, State};
pub use state::State;
39 changes: 15 additions & 24 deletions src/raft/node/candidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,21 +168,19 @@ impl RawNode<Candidate> {

#[cfg(test)]
mod tests {
use super::super::super::{Entry, Instruction, Log, Request};
use super::super::super::state::tests::TestState;
use super::super::super::{Entry, Log, Request};
use super::super::tests::{assert_messages, assert_node};
use super::*;
use crate::storage;
use tokio::sync::mpsc;

#[allow(clippy::type_complexity)]
fn setup() -> Result<(
RawNode<Candidate>,
mpsc::UnboundedReceiver<Message>,
mpsc::UnboundedReceiver<Instruction>,
)> {
fn setup() -> Result<(RawNode<Candidate>, mpsc::UnboundedReceiver<Message>)> {
let (node_tx, node_rx) = mpsc::unbounded_channel();
let (state_tx, state_rx) = mpsc::unbounded_channel();
let state = Box::new(TestState::new(0));
let mut log = Log::new(storage::engine::Memory::new(), false)?;

log.append(1, Some(vec![0x01]))?;
log.append(1, Some(vec![0x02]))?;
log.append(2, Some(vec![0x03]))?;
Expand All @@ -194,18 +192,18 @@ mod tests {
peers: HashSet::from([2, 3, 4, 5]),
term: 3,
log,
state,
node_tx,
state_tx,
role: Candidate::new(),
};
node.role.votes.insert(1);
Ok((node, node_rx, state_rx))
Ok((node, node_rx))
}

#[test]
// Heartbeat for current term converts to follower and emits ConfirmLeader.
fn step_heartbeat_current_term() -> Result<()> {
let (candidate, mut node_rx, mut state_rx) = setup()?;
let (candidate, mut node_rx) = setup()?;
let mut node = candidate.step(Message {
from: Address::Node(2),
to: Address::Node(1),
Expand All @@ -219,18 +217,17 @@ mod tests {
from: Address::Node(1),
to: Address::Node(2),
term: 3,
event: Event::ConfirmLeader { commit_index: 2, has_committed: true, read_seq: 7 },
event: Event::ConfirmLeader { has_committed: true, read_seq: 7 },
}],
);
assert_messages(&mut state_rx, vec![]);
Ok(())
}

#[test]
// Heartbeat for future term converts to follower and emits ConfirmLeader
// event.
fn step_heartbeat_future_term() -> Result<()> {
let (candidate, mut node_rx, mut state_rx) = setup()?;
let (candidate, mut node_rx) = setup()?;
let mut node = candidate.step(Message {
from: Address::Node(2),
to: Address::Node(1),
Expand All @@ -244,17 +241,16 @@ mod tests {
from: Address::Node(1),
to: Address::Node(2),
term: 4,
event: Event::ConfirmLeader { commit_index: 2, has_committed: true, read_seq: 7 },
event: Event::ConfirmLeader { has_committed: true, read_seq: 7 },
}],
);
assert_messages(&mut state_rx, vec![]);
Ok(())
}

#[test]
// Heartbeat for past term is ignored
fn step_heartbeat_past_term() -> Result<()> {
let (candidate, mut node_rx, mut state_rx) = setup()?;
let (candidate, mut node_rx) = setup()?;
let mut node = candidate.step(Message {
from: Address::Node(2),
to: Address::Node(1),
Expand All @@ -263,13 +259,12 @@ mod tests {
})?;
assert_node(&mut node).is_candidate().term(3);
assert_messages(&mut node_rx, vec![]);
assert_messages(&mut state_rx, vec![]);
Ok(())
}

#[test]
fn step_grantvote() -> Result<()> {
let (candidate, mut node_rx, mut state_rx) = setup()?;
let (candidate, mut node_rx) = setup()?;
let peers = candidate.peers.clone();
let mut node = Node::Candidate(candidate);

Expand All @@ -282,7 +277,6 @@ mod tests {
})?;
assert_node(&mut node).is_candidate().term(3);
assert_messages(&mut node_rx, vec![]);
assert_messages(&mut state_rx, vec![]);

// However, the second external vote makes us leader
node = node.step(Message {
Expand Down Expand Up @@ -320,14 +314,13 @@ mod tests {
}

assert_messages(&mut node_rx, vec![]);
assert_messages(&mut state_rx, vec![]);
Ok(())
}

#[test]
// ClientRequest returns Error::Abort.
fn step_clientrequest() -> Result<()> {
let (candidate, mut node_rx, mut state_rx) = setup()?;
let (candidate, mut node_rx) = setup()?;
let mut node = Node::Candidate(candidate);

node = node.step(Message {
Expand All @@ -346,13 +339,12 @@ mod tests {
event: Event::ClientResponse { id: vec![0x01], response: Err(Error::Abort) },
}],
);
assert_messages(&mut state_rx, vec![]);
Ok(())
}

#[test]
fn tick() -> Result<()> {
let (candidate, mut node_rx, mut state_rx) = setup()?;
let (candidate, mut node_rx) = setup()?;
let timeout = candidate.role.election_timeout;
let mut node = Node::Candidate(candidate);

Expand All @@ -372,7 +364,6 @@ mod tests {
event: Event::SolicitVote { last_index: 3, last_term: 2 },
}],
);
assert_messages(&mut state_rx, vec![]);
Ok(())
}
}
Loading

0 comments on commit 11474e4

Please sign in to comment.