Skip to content

Commit

Permalink
feat: add cirvuitv2 client
Browse files Browse the repository at this point in the history
  • Loading branch information
mpetrunic committed Jun 23, 2022
1 parent 3941eeb commit f25943f
Show file tree
Hide file tree
Showing 19 changed files with 221 additions and 424 deletions.
81 changes: 41 additions & 40 deletions src/circuit/auto-relay.ts → src/circuit/client.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
import { logger } from '@libp2p/logger'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { RELAY_V1_CODEC } from './multicodec.js'
import { canHop } from './v1/hop.js'
import { namespaceToCid } from './utils.js'
import { relayV2HopCodec } from './multicodec.js'
import { getExpiration, namespaceToCid } from './utils.js'
import {
CIRCUIT_PROTO_CODE,
HOP_METADATA_KEY,
HOP_METADATA_VALUE,
RELAY_RENDEZVOUS_NS
} from './constants.js'
import type { PeerId } from '@libp2p/interfaces/peer-id'
Expand All @@ -18,29 +13,32 @@ import sort from 'it-sort'
import all from 'it-all'
import { pipe } from 'it-pipe'
import { publicAddressesFirst } from '@libp2p/utils/address-sort'
import { reserve } from './v2/index.js'

const log = logger('libp2p:auto-relay')

const noop = () => {}

export interface AutoRelayInit {
addressSorter?: AddressSorter
maxListeners?: number
maxReservations?: number
onError?: (error: Error, msg?: string) => void
}

export class AutoRelay {
export class CircuitClient {
private readonly components: Components
private readonly addressSorter: AddressSorter
private readonly maxListeners: number
private readonly listenRelays: Set<string>
private readonly maxReservations: number
private readonly relays: Set<string>
private readonly reservationMap: Map<PeerId, ReturnType<typeof setTimeout>>
private readonly onError: (error: Error, msg?: string) => void

constructor (components: Components, init: AutoRelayInit) {
this.components = components
this.addressSorter = init.addressSorter ?? publicAddressesFirst
this.maxListeners = init.maxListeners ?? 1
this.listenRelays = new Set()
this.maxReservations = init.maxReservations ?? 1
this.relays = new Set()
this.reservationMap = new Map()
this.onError = init.onError ?? noop

this._onProtocolChange = this._onProtocolChange.bind(this)
Expand Down Expand Up @@ -68,18 +66,18 @@ export class AutoRelay {
const id = peerId.toString()

// Check if it has the protocol
const hasProtocol = protocols.find(protocol => protocol === RELAY_V1_CODEC)
const hasProtocol = protocols.find(protocol => protocol === relayV2HopCodec)

// If no protocol, check if we were keeping the peer before as a listenRelay
if (hasProtocol == null) {
if (this.listenRelays.has(id)) {
if (this.relays.has(id)) {
await this._removeListenRelay(id)
}

return
}

if (this.listenRelays.has(id)) {
if (this.relays.has(id)) {
return
}

Expand All @@ -99,12 +97,7 @@ export class AutoRelay {
return
}

const supportsHop = await canHop({ connection })

if (supportsHop) {
await this.components.getPeerStore().metadataBook.setValue(peerId, HOP_METADATA_KEY, uint8ArrayFromString(HOP_METADATA_VALUE))
await this._addListenRelay(connection, id)
}
await this._addListenRelay(connection, peerId)
} catch (err: any) {
this.onError(err)
}
Expand All @@ -117,9 +110,10 @@ export class AutoRelay {
const connection = evt.detail
const peerId = connection.remotePeer
const id = peerId.toString()
this.reservationMap.delete(peerId)

// Not listening on this relay
if (!this.listenRelays.has(id)) {
if (!this.relays.has(id)) {
return
}

Expand All @@ -131,13 +125,21 @@ export class AutoRelay {
/**
* Attempt to listen on the given relay connection
*/
async _addListenRelay (connection: Connection, id: string): Promise<void> {
async _addListenRelay (connection: Connection, peerId: PeerId): Promise<void> {
const id = peerId.toString()
try {
// Check if already listening on enough relays
if (this.listenRelays.size >= this.maxListeners) {
// Check if already enough relay reservations
if (this.relays.size >= this.maxReservations) {
return
}

const reservation = await reserve(connection)
if (reservation != null) {
this.reservationMap.set(peerId, setTimeout(() => {
// refresh reservation
}, Math.min(getExpiration(reservation.expire) - 100, 0)))
}

// Get peer known addresses and sort them with public addresses first
const remoteAddrs = await pipe(
await this.components.getPeerStore().addressBook.get(connection.remotePeer),
Expand Down Expand Up @@ -170,19 +172,19 @@ export class AutoRelay {
)

if (result.includes(true)) {
this.listenRelays.add(id)
this.relays.add(id)
}
} catch (err: any) {
this.onError(err)
this.listenRelays.delete(id)
this.relays.delete(id)
}
}

/**
* Remove listen relay
*/
async _removeListenRelay (id: string) {
if (this.listenRelays.delete(id)) {
if (this.relays.delete(id)) {
// TODO: this should be responsibility of the connMgr
await this._listenOnAvailableHopRelays([id])
}
Expand All @@ -196,32 +198,31 @@ export class AutoRelay {
* 3. Search the network.
*/
async _listenOnAvailableHopRelays (peersToIgnore: string[] = []) {
// TODO: The peer redial issue on disconnect should be handled by connection gating
// Check if already listening on enough relays
if (this.listenRelays.size >= this.maxListeners) {
if (this.relays.size >= this.maxReservations) {
return
}

const knownHopsToDial = []
const peers = await this.components.getPeerStore().all()

// Check if we have known hop peers to use and attempt to listen on the already connected
for (const { id, metadata } of peers) {
for (const { id, protocols } of peers) {
const idStr = id.toString()

// Continue to next if listening on this or peer to ignore
if (this.listenRelays.has(idStr)) {
if (this.relays.has(idStr)) {
continue
}

if (peersToIgnore.includes(idStr)) {
continue
}

const supportsHop = metadata.get(HOP_METADATA_KEY)
const hasProtocol = protocols.find(protocol => protocol === relayV2HopCodec)

// Continue to next if it does not support Hop
if ((supportsHop == null) || uint8ArrayToString(supportsHop) !== HOP_METADATA_VALUE) {
if (hasProtocol == null) {
continue
}

Expand All @@ -233,10 +234,10 @@ export class AutoRelay {
continue
}

await this._addListenRelay(connections[0], idStr)
await this._addListenRelay(connections[0], id)

// Check if already listening on enough relays
if (this.listenRelays.size >= this.maxListeners) {
if (this.relays.size >= this.maxReservations) {
return
}
}
Expand All @@ -246,7 +247,7 @@ export class AutoRelay {
await this._tryToListenOnRelay(peerId)

// Check if already listening on enough relays
if (this.listenRelays.size >= this.maxListeners) {
if (this.relays.size >= this.maxReservations) {
return
}
}
Expand All @@ -265,7 +266,7 @@ export class AutoRelay {
await this._tryToListenOnRelay(peerId)

// Check if already listening on enough relays
if (this.listenRelays.size >= this.maxListeners) {
if (this.relays.size >= this.maxReservations) {
return
}
}
Expand All @@ -277,7 +278,7 @@ export class AutoRelay {
async _tryToListenOnRelay (peerId: PeerId) {
try {
const connection = await this.components.getConnectionManager().openConnection(peerId)
await this._addListenRelay(connection, peerId.toString())
await this._addListenRelay(connection, peerId)
} catch (err: any) {
log.error('Could not use %p as relay', peerId, err)
this.onError(err, `could not connect and listen on known hop relay ${peerId.toString()}`)
Expand Down
10 changes: 0 additions & 10 deletions src/circuit/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,6 @@ export const ADVERTISE_TTL = 30 * minute
*/
export const CIRCUIT_PROTO_CODE = 290

/**
* PeerStore metadaBook key for HOP relay service
*/
export const HOP_METADATA_KEY = 'hop_relay'

/**
* PeerStore metadaBook value for HOP relay service
*/
export const HOP_METADATA_VALUE = 'true'

/**
* Relay HOP relay service namespace for discovery
*/
Expand Down
6 changes: 3 additions & 3 deletions src/circuit/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
clearDelayedInterval
// @ts-expect-error set-delayed-interval does not export types
} from 'set-delayed-interval'
import { AutoRelay } from './auto-relay.js'
import { CircuitClient } from './client.js'
import { namespaceToCid } from './utils.js'
import {
RELAY_RENDEZVOUS_NS
Expand Down Expand Up @@ -49,7 +49,7 @@ export class Relay implements Startable {
private readonly components: Components
private readonly init: RelayInit
// @ts-expect-error this field isn't used anywhere?
private readonly autoRelay?: AutoRelay
private readonly autoRelay?: CircuitClient
private timeout?: any
private started: boolean

Expand All @@ -60,7 +60,7 @@ export class Relay implements Startable {
this.components = components
// Create autoRelay if enabled
this.autoRelay = init.autoRelay?.enabled !== false
? new AutoRelay(components, {
? new CircuitClient(components, {
addressSorter: init.addressSorter,
...init.autoRelay
})
Expand Down
6 changes: 3 additions & 3 deletions src/circuit/multicodec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@

export const RELAY_V1_CODEC = '/libp2p/circuit/relay/0.1.0'
export const protocolIDv2Hop = '/libp2p/circuit/relay/0.2.0/hop'
export const protocolIDv2Stop = '/libp2p/circuit/relay/0.2.0/stop'
export const relayV1Codec = '/libp2p/circuit/relay/0.1.0'
export const relayV2HopCodec = '/libp2p/circuit/relay/0.2.0/hop'
export const relayV2StopCodec = '/libp2p/circuit/relay/0.2.0/stop'
108 changes: 108 additions & 0 deletions src/circuit/relay.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import { logger } from '@libp2p/logger'
import { codes } from '../errors.js'
import {
setDelayedInterval,
clearDelayedInterval
// @ts-expect-error set-delayed-interval does not export types
} from 'set-delayed-interval'
import { namespaceToCid } from './utils.js'
import {
RELAY_RENDEZVOUS_NS
} from './constants.js'
import type { AddressSorter } from '@libp2p/interfaces/peer-store'
import type { Startable } from '@libp2p/interfaces/startable'
import type { Components } from '@libp2p/interfaces/components'

const log = logger('libp2p:relay')

export interface RelayAdvertiseConfig {
bootDelay?: number
enabled?: boolean
ttl?: number
}

export interface HopConfig {
enabled?: boolean
active?: boolean
}

export interface AutoRelayConfig {
enabled?: boolean

/**
* maximum number of relays to listen
*/
maxListeners: number
}

export interface RelayInit {
addressSorter?: AddressSorter
maxListeners?: number
onError?: (error: Error, msg?: string) => void
hop: HopConfig
advertise: RelayAdvertiseConfig
}

export class Relay implements Startable {
private readonly components: Components
private readonly init: RelayInit
private timeout?: any
private started: boolean

/**
* Creates an instance of Relay
*/
constructor (components: Components, init: RelayInit) {
this.components = components
this.started = false
this.init = init
this._advertiseService = this._advertiseService.bind(this)
}

isStarted () {
return this.started
}

/**
* Start Relay service
*/
async start () {
// Advertise service if HOP enabled
if (this.init.hop.enabled !== false && this.init.advertise.enabled !== false) {
this.timeout = setDelayedInterval(
this._advertiseService, this.init.advertise.ttl, this.init.advertise.bootDelay
)
}

this.started = true
}

/**
* Stop Relay service
*/
async stop () {
if (this.timeout != null) {
clearDelayedInterval(this.timeout)
}

this.started = false
}

/**
* Advertise hop relay service in the network.
*/
async _advertiseService () {
try {
const cid = await namespaceToCid(RELAY_RENDEZVOUS_NS)
await this.components.getContentRouting().provide(cid)
} catch (err: any) {
if (err.code === codes.ERR_NO_ROUTERS_AVAILABLE) {
log.error('a content router, such as a DHT, must be provided in order to advertise the relay service', err)
// Stop the advertise
await this.stop()
} else {
log.error(err)
}
}
}
}
Loading

0 comments on commit f25943f

Please sign in to comment.