From e378ea9594b0b8e9fbf44931032b41ac82a44be6 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Fri, 28 Jun 2019 10:56:52 +0100 Subject: [PATCH] chore: address dirk review --- package.json | 1 + src/core/config.js | 1 + src/core/provider/index.js | 33 ++++++++++++------ src/core/provider/queue.js | 56 ++++++------------------------ src/core/provider/reprovider.js | 42 ++++++++++++---------- src/core/runtime/config-browser.js | 1 + src/core/runtime/config-nodejs.js | 1 + test/core/provider.spec.js | 3 ++ 8 files changed, 64 insertions(+), 74 deletions(-) diff --git a/package.json b/package.json index 440612ed44..680503d375 100644 --- a/package.json +++ b/package.json @@ -146,6 +146,7 @@ "multihashes": "~0.4.14", "multihashing-async": "~0.6.0", "node-fetch": "^2.3.0", + "p-queue": "^6.0.2", "peer-book": "~0.9.0", "peer-id": "~0.12.0", "peer-info": "~0.15.0", diff --git a/src/core/config.js b/src/core/config.js index b2493472d9..0c240e1050 100644 --- a/src/core/config.js +++ b/src/core/config.js @@ -67,6 +67,7 @@ const configSchema = s({ })) })), Reprovider: optional(s({ + Delay: 'string?', Interval: 'string?', Strategy: 'string?' })), diff --git a/src/core/provider/index.js b/src/core/provider/index.js index 918a352309..c6cb8a4761 100644 --- a/src/core/provider/index.js +++ b/src/core/provider/index.js @@ -12,12 +12,14 @@ class Provider { /** * Provider goal is to announce blocks to the network. * It keeps track of which blocks are provided, and allow them to be reprovided - * @param {Libp2p} libp2p - * @param {Blockstore} blockstore - * @param {object} options - * @memberof Provider + * @param {Libp2p} libp2p libp2p instance + * @param {Blockstore} blockstore blockstore instance + * @param {object} options reprovider options + * @param {string} options.delay reprovider initial delay in human friendly time + * @param {string} options.interval reprovider interval in human friendly time + * @param {string} options.strategy reprovider strategy */ - constructor (libp2p, blockstore, options) { + constructor (libp2p, blockstore, options = {}) { this._running = false this._contentRouting = libp2p.contentRouting @@ -38,11 +40,14 @@ class Provider { this._running = true - // handle options - const strategy = this._options.strategy || 'all' - const humanInterval = this._options.Interval || '12h' - const interval = await promisify((callback) => human(humanInterval, callback))() + // handle options (config uses uppercase) + const humanDelay = this._options.Delay || this._options.delay || '15s' + const delay = await human(humanDelay) + const humanInterval = this._options.Interval || this._options.interval || '12h' + const interval = await human(humanInterval) + const strategy = this._options.Strategy || this._options.strategy || 'all' const options = { + delay, interval, strategy } @@ -65,7 +70,7 @@ class Provider { } /** - * Announce block to the network and add and entry to the tracker + * Announce block to the network * Takes a cid and makes an attempt to announce it to the network * @param {CID} cid */ @@ -79,6 +84,14 @@ class Provider { })() } + /** + * Find providers of a block in the network + * @param {CID} cid cid of the block + * @param {object} options + * @param {number} options.timeout - how long the query should maximally run, in ms (default: 60000) + * @param {number} options.maxNumProviders - maximum number of providers to find + * @returns {Promise} + */ async findProviders (cid, options) { // eslint-disable-line require-await if (!CID.isCID(cid)) { throw errCode('invalid CID to find', 'ERR_INVALID_CID') diff --git a/src/core/provider/queue.js b/src/core/provider/queue.js index 1a54231eab..52c8492ddc 100644 --- a/src/core/provider/queue.js +++ b/src/core/provider/queue.js @@ -1,6 +1,6 @@ 'use strict' -const queue = require('async/queue') +const { default: PQueue } = require('p-queue') const debug = require('debug') const log = debug('ipfs:provider') @@ -17,71 +17,35 @@ class WorkerQueue { this._concurrency = concurrency this.running = false - this.queue = this._setupQueue() - } - - /** - * Create the underlying async queue. - * @returns {queue} - */ - _setupQueue () { - const q = queue(async (block) => { - await this._processNext(block) - }, this._concurrency) - - // If there is an error, stop the worker - q.error = (err) => { - log.error(err) - this.stop(err) - } - - q.buffer = 0 - - return q + this.queue = new PQueue({ concurrency }) } /** * Use the queue from async to keep `concurrency` amount items running * @param {Block[]} blocks - * @returns {Promise} */ async execute (blocks) { this.running = true - // store the promise resolution functions to be resolved at end of queue - this.execution = {} - const execPromise = new Promise((resolve, reject) => Object.assign(this.execution, { resolve, reject })) - - // When all blocks have been processed, stop the worker - this.queue.drain = () => { - log('queue:drain') - this.stop() - } + // Fill queue with the processing blocks function + this.queue.addAll(blocks.map((block) => async () => this._processNext(block))) // eslint-disable-line require-await - // Fill queue with blocks - this.queue.push(blocks) + // Wait for finishing + await this.queue.onIdle() - await execPromise + this.stop() } /** - * Stop the worker, optionally an error is thrown if received - * - * @param {object} error + * Stop the worker */ - stop (error) { + stop () { if (!this.running) { return } this.running = false - this.queue.kill() - - if (error) { - this.execution && this.execution.reject(error) - } else { - this.execution && this.execution.resolve() - } + this.queue.clear() } /** diff --git a/src/core/provider/reprovider.js b/src/core/provider/reprovider.js index 49c1efc823..a23bf9b995 100644 --- a/src/core/provider/reprovider.js +++ b/src/core/provider/reprovider.js @@ -5,16 +5,15 @@ const WorkerQueue = require('./queue') const { blockKeyToCid } = require('../utils') -// const initialDelay = 15000 -const initialDelay = 3000 - class Reprovider { /** * Reprovider goal is to reannounce blocks to the network. * @param {object} contentRouting * @param {Blockstore} blockstore * @param {object} options - * @memberof Reprovider + * @param {string} options.delay reprovider initial delay in human friendly time + * @param {string} options.interval reprovider interval in human friendly time + * @param {string} options.strategy reprovider strategy */ constructor (contentRouting, blockstore, options) { this._contentRouting = contentRouting @@ -33,7 +32,7 @@ class Reprovider { // Start doing reprovides after the initial delay this._timeoutId = setTimeout(() => { this._runPeriodically() - }, initialDelay) + }, this._options.delay) } /** @@ -49,27 +48,34 @@ class Reprovider { } /** - * Run reprovide on every `options.interval` ms + * Run reprovide on every `options.interval` ms recursively * @returns {void} */ async _runPeriodically () { - while (this._timeoutId) { - const blocks = await promisify((callback) => this._blockstore.query({}, callback))() - - // TODO strategy logic here - if (this._options.strategy === 'pinned') { + // Verify if stopped + if (!this._timeoutId) return - } else if (this._options.strategy === 'pinned') { + // TODO strategy logic here + const blocks = await promisify((callback) => this._blockstore.query({}, callback))() - } + if (this._options.strategy === 'pinned') { - await this._worker.execute(blocks) + } else if (this._options.strategy === 'pinned') { - // Each subsequent walk should run on a `this._options.interval` interval - await new Promise(resolve => { - this._timeoutId = setTimeout(resolve, this._options.interval) - }) } + + // Verify if stopped + if (!this._timeoutId) return + + await this._worker.execute(blocks) + + // Verify if stopped + if (!this._timeoutId) return + + // Each subsequent walk should run on a `this._options.interval` interval + this._timeoutId = setTimeout(() => { + this._runPeriodically() + }, this._options.interval) } /** diff --git a/src/core/runtime/config-browser.js b/src/core/runtime/config-browser.js index 3ec47ef977..bb673907d0 100644 --- a/src/core/runtime/config-browser.js +++ b/src/core/runtime/config-browser.js @@ -28,6 +28,7 @@ module.exports = () => ({ '/dns4/node1.preload.ipfs.io/tcp/443/wss/ipfs/Qmbut9Ywz9YEDrz8ySBSgWyJk41Uvm2QJPhwDJzJyGFsD6' ], Reprovider: { + Delay: '15s', Interval: '12h', Strategy: 'all' }, diff --git a/src/core/runtime/config-nodejs.js b/src/core/runtime/config-nodejs.js index 525483a238..45d5a56294 100644 --- a/src/core/runtime/config-nodejs.js +++ b/src/core/runtime/config-nodejs.js @@ -41,6 +41,7 @@ module.exports = () => ({ '/dns4/node1.preload.ipfs.io/tcp/443/wss/ipfs/Qmbut9Ywz9YEDrz8ySBSgWyJk41Uvm2QJPhwDJzJyGFsD6' ], Reprovider: { + Delay: '15s', Interval: '12h', Strategy: 'all' }, diff --git a/test/core/provider.spec.js b/test/core/provider.spec.js index ccbe60d193..9f7b5390d4 100644 --- a/test/core/provider.spec.js +++ b/test/core/provider.spec.js @@ -13,12 +13,14 @@ const IPFS = require('../../src') const DaemonFactory = require('ipfsd-ctl') const df = DaemonFactory.create({ type: 'proc' }) +const DELAY = '3s' const INTERVAL = '10s' const STRATEGY = 'all' const config = { Bootstrap: [], Reprovider: { + Delay: DELAY, Interval: INTERVAL, Strategy: STRATEGY } @@ -74,6 +76,7 @@ describe('record provider', () => { expect(err).to.not.exist() ipfsd = _ipfsd node = _ipfsd.api + done() }) })