diff --git a/package.json b/package.json index b1a0fd6d..8ce6db83 100644 --- a/package.json +++ b/package.json @@ -42,7 +42,7 @@ "err-code": "^2.0.0", "it-length-prefixed": "^3.0.0", "it-pipe": "^1.0.1", - "libp2p-pubsub": "~0.5.2", + "libp2p-pubsub": "~0.5.3", "p-map": "^4.0.0", "peer-id": "~0.13.12", "protons": "^1.0.1", diff --git a/test/2-nodes.spec.js b/test/2-nodes.spec.js index 7c584efc..8994f143 100644 --- a/test/2-nodes.spec.js +++ b/test/2-nodes.spec.js @@ -14,6 +14,7 @@ const { createGossipsubNodes, createGossipsubConnectedNodes, mockRegistrar, + mockConnectionManager, expectSet, ConnectionPair, first @@ -26,7 +27,7 @@ describe('1 node', () => { let gossipsub before(async () => { - gossipsub = await createGossipsub(mockRegistrar) + gossipsub = await createGossipsub(mockRegistrar, mockConnectionManager) }) after(() => gossipsub.stop()) diff --git a/test/emit-self.spec.js b/test/emit-self.spec.js index 8c2b15a8..f46e8567 100644 --- a/test/emit-self.spec.js +++ b/test/emit-self.spec.js @@ -8,7 +8,8 @@ const expect = chai.expect const { createGossipsub, - mockRegistrar + mockRegistrar, + mockConnectionManager } = require('./utils') const shouldNotHappen = (_) => expect.fail() @@ -20,7 +21,7 @@ describe('emit self', () => { describe('enabled', () => { before(async () => { - gossipsub = await createGossipsub(mockRegistrar, true, { emitSelf: true }) + gossipsub = await createGossipsub(mockRegistrar, mockConnectionManager, true, { emitSelf: true }) gossipsub.subscribe(topic) }) @@ -37,7 +38,7 @@ describe('emit self', () => { describe('disabled', () => { before(async () => { - gossipsub = await createGossipsub(mockRegistrar, true, { emitSelf: false }) + gossipsub = await createGossipsub(mockRegistrar, mockConnectionManager, true, { emitSelf: false }) gossipsub.subscribe(topic) }) diff --git a/test/floodsub.spec.js b/test/floodsub.spec.js index 831c1919..e00ad74e 100644 --- a/test/floodsub.spec.js +++ b/test/floodsub.spec.js @@ -16,6 +16,7 @@ const { expectSet, createMockRegistrar, ConnectionPair, + mockConnectionManager, first } = require('./utils') @@ -30,7 +31,7 @@ describe('gossipsub fallbacks to floodsub', () => { registrarRecords[0] = {} registrarRecords[1] = {} - nodeGs = await createGossipsub(createMockRegistrar(registrarRecords[0]), true) + nodeGs = await createGossipsub(createMockRegistrar(registrarRecords[0]), mockConnectionManager, true) nodeFs = await createFloodsubNode(createMockRegistrar(registrarRecords[1]), true) }) @@ -67,7 +68,7 @@ describe('gossipsub fallbacks to floodsub', () => { registrarRecords[0] = {} registrarRecords[1] = {} - nodeGs = await createGossipsub(createMockRegistrar(registrarRecords[0]), true, { fallbackToFloodsub: false }) + nodeGs = await createGossipsub(createMockRegistrar(registrarRecords[0]), mockConnectionManager, true, { fallbackToFloodsub: false }) nodeFs = await createFloodsubNode(createMockRegistrar(registrarRecords[1]), true) }) @@ -107,7 +108,7 @@ describe('gossipsub fallbacks to floodsub', () => { registrarRecords[0] = {} registrarRecords[1] = {} - nodeGs = await createGossipsub(createMockRegistrar(registrarRecords[0]), true) + nodeGs = await createGossipsub(createMockRegistrar(registrarRecords[0]), mockConnectionManager, true) nodeFs = await createFloodsubNode(createMockRegistrar(registrarRecords[1]), true) const onConnectGs = registrarRecords[0][floodsubMulticodec].onConnect @@ -178,7 +179,7 @@ describe('gossipsub fallbacks to floodsub', () => { registrarRecords[0] = {} registrarRecords[1] = {} - nodeGs = await createGossipsub(createMockRegistrar(registrarRecords[0]), true) + nodeGs = await createGossipsub(createMockRegistrar(registrarRecords[0]), mockConnectionManager, true) nodeFs = await createFloodsubNode(createMockRegistrar(registrarRecords[1]), true) const onConnectGs = registrarRecords[0][floodsubMulticodec].onConnect @@ -315,7 +316,7 @@ describe('gossipsub fallbacks to floodsub', () => { registrarRecords[0] = {} registrarRecords[1] = {} - nodeGs = await createGossipsub(createMockRegistrar(registrarRecords[0]), true) + nodeGs = await createGossipsub(createMockRegistrar(registrarRecords[0]), mockConnectionManager, true) nodeFs = await createFloodsubNode(createMockRegistrar(registrarRecords[1]), true) const onConnectGs = registrarRecords[0][floodsubMulticodec].onConnect diff --git a/test/heartbeat.spec.js b/test/heartbeat.spec.js index e590b917..360b5866 100644 --- a/test/heartbeat.spec.js +++ b/test/heartbeat.spec.js @@ -5,14 +5,14 @@ const { expect } = require('chai') const Gossipsub = require('../src') const { GossipsubHeartbeatInterval } = require('../src/constants') -const { createPeerId, mockRegistrar } = require('./utils') +const { createPeerId, mockRegistrar, mockConnectionManager } = require('./utils') describe('heartbeat', () => { let gossipsub before(async () => { const peerId = await createPeerId() - gossipsub = new Gossipsub(peerId, mockRegistrar, { emitSelf: true }) + gossipsub = new Gossipsub(peerId, mockRegistrar, mockConnectionManager, { emitSelf: true }) await gossipsub.start() }) diff --git a/test/pubsub.spec.js b/test/pubsub.spec.js index 4b5bd93b..e8663b52 100644 --- a/test/pubsub.spec.js +++ b/test/pubsub.spec.js @@ -10,14 +10,15 @@ const sinon = require('sinon') const { utils } = require('libp2p-pubsub') const { createGossipsub, - mockRegistrar + mockRegistrar, + mockConnectionManager } = require('./utils') describe('Pubsub', () => { let gossipsub before(async () => { - gossipsub = await createGossipsub(mockRegistrar, true) + gossipsub = await createGossipsub(mockRegistrar, mockConnectionManager, true) }) after(() => gossipsub.stop()) diff --git a/test/utils/index.js b/test/utils/index.js index 0e74c8af..6b844315 100644 --- a/test/utils/index.js +++ b/test/utils/index.js @@ -24,9 +24,9 @@ const createPeerId = async () => { exports.createPeerId = createPeerId -const createGossipsub = async (registrar, shouldStart = false, options) => { +const createGossipsub = async (registrar, connectionManager, shouldStart = false, options) => { const peerId = await createPeerId() - const gs = new Gossipsub(peerId, registrar, options) + const gs = new Gossipsub(peerId, registrar, connectionManager, options) if (shouldStart) { await gs.start() @@ -43,7 +43,7 @@ const createGossipsubNodes = async (n, shouldStart, options) => { const nodes = await pTimes(n, (index) => { registrarRecords[index] = {} - return createGossipsub(createMockRegistrar(registrarRecords[index]), shouldStart, options) + return createGossipsub(createMockRegistrar(registrarRecords[index]), exports.mockConnectionManager, shouldStart, options) }) return { @@ -164,3 +164,11 @@ const ConnectionPair = () => { } exports.ConnectionPair = ConnectionPair + +exports.mockConnectionManager = { + getAll(id) { + return [] + }, + on() {}, + off() {} +} diff --git a/ts/index.ts b/ts/index.ts index 322a074a..3318c9cc 100644 --- a/ts/index.ts +++ b/ts/index.ts @@ -12,17 +12,25 @@ import { Heartbeat } from './heartbeat' import { getGossipPeers } from './getGossipPeers' import { createGossipRpc } from './utils' import { Peer, Registrar } from './peer' +import { PeerScore, PeerScoreParams, PeerScoreThresholds, createPeerScoreParams, createPeerScoreThresholds, ConnectionManager } from './score' // @ts-ignore import TimeCache = require('time-cache') import PeerId = require('peer-id') import BasicPubsub = require('./pubsub') -interface GossipOptions { +interface GossipInputOptions { emitSelf: boolean gossipIncoming: boolean fallbackToFloodsub: boolean msgIdFn: (msg: Message) => string messageCache: MessageCache + scoreParams: Partial + scoreThresholds: Partial +} + +interface GossipOptions extends GossipInputOptions { + scoreParams: PeerScoreParams + scoreThresholds: PeerScoreThresholds } class Gossipsub extends BasicPubsub { @@ -33,6 +41,7 @@ class Gossipsub extends BasicPubsub { lastpub: Map gossip: Map control: Map + score: PeerScore _options: GossipOptions public static multicodec: string = constants.GossipsubIDv10 @@ -43,21 +52,31 @@ class Gossipsub extends BasicPubsub { * @param {function} registrar.handle * @param {function} registrar.register * @param {function} registrar.unregister + * @param {Object} connectionManager * @param {Object} [options] * @param {bool} [options.emitSelf] if publish should emit to self, if subscribed, defaults to false * @param {bool} [options.gossipIncoming] if incoming messages on a subscribed topic should be automatically gossiped, defaults to true * @param {bool} [options.fallbackToFloodsub] if dial should fallback to floodsub, defaults to true * @param {function} [options.msgIdFn] override the default message id function * @param {Object} [options.messageCache] override the default MessageCache + * @param {Object} [options.scoreParams] peer score parameters + * @param {Object} [options.scoreThresholds] peer score thresholds * @constructor */ - constructor (peerId: PeerId, registrar: Registrar, options: Partial = {}) { + constructor ( + peerId: PeerId, + registrar: Registrar, + connectionManager: ConnectionManager, + options: Partial = {} + ) { const multicodecs = [constants.GossipsubIDv10] const _options = { gossipIncoming: true, fallbackToFloodsub: true, - ...options - } + ...options, + scoreParams: createPeerScoreParams(options.scoreParams), + scoreThresholds: createPeerScoreThresholds(options.scoreThresholds) + } as GossipOptions // Also wants to get notified of peers connected using floodsub if (_options.fallbackToFloodsub) { @@ -69,7 +88,7 @@ class Gossipsub extends BasicPubsub { multicodecs, peerId, registrar, - options: _options as GossipOptions + options: _options }) /** @@ -129,6 +148,26 @@ class Gossipsub extends BasicPubsub { * A heartbeat timer that maintains the mesh */ this.heartbeat = new Heartbeat(this) + + /** + * Peer score tracking + */ + this.score = new PeerScore(this._options.scoreParams, connectionManager, this._msgIdFn) + } + + /** + * Add a peer to the router + * @param {PeerId} peerId + * @param {Array} protocols + * @returns {Peer} + */ + _addPeer (peerId: PeerId, protocols: string[]): Peer { + const p = super._addPeer(peerId, protocols) + + // Add to peer scoring + this.score.addPeer(peerId.toB58String()) + + return p } /** @@ -157,6 +196,9 @@ class Gossipsub extends BasicPubsub { // Remove from control mapping this.control.delete(peer) + // Remove from peer scoring + this.score.removePeer(peer.id.toB58String()) + return peer } @@ -380,6 +422,7 @@ class Gossipsub extends BasicPubsub { async start (): Promise { await super.start() this.heartbeat.start() + this.score.start() } /** @@ -390,6 +433,7 @@ class Gossipsub extends BasicPubsub { async stop (): Promise { await super.stop() this.heartbeat.stop() + this.score.stop() this.mesh = new Map() this.fanout = new Map() diff --git a/ts/score/peerScore.ts b/ts/score/peerScore.ts index a1f9f8d2..c8d209cc 100644 --- a/ts/score/peerScore.ts +++ b/ts/score/peerScore.ts @@ -16,7 +16,7 @@ interface Connection { remotePeer: PeerId } -interface ConnectionManager { +export interface ConnectionManager { getAll(id: string): Connection[] // eslint-disable-next-line @typescript-eslint/ban-types on(evt: string, fn: Function): void