Skip to content

Commit

Permalink
feat(request-response): deprecate Config::set_connection_keep_alive
Browse files Browse the repository at this point in the history
We deprecate the `set_connection_keep_alive` function in preparation for removing the `KeepAlive::Until` entirely. This is backwards-compatible.

Pull-Request: #4029.
  • Loading branch information
thomaseizinger authored Oct 18, 2023
1 parent c112703 commit 407dd42
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 54 deletions.
3 changes: 2 additions & 1 deletion protocols/perf/src/bin/perf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,8 @@ async fn swarm<B: NetworkBehaviour + Default>() -> Result<Swarm<B>> {
Default::default(),
local_peer_id,
Config::with_tokio_executor()
.with_substream_upgrade_protocol_override(upgrade::Version::V1Lazy),
.with_substream_upgrade_protocol_override(upgrade::Version::V1Lazy)
.with_idle_connection_timeout(Duration::from_secs(60 * 5)),
);

Ok(swarm)
Expand Down
1 change: 0 additions & 1 deletion protocols/perf/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ pub struct Behaviour {
impl Default for Behaviour {
fn default() -> Self {
let mut req_resp_config = request_response::Config::default();
req_resp_config.set_connection_keep_alive(Duration::from_secs(60 * 5));
req_resp_config.set_request_timeout(Duration::from_secs(60 * 5));
Self {
connected: Default::default(),
Expand Down
1 change: 0 additions & 1 deletion protocols/perf/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ pub struct Behaviour {
impl Default for Behaviour {
fn default() -> Self {
let mut req_resp_config = request_response::Config::default();
req_resp_config.set_connection_keep_alive(Duration::from_secs(60 * 5));
req_resp_config.set_request_timeout(Duration::from_secs(60 * 5));

Self {
Expand Down
3 changes: 3 additions & 0 deletions protocols/request-response/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 0.25.2 - unreleased

- Deprecate `request_response::Config::set_connection_keep_alive` in favor of `SwarmBuilder::idle_connection_timeout`.
See [PR 4029](https://github.com/libp2p/rust-libp2p/pull/4029).

<!-- Internal changes
- Allow deprecated usage of `KeepAlive::Until`
Expand Down
109 changes: 58 additions & 51 deletions protocols/request-response/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ use handler::Handler;
use libp2p_core::{ConnectedPoint, Endpoint, Multiaddr};
use libp2p_identity::PeerId;
use libp2p_swarm::{
behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm},
behaviour::{AddressChange, ConnectionClosed, DialFailure, FromSwarm},
dial_opts::DialOpts,
ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler, PollParameters, THandler,
THandlerInEvent, THandlerOutEvent, ToSwarm,
ConnectionDenied, ConnectionHandler, ConnectionId, NetworkBehaviour, NotifyHandler,
PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
};
use smallvec::SmallVec;
use std::{
Expand Down Expand Up @@ -298,6 +298,9 @@ impl Default for Config {

impl Config {
/// Sets the keep-alive timeout of idle connections.
#[deprecated(
note = "Set a global idle connection timeout via `SwarmBuilder::idle_connection_timeout` instead."
)]
pub fn set_connection_keep_alive(&mut self, v: Duration) -> &mut Self {
self.connection_keep_alive = v;
self
Expand Down Expand Up @@ -605,36 +608,7 @@ where
.iter_mut()
.find(|c| c.id == connection_id)
.expect("Address change can only happen on an established connection.");
connection.address = new_address;
}

fn on_connection_established(
&mut self,
ConnectionEstablished {
peer_id,
connection_id,
endpoint,
other_established,
..
}: ConnectionEstablished,
) {
let address = match endpoint {
ConnectedPoint::Dialer { address, .. } => Some(address.clone()),
ConnectedPoint::Listener { .. } => None,
};
self.connected
.entry(peer_id)
.or_default()
.push(Connection::new(connection_id, address));

if other_established == 0 {
if let Some(pending) = self.pending_outbound_requests.remove(&peer_id) {
for request in pending {
let request = self.try_send_request(&peer_id, request);
assert!(request.is_none());
}
}
}
connection.remote_address = new_address;
}

fn on_connection_closed(
Expand Down Expand Up @@ -701,6 +675,28 @@ where
}
}
}

/// Preloads a new [`Handler`] with requests that are waiting to be sent to the newly connected peer.
fn preload_new_handler(
&mut self,
handler: &mut Handler<TCodec>,
peer: PeerId,
connection_id: ConnectionId,
remote_address: Option<Multiaddr>,
) {
let mut connection = Connection::new(connection_id, remote_address);

if let Some(pending_requests) = self.pending_outbound_requests.remove(&peer) {
for request in pending_requests {
connection
.pending_inbound_responses
.insert(request.request_id);
handler.on_behaviour_event(request);
}
}

self.connected.entry(peer).or_default().push(connection);
}
}

impl<TCodec> NetworkBehaviour for Behaviour<TCodec>
Expand All @@ -712,18 +708,22 @@ where

fn handle_established_inbound_connection(
&mut self,
_: ConnectionId,
_: PeerId,
connection_id: ConnectionId,
peer: PeerId,
_: &Multiaddr,
_: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(Handler::new(
let mut handler = Handler::new(
self.inbound_protocols.clone(),
self.codec.clone(),
self.config.connection_keep_alive,
self.config.request_timeout,
self.config.connection_keep_alive,
self.next_inbound_id.clone(),
))
);

self.preload_new_handler(&mut handler, peer, connection_id, None);

Ok(handler)
}

fn handle_pending_outbound_connection(
Expand All @@ -740,7 +740,7 @@ where

let mut addresses = Vec::new();
if let Some(connections) = self.connected.get(&peer) {
addresses.extend(connections.iter().filter_map(|c| c.address.clone()))
addresses.extend(connections.iter().filter_map(|c| c.remote_address.clone()))
}
if let Some(more) = self.addresses.get(&peer) {
addresses.extend(more.into_iter().cloned());
Expand All @@ -751,25 +751,32 @@ where

fn handle_established_outbound_connection(
&mut self,
_: ConnectionId,
_: PeerId,
_: &Multiaddr,
connection_id: ConnectionId,
peer: PeerId,
remote_address: &Multiaddr,
_: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(Handler::new(
let mut handler = Handler::new(
self.inbound_protocols.clone(),
self.codec.clone(),
self.config.connection_keep_alive,
self.config.request_timeout,
self.config.connection_keep_alive,
self.next_inbound_id.clone(),
))
);

self.preload_new_handler(
&mut handler,
peer,
connection_id,
Some(remote_address.clone()),
);

Ok(handler)
}

fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
match event {
FromSwarm::ConnectionEstablished(connection_established) => {
self.on_connection_established(connection_established)
}
FromSwarm::ConnectionEstablished(_) => {}
FromSwarm::ConnectionClosed(connection_closed) => {
self.on_connection_closed(connection_closed)
}
Expand Down Expand Up @@ -924,7 +931,7 @@ const EMPTY_QUEUE_SHRINK_THRESHOLD: usize = 100;
/// Internal information tracked for an established connection.
struct Connection {
id: ConnectionId,
address: Option<Multiaddr>,
remote_address: Option<Multiaddr>,
/// Pending outbound responses where corresponding inbound requests have
/// been received on this connection and emitted via `poll` but have not yet
/// been answered.
Expand All @@ -935,10 +942,10 @@ struct Connection {
}

impl Connection {
fn new(id: ConnectionId, address: Option<Multiaddr>) -> Self {
fn new(id: ConnectionId, remote_address: Option<Multiaddr>) -> Self {
Self {
id,
address,
remote_address,
pending_outbound_responses: Default::default(),
pending_inbound_responses: Default::default(),
}
Expand Down

0 comments on commit 407dd42

Please sign in to comment.