Skip to content

Commit

Permalink
Reduce noise on peer disconnect (#1963)
Browse files Browse the repository at this point in the history
When remote clients disconnect, Restate sometimes logs a full backtrace for
essentially benign "connection reset by peer" errors. With this change we
silence this behavior, limiting it to a debug-level log only if Hyper reports an
incomplete message.
  • Loading branch information
pcholakov authored Sep 19, 2024
1 parent faff5b9 commit 06e0e04
Showing 1 changed file with 24 additions and 13 deletions.
37 changes: 24 additions & 13 deletions crates/core/src/network/net_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,26 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::{cancellation_watcher, task_center, ShutdownError, TaskCenter, TaskKind};
use http::Uri;
use hyper::body::{Body, Incoming};
use hyper::rt::{Read, Write};
use hyper_util::rt::TokioIo;
use restate_types::errors::GenericError;
use restate_types::net::{AdvertisedAddress, BindAddress};
use std::fmt::Debug;
use std::future::Future;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::time::Duration;

use crate::{cancellation_watcher, task_center, ShutdownError, TaskCenter, TaskKind};
use http::Uri;
use hyper::body::{Body, Incoming};
use hyper::rt::{Read, Write};
use hyper_util::rt::TokioIo;
use tokio::io;
use tokio::net::{TcpListener, UnixListener, UnixStream};
use tokio_util::net::Listener;
use tonic::transport::{Channel, Endpoint};
use tracing::{debug, info, instrument, Span};

use restate_types::errors::GenericError;
use restate_types::net::{AdvertisedAddress, BindAddress};

pub fn create_tonic_channel_from_advertised_address(
address: AdvertisedAddress,
) -> Result<Channel, http::Error> {
Expand Down Expand Up @@ -139,7 +141,7 @@ async fn run_listener_loop<L, S, B>(
where
L: Listener,
L::Io: Send + Unpin + 'static,
L::Addr: Debug,
L::Addr: Send + Debug + 'static,
S: hyper::service::Service<http::Request<Incoming>, Response = hyper::Response<B>>
+ Send
+ Clone
Expand All @@ -162,13 +164,13 @@ where
incoming_connection = listener.accept() => {
let (stream, remote_addr) = incoming_connection?;
let io = TokioIo::new(stream);

debug!("Accepting incoming connection from '{remote_addr:?}'.");
debug!(?remote_addr, "Accepting incoming connection");

tc.spawn_child(TaskKind::RpcConnection, server_name, None, handle_connection(
io,
service.clone(),
executor.clone(),
remote_addr,
))?;
}
}
Expand All @@ -177,10 +179,11 @@ where
Ok(())
}

async fn handle_connection<S, B, I>(
async fn handle_connection<S, B, I, A>(
io: I,
service: S,
executor: TaskCenterExecutor,
remote_addr: A,
) -> anyhow::Result<()>
where
S: hyper::service::Service<http::Request<Incoming>, Response = hyper::Response<B>>
Expand All @@ -193,14 +196,22 @@ where
B::Data: Send,
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
I: Read + Write + Unpin + 'static,
A: Send + Debug,
{
let builder = hyper_util::server::conn::auto::Builder::new(executor);
let connection = builder.serve_connection(io, service);

tokio::select! {
res = connection => {
// propagate errors
res.map_err(Error::HandlingConnection)?;
if let Err(e) = res {
if let Some(hyper_error) = e.downcast_ref::<hyper::Error>() {
if hyper_error.is_incomplete_message() {
debug!(?remote_addr, "Connection closed before request completed");
}
} else {
anyhow::bail!(Error::HandlingConnection(e));
}
}
},
_ = cancellation_watcher() => {}
}
Expand Down

0 comments on commit 06e0e04

Please sign in to comment.