Skip to content

Commit

Permalink
server: move SQL engine setup into serve_sql()
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Apr 7, 2024
1 parent b65f2a7 commit 03c44d9
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 11 deletions.
2 changes: 1 addition & 1 deletion src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ mod state;
pub use self::log::{Entry, Index, Log};
pub use message::{Address, Event, Message, ReadSequence, Request, RequestID, Response, Status};
pub use node::{Node, NodeID, Term};
pub use server::Server;
pub use server::{ClientSender, Server};
pub use state::State;
12 changes: 6 additions & 6 deletions src/raft/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ use tokio_stream::StreamExt as _;
use tokio_util::codec::{Framed, LengthDelimitedCodec};
use uuid::Uuid;

// TODO: simplify these types.
pub type ClientSender = mpsc::UnboundedSender<(Request, oneshot::Sender<Result<Response>>)>;
pub type ClientReceiver = mpsc::UnboundedReceiver<(Request, oneshot::Sender<Result<Response>>)>;

/// The interval between Raft ticks, the unit of time for e.g. heartbeats and
/// elections.
const TICK_INTERVAL: Duration = Duration::from_millis(100);
Expand Down Expand Up @@ -40,11 +44,7 @@ impl Server {
}

/// Connects to peers and serves requests.
pub async fn serve(
self,
listener: TcpListener,
client_rx: mpsc::UnboundedReceiver<(Request, oneshot::Sender<Result<Response>>)>,
) -> Result<()> {
pub async fn serve(self, listener: TcpListener, client_rx: ClientReceiver) -> Result<()> {
let (tcp_in_tx, tcp_in_rx) = mpsc::unbounded_channel::<Message>();
let (tcp_out_tx, tcp_out_rx) = mpsc::unbounded_channel::<Message>();
let (task, tcp_receiver) = Self::tcp_receive(listener, tcp_in_tx).remote_handle();
Expand All @@ -64,7 +64,7 @@ impl Server {
async fn eventloop(
mut node: Node,
node_rx: mpsc::UnboundedReceiver<Message>,
client_rx: mpsc::UnboundedReceiver<(Request, oneshot::Sender<Result<Response>>)>,
client_rx: ClientReceiver,
tcp_rx: mpsc::UnboundedReceiver<Message>,
tcp_tx: mpsc::UnboundedSender<Message>,
) -> Result<()> {
Expand Down
7 changes: 3 additions & 4 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,20 @@ impl Server {
);

let (raft_tx, raft_rx) = mpsc::unbounded_channel();
let sql_engine = sql::engine::Raft::new(raft_tx);

tokio::try_join!(
self.raft.serve(raft_listener, raft_rx),
Self::serve_sql(sql_listener, sql_engine),
Self::serve_sql(sql_listener, raft_tx),
)?;
Ok(())
}

/// Serves SQL clients.
async fn serve_sql(listener: TcpListener, engine: sql::engine::Raft) -> Result<()> {
async fn serve_sql(listener: TcpListener, raft_tx: raft::ClientSender) -> Result<()> {
let mut listener = TcpListenerStream::new(listener);
while let Some(socket) = listener.try_next().await? {
let peer = socket.peer_addr()?;
let session = Session::new(engine.clone());
let session = Session::new(sql::engine::Raft::new(raft_tx.clone()));
tokio::spawn(async move {
info!("Client {} connected", peer);
match session.handle(socket).await {
Expand Down

0 comments on commit 03c44d9

Please sign in to comment.