Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gs1.1 Integrate peer score #87

Merged
merged 3 commits into from
Jun 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
"err-code": "^2.0.0",
"it-length-prefixed": "^3.0.0",
"it-pipe": "^1.0.1",
"libp2p-pubsub": "~0.5.2",
"libp2p-pubsub": "~0.5.3",
"p-map": "^4.0.0",
"peer-id": "~0.13.12",
"protons": "^1.0.1",
Expand Down
3 changes: 2 additions & 1 deletion test/2-nodes.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const {
createGossipsubNodes,
createGossipsubConnectedNodes,
mockRegistrar,
mockConnectionManager,
expectSet,
ConnectionPair,
first
Expand All @@ -26,7 +27,7 @@ describe('1 node', () => {
let gossipsub

before(async () => {
gossipsub = await createGossipsub(mockRegistrar)
gossipsub = await createGossipsub(mockRegistrar, mockConnectionManager)
})

after(() => gossipsub.stop())
Expand Down
7 changes: 4 additions & 3 deletions test/emit-self.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ const expect = chai.expect

const {
createGossipsub,
mockRegistrar
mockRegistrar,
mockConnectionManager
} = require('./utils')

const shouldNotHappen = (_) => expect.fail()
Expand All @@ -20,7 +21,7 @@ describe('emit self', () => {

describe('enabled', () => {
before(async () => {
gossipsub = await createGossipsub(mockRegistrar, true, { emitSelf: true })
gossipsub = await createGossipsub(mockRegistrar, mockConnectionManager, true, { emitSelf: true })
gossipsub.subscribe(topic)
})

Expand All @@ -37,7 +38,7 @@ describe('emit self', () => {

describe('disabled', () => {
before(async () => {
gossipsub = await createGossipsub(mockRegistrar, true, { emitSelf: false })
gossipsub = await createGossipsub(mockRegistrar, mockConnectionManager, true, { emitSelf: false })
gossipsub.subscribe(topic)
})

Expand Down
11 changes: 6 additions & 5 deletions test/floodsub.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const {
expectSet,
createMockRegistrar,
ConnectionPair,
mockConnectionManager,
first
} = require('./utils')

Expand All @@ -30,7 +31,7 @@ describe('gossipsub fallbacks to floodsub', () => {
registrarRecords[0] = {}
registrarRecords[1] = {}

nodeGs = await createGossipsub(createMockRegistrar(registrarRecords[0]), true)
nodeGs = await createGossipsub(createMockRegistrar(registrarRecords[0]), mockConnectionManager, true)
nodeFs = await createFloodsubNode(createMockRegistrar(registrarRecords[1]), true)
})

Expand Down Expand Up @@ -67,7 +68,7 @@ describe('gossipsub fallbacks to floodsub', () => {
registrarRecords[0] = {}
registrarRecords[1] = {}

nodeGs = await createGossipsub(createMockRegistrar(registrarRecords[0]), true, { fallbackToFloodsub: false })
nodeGs = await createGossipsub(createMockRegistrar(registrarRecords[0]), mockConnectionManager, true, { fallbackToFloodsub: false })
nodeFs = await createFloodsubNode(createMockRegistrar(registrarRecords[1]), true)
})

Expand Down Expand Up @@ -107,7 +108,7 @@ describe('gossipsub fallbacks to floodsub', () => {
registrarRecords[0] = {}
registrarRecords[1] = {}

nodeGs = await createGossipsub(createMockRegistrar(registrarRecords[0]), true)
nodeGs = await createGossipsub(createMockRegistrar(registrarRecords[0]), mockConnectionManager, true)
nodeFs = await createFloodsubNode(createMockRegistrar(registrarRecords[1]), true)

const onConnectGs = registrarRecords[0][floodsubMulticodec].onConnect
Expand Down Expand Up @@ -178,7 +179,7 @@ describe('gossipsub fallbacks to floodsub', () => {
registrarRecords[0] = {}
registrarRecords[1] = {}

nodeGs = await createGossipsub(createMockRegistrar(registrarRecords[0]), true)
nodeGs = await createGossipsub(createMockRegistrar(registrarRecords[0]), mockConnectionManager, true)
nodeFs = await createFloodsubNode(createMockRegistrar(registrarRecords[1]), true)

const onConnectGs = registrarRecords[0][floodsubMulticodec].onConnect
Expand Down Expand Up @@ -315,7 +316,7 @@ describe('gossipsub fallbacks to floodsub', () => {
registrarRecords[0] = {}
registrarRecords[1] = {}

nodeGs = await createGossipsub(createMockRegistrar(registrarRecords[0]), true)
nodeGs = await createGossipsub(createMockRegistrar(registrarRecords[0]), mockConnectionManager, true)
nodeFs = await createFloodsubNode(createMockRegistrar(registrarRecords[1]), true)

const onConnectGs = registrarRecords[0][floodsubMulticodec].onConnect
Expand Down
4 changes: 2 additions & 2 deletions test/heartbeat.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ const { expect } = require('chai')

const Gossipsub = require('../src')
const { GossipsubHeartbeatInterval } = require('../src/constants')
const { createPeerId, mockRegistrar } = require('./utils')
const { createPeerId, mockRegistrar, mockConnectionManager } = require('./utils')

describe('heartbeat', () => {
let gossipsub

before(async () => {
const peerId = await createPeerId()
gossipsub = new Gossipsub(peerId, mockRegistrar, { emitSelf: true })
gossipsub = new Gossipsub(peerId, mockRegistrar, mockConnectionManager, { emitSelf: true })
await gossipsub.start()
})

Expand Down
5 changes: 3 additions & 2 deletions test/pubsub.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ const sinon = require('sinon')
const { utils } = require('libp2p-pubsub')
const {
createGossipsub,
mockRegistrar
mockRegistrar,
mockConnectionManager
} = require('./utils')

describe('Pubsub', () => {
let gossipsub

before(async () => {
gossipsub = await createGossipsub(mockRegistrar, true)
gossipsub = await createGossipsub(mockRegistrar, mockConnectionManager, true)
})

after(() => gossipsub.stop())
Expand Down
14 changes: 11 additions & 3 deletions test/utils/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ const createPeerId = async () => {

exports.createPeerId = createPeerId

const createGossipsub = async (registrar, shouldStart = false, options) => {
const createGossipsub = async (registrar, connectionManager, shouldStart = false, options) => {
const peerId = await createPeerId()
const gs = new Gossipsub(peerId, registrar, options)
const gs = new Gossipsub(peerId, registrar, connectionManager, options)

if (shouldStart) {
await gs.start()
Expand All @@ -43,7 +43,7 @@ const createGossipsubNodes = async (n, shouldStart, options) => {
const nodes = await pTimes(n, (index) => {
registrarRecords[index] = {}

return createGossipsub(createMockRegistrar(registrarRecords[index]), shouldStart, options)
return createGossipsub(createMockRegistrar(registrarRecords[index]), exports.mockConnectionManager, shouldStart, options)
})

return {
Expand Down Expand Up @@ -164,3 +164,11 @@ const ConnectionPair = () => {
}

exports.ConnectionPair = ConnectionPair

exports.mockConnectionManager = {
getAll(id) {
return []
},
on() {},
off() {}
}
54 changes: 49 additions & 5 deletions ts/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,25 @@ import { Heartbeat } from './heartbeat'
import { getGossipPeers } from './getGossipPeers'
import { createGossipRpc } from './utils'
import { Peer, Registrar } from './peer'
import { PeerScore, PeerScoreParams, PeerScoreThresholds, createPeerScoreParams, createPeerScoreThresholds, ConnectionManager } from './score'
// @ts-ignore
import TimeCache = require('time-cache')
import PeerId = require('peer-id')
import BasicPubsub = require('./pubsub')

interface GossipOptions {
interface GossipInputOptions {
emitSelf: boolean
gossipIncoming: boolean
fallbackToFloodsub: boolean
msgIdFn: (msg: Message) => string
messageCache: MessageCache
scoreParams: Partial<PeerScoreParams>
scoreThresholds: Partial<PeerScoreThresholds>
}

interface GossipOptions extends GossipInputOptions {
scoreParams: PeerScoreParams
scoreThresholds: PeerScoreThresholds
}

class Gossipsub extends BasicPubsub {
Expand All @@ -33,6 +41,7 @@ class Gossipsub extends BasicPubsub {
lastpub: Map<string, number>
gossip: Map<Peer, ControlIHave[]>
control: Map<Peer, ControlMessage>
score: PeerScore
_options: GossipOptions

public static multicodec: string = constants.GossipsubIDv10
Expand All @@ -43,21 +52,31 @@ class Gossipsub extends BasicPubsub {
* @param {function} registrar.handle
* @param {function} registrar.register
* @param {function} registrar.unregister
* @param {Object} connectionManager
* @param {Object} [options]
* @param {bool} [options.emitSelf] if publish should emit to self, if subscribed, defaults to false
* @param {bool} [options.gossipIncoming] if incoming messages on a subscribed topic should be automatically gossiped, defaults to true
* @param {bool} [options.fallbackToFloodsub] if dial should fallback to floodsub, defaults to true
* @param {function} [options.msgIdFn] override the default message id function
* @param {Object} [options.messageCache] override the default MessageCache
* @param {Object} [options.scoreParams] peer score parameters
* @param {Object} [options.scoreThresholds] peer score thresholds
* @constructor
*/
constructor (peerId: PeerId, registrar: Registrar, options: Partial<GossipOptions> = {}) {
constructor (
peerId: PeerId,
registrar: Registrar,
connectionManager: ConnectionManager,
options: Partial<GossipInputOptions> = {}
) {
const multicodecs = [constants.GossipsubIDv10]
const _options = {
gossipIncoming: true,
fallbackToFloodsub: true,
...options
}
...options,
scoreParams: createPeerScoreParams(options.scoreParams),
scoreThresholds: createPeerScoreThresholds(options.scoreThresholds)
} as GossipOptions

// Also wants to get notified of peers connected using floodsub
if (_options.fallbackToFloodsub) {
Expand All @@ -69,7 +88,7 @@ class Gossipsub extends BasicPubsub {
multicodecs,
peerId,
registrar,
options: _options as GossipOptions
options: _options
})

/**
Expand Down Expand Up @@ -129,6 +148,26 @@ class Gossipsub extends BasicPubsub {
* A heartbeat timer that maintains the mesh
*/
this.heartbeat = new Heartbeat(this)

/**
* Peer score tracking
*/
this.score = new PeerScore(this._options.scoreParams, connectionManager, this._msgIdFn)
}

/**
* Add a peer to the router
* @param {PeerId} peerId
* @param {Array<string>} protocols
* @returns {Peer}
*/
_addPeer (peerId: PeerId, protocols: string[]): Peer {
const p = super._addPeer(peerId, protocols)

// Add to peer scoring
this.score.addPeer(peerId.toB58String())

return p
}

/**
Expand Down Expand Up @@ -157,6 +196,9 @@ class Gossipsub extends BasicPubsub {
// Remove from control mapping
this.control.delete(peer)

// Remove from peer scoring
this.score.removePeer(peer.id.toB58String())

return peer
}

Expand Down Expand Up @@ -380,6 +422,7 @@ class Gossipsub extends BasicPubsub {
async start (): Promise<void> {
await super.start()
this.heartbeat.start()
this.score.start()
}

/**
Expand All @@ -390,6 +433,7 @@ class Gossipsub extends BasicPubsub {
async stop (): Promise<void> {
await super.stop()
this.heartbeat.stop()
this.score.stop()

this.mesh = new Map()
this.fanout = new Map()
Expand Down
2 changes: 1 addition & 1 deletion ts/score/peerScore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ interface Connection {
remotePeer: PeerId
}

interface ConnectionManager {
export interface ConnectionManager {
getAll(id: string): Connection[]
// eslint-disable-next-line @typescript-eslint/ban-types
on(evt: string, fn: Function): void
Expand Down