Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Mar 29, 2024
1 parent 05c71a6 commit 83763d2
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 806 deletions.
5 changes: 0 additions & 5 deletions src/raft/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,6 @@ pub enum Event {
},
/// Followers confirm loyalty to leader after heartbeats.
ConfirmLeader {
/// The commit_index of the original leader heartbeat, to confirm
/// read requests.
///
/// TODO: remove this when migrated to read_seq.
commit_index: Index,
/// If false, the follower does not have the entry at commit_index
/// and would like the leader to replicate it.
has_committed: bool,
Expand Down
2 changes: 1 addition & 1 deletion src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ pub use self::log::{Entry, Index, Log};
pub use message::{Address, Event, Message, ReadSequence, Request, RequestID, Response};
pub use node::{Node, NodeID, Status, Term};
pub use server::Server;
pub use state::{Driver, Instruction, State};
pub use state::State;
39 changes: 15 additions & 24 deletions src/raft/node/candidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,21 +168,19 @@ impl RawNode<Candidate> {

#[cfg(test)]
mod tests {
use super::super::super::{Entry, Instruction, Log, Request};
use super::super::super::state::tests::TestState;
use super::super::super::{Entry, Log, Request};
use super::super::tests::{assert_messages, assert_node};
use super::*;
use crate::storage;
use tokio::sync::mpsc;

#[allow(clippy::type_complexity)]
fn setup() -> Result<(
RawNode<Candidate>,
mpsc::UnboundedReceiver<Message>,
mpsc::UnboundedReceiver<Instruction>,
)> {
fn setup() -> Result<(RawNode<Candidate>, mpsc::UnboundedReceiver<Message>)> {
let (node_tx, node_rx) = mpsc::unbounded_channel();
let (state_tx, state_rx) = mpsc::unbounded_channel();
let state = Box::new(TestState::new(0));
let mut log = Log::new(storage::engine::Memory::new(), false)?;

log.append(1, Some(vec![0x01]))?;
log.append(1, Some(vec![0x02]))?;
log.append(2, Some(vec![0x03]))?;
Expand All @@ -194,18 +192,18 @@ mod tests {
peers: HashSet::from([2, 3, 4, 5]),
term: 3,
log,
state,
node_tx,
state_tx,
role: Candidate::new(),
};
node.role.votes.insert(1);
Ok((node, node_rx, state_rx))
Ok((node, node_rx))
}

#[test]
// Heartbeat for current term converts to follower and emits ConfirmLeader.
fn step_heartbeat_current_term() -> Result<()> {
let (candidate, mut node_rx, mut state_rx) = setup()?;
let (candidate, mut node_rx) = setup()?;
let mut node = candidate.step(Message {
from: Address::Node(2),
to: Address::Node(1),
Expand All @@ -219,18 +217,17 @@ mod tests {
from: Address::Node(1),
to: Address::Node(2),
term: 3,
event: Event::ConfirmLeader { commit_index: 2, has_committed: true, read_seq: 7 },
event: Event::ConfirmLeader { has_committed: true, read_seq: 7 },
}],
);
assert_messages(&mut state_rx, vec![]);
Ok(())
}

#[test]
// Heartbeat for future term converts to follower and emits ConfirmLeader
// event.
fn step_heartbeat_future_term() -> Result<()> {
let (candidate, mut node_rx, mut state_rx) = setup()?;
let (candidate, mut node_rx) = setup()?;
let mut node = candidate.step(Message {
from: Address::Node(2),
to: Address::Node(1),
Expand All @@ -244,17 +241,16 @@ mod tests {
from: Address::Node(1),
to: Address::Node(2),
term: 4,
event: Event::ConfirmLeader { commit_index: 2, has_committed: true, read_seq: 7 },
event: Event::ConfirmLeader { has_committed: true, read_seq: 7 },
}],
);
assert_messages(&mut state_rx, vec![]);
Ok(())
}

#[test]
// Heartbeat for past term is ignored
fn step_heartbeat_past_term() -> Result<()> {
let (candidate, mut node_rx, mut state_rx) = setup()?;
let (candidate, mut node_rx) = setup()?;
let mut node = candidate.step(Message {
from: Address::Node(2),
to: Address::Node(1),
Expand All @@ -263,13 +259,12 @@ mod tests {
})?;
assert_node(&mut node).is_candidate().term(3);
assert_messages(&mut node_rx, vec![]);
assert_messages(&mut state_rx, vec![]);
Ok(())
}

#[test]
fn step_grantvote() -> Result<()> {
let (candidate, mut node_rx, mut state_rx) = setup()?;
let (candidate, mut node_rx) = setup()?;
let peers = candidate.peers.clone();
let mut node = Node::Candidate(candidate);

Expand All @@ -282,7 +277,6 @@ mod tests {
})?;
assert_node(&mut node).is_candidate().term(3);
assert_messages(&mut node_rx, vec![]);
assert_messages(&mut state_rx, vec![]);

// However, the second external vote makes us leader
node = node.step(Message {
Expand Down Expand Up @@ -320,14 +314,13 @@ mod tests {
}

assert_messages(&mut node_rx, vec![]);
assert_messages(&mut state_rx, vec![]);
Ok(())
}

#[test]
// ClientRequest returns Error::Abort.
fn step_clientrequest() -> Result<()> {
let (candidate, mut node_rx, mut state_rx) = setup()?;
let (candidate, mut node_rx) = setup()?;
let mut node = Node::Candidate(candidate);

node = node.step(Message {
Expand All @@ -346,13 +339,12 @@ mod tests {
event: Event::ClientResponse { id: vec![0x01], response: Err(Error::Abort) },
}],
);
assert_messages(&mut state_rx, vec![]);
Ok(())
}

#[test]
fn tick() -> Result<()> {
let (candidate, mut node_rx, mut state_rx) = setup()?;
let (candidate, mut node_rx) = setup()?;
let timeout = candidate.role.election_timeout;
let mut node = Node::Candidate(candidate);

Expand All @@ -372,7 +364,6 @@ mod tests {
event: Event::SolicitVote { last_index: 3, last_term: 2 },
}],
);
assert_messages(&mut state_rx, vec![]);
Ok(())
}
}
Loading

0 comments on commit 83763d2

Please sign in to comment.