diff --git a/crates/bifrost/src/loglet/mod.rs b/crates/bifrost/src/loglet/mod.rs index 76b129968..7d70cc03a 100644 --- a/crates/bifrost/src/loglet/mod.rs +++ b/crates/bifrost/src/loglet/mod.rs @@ -156,21 +156,40 @@ pub type SendableLogletReadStream = Pin>; #[allow(dead_code)] pub(crate) struct LogletCommitResolver { - tx: oneshot::Sender>, + tx: Option>>, } #[allow(dead_code)] impl LogletCommitResolver { - pub fn sealed(self) { - let _ = self.tx.send(Err(AppendError::Sealed)); + pub fn sealed(mut self) { + let _ = self + .tx + .take() + .expect("must be set") + .send(Err(AppendError::Sealed)); } - pub fn offset(self, offset: LogletOffset) { - let _ = self.tx.send(Ok(offset)); + pub fn offset(mut self, offset: LogletOffset) { + let _ = self.tx.take().expect("must be set").send(Ok(offset)); } - pub fn error(self, err: AppendError) { - let _ = self.tx.send(Err(err)); + pub fn error(mut self, err: AppendError) { + let _ = self.tx.take().expect("must be set").send(Err(err)); + } +} + +#[derive(Debug, Clone, Copy, thiserror::Error)] +#[error("Commit resolver was dropped")] +struct CommitCancelled; + +/// To avoid lost commit resolver, If a commit +/// resolver was ever dropped without being resolved +/// it will still resolve the commit as 'Cancelled' +impl Drop for LogletCommitResolver { + fn drop(&mut self) { + if let Some(tx) = self.tx.take() { + let _ = tx.send(Err(AppendError::retryable(CommitCancelled))); + } } } @@ -194,7 +213,7 @@ impl LogletCommit { #[allow(dead_code)] pub(crate) fn deferred() -> (Self, LogletCommitResolver) { let (tx, rx) = oneshot::channel(); - (Self { rx }, LogletCommitResolver { tx }) + (Self { rx }, LogletCommitResolver { tx: Some(tx) }) } } diff --git a/crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs b/crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs index 2c486da84..9f6cba282 100644 --- a/crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs +++ b/crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs @@ -17,8 +17,10 @@ use std::{ }; use tokio::sync::{mpsc, Mutex, OwnedSemaphorePermit, Semaphore}; +use tracing::{debug, instrument, Span}; use restate_core::{ + cancellation_watcher, network::{ rpc_router::{RpcRouter, RpcToken}, NetworkError, NetworkSendError, Networking, Outgoing, TransportConnect, WeakConnection, @@ -33,7 +35,6 @@ use restate_types::{ replicated_loglet::ReplicatedLogletParams, GenerationalNodeId, }; -use tracing::instrument; use super::rpc_routers::SequencersRpc; use crate::loglet::{ @@ -107,7 +108,7 @@ where } #[instrument( - level="trace", + level="debug", skip_all, fields( otel.name = "replicated_loglet::remote_sequencer: append", @@ -148,16 +149,14 @@ where Ok(token) => break token, Err(err) => { match err.source { - NetworkError::ConnectError(_) - | NetworkError::ConnectionClosed(_) - | NetworkError::Timeout(_) => { - // we retry to re-connect one time + err @ NetworkError::Full => return Err(err.into()), + _ => { + // we retry on any other network error connection = self.renew_connection(connection).await?; msg = err.original; continue; } - err => return Err(err.into()), } } }; @@ -170,6 +169,7 @@ where } /// Gets or starts a new remote sequencer connection + #[instrument(level = "debug", skip_all)] async fn get_connection(&self) -> Result { let mut guard = self.connection.lock().await; if let Some(connection) = guard.deref() { @@ -190,6 +190,7 @@ where /// Renew a connection to a remote sequencer. This guarantees that only a single connection /// to the sequencer is available. + #[instrument(level = "debug", skip_all, fields(renewed = false))] async fn renew_connection( &self, old: RemoteSequencerConnection, @@ -207,6 +208,8 @@ where .node_connection(self.params.sequencer) .await?; + Span::current().record("renewed", true); + let connection = RemoteSequencerConnection::start(self.known_global_tail.clone(), connection)?; @@ -232,6 +235,7 @@ struct RemoteSequencerConnection { } impl RemoteSequencerConnection { + #[instrument(level = "debug", name = "connection_start", skip_all)] fn start( known_global_tail: TailOffsetWatch, connection: WeakConnection, @@ -239,7 +243,7 @@ impl RemoteSequencerConnection { let (tx, rx) = mpsc::unbounded_channel(); task_center().spawn( - TaskKind::NetworkMessageHandler, + TaskKind::SequencerAppender, "remote-sequencer-connection", None, Self::handle_appended_responses(known_global_tail, connection.clone(), rx), @@ -260,6 +264,16 @@ impl RemoteSequencerConnection { sequencer: GenerationalNodeId, msg: Append, ) -> Result, NetworkSendError> { + // there are other reasons that can render this connection unusable + // even if the underlying connection is still valid. + // if the channel is closed, this connection cannot be used anymore + if self.tx.is_closed() { + return Err(NetworkSendError::new( + msg, + NetworkError::Unavailable("Inflight commits channel is closed".into()), + )); + } + let outgoing = Outgoing::new(sequencer, msg).assign_connection(self.inner.clone()); rpc_router @@ -283,6 +297,7 @@ impl RemoteSequencerConnection { if let Err(err) = self.tx.send(inflight_append) { // if we failed to push this to be processed by the connection reactor task // then we need to notify the caller + debug!("Inflight channel closed. Resolve commit as connection closed"); err.0 .commit_resolver .error(AppendError::retryable(NetworkError::ConnectionClosed( @@ -296,55 +311,84 @@ impl RemoteSequencerConnection { /// This task will run until the [`AppendStream`] is dropped. Once dropped /// all pending commits will be resolved with an error. it's up to the enqueuer /// to retry if needed. + #[instrument(level = "debug", skip_all)] async fn handle_appended_responses( known_global_tail: TailOffsetWatch, connection: WeakConnection, mut rx: mpsc::UnboundedReceiver, ) -> anyhow::Result<()> { let mut closed = std::pin::pin!(connection.closed()); + let cancelled = cancellation_watcher(); + + let termination_err = tokio::select! { + _ = &mut closed => { + AppendError::retryable(NetworkError::ConnectionClosed(connection.peer())) + } + _ = cancelled => { + AppendError::Shutdown(ShutdownError) + } + err = Self::handler_loop(known_global_tail, connection.peer(), &mut rx) => { + err + } + }; + + // close channel to stop any further appends calls on the same connection + rx.close(); + + // Drain and resolve ALL pending appends on this connection. + // + // todo(azmy): The order of the RemoteInflightAppend's on the channel + // does not necessary matches the actual append calls. This is + // since sending on the connection and pushing on the rx channel is not an atomic + // operation. Which means that, it's possible when we are draining + // the pending requests here that we also end up cancelling some inflight appends + // that has already received a positive response from the sequencer. + // + // For now this should not be a problem since they can (possibly) retry + // to do the write again later. + debug!(cause=%termination_err, "Draining inflight channel"); + let mut count = 0; + while let Some(inflight) = rx.recv().await { + inflight.commit_resolver.error(termination_err.clone()); + count += 1; + } + + debug!("Drained/Cancelled {count} inflight commits"); + + Ok(()) + } + async fn handler_loop( + known_global_tail: TailOffsetWatch, + peer: GenerationalNodeId, + rx: &mut mpsc::UnboundedReceiver, + ) -> AppendError { // handle all rpc tokens in an infinite loop // this loop only breaks when it encounters a terminal // AppendError. // When this happens, the receiver channel is closed // and drained. The same error is then used to resolve // all pending tokens - let err = loop { - let inflight = tokio::select! { - inflight = rx.recv() => { - inflight - } - _ = &mut closed => { - break AppendError::retryable(NetworkError::ConnectionClosed(connection.peer())); + loop { + let inflight = match rx.recv().await { + Some(inflight) => inflight, + None => { + return AppendError::retryable(NetworkError::ConnectionClosed(peer)); } }; - let Some(inflight) = inflight else { - // connection was dropped. - break AppendError::retryable(NetworkError::ConnectionClosed(connection.peer())); - }; - let RemoteInflightAppend { rpc_token, commit_resolver, permit: _permit, } = inflight; - let appended = tokio::select! { - incoming = rpc_token.recv() => { - incoming.map_err(AppendError::Shutdown) - }, - _ = &mut closed => { - Err(AppendError::retryable(NetworkError::ConnectionClosed(connection.peer()))) - } - }; - - let appended = match appended { + let appended = match rpc_token.recv().await { Ok(appended) => appended.into_body(), Err(err) => { // this can only be a terminal error (either shutdown or connection is closing) - commit_resolver.error(err.clone()); - break err; + commit_resolver.error(AppendError::Shutdown(err)); + return AppendError::Shutdown(err); } }; @@ -363,7 +407,7 @@ impl RemoteSequencerConnection { // A sealed status returns a terminal error since we can immediately cancel // all inflight append jobs. commit_resolver.sealed(); - break AppendError::Sealed; + return AppendError::Sealed; } SequencerStatus::UnknownLogId | SequencerStatus::UnknownSegmentIndex @@ -375,30 +419,11 @@ impl RemoteSequencerConnection { // While the UnknownLoglet status is non-terminal for the connection // (since only one request is bad), // the AppendError for the caller is terminal + debug!(error=%err, "Resolve commit with error"); commit_resolver.error(AppendError::other(err)); } } - }; - - // close channel to stop any further appends calls on the same connection - rx.close(); - - // Drain and resolve ALL pending appends on this connection. - // - // todo(azmy): The order of the RemoteInflightAppend's on the channel - // does not necessary matches the actual append calls. This is - // since sending on the connection and pushing on the rx channel is not an atomic - // operation. Which means that, it's possible when we are draining - // the pending requests here that we also end up cancelling some inflight appends - // that has already received a positive response from the sequencer. - // - // For now this should not be a problem since they can (possibly) retry - // to do the write again later. - while let Some(inflight) = rx.recv().await { - inflight.commit_resolver.error(err.clone()); } - - Ok(()) } }