Skip to content

Commit

Permalink
feat: add runOnTransientConnection option to pass to registrar (#485)
Browse files Browse the repository at this point in the history
fix: add runOnTransientConnection option to pass to registrar

Adds an option to allow running gossipsub over transient (e.g. relayed)
connections.

By default this is not allowed since transient connections are by
design data/time limited and breaching those limits can cause the
connection to be closed abruptly, but sometimes people control the
relay servers and disable the limits so allow them to use gossipsub
over relayed connections if they choose.
  • Loading branch information
achingbrain committed Jan 29, 2024
1 parent b77e6ca commit 986ff6c
Showing 1 changed file with 21 additions and 5 deletions.
26 changes: 21 additions & 5 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ import type {
SubscriptionChangeData,
TopicValidatorFn,
Logger,
ComponentLogger
ComponentLogger,
Topology
} from '@libp2p/interface'
import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager'
import type { IncomingStreamData, Registrar } from '@libp2p/interface-internal/registrar'
Expand Down Expand Up @@ -157,6 +158,15 @@ export interface GossipsubOpts extends GossipsubOptsSpec, PubSubInit {
*/
maxOutboundStreams?: number

/**
* Pass true to run on transient connections - data or time-limited
* connections that may be closed at any time such as circuit relay
* connections.
*
* @default false
*/
runOnTransientConnection?: boolean

/**
* Specify max buffer size in bytes for OutboundStream.
* If full it will throw and reject sending any more data.
Expand Down Expand Up @@ -382,6 +392,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
private status: GossipStatus = { code: GossipStatusCode.stopped }
private readonly maxInboundStreams?: number
private readonly maxOutboundStreams?: number
private readonly runOnTransientConnection?: boolean
private readonly allowedTopics: Set<TopicStr> | null

private heartbeatTimer: {
Expand Down Expand Up @@ -513,6 +524,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub

this.maxInboundStreams = options.maxInboundStreams
this.maxOutboundStreams = options.maxOutboundStreams
this.runOnTransientConnection = options.runOnTransientConnection

this.allowedTopics = (opts.allowedTopics != null) ? new Set(opts.allowedTopics) : null
}
Expand Down Expand Up @@ -566,7 +578,8 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
this.multicodecs.map(async (multicodec) =>
registrar.handle(multicodec, this.onIncomingStream.bind(this), {
maxInboundStreams: this.maxInboundStreams,
maxOutboundStreams: this.maxOutboundStreams
maxOutboundStreams: this.maxOutboundStreams,
runOnTransientConnection: this.runOnTransientConnection
})
)
)
Expand All @@ -590,9 +603,10 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub

// register protocol with topology
// Topology callbacks called on connection manager changes
const topology = {
const topology: Topology = {
onConnect: this.onPeerConnected.bind(this),
onDisconnect: this.onPeerDisconnected.bind(this)
onDisconnect: this.onPeerDisconnected.bind(this),
notifyOnTransient: this.runOnTransientConnection
}
const registrarTopologyIds = await Promise.all(
this.multicodecs.map(async (multicodec) => registrar.register(multicodec, topology))
Expand Down Expand Up @@ -752,7 +766,9 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub

try {
const stream = new OutboundStream(
await connection.newStream(this.multicodecs),
await connection.newStream(this.multicodecs, {
runOnTransientConnection: this.runOnTransientConnection
}),
(e) => { this.log.error('outbound pipe error', e) },
{ maxBufferSize: this.opts.maxOutboundBufferSize }
)
Expand Down

0 comments on commit 986ff6c

Please sign in to comment.