Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(node-http-handler): improve stream collection performance #1272

Merged
merged 3 commits into from
May 10, 2024

Conversation

kuhe
Copy link
Contributor

@kuhe kuhe commented May 9, 2024

I noticed stream collectors were doing iffy copy operations during the stream -> Uint8Array process.

@kuhe kuhe requested review from a team as code owners May 9, 2024 20:32
@kuhe kuhe requested a review from JordonPhillips May 9, 2024 20:32
@kuhe
Copy link
Contributor Author

kuhe commented May 9, 2024

doing some more testing of practical scenarios

the existing algorithm pulls ahead if chunk size is very large like 100k, because there are relatively few Uint8Array copies in that case

@kuhe
Copy link
Contributor Author

kuhe commented May 10, 2024

in practical terms it seems S3::getObject response stream alternates between 16384 chunk size (default ReadableStream highwatermark) and 1024 for some reason I don't understand: could be the difference in data rates between the stream source and the reader.

At these chunk sizes, the Array buffering is not faster until approximately 1 million bytes, at which point it suddenly becomes several times (3x to 6x) faster. A breakpoint in internal behavior?

Anyway, I've modified the algo to use an Array but only to collect the chunks. The chunks are then rewritten or moved into a single Uint8Array of known final length rather than repeatedly increasing the size of a discarded Uint8Array.

test code:

collect stream of 1 elements 0.5354740619659424
collect stream (alternate algorithm) of 1 elements 0.0736701488494873
collect stream (alternate algorithm 2) of 1 elements 0.08908200263977051

collect stream of 10 elements 0.5985689163208008
collect stream (alternate algorithm) of 10 elements 0.10771608352661133
collect stream (alternate algorithm 2) of 10 elements 0.1118621826171875

collect stream of 100 elements 0.5355470180511475
collect stream (alternate algorithm) of 100 elements 0.0846397876739502
collect stream (alternate algorithm 2) of 100 elements 0.09865403175354004

collect stream of 1000 elements 0.5386970043182373
collect stream (alternate algorithm) of 1000 elements 0.17470288276672363
collect stream (alternate algorithm 2) of 1000 elements 0.0959329605102539

collect stream of 10000 elements 0.5814781188964844
collect stream (alternate algorithm) of 10000 elements 1.519124984741211
collect stream (alternate algorithm 2) of 10000 elements 0.12869906425476074

collect stream of 100000 elements 1.1589949131011963
collect stream (alternate algorithm) of 100000 elements 11.25875186920166
collect stream (alternate algorithm 2) of 100000 elements 0.3245420455932617

collect stream of 1000000 elements 34.94601106643677
collect stream (alternate algorithm) of 1000000 elements 95.69593620300293
collect stream (alternate algorithm 2) of 1000000 elements 1.0913150310516357

collect stream of 10000000 elements 3237.118015050888
collect stream (alternate algorithm) of 10000000 elements 891.9919929504395
collect stream (alternate algorithm 2) of 10000000 elements 7.6842029094696045
const assert = require("assert");

const length = 100_000_000;
const chunk_size = [1024, 16384];

const streams = [0, 0, 0].map(
  () =>
    new ReadableStream({
      start(controller) {
        let j = 0;
        for (let i = 0; i < length; ) {
          const size = chunk_size[j++ % chunk_size.length];
          controller.enqueue(new Uint8Array(Math.min(size, length - i)).fill(i));
          i += size;
        }
        controller.close();
      },
    })
);

async function collectStream(stream) {
  let res = new Uint8Array(0);
  const reader = stream.getReader();
  let isDone = false;
  while (!isDone) {
    const { done, value } = await reader.read();
    if (value) {
      const prior = res;
      res = new Uint8Array(prior.length + value.length);
      res.set(prior);
      res.set(value, prior.length);
    }
    isDone = done;
  }
  return res;
}

async function collectStream2(stream) {
  const buffer = [];
  const reader = stream.getReader();
  let isDone = false;
  while (!isDone) {
    const { done, value } = await reader.read();
    if (value) {
      buffer.push(...value);
    }
    isDone = done;
  }
  return new Uint8Array(buffer);
}

async function collectStream3(stream) {
  const chunks = [];
  const reader = stream.getReader();
  let isDone = false;
  let length = 0;

  while (!isDone) {
    const { done, value } = await reader.read();
    if (value) {
      chunks.push(value);
      length += value.length;
    }
    isDone = done;
  }

  const collected = new Uint8Array(length);
  let offset = 0;
  for (const chunk of chunks) {
    collected.set(chunk, offset);
    offset += chunk.length;
  }

  return collected;
}

(async () => {
  performance.mark("A");
  const collected = await collectStream(streams[0]);
  performance.mark("B");

  performance.mark("C");
  const collected2 = await collectStream2(streams[1]);
  performance.mark("D");

  performance.mark("E");
  const collected3 = await collectStream3(streams[2]);
  performance.mark("F");

  const m1 = performance.measure(`collect stream of ${length} elements`, "A", "B");
  const m2 = performance.measure(`collect stream (alternate algorithm) of ${length} elements`, "C", "D");
  const m3 = performance.measure(`collect stream (alternate algorithm 2) of ${length} elements`, "E", "F");

  console.log(m1.name, m1.duration);
  console.log(m2.name, m2.duration);
  console.log(m3.name, m3.duration);

  for (let i = 0; i < Math.max(collected.length, collected2.length, collected3.length); ++i) {
    const a = collected[i];
    const b = collected2[i];
    const c = collected3[i];
    assert.strictEqual(a, b, `inequality 1x2 ${a}!=${b} at index ${i}`);
    assert.strictEqual(a, c, `inequality 1x3 ${a}!=${c} at index ${i}`);
  }

  assert.strictEqual(collected.length, length);
  assert.strictEqual(collected2.length, length);
  assert.strictEqual(collected3.length, length);

  /**
   */
})();

@kuhe kuhe merged commit e76e736 into smithy-lang:main May 10, 2024
10 checks passed
@kuhe kuhe deleted the feat/stream branch May 10, 2024 15:21
@trivikr
Copy link
Contributor

trivikr commented May 10, 2024

Algorithm 2 is faster in all cases in Node.js 20, tested with v20.13.1

collect stream of 1 elements 0.3272499999999994
collect stream (alternate algorithm) of 1 elements 0.16454099999999983
collect stream (alternate algorithm 2) of 1 elements 0.052832999999999686

collect stream of 10 elements 0.30933299999999875
collect stream (alternate algorithm) of 10 elements 0.14699999999999847
collect stream (alternate algorithm 2) of 10 elements 0.04962500000000247

collect stream of 100 elements 0.3576250000000023
collect stream (alternate algorithm) of 100 elements 0.15420899999999804
collect stream (alternate algorithm 2) of 100 elements 0.053958000000001505

collect stream of 1000 elements 0.35991700000000293
collect stream (alternate algorithm) of 1000 elements 0.21433299999999988
collect stream (alternate algorithm 2) of 1000 elements 0.05191600000000207

collect stream of 10000 elements 0.39937499999999915
collect stream (alternate algorithm) of 10000 elements 0.8763749999999995
collect stream (alternate algorithm 2) of 10000 elements 0.07458300000000051

I noticed the difference from previous comment only in Node.js 18, tested with v18.20.2

@kuhe
Copy link
Contributor Author

kuhe commented May 10, 2024

Note: this PR's implementation increases memory usage temporarily when collecting a stream.

kuhe added a commit to kuhe/smithy-typescript that referenced this pull request May 28, 2024
…y-lang#1272)

* feat(node-http-handler): improve stream collection performance

* changeset

* feat: update stream-collector implementation
This pull request was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants