Skip to content

Commit

Permalink
fix(identify): only report observed address once per connection
Browse files Browse the repository at this point in the history
At the moment, `libp2p-identify` reports an observed address repeatedly as a **new** external address candidate even if it is the same address _from the same connection_. Unless the underlying transport supports roaming, a connection does not change its observed address.

We change the behaviour of `libp2p-identify` to remember the observed address for a particular connection and not re-emit the `NewExternalAddrCandidate` event for it. This allows users to probabilistically promote a candidate to an external address based on its report frequency. If an address is reported twice, it means we have two connections where the remote observed this address. Chances are, we have port-reuse enabled for this connection and it might thus be dialable or at least a good candidate for hole-punching.

Related: #4688.

Pull-Request: #4721.
  • Loading branch information
thomaseizinger authored Oct 31, 2023
1 parent ec15717 commit 57c2100
Show file tree
Hide file tree
Showing 9 changed files with 289 additions and 107 deletions.
2 changes: 2 additions & 0 deletions misc/metrics/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## 0.14.0 - unreleased

- Add metrics for `SwarmEvent::{NewExternalAddrCandidate,ExternalAddrConfirmed,ExternalAddrExpired}`.
See [PR 4721](https://github.com/libp2p/rust-libp2p/pull/4721).

## 0.13.1

Expand Down
51 changes: 50 additions & 1 deletion misc/metrics/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::{Arc, Mutex};

use crate::protocol_stack;
use instant::Instant;
use libp2p_swarm::ConnectionId;
use libp2p_swarm::{ConnectionId, SwarmEvent};
use prometheus_client::encoding::{EncodeLabelSet, EncodeLabelValue};
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
Expand All @@ -41,6 +41,10 @@ pub(crate) struct Metrics {
new_listen_addr: Family<AddressLabels, Counter>,
expired_listen_addr: Family<AddressLabels, Counter>,

external_addr_candidates: Family<AddressLabels, Counter>,
external_addr_confirmed: Family<AddressLabels, Counter>,
external_addr_expired: Family<AddressLabels, Counter>,

listener_closed: Family<AddressLabels, Counter>,
listener_error: Counter,

Expand Down Expand Up @@ -82,6 +86,27 @@ impl Metrics {
expired_listen_addr.clone(),
);

let external_addr_candidates = Family::default();
sub_registry.register(
"external_addr_candidates",
"Number of new external address candidates",
external_addr_candidates.clone(),
);

let external_addr_confirmed = Family::default();
sub_registry.register(
"external_addr_confirmed",
"Number of confirmed external addresses",
external_addr_confirmed.clone(),
);

let external_addr_expired = Family::default();
sub_registry.register(
"external_addr_expired",
"Number of expired external addresses",
external_addr_expired.clone(),
);

let listener_closed = Family::default();
sub_registry.register(
"listener_closed",
Expand Down Expand Up @@ -146,6 +171,9 @@ impl Metrics {
connections_established,
new_listen_addr,
expired_listen_addr,
external_addr_candidates,
external_addr_confirmed,
external_addr_expired,
listener_closed,
listener_error,
dial_attempt,
Expand Down Expand Up @@ -296,6 +324,27 @@ impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleE
libp2p_swarm::SwarmEvent::Dialing { .. } => {
self.dial_attempt.inc();
}
SwarmEvent::NewExternalAddrCandidate { address } => {
self.external_addr_candidates
.get_or_create(&AddressLabels {
protocols: protocol_stack::as_string(address),
})
.inc();
}
SwarmEvent::ExternalAddrConfirmed { address } => {
self.external_addr_confirmed
.get_or_create(&AddressLabels {
protocols: protocol_stack::as_string(address),
})
.inc();
}
SwarmEvent::ExternalAddrExpired { address } => {
self.external_addr_expired
.get_or_create(&AddressLabels {
protocols: protocol_stack::as_string(address),
})
.inc();
}
}
}
}
Expand Down
10 changes: 3 additions & 7 deletions protocols/dcutr/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,8 @@ async fn connect() {

src.dial_and_wait(dst_relayed_addr.clone()).await;

loop {
match src
.next_swarm_event()
.await
.try_into_behaviour_event()
.unwrap()
{
while let Ok(event) = src.next_swarm_event().await.try_into_behaviour_event() {
match event {
ClientEvent::Dcutr(dcutr::Event::RemoteInitiatedDirectConnectionUpgrade {
remote_peer_id,
remote_relayed_addr,
Expand Down Expand Up @@ -215,6 +210,7 @@ async fn wait_for_reservation(
addr_observed = true;
}
SwarmEvent::Behaviour(ClientEvent::Identify(_)) => {}
SwarmEvent::NewExternalAddrCandidate { .. } => {}
e => panic!("{e:?}"),
}
}
Expand Down
4 changes: 4 additions & 0 deletions protocols/identify/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
- Remove deprecated `initial_delay`.
Identify requests are always sent instantly after the connection has been established.
See [PR 4735](https://github.com/libp2p/rust-libp2p/pull/4735)
- Don't repeatedly report the same observed address as a `NewExternalAddrCandidate`.
Instead, only report each observed address once per connection.
This allows users to probabilistically deem an address as external if it gets reported as a candidate repeatedly.
See [PR 4721](https://github.com/libp2p/rust-libp2p/pull/4721).

## 0.43.1

Expand Down
33 changes: 30 additions & 3 deletions protocols/identify/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use libp2p_swarm::{
};
use libp2p_swarm::{ConnectionId, THandler, THandlerOutEvent};
use lru::LruCache;
use std::collections::hash_map::Entry;
use std::num::NonZeroUsize;
use std::{
collections::{HashMap, HashSet, VecDeque},
Expand All @@ -48,6 +49,10 @@ pub struct Behaviour {
config: Config,
/// For each peer we're connected to, the observed address to send back to it.
connected: HashMap<PeerId, HashMap<ConnectionId, Multiaddr>>,

/// The address a remote observed for us.
our_observed_addresses: HashMap<ConnectionId, Multiaddr>,

/// Pending events to be emitted when polled.
events: VecDeque<ToSwarm<Event, InEvent>>,
/// The addresses of all peers that we have discovered.
Expand Down Expand Up @@ -148,6 +153,7 @@ impl Behaviour {
Self {
config,
connected: HashMap::new(),
our_observed_addresses: Default::default(),
events: VecDeque::new(),
discovered_peers,
listen_addresses: Default::default(),
Expand Down Expand Up @@ -253,7 +259,7 @@ impl NetworkBehaviour for Behaviour {
fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
_: ConnectionId,
id: ConnectionId,
event: THandlerOutEvent<Self>,
) {
match event {
Expand All @@ -269,8 +275,27 @@ impl NetworkBehaviour for Behaviour {
let observed = info.observed_addr.clone();
self.events
.push_back(ToSwarm::GenerateEvent(Event::Received { peer_id, info }));
self.events
.push_back(ToSwarm::NewExternalAddrCandidate(observed));

match self.our_observed_addresses.entry(id) {
Entry::Vacant(not_yet_observed) => {
not_yet_observed.insert(observed.clone());
self.events
.push_back(ToSwarm::NewExternalAddrCandidate(observed));
}
Entry::Occupied(already_observed) if already_observed.get() == &observed => {
// No-op, we already observed this address.
}
Entry::Occupied(mut already_observed) => {
log::info!(
"Our observed address on connection {id} changed from {} to {observed}",
already_observed.get()
);

*already_observed.get_mut() = observed.clone();
self.events
.push_back(ToSwarm::NewExternalAddrCandidate(observed));
}
}
}
handler::Event::Identification => {
self.events
Expand Down Expand Up @@ -356,6 +381,8 @@ impl NetworkBehaviour for Behaviour {
} else if let Some(addrs) = self.connected.get_mut(&peer_id) {
addrs.remove(&connection_id);
}

self.our_observed_addresses.remove(&connection_id);
}
FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => {
if let Some(entry) = peer_id.and_then(|id| self.discovered_peers.get_mut(&id)) {
Expand Down
71 changes: 71 additions & 0 deletions protocols/identify/tests/smoke.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use futures::StreamExt;
use libp2p_core::multiaddr::Protocol;
use libp2p_identify as identify;
use libp2p_swarm::{Swarm, SwarmEvent};
use libp2p_swarm_test::SwarmExt;
use std::collections::HashSet;
use std::iter;
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -79,6 +81,75 @@ async fn periodic_identify() {
other => panic!("Unexpected events: {other:?}"),
}
}
#[async_std::test]
async fn only_emits_address_candidate_once_per_connection() {
let _ = env_logger::try_init();

let mut swarm1 = Swarm::new_ephemeral(|identity| {
identify::Behaviour::new(
identify::Config::new("a".to_string(), identity.public())
.with_agent_version("b".to_string())
.with_interval(Duration::from_secs(1)),
)
});
let mut swarm2 = Swarm::new_ephemeral(|identity| {
identify::Behaviour::new(
identify::Config::new("c".to_string(), identity.public())
.with_agent_version("d".to_string()),
)
});

swarm2.listen().with_memory_addr_external().await;
swarm1.connect(&mut swarm2).await;

async_std::task::spawn(swarm2.loop_on_next());

let swarm_events = futures::stream::poll_fn(|cx| swarm1.poll_next_unpin(cx))
.take(5)
.collect::<Vec<_>>()
.await;

let infos = swarm_events
.iter()
.filter_map(|e| match e {
SwarmEvent::Behaviour(identify::Event::Received { info, .. }) => Some(info.clone()),
_ => None,
})
.collect::<Vec<_>>();

assert!(
infos.len() > 1,
"should exchange identify payload more than once"
);

let varying_observed_addresses = infos
.iter()
.map(|i| i.observed_addr.clone())
.collect::<HashSet<_>>();
assert_eq!(
varying_observed_addresses.len(),
1,
"Observed address should not vary on persistent connection"
);

let external_address_candidates = swarm_events
.iter()
.filter_map(|e| match e {
SwarmEvent::NewExternalAddrCandidate { address } => Some(address.clone()),
_ => None,
})
.collect::<Vec<_>>();

assert_eq!(
external_address_candidates.len(),
1,
"To only have one external address candidate"
);
assert_eq!(
&external_address_candidates[0],
varying_observed_addresses.iter().next().unwrap()
);
}

#[async_std::test]
async fn identify_push() {
Expand Down
2 changes: 2 additions & 0 deletions swarm/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
See [PR 4715](https://github.com/libp2p/rust-libp2p/pull/4715).
- Log `PeerId` of `Swarm` even when constructed with new `SwarmBuilder`.
See [PR 4671](https://github.com/libp2p/rust-libp2p/pull/4671).
- Add `SwarmEvent::{NewExternalAddrCandidate,ExternalAddrConfirmed,ExternalAddrExpired}` variants.
See [PR 4721](https://github.com/libp2p/rust-libp2p/pull/4721).
- Remove deprecated symbols.
See [PR 4737](https://github.com/libp2p/rust-libp2p/pull/4737).

Expand Down
9 changes: 7 additions & 2 deletions swarm/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,15 +261,20 @@ pub enum ToSwarm<TOutEvent, TInEvent> {
event: TInEvent,
},

/// Reports a new candidate for an external address to the [`Swarm`](crate::Swarm).
/// Reports a **new** candidate for an external address to the [`Swarm`](crate::Swarm).
///
/// The emphasis on a **new** candidate is important.
/// Protocols MUST take care to only emit a candidate once per "source".
/// For example, the observed address of a TCP connection does not change throughout its lifetime.
/// Thus, only one candidate should be emitted per connection.
///
/// This makes the report frequency of an address a meaningful data-point for consumers of this event.
/// This address will be shared with all [`NetworkBehaviour`]s via [`FromSwarm::NewExternalAddrCandidate`].
///
/// This address could come from a variety of sources:
/// - A protocol such as identify obtained it from a remote.
/// - The user provided it based on configuration.
/// - We made an educated guess based on one of our listen addresses.
/// - We established a new relay connection.
NewExternalAddrCandidate(Multiaddr),

/// Indicates to the [`Swarm`](crate::Swarm) that the provided address is confirmed to be externally reachable.
Expand Down
Loading

0 comments on commit 57c2100

Please sign in to comment.