Skip to content

Commit

Permalink
scaffolding leader Sequencer
Browse files Browse the repository at this point in the history
  • Loading branch information
muhamadazmy committed Sep 25, 2024
1 parent 7da2942 commit 0fc55ac
Show file tree
Hide file tree
Showing 2 changed files with 275 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub enum SpreadSelectorError {
InsufficientWriteableNodes,
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum SelectorStrategy {
/// Selects all writeable nodes in the nodeset, this might lead to over-replication,
/// and it's up to the appender state machine to continue replicating beyond the
Expand All @@ -38,6 +38,7 @@ pub enum SelectorStrategy {
}

/// Spread selector is thread-safe and can be used concurrently.
#[derive(Clone)]
pub struct SpreadSelector {
nodeset: NodeSet,
strategy: SelectorStrategy,
Expand Down
277 changes: 273 additions & 4 deletions crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,26 @@ mod node;

use std::sync::Arc;

use restate_core::ShutdownError;
use futures::channel::oneshot;
use tokio::sync::{mpsc, OwnedSemaphorePermit, Semaphore};

use restate_core::{
cancellation_watcher,
network::{rpc_router::RpcRouter, MessageRouterBuilder, Networking, TransportConnect},
task_center, Metadata, ShutdownError, TaskKind,
};
use restate_types::{
logs::{LogletOffset, Record},
replicated_loglet::ReplicatedLogletId,
config::Configuration,
logs::{LogletOffset, Record, SequenceNumber, TailState},
net::log_server::Store,
replicated_loglet::{NodeSet, ReplicatedLogletId},
GenerationalNodeId,
};

use super::replication::spread_selector::SpreadSelector;
use crate::loglet::util::TailOffsetWatch;
use crate::loglet::{util::TailOffsetWatch, LogletCommit};
use append::Appender;
use node::RemoteLogServerManager;

#[derive(thiserror::Error, Debug)]
pub enum SequencerError {
Expand Down Expand Up @@ -55,6 +66,264 @@ impl SequencerSharedState {
}
}

/// internal commands sent over the [`SequencerHandler`] to sequencer main loop
struct SequencerCommand<Input, Output> {
input: Input,
sender: oneshot::Sender<Output>,
}

impl<Input, Output> SequencerCommand<Input, Output> {
fn new(input: Input) -> (oneshot::Receiver<Output>, Self) {
let (sender, receiver) = oneshot::channel();
(receiver, Self { input, sender })
}
}

/// Available sequencer commands
enum SequencerCommands {
/// executed commands
EnqueueBatch(SequencerCommand<Arc<[Record]>, Result<LogletCommit, SequencerError>>),
}

struct SequencerCommandsWithPermit {
permit: OwnedSemaphorePermit,
command: SequencerCommands,
}

/// Available sequencer control and introspection commands
enum SequencerCtrlCommands {
GetClusterState(SequencerCommand<(), ClusterState>),
}

/// Main interaction interface with the sequencer state machine
#[derive(Clone)]
pub struct SequencerHandle {
/// internal commands channel.
commands: mpsc::Sender<SequencerCommandsWithPermit>,
ctrl: mpsc::Sender<SequencerCtrlCommands>,
sequencer_shared_state: Arc<SequencerSharedState>,
permits: Arc<Semaphore>,
}

pub(crate) struct SequencerHandleSink {
commands: mpsc::Receiver<SequencerCommandsWithPermit>,
ctrl: mpsc::Receiver<SequencerCtrlCommands>,
}

impl SequencerHandle {
pub(crate) fn pair(
permits: Arc<Semaphore>,
sequencer_shared_state: Arc<SequencerSharedState>,
) -> (SequencerHandle, SequencerHandleSink) {
// todo: the size of the channel should be 1
let (commands_sender, commands_receiver) = mpsc::channel(1);
let (ctrl_sender, ctrl_received) = mpsc::channel(1);
(
SequencerHandle {
commands: commands_sender,
ctrl: ctrl_sender,
sequencer_shared_state,
permits,
},
SequencerHandleSink {
commands: commands_receiver,
ctrl: ctrl_received,
},
)
}

pub fn sequencer_state(&self) -> &SequencerSharedState {
&self.sequencer_shared_state
}

pub async fn cluster_state(&self) -> Result<ClusterState, ShutdownError> {
let (receiver, command) = SequencerCommand::new(());
self.ctrl
.send(SequencerCtrlCommands::GetClusterState(command))
.await
.map_err(|_| ShutdownError)?;

receiver.await.map_err(|_| ShutdownError)
}

pub async fn enqueue_batch(
&self,
payloads: Arc<[Record]>,
) -> Result<LogletCommit, SequencerError> {
let permit = self.permits.clone().acquire_owned().await.unwrap();

let (receiver, command) = SequencerCommand::new(payloads);
self.commands
.send(SequencerCommandsWithPermit {
permit,
command: SequencerCommands::EnqueueBatch(command),
})
.await
.map_err(|_| ShutdownError)?;

receiver.await.map_err(|_| ShutdownError)?
}
}

#[derive(Clone, Debug)]
pub struct ClusterState {
pub sequencer_id: GenerationalNodeId,
pub global_committed_tail: TailState<LogletOffset>,
}

/// Sequencer
pub struct Sequencer<T> {
sequencer_shared_state: Arc<SequencerSharedState>,
log_server_manager: RemoteLogServerManager<T>,
metadata: Metadata,
next_write_offset: LogletOffset,
rpc_router: RpcRouter<Store>,
handle_sink: SequencerHandleSink,
}

impl<T: TransportConnect> Sequencer<T> {
/// Create a new sequencer instance
pub fn new(
node_id: GenerationalNodeId,
loglet_id: ReplicatedLogletId,
node_set: NodeSet,
selector: SpreadSelector,
metadata: Metadata,
networking: Networking<T>,
router_builder: &mut MessageRouterBuilder,
) -> (SequencerHandle, Self) {
// - register for all potential response streams from the log-server(s).

let permits = Arc::new(Semaphore::new(
Configuration::pinned()
.bifrost
.replicated_loglet
.maximum_inflight_batches
.into(),
));

// shared state with appenders
let sequencer_shared_state = Arc::new(SequencerSharedState {
node_id,
loglet_id,
selector,
committed_tail: TailOffsetWatch::new(TailState::Open(LogletOffset::OLDEST)),
});

// create a command channel to be used by the sequencer handler. The handler then can be used
// to call and execute commands on the sequencer directly
let (handle, handle_sink) =
SequencerHandle::pair(permits, Arc::clone(&sequencer_shared_state));

let rpc_router = RpcRouter::new(router_builder);

let log_server_manager = RemoteLogServerManager::new(loglet_id, networking, node_set);

let sequencer = Sequencer {
sequencer_shared_state,
log_server_manager,
metadata,
next_write_offset: LogletOffset::OLDEST,
rpc_router,
handle_sink,
};

(handle, sequencer)
}

/// Start the sequencer main loop
pub async fn start(mut self) {
let shutdown = cancellation_watcher();
tokio::pin!(shutdown);

// enter main state machine loop
loop {
tokio::select! {
biased;
_ = &mut shutdown => {
break;
},
Some(command) = self.handle_sink.commands.recv() => {
self.process_command(command).await;
}
Some(ctrl) = self.handle_sink.ctrl.recv() => {
self.process_ctrl(ctrl);
}
}
}
}

fn process_ctrl(&mut self, ctrl: SequencerCtrlCommands) {
match ctrl {
SequencerCtrlCommands::GetClusterState(command) => {
let SequencerCommand { sender, .. } = command;
let _ = sender.send(self.cluster_state());
}
}
}
/// process calls from the SequencerHandler.
async fn process_command(&mut self, command: SequencerCommandsWithPermit) {
let SequencerCommandsWithPermit { permit, command } = command;

match command {
SequencerCommands::EnqueueBatch(command) => {
let SequencerCommand {
input: request,
sender,
} = command;

let _ = sender.send(self.enqueue_batch(permit, request).await);
}
}
}

fn cluster_state(&self) -> ClusterState {
ClusterState {
global_committed_tail: self
.sequencer_shared_state
.global_committed_tail()
.get()
.to_owned(),
sequencer_id: self.sequencer_shared_state.node_id,
}
}

async fn enqueue_batch(
&mut self,
permit: OwnedSemaphorePermit,
records: Arc<[Record]>,
) -> Result<LogletCommit, SequencerError> {
if self
.sequencer_shared_state
.global_committed_tail()
.is_sealed()
{
// todo: (question) do we return a sealed loglet commit, or error.
return Ok(LogletCommit::sealed());
}

let next_write_offset = records.last_offset(self.next_write_offset)?.next();

let (loglet_commit, commit_resolver) = LogletCommit::deferred();

let appender = Appender::new(
Arc::clone(&self.sequencer_shared_state),
self.log_server_manager.clone(),
self.rpc_router.clone(),
self.metadata.clone(),
self.next_write_offset,
records,
permit,
commit_resolver,
);

task_center().spawn(TaskKind::BifrostAppender, "appender", None, appender.run())?;
self.next_write_offset = next_write_offset;

Ok(loglet_commit)
}
}

trait BatchExt {
/// tail computes inflight tail after this batch is committed
fn last_offset(&self, first_offset: LogletOffset) -> Result<LogletOffset, SequencerError>;
Expand Down

0 comments on commit 0fc55ac

Please sign in to comment.