From 986ff6c420026654179bc398ba139c87a2277ea7 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Mon, 29 Jan 2024 20:37:09 +0100 Subject: [PATCH] feat: add runOnTransientConnection option to pass to registrar (#485) fix: add runOnTransientConnection option to pass to registrar Adds an option to allow running gossipsub over transient (e.g. relayed) connections. By default this is not allowed since transient connections are by design data/time limited and breaching those limits can cause the connection to be closed abruptly, but sometimes people control the relay servers and disable the limits so allow them to use gossipsub over relayed connections if they choose. --- src/index.ts | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/src/index.ts b/src/index.ts index 78422347..4effb4e8 100644 --- a/src/index.ts +++ b/src/index.ts @@ -71,7 +71,8 @@ import type { SubscriptionChangeData, TopicValidatorFn, Logger, - ComponentLogger + ComponentLogger, + Topology } from '@libp2p/interface' import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager' import type { IncomingStreamData, Registrar } from '@libp2p/interface-internal/registrar' @@ -157,6 +158,15 @@ export interface GossipsubOpts extends GossipsubOptsSpec, PubSubInit { */ maxOutboundStreams?: number + /** + * Pass true to run on transient connections - data or time-limited + * connections that may be closed at any time such as circuit relay + * connections. + * + * @default false + */ + runOnTransientConnection?: boolean + /** * Specify max buffer size in bytes for OutboundStream. * If full it will throw and reject sending any more data. @@ -382,6 +392,7 @@ export class GossipSub extends TypedEventEmitter implements Pub private status: GossipStatus = { code: GossipStatusCode.stopped } private readonly maxInboundStreams?: number private readonly maxOutboundStreams?: number + private readonly runOnTransientConnection?: boolean private readonly allowedTopics: Set | null private heartbeatTimer: { @@ -513,6 +524,7 @@ export class GossipSub extends TypedEventEmitter implements Pub this.maxInboundStreams = options.maxInboundStreams this.maxOutboundStreams = options.maxOutboundStreams + this.runOnTransientConnection = options.runOnTransientConnection this.allowedTopics = (opts.allowedTopics != null) ? new Set(opts.allowedTopics) : null } @@ -566,7 +578,8 @@ export class GossipSub extends TypedEventEmitter implements Pub this.multicodecs.map(async (multicodec) => registrar.handle(multicodec, this.onIncomingStream.bind(this), { maxInboundStreams: this.maxInboundStreams, - maxOutboundStreams: this.maxOutboundStreams + maxOutboundStreams: this.maxOutboundStreams, + runOnTransientConnection: this.runOnTransientConnection }) ) ) @@ -590,9 +603,10 @@ export class GossipSub extends TypedEventEmitter implements Pub // register protocol with topology // Topology callbacks called on connection manager changes - const topology = { + const topology: Topology = { onConnect: this.onPeerConnected.bind(this), - onDisconnect: this.onPeerDisconnected.bind(this) + onDisconnect: this.onPeerDisconnected.bind(this), + notifyOnTransient: this.runOnTransientConnection } const registrarTopologyIds = await Promise.all( this.multicodecs.map(async (multicodec) => registrar.register(multicodec, topology)) @@ -752,7 +766,9 @@ export class GossipSub extends TypedEventEmitter implements Pub try { const stream = new OutboundStream( - await connection.newStream(this.multicodecs), + await connection.newStream(this.multicodecs, { + runOnTransientConnection: this.runOnTransientConnection + }), (e) => { this.log.error('outbound pipe error', e) }, { maxBufferSize: this.opts.maxOutboundBufferSize } )