Skip to content

Commit

Permalink
feat: Added the configuration capabilities on connection manager to s…
Browse files Browse the repository at this point in the history
…eperate incoming and outgoing connection limits (libp2p#1508)
  • Loading branch information
maschad committed Jan 16, 2023
1 parent b8bb367 commit d2cad14
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 46 deletions.
3 changes: 2 additions & 1 deletion src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ const DefaultConfig: Partial<Libp2pInit> = {
announceFilter: (multiaddrs: Multiaddr[]) => multiaddrs
},
connectionManager: {
maxConnections: 300,
maxIncomingConnections: 300,
maxOutgoingConnections: 300,
minConnections: 50,
autoDial: true,
autoDialInterval: 10000,
Expand Down
4 changes: 2 additions & 2 deletions src/connection-manager/auto-dialler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export interface AutoDiallerInit {
enabled?: boolean

/**
* The minimum number of connections to avoid pruning
* The minimum number of incoming connections to avoid pruning
*/
minConnections?: number

Expand Down Expand Up @@ -107,7 +107,7 @@ export class AutoDialler implements Startable {
this.autoDialTimeout.clear()
}

const minConnections = this.options.minConnections
const { minConnections } = this.options

// Already has enough connections
if (this.components.connectionManager.getConnections().length >= minConnections) {
Expand Down
113 changes: 70 additions & 43 deletions src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ export interface ConnectionManagerConfig {
/**
* The maximum number of connections libp2p is willing to have before it starts disconnecting. Defaults to `Infinity`
*/
maxConnections: number
maxIncomingConnections: number

/**
* The maximum number of outgoing connections to keep open
*/
maxOutgoingConnections: number

/**
* The minimum number of connections below which libp2p not activate preemptive disconnections. Defaults to `0`.
Expand Down Expand Up @@ -100,7 +105,7 @@ export interface ConnectionManagerConfig {

/**
* A list of multiaddrs that will always be allowed (except if they are in the
* deny list) to open connections to this node even if we've reached maxConnections
* deny list) to open connections to this node even if we've reached maxIncomingConnections
*/
allow?: string[]

Expand All @@ -121,10 +126,17 @@ export interface ConnectionManagerConfig {
* complete the connection upgrade - e.g. choosing connection encryption, muxer, etc
*/
maxIncomingPendingConnections?: number

/**
* The maximum number of parallel outgoing connections allowed that have yet to
* complete the connection upgrade - e.g. choosing connection encryption, muxer, etc
*/
maxOutgoingPendingConnections?: number
}

const defaultOptions: Partial<ConnectionManagerConfig> = {
maxConnections: Infinity,
maxIncomingConnections: Infinity,
maxOutgoingConnections: Infinity,
minConnections: 0,
maxEventLoopDelay: Infinity,
pollInterval: 2000,
Expand Down Expand Up @@ -167,8 +179,18 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven

this.opts = mergeOptions.call({ ignoreUndefined: true }, defaultOptions, init)

if (this.opts.maxConnections < this.opts.minConnections) {
throw errCode(new Error('Connection Manager maxConnections must be greater than minConnections'), codes.ERR_INVALID_PARAMETERS)
if (this.opts.maxIncomingConnections < this.opts.minConnections) {
throw errCode(
new Error('Connection Manager maxIncomingConnections must be greater than minConnections'),
codes.ERR_INVALID_PARAMETERS
)
}

if (this.opts.maxOutgoingConnections < this.opts.minConnections) {
throw errCode(
new Error('Connection Manager maxOutgoingConnections must be greater than minConnections'),
codes.ERR_INVALID_PARAMETERS
)
}

log('options: %o', this.opts)
Expand All @@ -192,16 +214,16 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
try {
// This emitter gets listened to a lot
setMaxListeners?.(Infinity, this)
} catch {}
} catch { }

this.onConnect = this.onConnect.bind(this)
this.onDisconnect = this.onDisconnect.bind(this)

this.startupReconnectTimeout = init.startupReconnectTimeout ?? STARTUP_RECONNECT_TIMEOUT
this.dialTimeout = init.dialTimeout ?? 30000

this.allow = (init.allow ?? []).map(ma => multiaddr(ma))
this.deny = (init.deny ?? []).map(ma => multiaddr(ma))
this.allow = (init.allow ?? []).map((ma) => multiaddr(ma))
this.deny = (init.deny ?? []).map((ma) => multiaddr(ma))

this.inboundConnectionRateLimiter = new RateLimiterMemory({
points: this.opts.inboundConnectionThreshold,
Expand Down Expand Up @@ -318,7 +340,7 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven

for (const peer of await this.components.peerStore.all()) {
const tags = await this.components.peerStore.getTags(peer.id)
const hasKeepAlive = tags.filter(tag => tag.name === KEEP_ALIVE).length > 0
const hasKeepAlive = tags.filter((tag) => tag.name === KEEP_ALIVE).length > 0

if (hasKeepAlive) {
keepAlivePeers.push(peer.id)
Expand All @@ -331,20 +353,19 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
try {
// fails on node < 15.4
setMaxListeners?.(Infinity, this.connectOnStartupController.signal)
} catch {}
} catch { }

await Promise.all(
keepAlivePeers.map(async peer => {
keepAlivePeers.map(async (peer) => {
await this.openConnection(peer, {
signal: this.connectOnStartupController?.signal
}).catch((err) => {
log.error(err)
})
.catch(err => {
log.error(err)
})
})
)
})
.catch(err => {
.catch((err) => {
log.error(err)
})
.finally(() => {
Expand Down Expand Up @@ -379,13 +400,15 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
const tasks: Array<Promise<void>> = []
for (const connectionList of this.connections.values()) {
for (const connection of connectionList) {
tasks.push((async () => {
try {
await connection.close()
} catch (err) {
log.error(err)
}
})())
tasks.push(
(async () => {
try {
await connection.close()
} catch (err) {
log.error(err)
}
})()
)
}
}

Expand All @@ -395,7 +418,7 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
}

onConnect (evt: CustomEvent<Connection>) {
void this._onConnect(evt).catch(err => {
void this._onConnect(evt).catch((err) => {
log.error(err)
})
}
Expand Down Expand Up @@ -427,9 +450,9 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
}

const numConnections = this.getConnections().length
const toPrune = numConnections - this.opts.maxConnections
const toPrune = numConnections - this.opts.maxIncomingConnections

await this._checkMaxLimit('maxConnections', numConnections, toPrune)
await this._checkMaxLimit('maxIncomingConnections', numConnections, toPrune)
this.dispatchEvent(new CustomEvent<Connection>('peer:connect', { detail: connection }))
}

Expand Down Expand Up @@ -498,7 +521,7 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
try {
// fails on node < 15.4
setMaxListeners?.(Infinity, timeoutController.signal)
} catch {}
} catch { }
}

try {
Expand Down Expand Up @@ -537,7 +560,7 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
const connections = this.connections.get(peerId.toString()) ?? []

await Promise.all(
connections.map(async connection => {
connections.map(async (connection) => {
return await connection.close()
})
)
Expand All @@ -556,7 +579,7 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven

// Return all open connections
if (connections != null) {
return connections.filter(connection => connection.stat.status === STATUS.OPEN)
return connections.filter((connection) => connection.stat.status === STATUS.OPEN)
}

return []
Expand All @@ -568,10 +591,9 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
_onLatencyMeasure (evt: CustomEvent<SummaryObject>) {
const { detail: summary } = evt

this._checkMaxLimit('maxEventLoopDelay', summary.avgMs, 1)
.catch(err => {
log.error(err)
})
this._checkMaxLimit('maxEventLoopDelay', summary.avgMs, 1).catch((err) => {
log.error(err)
})
}

/**
Expand Down Expand Up @@ -611,9 +633,12 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
const tags = await this.components.peerStore.getTags(remotePeer)

// sum all tag values
peerValues.set(remotePeer, tags.reduce((acc, curr) => {
return acc + curr.value
}, 0))
peerValues.set(
remotePeer,
tags.reduce((acc, curr) => {
return acc + curr.value
}, 0)
)
}

// sort by value, lowest to highest
Expand Down Expand Up @@ -658,24 +683,26 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven

// close connections
await Promise.all(
toClose.map(async connection => {
toClose.map(async (connection) => {
try {
await connection.close()
} catch (err) {
log.error(err)
}

// TODO: should not need to invoke this manually
this.onDisconnect(new CustomEvent<Connection>('connectionEnd', {
detail: connection
}))
this.onDisconnect(
new CustomEvent<Connection>('connectionEnd', {
detail: connection
})
)
})
)
}

async acceptIncomingConnection (maConn: MultiaddrConnection): Promise<boolean> {
// check deny list
const denyConnection = this.deny.some(ma => {
const denyConnection = this.deny.some((ma) => {
return maConn.remoteAddr.toString().startsWith(ma.toString())
})

Expand All @@ -685,7 +712,7 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
}

// check allow list
const allowConnection = this.allow.some(ma => {
const allowConnection = this.allow.some((ma) => {
return maConn.remoteAddr.toString().startsWith(ma.toString())
})

Expand All @@ -712,13 +739,13 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
}
}

if (this.getConnections().length < this.opts.maxConnections) {
if (this.getConnections().length < this.opts.maxIncomingConnections) {
this.incomingPendingConnections++

return true
}

log('connection from %s refused - maxConnections exceeded', maConn.remoteAddr)
log('connection from %s refused - maxIncomingConnections exceeded', maConn.remoteAddr)
return false
}

Expand Down

0 comments on commit d2cad14

Please sign in to comment.