diff --git a/src/index.ts b/src/index.ts index 78422347..4effb4e8 100644 --- a/src/index.ts +++ b/src/index.ts @@ -71,7 +71,8 @@ import type { SubscriptionChangeData, TopicValidatorFn, Logger, - ComponentLogger + ComponentLogger, + Topology } from '@libp2p/interface' import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager' import type { IncomingStreamData, Registrar } from '@libp2p/interface-internal/registrar' @@ -157,6 +158,15 @@ export interface GossipsubOpts extends GossipsubOptsSpec, PubSubInit { */ maxOutboundStreams?: number + /** + * Pass true to run on transient connections - data or time-limited + * connections that may be closed at any time such as circuit relay + * connections. + * + * @default false + */ + runOnTransientConnection?: boolean + /** * Specify max buffer size in bytes for OutboundStream. * If full it will throw and reject sending any more data. @@ -382,6 +392,7 @@ export class GossipSub extends TypedEventEmitter implements Pub private status: GossipStatus = { code: GossipStatusCode.stopped } private readonly maxInboundStreams?: number private readonly maxOutboundStreams?: number + private readonly runOnTransientConnection?: boolean private readonly allowedTopics: Set | null private heartbeatTimer: { @@ -513,6 +524,7 @@ export class GossipSub extends TypedEventEmitter implements Pub this.maxInboundStreams = options.maxInboundStreams this.maxOutboundStreams = options.maxOutboundStreams + this.runOnTransientConnection = options.runOnTransientConnection this.allowedTopics = (opts.allowedTopics != null) ? new Set(opts.allowedTopics) : null } @@ -566,7 +578,8 @@ export class GossipSub extends TypedEventEmitter implements Pub this.multicodecs.map(async (multicodec) => registrar.handle(multicodec, this.onIncomingStream.bind(this), { maxInboundStreams: this.maxInboundStreams, - maxOutboundStreams: this.maxOutboundStreams + maxOutboundStreams: this.maxOutboundStreams, + runOnTransientConnection: this.runOnTransientConnection }) ) ) @@ -590,9 +603,10 @@ export class GossipSub extends TypedEventEmitter implements Pub // register protocol with topology // Topology callbacks called on connection manager changes - const topology = { + const topology: Topology = { onConnect: this.onPeerConnected.bind(this), - onDisconnect: this.onPeerDisconnected.bind(this) + onDisconnect: this.onPeerDisconnected.bind(this), + notifyOnTransient: this.runOnTransientConnection } const registrarTopologyIds = await Promise.all( this.multicodecs.map(async (multicodec) => registrar.register(multicodec, topology)) @@ -752,7 +766,9 @@ export class GossipSub extends TypedEventEmitter implements Pub try { const stream = new OutboundStream( - await connection.newStream(this.multicodecs), + await connection.newStream(this.multicodecs, { + runOnTransientConnection: this.runOnTransientConnection + }), (e) => { this.log.error('outbound pipe error', e) }, { maxBufferSize: this.opts.maxOutboundBufferSize } )