Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(request-response): deprecate Config::set_connection_keep_alive #4029

Merged
merged 16 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion protocols/perf/src/bin/perf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,8 @@ async fn swarm<B: NetworkBehaviour + Default>() -> Result<Swarm<B>> {
Default::default(),
local_peer_id,
Config::with_tokio_executor()
.with_substream_upgrade_protocol_override(upgrade::Version::V1Lazy),
.with_substream_upgrade_protocol_override(upgrade::Version::V1Lazy)
.with_idle_connection_timeout(Duration::from_secs(60 * 5)),
);

Ok(swarm)
Expand Down
1 change: 0 additions & 1 deletion protocols/perf/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ pub struct Behaviour {
impl Default for Behaviour {
fn default() -> Self {
let mut req_resp_config = request_response::Config::default();
req_resp_config.set_connection_keep_alive(Duration::from_secs(60 * 5));
req_resp_config.set_request_timeout(Duration::from_secs(60 * 5));
Self {
connected: Default::default(),
Expand Down
1 change: 0 additions & 1 deletion protocols/perf/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ pub struct Behaviour {
impl Default for Behaviour {
fn default() -> Self {
let mut req_resp_config = request_response::Config::default();
req_resp_config.set_connection_keep_alive(Duration::from_secs(60 * 5));
req_resp_config.set_request_timeout(Duration::from_secs(60 * 5));

Self {
Expand Down
3 changes: 3 additions & 0 deletions protocols/request-response/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 0.25.2 - unreleased

- Deprecate `request_response::Config::set_connection_keep_alive` in favor of `SwarmBuilder::idle_connection_timeout`.
See [PR 4029](https://github.com/libp2p/rust-libp2p/pull/4029).

<!-- Internal changes

- Allow deprecated usage of `KeepAlive::Until`
Expand Down
109 changes: 58 additions & 51 deletions protocols/request-response/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ use handler::Handler;
use libp2p_core::{ConnectedPoint, Endpoint, Multiaddr};
use libp2p_identity::PeerId;
use libp2p_swarm::{
behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm},
behaviour::{AddressChange, ConnectionClosed, DialFailure, FromSwarm},
dial_opts::DialOpts,
ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler, PollParameters, THandler,
THandlerInEvent, THandlerOutEvent, ToSwarm,
ConnectionDenied, ConnectionHandler, ConnectionId, NetworkBehaviour, NotifyHandler,
PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
};
use smallvec::SmallVec;
use std::{
Expand Down Expand Up @@ -298,6 +298,9 @@ impl Default for Config {

impl Config {
/// Sets the keep-alive timeout of idle connections.
#[deprecated(
note = "Set a global idle connection timeout via `SwarmBuilder::idle_connection_timeout` instead."
)]
pub fn set_connection_keep_alive(&mut self, v: Duration) -> &mut Self {
self.connection_keep_alive = v;
self
Expand Down Expand Up @@ -605,36 +608,7 @@ where
.iter_mut()
.find(|c| c.id == connection_id)
.expect("Address change can only happen on an established connection.");
connection.address = new_address;
}

fn on_connection_established(
&mut self,
ConnectionEstablished {
peer_id,
connection_id,
endpoint,
other_established,
..
}: ConnectionEstablished,
) {
let address = match endpoint {
ConnectedPoint::Dialer { address, .. } => Some(address.clone()),
ConnectedPoint::Listener { .. } => None,
};
self.connected
.entry(peer_id)
.or_default()
.push(Connection::new(connection_id, address));

if other_established == 0 {
if let Some(pending) = self.pending_outbound_requests.remove(&peer_id) {
for request in pending {
let request = self.try_send_request(&peer_id, request);
assert!(request.is_none());
}
}
}
connection.remote_address = new_address;
}

fn on_connection_closed(
Expand Down Expand Up @@ -701,6 +675,28 @@ where
}
}
}

/// Preloads a new [`Handler`] with requests that are waiting to be sent to the newly connected peer.
fn preload_new_handler(
&mut self,
handler: &mut Handler<TCodec>,
peer: PeerId,
connection_id: ConnectionId,
remote_address: Option<Multiaddr>,
) {
let mut connection = Connection::new(connection_id, remote_address);

if let Some(pending_requests) = self.pending_outbound_requests.remove(&peer) {
for request in pending_requests {
connection
.pending_inbound_responses
.insert(request.request_id);
handler.on_behaviour_event(request);
}
}

self.connected.entry(peer).or_default().push(connection);
}
}

impl<TCodec> NetworkBehaviour for Behaviour<TCodec>
Expand All @@ -712,18 +708,22 @@ where

fn handle_established_inbound_connection(
&mut self,
_: ConnectionId,
_: PeerId,
connection_id: ConnectionId,
peer: PeerId,
_: &Multiaddr,
_: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(Handler::new(
let mut handler = Handler::new(
self.inbound_protocols.clone(),
self.codec.clone(),
self.config.connection_keep_alive,
self.config.request_timeout,
self.config.connection_keep_alive,
self.next_inbound_id.clone(),
))
);

self.preload_new_handler(&mut handler, peer, connection_id, None);

Ok(handler)
}

fn handle_pending_outbound_connection(
Expand All @@ -740,7 +740,7 @@ where

let mut addresses = Vec::new();
if let Some(connections) = self.connected.get(&peer) {
addresses.extend(connections.iter().filter_map(|c| c.address.clone()))
addresses.extend(connections.iter().filter_map(|c| c.remote_address.clone()))
}
if let Some(more) = self.addresses.get(&peer) {
addresses.extend(more.into_iter().cloned());
Expand All @@ -751,25 +751,32 @@ where

fn handle_established_outbound_connection(
&mut self,
_: ConnectionId,
_: PeerId,
_: &Multiaddr,
connection_id: ConnectionId,
peer: PeerId,
remote_address: &Multiaddr,
_: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(Handler::new(
let mut handler = Handler::new(
self.inbound_protocols.clone(),
self.codec.clone(),
self.config.connection_keep_alive,
self.config.request_timeout,
self.config.connection_keep_alive,
self.next_inbound_id.clone(),
))
);

self.preload_new_handler(
&mut handler,
peer,
connection_id,
Some(remote_address.clone()),
);

Ok(handler)
}

fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
match event {
FromSwarm::ConnectionEstablished(connection_established) => {
self.on_connection_established(connection_established)
}
FromSwarm::ConnectionEstablished(_) => {}
FromSwarm::ConnectionClosed(connection_closed) => {
self.on_connection_closed(connection_closed)
}
Expand Down Expand Up @@ -924,7 +931,7 @@ const EMPTY_QUEUE_SHRINK_THRESHOLD: usize = 100;
/// Internal information tracked for an established connection.
struct Connection {
id: ConnectionId,
address: Option<Multiaddr>,
remote_address: Option<Multiaddr>,
/// Pending outbound responses where corresponding inbound requests have
/// been received on this connection and emitted via `poll` but have not yet
/// been answered.
Expand All @@ -935,10 +942,10 @@ struct Connection {
}

impl Connection {
fn new(id: ConnectionId, address: Option<Multiaddr>) -> Self {
fn new(id: ConnectionId, remote_address: Option<Multiaddr>) -> Self {
Self {
id,
address,
remote_address,
pending_outbound_responses: Default::default(),
pending_inbound_responses: Default::default(),
}
Expand Down
Loading