Skip to content

Commit

Permalink
stream: support Uint8Array input to methods
Browse files Browse the repository at this point in the history
PR-URL: #11608
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
addaleax authored and jasnell committed May 28, 2017
1 parent 9d922c6 commit 220186c
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 23 deletions.
69 changes: 49 additions & 20 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ There are four fundamental stream types within Node.js:
### Object Mode

All streams created by Node.js APIs operate exclusively on strings and `Buffer`
objects. It is possible, however, for stream implementations to work with other
types of JavaScript values (with the exception of `null`, which serves a special
purpose within streams). Such streams are considered to operate in "object
mode".
(or `Uint8Array`) objects. It is possible, however, for stream implementations
to work with other types of JavaScript values (with the exception of `null`,
which serves a special purpose within streams). Such streams are considered to
operate in "object mode".

Stream instances are switched into object mode using the `objectMode` option
when the stream is created. Attempting to switch an existing stream into
Expand Down Expand Up @@ -352,12 +352,17 @@ See also: [`writable.uncork()`][].
##### writable.end([chunk][, encoding][, callback])
<!-- YAML
added: v0.9.4
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/11608
description: The `chunk` argument can now be a `Uint8Array` instance.
-->

* `chunk` {string|Buffer|any} Optional data to write. For streams not operating
in object mode, `chunk` must be a string or a `Buffer`. For object mode
streams, `chunk` may be any JavaScript value other than `null`.
* `encoding` {string} The encoding, if `chunk` is a String
* `chunk` {string|Buffer|Uint8Array|any} Optional data to write. For streams
not operating in object mode, `chunk` must be a string, `Buffer` or
`Uint8Array`. For object mode streams, `chunk` may be any JavaScript value
other than `null`.
* `encoding` {string} The encoding, if `chunk` is a string
* `callback` {Function} Optional callback for when the stream is finished

Calling the `writable.end()` method signals that no more data will be written
Expand Down Expand Up @@ -434,14 +439,20 @@ See also: [`writable.cork()`][].
<!-- YAML
added: v0.9.4
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/11608
description: The `chunk` argument can now be a `Uint8Array` instance.
- version: v6.0.0
pr-url: https://github.com/nodejs/node/pull/6170
description: Passing `null` as the `chunk` parameter will always be
considered invalid now, even in object mode.
-->

* `chunk` {string|Buffer} The data to write
* `encoding` {string} The encoding, if `chunk` is a String
* `chunk` {string|Buffer|Uint8Array|any} Optional data to write. For streams
not operating in object mode, `chunk` must be a string, `Buffer` or
`Uint8Array`. For object mode streams, `chunk` may be any JavaScript value
other than `null`.
* `encoding` {string} The encoding, if `chunk` is a string
* `callback` {Function} Callback for when this chunk of data is flushed
* Returns: {boolean} `false` if the stream wishes for the calling code to
wait for the `'drain'` event to be emitted before continuing to write
Expand Down Expand Up @@ -985,9 +996,16 @@ setTimeout(() => {
##### readable.unshift(chunk)
<!-- YAML
added: v0.9.11
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/11608
description: The `chunk` argument can now be a `Uint8Array` instance.
-->

* `chunk` {Buffer|string|any} Chunk of data to unshift onto the read queue
* `chunk` {Buffer|Uint8Array|string|any} Chunk of data to unshift onto the
read queue. For streams not operating in object mode, `chunk` must be a
string, `Buffer` or `Uint8Array`. For object mode streams, `chunk` may be
any JavaScript value other than `null`.

The `readable.unshift()` method pushes a chunk of data back into the internal
buffer. This is useful in certain situations where a stream is being consumed by
Expand Down Expand Up @@ -1274,8 +1292,9 @@ constructor and implement the `writable._write()` method. The
Defaults to `true`
* `objectMode` {boolean} Whether or not the
[`stream.write(anyObj)`][stream-write] is a valid operation. When set,
it becomes possible to write JavaScript values other than string or
`Buffer` if supported by the stream implementation. Defaults to `false`
it becomes possible to write JavaScript values other than string,
`Buffer` or `Uint8Array` if supported by the stream implementation.
Defaults to `false`
* `write` {Function} Implementation for the
[`stream._write()`][stream-_write] method.
* `writev` {Function} Implementation for the
Expand Down Expand Up @@ -1564,16 +1583,26 @@ internal to the class that defines it, and should never be called directly by
user programs.

#### readable.push(chunk[, encoding])
<!-- YAML
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/11608
description: The `chunk` argument can now be a `Uint8Array` instance.
-->

* `chunk` {Buffer|null|string|any} Chunk of data to push into the read queue
* `encoding` {string} Encoding of String chunks. Must be a valid
* `chunk` {Buffer|Uint8Array|string|null|any} Chunk of data to push into the
read queue. For streams not operating in object mode, `chunk` must be a
string, `Buffer` or `Uint8Array`. For object mode streams, `chunk` may be
any JavaScript value.
* `encoding` {string} Encoding of string chunks. Must be a valid
Buffer encoding, such as `'utf8'` or `'ascii'`
* Returns {boolean} `true` if additional chunks of data may continued to be
pushed; `false` otherwise.

When `chunk` is not `null`, the `chunk` of data will be added to the
internal queue for users of the stream to consume. Passing `chunk` as `null`
signals the end of the stream (EOF), after which no more data can be written.
When `chunk` is a `Buffer`, `Uint8Array` or `string`, the `chunk` of data will
be added to the internal queue for users of the stream to consume.
Passing `chunk` as `null` signals the end of the stream (EOF), after which no
more data can be written.

When the Readable is operating in paused mode, the data added with
`readable.push()` can be read out by calling the
Expand Down Expand Up @@ -2088,8 +2117,8 @@ Readable stream class internals.

Use of `readable.push('')` is not recommended.

Pushing a zero-byte string or `Buffer` to a stream that is not in object mode
has an interesting side effect. Because it *is* a call to
Pushing a zero-byte string, `Buffer` or `Uint8Array` to a stream that is not in
object mode has an interesting side effect. Because it *is* a call to
[`readable.push()`][stream-push], the call will end the reading process.
However, because the argument is an empty string, no data is added to the
readable buffer so there is nothing for a user to consume.
Expand Down
8 changes: 7 additions & 1 deletion lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,12 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
if (er) {
stream.emit('error', er);
} else if (state.objectMode || chunk && chunk.length > 0) {
if (typeof chunk !== 'string' &&
Object.getPrototypeOf(chunk) !== Buffer.prototype &&
!state.objectMode) {
chunk = Stream._uint8ArrayToBuffer(chunk);
}

if (addToFront) {
if (state.endEmitted)
stream.emit('error', new Error('stream.unshift() after end event'));
Expand Down Expand Up @@ -259,7 +265,7 @@ function addChunk(stream, state, chunk, addToFront) {

function chunkInvalid(state, chunk) {
var er;
if (!(chunk instanceof Buffer) &&
if (!Stream._isUint8Array(chunk) &&
typeof chunk !== 'string' &&
chunk !== undefined &&
!state.objectMode) {
Expand Down
6 changes: 5 additions & 1 deletion lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,11 @@ function validChunk(stream, state, chunk, cb) {
Writable.prototype.write = function(chunk, encoding, cb) {
var state = this._writableState;
var ret = false;
var isBuf = (chunk instanceof Buffer);
var isBuf = Stream._isUint8Array(chunk) && !state.objectMode;

if (isBuf && Object.getPrototypeOf(chunk) !== Buffer.prototype) {
chunk = Stream._uint8ArrayToBuffer(chunk);
}

if (typeof encoding === 'function') {
cb = encoding;
Expand Down
6 changes: 5 additions & 1 deletion lib/internal/streams/BufferList.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

const Buffer = require('buffer').Buffer;

function copyBuffer(src, target, offset) {
Buffer.prototype.copy.call(src, target, offset);
}

module.exports = class BufferList {
constructor() {
this.head = null;
Expand Down Expand Up @@ -63,7 +67,7 @@ module.exports = class BufferList {
var p = this.head;
var i = 0;
while (p) {
p.data.copy(ret, i);
copyBuffer(p.data, ret, i);
i += p.data.length;
p = p.next;
}
Expand Down
37 changes: 37 additions & 0 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

'use strict';

const Buffer = require('buffer').Buffer;

// Note: export Stream before Readable/Writable/Duplex/...
// to avoid a cross-reference(require) issues
const Stream = module.exports = require('internal/streams/legacy');
Expand All @@ -33,3 +35,38 @@ Stream.PassThrough = require('_stream_passthrough');

// Backwards-compat with node 0.4.x
Stream.Stream = Stream;

// Internal utilities
try {
Stream._isUint8Array = process.binding('util').isUint8Array;
} catch (e) {
// This throws for Node < 4.2.0 because there’s no util binding and
// returns undefined for Node < 7.4.0.
}

if (!Stream._isUint8Array) {
Stream._isUint8Array = function _isUint8Array(obj) {
return Object.prototype.toString.call(obj) === '[object Uint8Array]';
};
}

const version = process.version.substr(1).split('.');
if (version[0] === 0 && version[1] < 12) {
Stream._uint8ArrayToBuffer = Buffer;
} else {
try {
const internalBuffer = require('internal/buffer');
Stream._uint8ArrayToBuffer = function _uint8ArrayToBuffer(chunk) {
return new internalBuffer.FastBuffer(chunk.buffer,
chunk.byteOffset,
chunk.byteLength);
};
} catch (e) {
}

if (!Stream._uint8ArrayToBuffer) {
Stream._uint8ArrayToBuffer = function _uint8ArrayToBuffer(chunk) {
return Buffer.prototype.slice.call(chunk);
};
}
}
102 changes: 102 additions & 0 deletions test/parallel/test-stream-uint8array.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const Buffer = require('buffer').Buffer;

const { Readable, Writable } = require('stream');

const ABC = new Uint8Array([0x41, 0x42, 0x43]);
const DEF = new Uint8Array([0x44, 0x45, 0x46]);
const GHI = new Uint8Array([0x47, 0x48, 0x49]);

{
// Simple Writable test.

let n = 0;
const writable = new Writable({
write: common.mustCall((chunk, encoding, cb) => {
assert(chunk instanceof Buffer);
if (n++ === 0) {
assert.strictEqual(String(chunk), 'ABC');
} else {
assert.strictEqual(String(chunk), 'DEF');
}

cb();
}, 2)
});

writable.write(ABC);
writable.end(DEF);
}

{
// Writable test, pass in Uint8Array in object mode.

const writable = new Writable({
objectMode: true,
write: common.mustCall((chunk, encoding, cb) => {
assert(!(chunk instanceof Buffer));
assert(chunk instanceof Uint8Array);
assert.strictEqual(chunk, ABC);
assert.strictEqual(encoding, 'utf8');
cb();
})
});

writable.end(ABC);
}

{
// Writable test, multiple writes carried out via writev.
let callback;

const writable = new Writable({
write: common.mustCall((chunk, encoding, cb) => {
assert(chunk instanceof Buffer);
assert.strictEqual(encoding, 'buffer');
assert.strictEqual(String(chunk), 'ABC');
callback = cb;
}),
writev: common.mustCall((chunks, cb) => {
assert.strictEqual(chunks.length, 2);
assert.strictEqual(chunks[0].encoding, 'buffer');
assert.strictEqual(chunks[1].encoding, 'buffer');
assert.strictEqual(chunks[0].chunk + chunks[1].chunk, 'DEFGHI');
})
});

writable.write(ABC);
writable.write(DEF);
writable.end(GHI);
callback();
}

{
// Simple Readable test.
const readable = new Readable({
read() {}
});

readable.push(DEF);
readable.unshift(ABC);

const buf = readable.read();
assert(buf instanceof Buffer);
assert.deepStrictEqual([...buf], [...ABC, ...DEF]);
}

{
// Readable test, setEncoding.
const readable = new Readable({
read() {}
});

readable.setEncoding('utf8');

readable.push(DEF);
readable.unshift(ABC);

const out = readable.read();
assert.strictEqual(out, 'ABCDEF');
}

0 comments on commit 220186c

Please sign in to comment.