Skip to content

Commit

Permalink
fix: simplify lp stream read/write
Browse files Browse the repository at this point in the history
  • Loading branch information
wemeetagain committed Dec 5, 2023
1 parent 808bd33 commit f99501a
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 12 deletions.
1 change: 0 additions & 1 deletion packages/it-length-prefixed-stream/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@
},
"dependencies": {
"it-byte-stream": "^1.0.0",
"it-length-prefixed": "^9.0.1",
"it-stream-types": "^2.0.1",
"uint8-varint": "^2.0.1",
"uint8arraylist": "^2.4.1"
Expand Down
18 changes: 7 additions & 11 deletions packages/it-length-prefixed-stream/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
* ```
*/
import { byteStream, type ByteStreamOpts } from 'it-byte-stream'
import * as lp from 'it-length-prefixed'
import * as varint from 'uint8-varint'
import { Uint8ArrayList } from 'uint8arraylist'
import type { Duplex } from 'it-stream-types'
Expand Down Expand Up @@ -66,19 +65,14 @@ export interface LengthPrefixedStream <Stream = unknown> {

export interface LengthPrefixedStreamOpts extends ByteStreamOpts {
// encoding opts
lengthEncoder: lp.LengthEncoderFunction
lengthEncoder (value: number): Uint8ArrayList | Uint8Array

// decoding opts
lengthDecoder: lp.LengthDecoderFunction
lengthDecoder (data: Uint8ArrayList): number
maxLengthLength: number
maxDataLength: number
}

const defaultLengthDecoder: lp.LengthDecoderFunction = (buf) => {
return varint.decode(buf)
}
defaultLengthDecoder.bytes = 0

export function lpStream <Stream extends Duplex<any, any, any>> (duplex: Stream, opts: Partial<LengthPrefixedStreamOpts> = {}): LengthPrefixedStream<Stream> {
const bytes = byteStream(duplex, opts)

Expand All @@ -88,11 +82,13 @@ export function lpStream <Stream extends Duplex<any, any, any>> (duplex: Stream,
opts.maxLengthLength = varint.encodingLength(opts.maxDataLength)
}

const decodeLength = opts?.lengthDecoder ?? varint.decode
const encodeLength = opts?.lengthEncoder ?? varint.encode

const W: LengthPrefixedStream<Stream> = {
read: async (options?: AbortOptions) => {
let dataLength: number = -1
const lengthBuffer = new Uint8ArrayList()
const decodeLength = opts?.lengthDecoder ?? defaultLengthDecoder

while (true) {
// read one byte at a time until we can decode a varint
Expand Down Expand Up @@ -125,11 +121,11 @@ export function lpStream <Stream extends Duplex<any, any, any>> (duplex: Stream,
},
write: async (data, options?: AbortOptions) => {
// encode, write
await bytes.write(lp.encode.single(data, opts), options)
await bytes.write(new Uint8ArrayList(encodeLength(data.byteLength), data), options)
},
writeV: async (data, options?: AbortOptions) => {
const list = new Uint8ArrayList(
...data.map(buf => lp.encode.single(buf, opts))
...data.flatMap(buf => ([encodeLength(buf.byteLength), buf]))
)

// encode, write
Expand Down

0 comments on commit f99501a

Please sign in to comment.