From 7dbdd728ec1277acd92e1d381a1bb73797b8670d Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Wed, 7 Feb 2024 13:36:00 +0100 Subject: [PATCH] fix: add read/write tests for it-byte-stream (#119) To prevent regresssions and ensure the contract is respected. --- packages/it-byte-stream/package.json | 1 + packages/it-byte-stream/src/pushable.ts | 24 +++++----- packages/it-byte-stream/test/index.spec.ts | 51 ++++++++++++++++++++++ 3 files changed, 66 insertions(+), 10 deletions(-) diff --git a/packages/it-byte-stream/package.json b/packages/it-byte-stream/package.json index 2bae6c0d..b395b39f 100644 --- a/packages/it-byte-stream/package.json +++ b/packages/it-byte-stream/package.json @@ -139,6 +139,7 @@ }, "devDependencies": { "aegir": "^41.1.9", + "delay": "^6.0.0", "it-pair": "^2.0.2", "uint8arrays": "^5.0.0" } diff --git a/packages/it-byte-stream/src/pushable.ts b/packages/it-byte-stream/src/pushable.ts index cbc50b66..b55d2a28 100644 --- a/packages/it-byte-stream/src/pushable.ts +++ b/packages/it-byte-stream/src/pushable.ts @@ -18,7 +18,7 @@ export interface Pushable extends AsyncGenerator { } class QueuelessPushable implements Pushable { - private needNext: DeferredPromise + private readNext: DeferredPromise private haveNext: DeferredPromise private ended: boolean private nextResult: IteratorResult | undefined @@ -26,7 +26,7 @@ class QueuelessPushable implements Pushable { constructor () { this.ended = false - this.needNext = deferred() + this.readNext = deferred() this.haveNext = deferred() } @@ -41,15 +41,15 @@ class QueuelessPushable implements Pushable { } if (this.nextResult == null) { - throw new Error('Have next but next was undefined') + throw new Error('HaveNext promise resolved but nextResult was undefined') } const nextResult = this.nextResult this.nextResult = undefined - // signal to the supplier that we have read the value - this.needNext.resolve() - this.needNext = deferred() + // signal to the supplier that we read the value + this.readNext.resolve() + this.readNext = deferred() return nextResult } @@ -100,7 +100,11 @@ class QueuelessPushable implements Pushable { // already have a value, wait for it to be read if (this.nextResult != null) { - await this.needNext.promise + await this.readNext.promise + + if (this.nextResult != null) { + throw new Error('NeedNext promise resolved but nextResult was not consumed') + } } if (value != null) { @@ -114,10 +118,10 @@ class QueuelessPushable implements Pushable { this.haveNext.resolve() this.haveNext = deferred() - // wait for the consumer to read the value or for the passed signal - // to abort the waiting + // wait for the consumer to have finished processing the value and requested + // the next one or for the passed signal to abort the waiting await raceSignal( - this.needNext.promise, + this.readNext.promise, options?.signal, options ) diff --git a/packages/it-byte-stream/test/index.spec.ts b/packages/it-byte-stream/test/index.spec.ts index 7ef389e2..39319dd9 100644 --- a/packages/it-byte-stream/test/index.spec.ts +++ b/packages/it-byte-stream/test/index.spec.ts @@ -3,6 +3,7 @@ import { Buffer } from 'buffer' import { expect } from 'aegir/chai' +import delay from 'delay' import { pair } from 'it-pair' import { Uint8ArrayList } from 'uint8arraylist' import { concat as uint8ArrayConcat } from 'uint8arrays/concat' @@ -86,5 +87,55 @@ Object.keys(tests).forEach(key => { expect(r.subarray()).to.equalBytes(r1.subarray()) expect(r.subarray()).to.equalBytes(r2.subarray()) }) + + it('should not resolve write promise until data is read when writing first', async () => { + let firstWritePromiseResolved = false + void b.write(test.from('11')) + .then(() => { + firstWritePromiseResolved = true + }) + await delay(10) + expect(firstWritePromiseResolved).to.be.false() + + let secondWritePromiseResolved = false + void b.write(test.from('22')) + .then(() => { + secondWritePromiseResolved = true + }) + await delay(10) + expect(secondWritePromiseResolved).to.be.false() + + await expect(b.read()).to.eventually.deep.equal(test.from('11').subarray()) + await delay(10) + expect(firstWritePromiseResolved).to.be.true() + expect(secondWritePromiseResolved).to.be.false() + + await expect(b.read()).to.eventually.deep.equal(test.from('22').subarray()) + await delay(10) + expect(secondWritePromiseResolved).to.be.true() + }) + + it('should not resolve write promise until data is read when reading first', async () => { + const firstReadPromise = b.read() + let firstWritePromiseResolved = false + void b.write(test.from('11')) + .then(() => { + firstWritePromiseResolved = true + }) + await delay(10) + expect(firstWritePromiseResolved).to.be.true() + + const secondReadPromise = b.read() + let secondWritePromiseResolved = false + void b.write(test.from('22')) + .then(() => { + secondWritePromiseResolved = true + }) + await delay(10) + expect(secondWritePromiseResolved).to.be.true() + + await expect(firstReadPromise).to.eventually.deep.equal(test.from('11').subarray()) + await expect(secondReadPromise).to.eventually.deep.equal(test.from('22').subarray()) + }) }) })