Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(swarm): expose ConnectionId and add conn duration metric #3927

Merged
merged 10 commits into from
May 17, 2023
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions misc/metrics/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
- Raise MSRV to 1.65.
See [PR 3715].

- Replace `libp2p_swarm_connections_closed` `Counter` with `libp2p_swarm_connections_duration` `Histogram` which additionally tracks the duration of a connection.
Note that you can use the `_count` metric of the `Histogram` as a replacement for the `Counter`.
See [PR XXX].

[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715
[PR XXX]: https://github.com/libp2p/rust-libp2p/pull/XXX
mxinden marked this conversation as resolved.
Show resolved Hide resolved

## 0.12.0

Expand Down
1 change: 1 addition & 0 deletions misc/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ relay = ["libp2p-relay"]
dcutr = ["libp2p-dcutr"]

[dependencies]
instant = "0.1.11"
libp2p-core = { workspace = true }
libp2p-dcutr = { workspace = true, optional = true }
libp2p-identify = { workspace = true, optional = true }
Expand Down
108 changes: 78 additions & 30 deletions misc/metrics/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,25 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use std::collections::HashMap;
use std::sync::{Arc, Mutex};

use crate::protocol_stack;
use instant::Instant;
use libp2p_swarm::ConnectionId;
use prometheus_client::encoding::{EncodeLabelSet, EncodeLabelValue};
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::histogram::{exponential_buckets, Histogram};
use prometheus_client::registry::Registry;
use prometheus_client::registry::{Registry, Unit};

pub(crate) struct Metrics {
connections_incoming: Family<AddressLabels, Counter>,
connections_incoming_error: Family<IncomingConnectionErrorLabels, Counter>,

connections_established: Family<ConnectionEstablishedLabels, Counter>,
connections_establishment_duration: Family<ConnectionEstablishmentDurationLabels, Histogram>,
connections_closed: Family<ConnectionClosedLabels, Counter>,
connections_established: Family<ConnectionLabels, Counter>,
connections_establishment_duration: Family<ConnectionLabels, Histogram>,
connections_duration: Family<ConnectionClosedLabels, Histogram>,

new_listen_addr: Family<AddressLabels, Counter>,
expired_listen_addr: Family<AddressLabels, Counter>,
Expand All @@ -41,6 +46,8 @@ pub(crate) struct Metrics {

dial_attempt: Counter,
outgoing_connection_error: Family<OutgoingConnectionErrorLabels, Counter>,

connections: Arc<Mutex<HashMap<ConnectionId, Instant>>>,
}

impl Metrics {
Expand Down Expand Up @@ -110,34 +117,42 @@ impl Metrics {
connections_established.clone(),
);

let connections_closed = Family::default();
let connections_establishment_duration = {
let constructor: fn() -> Histogram =
|| Histogram::new(exponential_buckets(0.01, 1.5, 20));
Family::new_with_constructor(constructor)
};
sub_registry.register(
"connections_closed",
"Number of connections closed",
connections_closed.clone(),
"connections_establishment_duration",
"Time it took (locally) to establish connections",
connections_establishment_duration.clone(),
);

let connections_establishment_duration = Family::new_with_constructor(
create_connection_establishment_duration_histogram as fn() -> Histogram,
);
sub_registry.register(
let connections_duration = {
let constructor: fn() -> Histogram =
|| Histogram::new(exponential_buckets(0.01, 3.0, 20));
Family::new_with_constructor(constructor)
};
sub_registry.register_with_unit(
"connections_establishment_duration",
"Time it took (locally) to establish connections",
Unit::Seconds,
connections_establishment_duration.clone(),
);

Self {
connections_incoming,
connections_incoming_error,
connections_established,
connections_closed,
new_listen_addr,
expired_listen_addr,
listener_closed,
listener_error,
dial_attempt,
outgoing_connection_error,
connections_establishment_duration,
connections_duration,
connections: Default::default(),
}
}
}
Expand All @@ -149,24 +164,44 @@ impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleE
libp2p_swarm::SwarmEvent::ConnectionEstablished {
endpoint,
established_in: time_taken,
connection_id,
..
} => {
let labels = ConnectionEstablishedLabels {
let labels = ConnectionLabels {
role: endpoint.into(),
protocols: protocol_stack::as_string(endpoint.get_remote_address()),
};
self.connections_established.get_or_create(&labels).inc();
self.connections_establishment_duration
.get_or_create(&labels)
.observe(time_taken.as_secs_f64());
self.connections
.lock()
.expect("lock not to be poisoned")
.insert(*connection_id, Instant::now());
}
libp2p_swarm::SwarmEvent::ConnectionClosed { endpoint, .. } => {
self.connections_closed
.get_or_create(&ConnectionClosedLabels {
libp2p_swarm::SwarmEvent::ConnectionClosed {
endpoint,
connection_id,
cause,
..
} => {
let labels = ConnectionClosedLabels {
connection: ConnectionLabels {
role: endpoint.into(),
protocols: protocol_stack::as_string(endpoint.get_remote_address()),
})
.inc();
},
cause: cause.as_ref().expect("TODO remove see definition").into(),
};
self.connections_duration.get_or_create(&labels).observe(
self.connections
.lock()
.expect("lock not to be poisoned")
.remove(connection_id)
.expect("closed connection to previously be established")
.elapsed()
.as_secs_f64(),
);
}
libp2p_swarm::SwarmEvent::IncomingConnection { send_back_addr, .. } => {
self.connections_incoming
Expand All @@ -187,7 +222,7 @@ impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleE
})
.inc();
}
libp2p_swarm::SwarmEvent::OutgoingConnectionError { error, peer_id } => {
libp2p_swarm::SwarmEvent::OutgoingConnectionError { error, peer_id, .. } => {
let peer = match peer_id {
Some(_) => PeerStatus::Known,
None => PeerStatus::Unknown,
Expand Down Expand Up @@ -261,25 +296,42 @@ impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleE
libp2p_swarm::SwarmEvent::ListenerError { .. } => {
self.listener_error.inc();
}
libp2p_swarm::SwarmEvent::Dialing(_) => {
libp2p_swarm::SwarmEvent::Dialing { .. } => {
self.dial_attempt.inc();
}
}
}
}

#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
struct ConnectionEstablishedLabels {
struct ConnectionLabels {
role: Role,
protocols: String,
}

type ConnectionEstablishmentDurationLabels = ConnectionEstablishedLabels;

#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
struct ConnectionClosedLabels {
role: Role,
protocols: String,
// TODO: Should be Option<ConnectionError>. Needs https://github.com/prometheus/client_rust/pull/137
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we wait for this before merging here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oversight. Fixed with 9578315. Thanks for the catch Thomas!

cause: ConnectionError,
#[prometheus(flatten)]
connection: ConnectionLabels,
}

#[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Debug)]
enum ConnectionError {
Io,
KeepAliveTimeout,
Handler,
}

impl<E> From<&libp2p_swarm::ConnectionError<E>> for ConnectionError {
fn from(value: &libp2p_swarm::ConnectionError<E>) -> Self {
match value {
libp2p_swarm::ConnectionError::IO(_) => ConnectionError::Io,
libp2p_swarm::ConnectionError::KeepAliveTimeout => ConnectionError::KeepAliveTimeout,
libp2p_swarm::ConnectionError::Handler(_) => ConnectionError::Handler,
}
}
}

#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
Expand Down Expand Up @@ -359,7 +411,3 @@ impl From<&libp2p_swarm::ListenError> for IncomingConnectionError {
}
}
}

fn create_connection_establishment_duration_histogram() -> Histogram {
Histogram::new(exponential_buckets(0.01, 1.5, 20))
}
5 changes: 5 additions & 0 deletions swarm/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@
- Remove deprecated `NetworkBehaviourAction` type.
See [PR 3919].

- Expose `ConnectionId` on `SwarmEvent::{ConnectionEstablished,ConnectionClosed,IncomingConnection,IncomingConnectionError,OutgoingConnectionError,Dialing}`.
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
Also emit `SwarmEvent::Dialing` for dials with unknown `PeerId`.
See [PR XXX].
mxinden marked this conversation as resolved.
Show resolved Hide resolved

[PR 3605]: https://github.com/libp2p/rust-libp2p/pull/3605
[PR 3651]: https://github.com/libp2p/rust-libp2p/pull/3651
[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715
Expand All @@ -62,6 +66,7 @@
[PR 3886]: https://github.com/libp2p/rust-libp2p/pull/3886
[PR 3912]: https://github.com/libp2p/rust-libp2p/pull/3912
[PR 3919]: https://github.com/libp2p/rust-libp2p/pull/3919
[PR XXX]: https://github.com/libp2p/rust-libp2p/pull/XXX
mxinden marked this conversation as resolved.
Show resolved Hide resolved

## 0.42.2

Expand Down
34 changes: 30 additions & 4 deletions swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ pub enum SwarmEvent<TBehaviourOutEvent, THandlerErr> {
ConnectionEstablished {
/// Identity of the peer that we have connected to.
peer_id: PeerId,
/// Identifier of the connection.
connection_id: ConnectionId,
/// Endpoint of the connection that has been opened.
endpoint: ConnectedPoint,
/// Number of established connections to this peer, including the one that has just been
Expand All @@ -204,6 +206,8 @@ pub enum SwarmEvent<TBehaviourOutEvent, THandlerErr> {
ConnectionClosed {
/// Identity of the peer that we have connected to.
peer_id: PeerId,
/// Identifier of the connection.
connection_id: ConnectionId,
/// Endpoint of the connection that has been closed.
endpoint: ConnectedPoint,
/// Number of other remaining connections to this same peer.
Expand All @@ -218,6 +222,8 @@ pub enum SwarmEvent<TBehaviourOutEvent, THandlerErr> {
/// [`IncomingConnectionError`](SwarmEvent::IncomingConnectionError) event will later be
/// generated for this connection.
IncomingConnection {
/// Identifier of the connection.
connection_id: ConnectionId,
/// Local connection address.
/// This address has been earlier reported with a [`NewListenAddr`](SwarmEvent::NewListenAddr)
/// event.
Expand All @@ -230,6 +236,8 @@ pub enum SwarmEvent<TBehaviourOutEvent, THandlerErr> {
/// This can include, for example, an error during the handshake of the encryption layer, or
/// the connection unexpectedly closed.
IncomingConnectionError {
/// Identifier of the connection.
connection_id: ConnectionId,
/// Local connection address.
/// This address has been earlier reported with a [`NewListenAddr`](SwarmEvent::NewListenAddr)
/// event.
Expand All @@ -241,6 +249,8 @@ pub enum SwarmEvent<TBehaviourOutEvent, THandlerErr> {
},
/// An error happened on an outbound connection.
OutgoingConnectionError {
/// Identifier of the connection.
connection_id: ConnectionId,
/// If known, [`PeerId`] of the peer we tried to reach.
peer_id: Option<PeerId>,
/// Error that has been encountered.
Expand Down Expand Up @@ -286,7 +296,13 @@ pub enum SwarmEvent<TBehaviourOutEvent, THandlerErr> {
/// reported if the dialing attempt succeeds, otherwise a
/// [`OutgoingConnectionError`](SwarmEvent::OutgoingConnectionError) event
/// is reported.
Dialing(PeerId),
Dialing {
/// Identity of the peer that we are connecting to.
peer_id: Option<PeerId>,

/// Identifier of the connection.
connection_id: ConnectionId,
},
}

impl<TBehaviourOutEvent, THandlerErr> SwarmEvent<TBehaviourOutEvent, THandlerErr> {
Expand Down Expand Up @@ -773,6 +789,7 @@ where

return Some(SwarmEvent::OutgoingConnectionError {
peer_id: Some(peer_id),
connection_id: id,
error: dial_error,
});
}
Expand Down Expand Up @@ -801,6 +818,7 @@ where
));

return Some(SwarmEvent::IncomingConnectionError {
connection_id: id,
send_back_addr,
local_addr,
error: listen_error,
Expand Down Expand Up @@ -856,6 +874,7 @@ where
self.supported_protocols = supported_protocols;
return Some(SwarmEvent::ConnectionEstablished {
peer_id,
connection_id: id,
num_established,
endpoint,
concurrent_dial_errors,
Expand Down Expand Up @@ -884,6 +903,7 @@ where

return Some(SwarmEvent::OutgoingConnectionError {
peer_id: peer,
connection_id,
error,
});
}
Expand All @@ -904,6 +924,7 @@ where
connection_id: id,
}));
return Some(SwarmEvent::IncomingConnectionError {
connection_id: id,
local_addr,
send_back_addr,
error,
Expand Down Expand Up @@ -946,6 +967,7 @@ where
}));
return Some(SwarmEvent::ConnectionClosed {
peer_id,
connection_id: id,
endpoint,
cause: error,
num_established,
Expand Down Expand Up @@ -1008,6 +1030,7 @@ where
}));

return Some(SwarmEvent::IncomingConnectionError {
connection_id,
local_addr,
send_back_addr,
error: listen_error,
Expand All @@ -1025,6 +1048,7 @@ where
);

Some(SwarmEvent::IncomingConnection {
connection_id,
local_addr,
send_back_addr,
})
Expand Down Expand Up @@ -1111,10 +1135,12 @@ where
ToSwarm::GenerateEvent(event) => return Some(SwarmEvent::Behaviour(event)),
ToSwarm::Dial { opts } => {
let peer_id = opts.get_or_parse_peer_id();
let connection_id = opts.connection_id();
if let Ok(()) = self.dial(opts) {
if let Ok(Some(peer_id)) = peer_id {
return Some(SwarmEvent::Dialing(peer_id));
}
return Some(SwarmEvent::Dialing {
peer_id: peer_id.ok().flatten(),
connection_id,
});
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
}
}
ToSwarm::NotifyHandler {
Expand Down