Skip to content

Commit

Permalink
fix: add read/write tests for it-byte-stream (#119)
Browse files Browse the repository at this point in the history
To prevent regresssions and ensure the contract is respected.
  • Loading branch information
achingbrain committed Feb 7, 2024
1 parent 6956c38 commit 7dbdd72
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 10 deletions.
1 change: 1 addition & 0 deletions packages/it-byte-stream/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@
},
"devDependencies": {
"aegir": "^41.1.9",
"delay": "^6.0.0",
"it-pair": "^2.0.2",
"uint8arrays": "^5.0.0"
}
Expand Down
24 changes: 14 additions & 10 deletions packages/it-byte-stream/src/pushable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ export interface Pushable<T> extends AsyncGenerator<T, void, unknown> {
}

class QueuelessPushable <T> implements Pushable<T> {
private needNext: DeferredPromise<void>
private readNext: DeferredPromise<void>
private haveNext: DeferredPromise<void>
private ended: boolean
private nextResult: IteratorResult<T> | undefined

constructor () {
this.ended = false

this.needNext = deferred()
this.readNext = deferred()
this.haveNext = deferred()
}

Expand All @@ -41,15 +41,15 @@ class QueuelessPushable <T> implements Pushable<T> {
}

if (this.nextResult == null) {
throw new Error('Have next but next was undefined')
throw new Error('HaveNext promise resolved but nextResult was undefined')
}

const nextResult = this.nextResult
this.nextResult = undefined

// signal to the supplier that we have read the value
this.needNext.resolve()
this.needNext = deferred()
// signal to the supplier that we read the value
this.readNext.resolve()
this.readNext = deferred()

return nextResult
}
Expand Down Expand Up @@ -100,7 +100,11 @@ class QueuelessPushable <T> implements Pushable<T> {

// already have a value, wait for it to be read
if (this.nextResult != null) {
await this.needNext.promise
await this.readNext.promise

if (this.nextResult != null) {
throw new Error('NeedNext promise resolved but nextResult was not consumed')
}
}

if (value != null) {
Expand All @@ -114,10 +118,10 @@ class QueuelessPushable <T> implements Pushable<T> {
this.haveNext.resolve()
this.haveNext = deferred()

// wait for the consumer to read the value or for the passed signal
// to abort the waiting
// wait for the consumer to have finished processing the value and requested
// the next one or for the passed signal to abort the waiting
await raceSignal(
this.needNext.promise,
this.readNext.promise,
options?.signal,
options
)
Expand Down
51 changes: 51 additions & 0 deletions packages/it-byte-stream/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import { Buffer } from 'buffer'
import { expect } from 'aegir/chai'
import delay from 'delay'
import { pair } from 'it-pair'
import { Uint8ArrayList } from 'uint8arraylist'
import { concat as uint8ArrayConcat } from 'uint8arrays/concat'
Expand Down Expand Up @@ -86,5 +87,55 @@ Object.keys(tests).forEach(key => {
expect(r.subarray()).to.equalBytes(r1.subarray())
expect(r.subarray()).to.equalBytes(r2.subarray())
})

it('should not resolve write promise until data is read when writing first', async () => {
let firstWritePromiseResolved = false
void b.write(test.from('11'))
.then(() => {
firstWritePromiseResolved = true
})
await delay(10)
expect(firstWritePromiseResolved).to.be.false()

let secondWritePromiseResolved = false
void b.write(test.from('22'))
.then(() => {
secondWritePromiseResolved = true
})
await delay(10)
expect(secondWritePromiseResolved).to.be.false()

await expect(b.read()).to.eventually.deep.equal(test.from('11').subarray())
await delay(10)
expect(firstWritePromiseResolved).to.be.true()
expect(secondWritePromiseResolved).to.be.false()

await expect(b.read()).to.eventually.deep.equal(test.from('22').subarray())
await delay(10)
expect(secondWritePromiseResolved).to.be.true()
})

it('should not resolve write promise until data is read when reading first', async () => {
const firstReadPromise = b.read()
let firstWritePromiseResolved = false
void b.write(test.from('11'))
.then(() => {
firstWritePromiseResolved = true
})
await delay(10)
expect(firstWritePromiseResolved).to.be.true()

const secondReadPromise = b.read()
let secondWritePromiseResolved = false
void b.write(test.from('22'))
.then(() => {
secondWritePromiseResolved = true
})
await delay(10)
expect(secondWritePromiseResolved).to.be.true()

await expect(firstReadPromise).to.eventually.deep.equal(test.from('11').subarray())
await expect(secondReadPromise).to.eventually.deep.equal(test.from('22').subarray())
})
})
})

0 comments on commit 7dbdd72

Please sign in to comment.