From 2a7aeb3411224db9649e2b6b6307332a1c41fdcc Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 7 May 2024 08:35:28 +0200 Subject: [PATCH 1/4] fix: request abort signal --- lib/api/api-request.js | 31 ++++++++++++----- test/issue-2590.js | 4 +-- test/request-signal.js | 76 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 100 insertions(+), 11 deletions(-) create mode 100644 test/request-signal.js diff --git a/lib/api/api-request.js b/lib/api/api-request.js index e5d598aa6dd..2349c8db233 100644 --- a/lib/api/api-request.js +++ b/lib/api/api-request.js @@ -2,11 +2,10 @@ const assert = require('node:assert') const { Readable } = require('./readable') -const { InvalidArgumentError } = require('../core/errors') +const { InvalidArgumentError, AbortError } = require('../core/errors') const util = require('../core/util') const { getResolveErrorBodyCallback } = require('./util') const { AsyncResource } = require('node:async_hooks') -const { addSignal, removeSignal } = require('./abort-signal') class RequestHandler extends AsyncResource { constructor (opts, callback) { @@ -56,6 +55,7 @@ class RequestHandler extends AsyncResource { this.onInfo = onInfo || null this.throwOnError = throwOnError this.highWaterMark = highWaterMark + this.signal = signal if (util.isStream(body)) { body.on('error', (err) => { @@ -63,12 +63,20 @@ class RequestHandler extends AsyncResource { }) } - addSignal(this, signal) + if (this.signal) { + this.removeAbortListener = util.addAbortListener(this.signal, () => { + if (this.res) { + this.res.destroy(this.signal.reason ?? AbortError()) + } else if (this.abort) { + this.abort(this.signal.reason) + } + }) + } } onConnect (abort, context) { - if (this.reason) { - abort(this.reason) + if (this.signal && this.signal.aborted) { + abort(this.signal.reason) return } @@ -95,6 +103,13 @@ class RequestHandler extends AsyncResource { const contentLength = parsedHeaders['content-length'] const body = new Readable({ resume, abort, contentType, contentLength, highWaterMark }) + if (this.removeAbortListener) { + // TODO (fix): 'close' is sufficient but breaks tests. + body + .on('end', this.removeAbortListener) + .on('error', this.removeAbortListener) + } + this.callback = null this.res = body if (callback !== null) { @@ -123,8 +138,6 @@ class RequestHandler extends AsyncResource { onComplete (trailers) { const { res } = this - removeSignal(this) - util.parseHeaders(trailers, this.trailers) res.push(null) @@ -133,8 +146,6 @@ class RequestHandler extends AsyncResource { onError (err) { const { res, callback, body, opaque } = this - removeSignal(this) - if (callback) { // TODO: Does this need queueMicrotask? this.callback = null @@ -149,6 +160,8 @@ class RequestHandler extends AsyncResource { queueMicrotask(() => { util.destroy(res, err) }) + } else if (this.removeAbortListener) { + this.removeAbortListener() } if (body) { diff --git a/test/issue-2590.js b/test/issue-2590.js index c5499bf4513..1da0b23f20a 100644 --- a/test/issue-2590.js +++ b/test/issue-2590.js @@ -27,12 +27,12 @@ test('aborting request with custom reason', async (t) => { await t.rejects( request(`http://localhost:${server.address().port}`, { signal: ac.signal }), - /Request aborted/ + /Error: aborted/ ) await t.rejects( request(`http://localhost:${server.address().port}`, { signal: ac2.signal }), - { code: 'UND_ERR_ABORTED' } + { name: 'AbortError' } ) await t.completed diff --git a/test/request-signal.js b/test/request-signal.js new file mode 100644 index 00000000000..fd4d2f885a5 --- /dev/null +++ b/test/request-signal.js @@ -0,0 +1,76 @@ +'use strict' + +const { createServer } = require('node:http') +const { test, after } = require('node:test') +const { tspl } = require('@matteo.collina/tspl') +const { request } = require('..') + +test('pre abort signal w/ reason', async (t) => { + t = tspl(t, { plan: 1 }) + + const server = createServer((req, res) => { + res.end('asd') + }) + after(() => server.close()) + + server.listen(0, async () => { + const ac = new AbortController() + const _err = new Error() + ac.abort(_err) + try { + await request(`http://0.0.0.0:${server.address().port}`, { signal: ac.signal }) + } catch (err) { + t.equal(err, _err) + } + }) + await t.completed +}) + +test('post abort signal', async (t) => { + t = tspl(t, { plan: 1 }) + + const server = createServer((req, res) => { + res.end('asd') + }) + after(() => server.close()) + + server.listen(0, async () => { + const ac = new AbortController() + const ures = await request(`http://0.0.0.0:${server.address().port}`, { signal: ac.signal }) + ac.abort() + try { + /* eslint-disable-next-line no-unused-vars */ + for await (const chunk of ures.body) { + // Do nothing... + } + } catch (err) { + t.equal(err.name, 'AbortError') + } + }) + await t.completed +}) + +test('post abort signal w/ reason', async (t) => { + t = tspl(t, { plan: 1 }) + + const server = createServer((req, res) => { + res.end('asd') + }) + after(() => server.close()) + + server.listen(0, async () => { + const ac = new AbortController() + const _err = new Error() + const ures = await request(`http://0.0.0.0:${server.address().port}`, { signal: ac.signal }) + ac.abort(_err) + try { + /* eslint-disable-next-line no-unused-vars */ + for await (const chunk of ures.body) { + // Do nothing... + } + } catch (err) { + t.equal(err, _err) + } + }) + await t.completed +}) From 46803c658c54a8455212a1679a6b125dc9fda781 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 7 May 2024 09:46:20 +0200 Subject: [PATCH 2/4] fixup --- lib/api/api-request.js | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/lib/api/api-request.js b/lib/api/api-request.js index 2349c8db233..3909140d05f 100644 --- a/lib/api/api-request.js +++ b/lib/api/api-request.js @@ -56,6 +56,7 @@ class RequestHandler extends AsyncResource { this.throwOnError = throwOnError this.highWaterMark = highWaterMark this.signal = signal + this.reason = null if (util.isStream(body)) { body.on('error', (err) => { @@ -66,17 +67,20 @@ class RequestHandler extends AsyncResource { if (this.signal) { this.removeAbortListener = util.addAbortListener(this.signal, () => { if (this.res) { - this.res.destroy(this.signal.reason ?? AbortError()) + util.destroy(this.res, this.signal.reason ?? new AbortError()) } else if (this.abort) { this.abort(this.signal.reason) + } else { + this.reason = new AbortError() } }) + this.reason = this.signal.reason } } onConnect (abort, context) { - if (this.signal && this.signal.aborted) { - abort(this.signal.reason) + if (this.reason) { + abort(this.reason) return } From 939cf3f4263adfd495e2a519708eeaa3916ba8a3 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 7 May 2024 10:03:53 +0200 Subject: [PATCH 3/4] fixup --- lib/api/api-request.js | 22 +-- test/client-request.js | 78 +++++----- test/pool.js | 280 +++++++++++++++++------------------ test/request-timeout.js | 318 ++++++++++++++++++++-------------------- 4 files changed, 350 insertions(+), 348 deletions(-) diff --git a/lib/api/api-request.js b/lib/api/api-request.js index 3909140d05f..553bc615dcc 100644 --- a/lib/api/api-request.js +++ b/lib/api/api-request.js @@ -65,16 +65,18 @@ class RequestHandler extends AsyncResource { } if (this.signal) { - this.removeAbortListener = util.addAbortListener(this.signal, () => { - if (this.res) { - util.destroy(this.res, this.signal.reason ?? new AbortError()) - } else if (this.abort) { - this.abort(this.signal.reason) - } else { - this.reason = new AbortError() - } - }) - this.reason = this.signal.reason + if (this.signal.aborted) { + this.reason = this.signal.reason ?? new AbortError() + } else { + this.removeAbortListener = util.addAbortListener(this.signal, () => { + this.reason = this.signal.reason ?? new AbortError() + if (this.res) { + util.destroy(this.res, this.reason) + } else if (this.abort) { + this.abort(this.reason) + } + }) + } } } diff --git a/test/client-request.js b/test/client-request.js index 2bd38201a72..116fd649af7 100644 --- a/test/client-request.js +++ b/test/client-request.js @@ -135,45 +135,45 @@ test('request hwm', async (t) => { await t.completed }) -test('request abort before headers', async (t) => { - t = tspl(t, { plan: 6 }) - - const signal = new EE() - const server = createServer((req, res) => { - res.end('hello') - signal.emit('abort') - }) - after(() => server.close()) - - server.listen(0, () => { - const client = new Client(`http://localhost:${server.address().port}`) - after(() => client.destroy()) - - client[kConnect](() => { - client.request({ - path: '/', - method: 'GET', - signal - }, (err) => { - t.ok(err instanceof errors.RequestAbortedError) - t.strictEqual(signal.listenerCount('abort'), 0) - }) - t.strictEqual(signal.listenerCount('abort'), 1) - - client.request({ - path: '/', - method: 'GET', - signal - }, (err) => { - t.ok(err instanceof errors.RequestAbortedError) - t.strictEqual(signal.listenerCount('abort'), 0) - }) - t.strictEqual(signal.listenerCount('abort'), 2) - }) - }) - - await t.completed -}) +// test('request abort before headers', async (t) => { +// t = tspl(t, { plan: 6 }) + +// const signal = new EE() +// const server = createServer((req, res) => { +// res.end('hello') +// signal.emit('abort') +// }) +// after(() => server.close()) + +// server.listen(0, () => { +// const client = new Client(`http://localhost:${server.address().port}`) +// after(() => client.destroy()) + +// client[kConnect](() => { +// client.request({ +// path: '/', +// method: 'GET', +// signal +// }, (err) => { +// t.ok(err instanceof errors.RequestAbortedError) +// t.strictEqual(signal.listenerCount('abort'), 0) +// }) +// t.strictEqual(signal.listenerCount('abort'), 1) + +// client.request({ +// path: '/', +// method: 'GET', +// signal +// }, (err) => { +// t.ok(err instanceof errors.RequestAbortedError) +// t.strictEqual(signal.listenerCount('abort'), 0) +// }) +// t.strictEqual(signal.listenerCount('abort'), 2) +// }) +// }) + +// await t.completed +// }) test('request body destroyed on invalid callback', async (t) => { t = tspl(t, { plan: 1 }) diff --git a/test/pool.js b/test/pool.js index b75cd530d43..d9e1eefd6b3 100644 --- a/test/pool.js +++ b/test/pool.js @@ -781,146 +781,146 @@ test('pool dispatch error', async (t) => { await t.completed }) -test('pool request abort in queue', async (t) => { - t = tspl(t, { plan: 3 }) - - const server = createServer((req, res) => { - res.end('asd') - }) - after(() => server.close()) - - server.listen(0, async () => { - const client = new Pool(`http://localhost:${server.address().port}`, { - connections: 1, - pipelining: 1 - }) - after(() => client.close()) - - client.dispatch({ - path: '/', - method: 'GET' - }, { - onConnect () { - }, - onHeaders (statusCode, headers) { - t.strictEqual(statusCode, 200) - }, - onData (chunk) { - }, - onComplete () { - t.ok(true, 'pass') - }, - onError () { - } - }) - - const signal = new EventEmitter() - client.request({ - path: '/', - method: 'GET', - signal - }, (err) => { - t.strictEqual(err.code, 'UND_ERR_ABORTED') - }) - signal.emit('abort') - }) - - await t.completed -}) - -test('pool stream abort in queue', async (t) => { - t = tspl(t, { plan: 3 }) - - const server = createServer((req, res) => { - res.end('asd') - }) - after(() => server.close()) - - server.listen(0, async () => { - const client = new Pool(`http://localhost:${server.address().port}`, { - connections: 1, - pipelining: 1 - }) - after(() => client.close()) - - client.dispatch({ - path: '/', - method: 'GET' - }, { - onConnect () { - }, - onHeaders (statusCode, headers) { - t.strictEqual(statusCode, 200) - }, - onData (chunk) { - }, - onComplete () { - t.ok(true, 'pass') - }, - onError () { - } - }) - - const signal = new EventEmitter() - client.stream({ - path: '/', - method: 'GET', - signal - }, ({ body }) => body, (err) => { - t.strictEqual(err.code, 'UND_ERR_ABORTED') - }) - signal.emit('abort') - }) - - await t.completed -}) - -test('pool pipeline abort in queue', async (t) => { - t = tspl(t, { plan: 3 }) - - const server = createServer((req, res) => { - res.end('asd') - }) - after(() => server.close()) - - server.listen(0, async () => { - const client = new Pool(`http://localhost:${server.address().port}`, { - connections: 1, - pipelining: 1 - }) - after(() => client.close()) - - client.dispatch({ - path: '/', - method: 'GET' - }, { - onConnect () { - }, - onHeaders (statusCode, headers) { - t.strictEqual(statusCode, 200) - }, - onData (chunk) { - }, - onComplete () { - t.ok(true, 'pass') - }, - onError () { - } - }) - - const signal = new EventEmitter() - client.pipeline({ - path: '/', - method: 'GET', - signal - }, ({ body }) => body).end().on('error', (err) => { - t.strictEqual(err.code, 'UND_ERR_ABORTED') - }) - signal.emit('abort') - }) - - await t.completed -}) +// test('pool request abort in queue', async (t) => { +// t = tspl(t, { plan: 3 }) + +// const server = createServer((req, res) => { +// res.end('asd') +// }) +// after(() => server.close()) + +// server.listen(0, async () => { +// const client = new Pool(`http://localhost:${server.address().port}`, { +// connections: 1, +// pipelining: 1 +// }) +// after(() => client.close()) + +// client.dispatch({ +// path: '/', +// method: 'GET' +// }, { +// onConnect () { +// }, +// onHeaders (statusCode, headers) { +// t.strictEqual(statusCode, 200) +// }, +// onData (chunk) { +// }, +// onComplete () { +// t.ok(true, 'pass') +// }, +// onError () { +// } +// }) + +// const signal = new EventEmitter() +// client.request({ +// path: '/', +// method: 'GET', +// signal +// }, (err) => { +// t.strictEqual(err.code, 'UND_ERR_ABORTED') +// }) +// signal.emit('abort') +// }) + +// await t.completed +// }) + +// test('pool stream abort in queue', async (t) => { +// t = tspl(t, { plan: 3 }) + +// const server = createServer((req, res) => { +// res.end('asd') +// }) +// after(() => server.close()) + +// server.listen(0, async () => { +// const client = new Pool(`http://localhost:${server.address().port}`, { +// connections: 1, +// pipelining: 1 +// }) +// after(() => client.close()) + +// client.dispatch({ +// path: '/', +// method: 'GET' +// }, { +// onConnect () { +// }, +// onHeaders (statusCode, headers) { +// t.strictEqual(statusCode, 200) +// }, +// onData (chunk) { +// }, +// onComplete () { +// t.ok(true, 'pass') +// }, +// onError () { +// } +// }) + +// const signal = new EventEmitter() +// client.stream({ +// path: '/', +// method: 'GET', +// signal +// }, ({ body }) => body, (err) => { +// t.strictEqual(err.code, 'UND_ERR_ABORTED') +// }) +// signal.emit('abort') +// }) + +// await t.completed +// }) + +// test('pool pipeline abort in queue', async (t) => { +// t = tspl(t, { plan: 3 }) + +// const server = createServer((req, res) => { +// res.end('asd') +// }) +// after(() => server.close()) + +// server.listen(0, async () => { +// const client = new Pool(`http://localhost:${server.address().port}`, { +// connections: 1, +// pipelining: 1 +// }) +// after(() => client.close()) + +// client.dispatch({ +// path: '/', +// method: 'GET' +// }, { +// onConnect () { +// }, +// onHeaders (statusCode, headers) { +// t.strictEqual(statusCode, 200) +// }, +// onData (chunk) { +// }, +// onComplete () { +// t.ok(true, 'pass') +// }, +// onError () { +// } +// }) + +// const signal = new EventEmitter() +// client.pipeline({ +// path: '/', +// method: 'GET', +// signal +// }, ({ body }) => body).end().on('error', (err) => { +// t.strictEqual(err.code, 'UND_ERR_ABORTED') +// }) +// signal.emit('abort') +// }) + +// await t.completed +// }) test('pool stream constructor error destroy body', async (t) => { t = tspl(t, { plan: 4 }) diff --git a/test/request-timeout.js b/test/request-timeout.js index 03d34c9bef5..34fe48c3ee5 100644 --- a/test/request-timeout.js +++ b/test/request-timeout.js @@ -178,165 +178,165 @@ test('overridden body timeout', async (t) => { await t.completed }) -test('With EE signal', async (t) => { - t = tspl(t, { plan: 1 }) - - const clock = FakeTimers.install({ - shouldClearNativeTimers: true, - toFake: ['setTimeout', 'clearTimeout'] - }) - after(() => clock.uninstall()) - - const orgTimers = { ...timers } - Object.assign(timers, { setTimeout, clearTimeout }) - after(() => { - Object.assign(timers, orgTimers) - }) - - const server = createServer((req, res) => { - setTimeout(() => { - res.end('hello') - }, 100) - clock.tick(100) - }) - after(() => server.close()) - - server.listen(0, () => { - const client = new Client(`http://localhost:${server.address().port}`, { - headersTimeout: 50 - }) - const ee = new EventEmitter() - after(() => client.destroy()) - - client.request({ path: '/', method: 'GET', signal: ee }, (err, response) => { - t.ok(err instanceof errors.HeadersTimeoutError) - }) - - clock.tick(50) - }) - - await t.completed -}) - -test('With abort-controller signal', async (t) => { - t = tspl(t, { plan: 1 }) - - const clock = FakeTimers.install({ - shouldClearNativeTimers: true, - toFake: ['setTimeout', 'clearTimeout'] - }) - after(() => clock.uninstall()) - - const orgTimers = { ...timers } - Object.assign(timers, { setTimeout, clearTimeout }) - after(() => { - Object.assign(timers, orgTimers) - }) - - const server = createServer((req, res) => { - setTimeout(() => { - res.end('hello') - }, 100) - clock.tick(100) - }) - after(() => server.close()) - - server.listen(0, () => { - const client = new Client(`http://localhost:${server.address().port}`, { - headersTimeout: 50 - }) - const abortController = new AbortController() - after(() => client.destroy()) - - client.request({ path: '/', method: 'GET', signal: abortController.signal }, (err, response) => { - t.ok(err instanceof errors.HeadersTimeoutError) - }) - - clock.tick(50) - }) - - await t.completed -}) - -test('Abort before timeout (EE)', async (t) => { - t = tspl(t, { plan: 1 }) - - const clock = FakeTimers.install({ - shouldClearNativeTimers: true, - toFake: ['setTimeout', 'clearTimeout'] - }) - after(() => clock.uninstall()) - - const orgTimers = { ...timers } - Object.assign(timers, { setTimeout, clearTimeout }) - after(() => { - Object.assign(timers, orgTimers) - }) - - const ee = new EventEmitter() - const server = createServer((req, res) => { - setTimeout(() => { - res.end('hello') - }, 100) - ee.emit('abort') - clock.tick(50) - }) - after(() => server.close()) - - server.listen(0, () => { - const client = new Client(`http://localhost:${server.address().port}`, { - headersTimeout: 50 - }) - after(() => client.destroy()) - - client.request({ path: '/', method: 'GET', signal: ee }, (err, response) => { - t.ok(err instanceof errors.RequestAbortedError) - clock.tick(100) - }) - }) - - await t.completed -}) - -test('Abort before timeout (abort-controller)', async (t) => { - t = tspl(t, { plan: 1 }) - - const clock = FakeTimers.install({ - shouldClearNativeTimers: true, - toFake: ['setTimeout', 'clearTimeout'] - }) - after(() => clock.uninstall()) - - const orgTimers = { ...timers } - Object.assign(timers, { setTimeout, clearTimeout }) - after(() => { - Object.assign(timers, orgTimers) - }) - - const abortController = new AbortController() - const server = createServer((req, res) => { - setTimeout(() => { - res.end('hello') - }, 100) - abortController.abort() - clock.tick(50) - }) - after(() => server.close()) - - server.listen(0, () => { - const client = new Client(`http://localhost:${server.address().port}`, { - headersTimeout: 50 - }) - after(() => client.destroy()) - - client.request({ path: '/', method: 'GET', signal: abortController.signal }, (err, response) => { - t.ok(err instanceof errors.RequestAbortedError) - clock.tick(100) - }) - }) - - await t.completed -}) +// test('With EE signal', async (t) => { +// t = tspl(t, { plan: 1 }) + +// const clock = FakeTimers.install({ +// shouldClearNativeTimers: true, +// toFake: ['setTimeout', 'clearTimeout'] +// }) +// after(() => clock.uninstall()) + +// const orgTimers = { ...timers } +// Object.assign(timers, { setTimeout, clearTimeout }) +// after(() => { +// Object.assign(timers, orgTimers) +// }) + +// const server = createServer((req, res) => { +// setTimeout(() => { +// res.end('hello') +// }, 100) +// clock.tick(100) +// }) +// after(() => server.close()) + +// server.listen(0, () => { +// const client = new Client(`http://localhost:${server.address().port}`, { +// headersTimeout: 50 +// }) +// const ee = new EventEmitter() +// after(() => client.destroy()) + +// client.request({ path: '/', method: 'GET', signal: ee }, (err, response) => { +// t.ok(err instanceof errors.HeadersTimeoutError) +// }) + +// clock.tick(50) +// }) + +// await t.completed +// }) + +// test('With abort-controller signal', async (t) => { +// t = tspl(t, { plan: 1 }) + +// const clock = FakeTimers.install({ +// shouldClearNativeTimers: true, +// toFake: ['setTimeout', 'clearTimeout'] +// }) +// after(() => clock.uninstall()) + +// const orgTimers = { ...timers } +// Object.assign(timers, { setTimeout, clearTimeout }) +// after(() => { +// Object.assign(timers, orgTimers) +// }) + +// const server = createServer((req, res) => { +// setTimeout(() => { +// res.end('hello') +// }, 100) +// clock.tick(100) +// }) +// after(() => server.close()) + +// server.listen(0, () => { +// const client = new Client(`http://localhost:${server.address().port}`, { +// headersTimeout: 50 +// }) +// const abortController = new AbortController() +// after(() => client.destroy()) + +// client.request({ path: '/', method: 'GET', signal: abortController.signal }, (err, response) => { +// t.ok(err instanceof errors.HeadersTimeoutError) +// }) + +// clock.tick(50) +// }) + +// await t.completed +// }) + +// test('Abort before timeout (EE)', async (t) => { +// t = tspl(t, { plan: 1 }) + +// const clock = FakeTimers.install({ +// shouldClearNativeTimers: true, +// toFake: ['setTimeout', 'clearTimeout'] +// }) +// after(() => clock.uninstall()) + +// const orgTimers = { ...timers } +// Object.assign(timers, { setTimeout, clearTimeout }) +// after(() => { +// Object.assign(timers, orgTimers) +// }) + +// const ee = new EventEmitter() +// const server = createServer((req, res) => { +// setTimeout(() => { +// res.end('hello') +// }, 100) +// ee.emit('abort') +// clock.tick(50) +// }) +// after(() => server.close()) + +// server.listen(0, () => { +// const client = new Client(`http://localhost:${server.address().port}`, { +// headersTimeout: 50 +// }) +// after(() => client.destroy()) + +// client.request({ path: '/', method: 'GET', signal: ee }, (err, response) => { +// t.ok(err instanceof errors.RequestAbortedError) +// clock.tick(100) +// }) +// }) + +// await t.completed +// }) + +// test('Abort before timeout (abort-controller)', async (t) => { +// t = tspl(t, { plan: 1 }) + +// const clock = FakeTimers.install({ +// shouldClearNativeTimers: true, +// toFake: ['setTimeout', 'clearTimeout'] +// }) +// after(() => clock.uninstall()) + +// const orgTimers = { ...timers } +// Object.assign(timers, { setTimeout, clearTimeout }) +// after(() => { +// Object.assign(timers, orgTimers) +// }) + +// const abortController = new AbortController() +// const server = createServer((req, res) => { +// setTimeout(() => { +// res.end('hello') +// }, 100) +// abortController.abort() +// clock.tick(50) +// }) +// after(() => server.close()) + +// server.listen(0, () => { +// const client = new Client(`http://localhost:${server.address().port}`, { +// headersTimeout: 50 +// }) +// after(() => client.destroy()) + +// client.request({ path: '/', method: 'GET', signal: abortController.signal }, (err, response) => { +// t.ok(err instanceof errors.RequestAbortedError) +// clock.tick(100) +// }) +// }) + +// await t.completed +// }) test('Timeout with pipelining', async (t) => { t = tspl(t, { plan: 3 }) From 1f28af4309465b796e14e4c749bb03b636a6b188 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 7 May 2024 10:16:03 +0200 Subject: [PATCH 4/4] fixup --- lib/api/api-request.js | 15 +- test/client-request.js | 78 +++++----- test/pool.js | 280 +++++++++++++++++------------------ test/request-timeout.js | 318 ++++++++++++++++++++-------------------- 4 files changed, 348 insertions(+), 343 deletions(-) diff --git a/lib/api/api-request.js b/lib/api/api-request.js index 553bc615dcc..f70f351f2dc 100644 --- a/lib/api/api-request.js +++ b/lib/api/api-request.js @@ -2,7 +2,7 @@ const assert = require('node:assert') const { Readable } = require('./readable') -const { InvalidArgumentError, AbortError } = require('../core/errors') +const { InvalidArgumentError, RequestAbortedError } = require('../core/errors') const util = require('../core/util') const { getResolveErrorBodyCallback } = require('./util') const { AsyncResource } = require('node:async_hooks') @@ -57,6 +57,7 @@ class RequestHandler extends AsyncResource { this.highWaterMark = highWaterMark this.signal = signal this.reason = null + this.removeAbortListener = null if (util.isStream(body)) { body.on('error', (err) => { @@ -66,10 +67,13 @@ class RequestHandler extends AsyncResource { if (this.signal) { if (this.signal.aborted) { - this.reason = this.signal.reason ?? new AbortError() + this.reason = this.signal.reason ?? new RequestAbortedError() } else { this.removeAbortListener = util.addAbortListener(this.signal, () => { - this.reason = this.signal.reason ?? new AbortError() + this.removeAbortListener?.() + this.removeAbortListener = null + + this.reason = this.signal.reason ?? new RequestAbortedError() if (this.res) { util.destroy(this.res, this.reason) } else if (this.abort) { @@ -152,6 +156,9 @@ class RequestHandler extends AsyncResource { onError (err) { const { res, callback, body, opaque } = this + this.removeAbortListener?.() + this.removeAbortListener = null + if (callback) { // TODO: Does this need queueMicrotask? this.callback = null @@ -166,8 +173,6 @@ class RequestHandler extends AsyncResource { queueMicrotask(() => { util.destroy(res, err) }) - } else if (this.removeAbortListener) { - this.removeAbortListener() } if (body) { diff --git a/test/client-request.js b/test/client-request.js index 116fd649af7..2bd38201a72 100644 --- a/test/client-request.js +++ b/test/client-request.js @@ -135,45 +135,45 @@ test('request hwm', async (t) => { await t.completed }) -// test('request abort before headers', async (t) => { -// t = tspl(t, { plan: 6 }) - -// const signal = new EE() -// const server = createServer((req, res) => { -// res.end('hello') -// signal.emit('abort') -// }) -// after(() => server.close()) - -// server.listen(0, () => { -// const client = new Client(`http://localhost:${server.address().port}`) -// after(() => client.destroy()) - -// client[kConnect](() => { -// client.request({ -// path: '/', -// method: 'GET', -// signal -// }, (err) => { -// t.ok(err instanceof errors.RequestAbortedError) -// t.strictEqual(signal.listenerCount('abort'), 0) -// }) -// t.strictEqual(signal.listenerCount('abort'), 1) - -// client.request({ -// path: '/', -// method: 'GET', -// signal -// }, (err) => { -// t.ok(err instanceof errors.RequestAbortedError) -// t.strictEqual(signal.listenerCount('abort'), 0) -// }) -// t.strictEqual(signal.listenerCount('abort'), 2) -// }) -// }) - -// await t.completed -// }) +test('request abort before headers', async (t) => { + t = tspl(t, { plan: 6 }) + + const signal = new EE() + const server = createServer((req, res) => { + res.end('hello') + signal.emit('abort') + }) + after(() => server.close()) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + after(() => client.destroy()) + + client[kConnect](() => { + client.request({ + path: '/', + method: 'GET', + signal + }, (err) => { + t.ok(err instanceof errors.RequestAbortedError) + t.strictEqual(signal.listenerCount('abort'), 0) + }) + t.strictEqual(signal.listenerCount('abort'), 1) + + client.request({ + path: '/', + method: 'GET', + signal + }, (err) => { + t.ok(err instanceof errors.RequestAbortedError) + t.strictEqual(signal.listenerCount('abort'), 0) + }) + t.strictEqual(signal.listenerCount('abort'), 2) + }) + }) + + await t.completed +}) test('request body destroyed on invalid callback', async (t) => { t = tspl(t, { plan: 1 }) diff --git a/test/pool.js b/test/pool.js index d9e1eefd6b3..b75cd530d43 100644 --- a/test/pool.js +++ b/test/pool.js @@ -781,146 +781,146 @@ test('pool dispatch error', async (t) => { await t.completed }) -// test('pool request abort in queue', async (t) => { -// t = tspl(t, { plan: 3 }) - -// const server = createServer((req, res) => { -// res.end('asd') -// }) -// after(() => server.close()) - -// server.listen(0, async () => { -// const client = new Pool(`http://localhost:${server.address().port}`, { -// connections: 1, -// pipelining: 1 -// }) -// after(() => client.close()) - -// client.dispatch({ -// path: '/', -// method: 'GET' -// }, { -// onConnect () { -// }, -// onHeaders (statusCode, headers) { -// t.strictEqual(statusCode, 200) -// }, -// onData (chunk) { -// }, -// onComplete () { -// t.ok(true, 'pass') -// }, -// onError () { -// } -// }) - -// const signal = new EventEmitter() -// client.request({ -// path: '/', -// method: 'GET', -// signal -// }, (err) => { -// t.strictEqual(err.code, 'UND_ERR_ABORTED') -// }) -// signal.emit('abort') -// }) - -// await t.completed -// }) - -// test('pool stream abort in queue', async (t) => { -// t = tspl(t, { plan: 3 }) - -// const server = createServer((req, res) => { -// res.end('asd') -// }) -// after(() => server.close()) - -// server.listen(0, async () => { -// const client = new Pool(`http://localhost:${server.address().port}`, { -// connections: 1, -// pipelining: 1 -// }) -// after(() => client.close()) - -// client.dispatch({ -// path: '/', -// method: 'GET' -// }, { -// onConnect () { -// }, -// onHeaders (statusCode, headers) { -// t.strictEqual(statusCode, 200) -// }, -// onData (chunk) { -// }, -// onComplete () { -// t.ok(true, 'pass') -// }, -// onError () { -// } -// }) - -// const signal = new EventEmitter() -// client.stream({ -// path: '/', -// method: 'GET', -// signal -// }, ({ body }) => body, (err) => { -// t.strictEqual(err.code, 'UND_ERR_ABORTED') -// }) -// signal.emit('abort') -// }) - -// await t.completed -// }) - -// test('pool pipeline abort in queue', async (t) => { -// t = tspl(t, { plan: 3 }) - -// const server = createServer((req, res) => { -// res.end('asd') -// }) -// after(() => server.close()) - -// server.listen(0, async () => { -// const client = new Pool(`http://localhost:${server.address().port}`, { -// connections: 1, -// pipelining: 1 -// }) -// after(() => client.close()) - -// client.dispatch({ -// path: '/', -// method: 'GET' -// }, { -// onConnect () { -// }, -// onHeaders (statusCode, headers) { -// t.strictEqual(statusCode, 200) -// }, -// onData (chunk) { -// }, -// onComplete () { -// t.ok(true, 'pass') -// }, -// onError () { -// } -// }) - -// const signal = new EventEmitter() -// client.pipeline({ -// path: '/', -// method: 'GET', -// signal -// }, ({ body }) => body).end().on('error', (err) => { -// t.strictEqual(err.code, 'UND_ERR_ABORTED') -// }) -// signal.emit('abort') -// }) - -// await t.completed -// }) +test('pool request abort in queue', async (t) => { + t = tspl(t, { plan: 3 }) + + const server = createServer((req, res) => { + res.end('asd') + }) + after(() => server.close()) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`, { + connections: 1, + pipelining: 1 + }) + after(() => client.close()) + + client.dispatch({ + path: '/', + method: 'GET' + }, { + onConnect () { + }, + onHeaders (statusCode, headers) { + t.strictEqual(statusCode, 200) + }, + onData (chunk) { + }, + onComplete () { + t.ok(true, 'pass') + }, + onError () { + } + }) + + const signal = new EventEmitter() + client.request({ + path: '/', + method: 'GET', + signal + }, (err) => { + t.strictEqual(err.code, 'UND_ERR_ABORTED') + }) + signal.emit('abort') + }) + + await t.completed +}) + +test('pool stream abort in queue', async (t) => { + t = tspl(t, { plan: 3 }) + + const server = createServer((req, res) => { + res.end('asd') + }) + after(() => server.close()) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`, { + connections: 1, + pipelining: 1 + }) + after(() => client.close()) + + client.dispatch({ + path: '/', + method: 'GET' + }, { + onConnect () { + }, + onHeaders (statusCode, headers) { + t.strictEqual(statusCode, 200) + }, + onData (chunk) { + }, + onComplete () { + t.ok(true, 'pass') + }, + onError () { + } + }) + + const signal = new EventEmitter() + client.stream({ + path: '/', + method: 'GET', + signal + }, ({ body }) => body, (err) => { + t.strictEqual(err.code, 'UND_ERR_ABORTED') + }) + signal.emit('abort') + }) + + await t.completed +}) + +test('pool pipeline abort in queue', async (t) => { + t = tspl(t, { plan: 3 }) + + const server = createServer((req, res) => { + res.end('asd') + }) + after(() => server.close()) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`, { + connections: 1, + pipelining: 1 + }) + after(() => client.close()) + + client.dispatch({ + path: '/', + method: 'GET' + }, { + onConnect () { + }, + onHeaders (statusCode, headers) { + t.strictEqual(statusCode, 200) + }, + onData (chunk) { + }, + onComplete () { + t.ok(true, 'pass') + }, + onError () { + } + }) + + const signal = new EventEmitter() + client.pipeline({ + path: '/', + method: 'GET', + signal + }, ({ body }) => body).end().on('error', (err) => { + t.strictEqual(err.code, 'UND_ERR_ABORTED') + }) + signal.emit('abort') + }) + + await t.completed +}) test('pool stream constructor error destroy body', async (t) => { t = tspl(t, { plan: 4 }) diff --git a/test/request-timeout.js b/test/request-timeout.js index 34fe48c3ee5..03d34c9bef5 100644 --- a/test/request-timeout.js +++ b/test/request-timeout.js @@ -178,165 +178,165 @@ test('overridden body timeout', async (t) => { await t.completed }) -// test('With EE signal', async (t) => { -// t = tspl(t, { plan: 1 }) - -// const clock = FakeTimers.install({ -// shouldClearNativeTimers: true, -// toFake: ['setTimeout', 'clearTimeout'] -// }) -// after(() => clock.uninstall()) - -// const orgTimers = { ...timers } -// Object.assign(timers, { setTimeout, clearTimeout }) -// after(() => { -// Object.assign(timers, orgTimers) -// }) - -// const server = createServer((req, res) => { -// setTimeout(() => { -// res.end('hello') -// }, 100) -// clock.tick(100) -// }) -// after(() => server.close()) - -// server.listen(0, () => { -// const client = new Client(`http://localhost:${server.address().port}`, { -// headersTimeout: 50 -// }) -// const ee = new EventEmitter() -// after(() => client.destroy()) - -// client.request({ path: '/', method: 'GET', signal: ee }, (err, response) => { -// t.ok(err instanceof errors.HeadersTimeoutError) -// }) - -// clock.tick(50) -// }) - -// await t.completed -// }) - -// test('With abort-controller signal', async (t) => { -// t = tspl(t, { plan: 1 }) - -// const clock = FakeTimers.install({ -// shouldClearNativeTimers: true, -// toFake: ['setTimeout', 'clearTimeout'] -// }) -// after(() => clock.uninstall()) - -// const orgTimers = { ...timers } -// Object.assign(timers, { setTimeout, clearTimeout }) -// after(() => { -// Object.assign(timers, orgTimers) -// }) - -// const server = createServer((req, res) => { -// setTimeout(() => { -// res.end('hello') -// }, 100) -// clock.tick(100) -// }) -// after(() => server.close()) - -// server.listen(0, () => { -// const client = new Client(`http://localhost:${server.address().port}`, { -// headersTimeout: 50 -// }) -// const abortController = new AbortController() -// after(() => client.destroy()) - -// client.request({ path: '/', method: 'GET', signal: abortController.signal }, (err, response) => { -// t.ok(err instanceof errors.HeadersTimeoutError) -// }) - -// clock.tick(50) -// }) - -// await t.completed -// }) - -// test('Abort before timeout (EE)', async (t) => { -// t = tspl(t, { plan: 1 }) - -// const clock = FakeTimers.install({ -// shouldClearNativeTimers: true, -// toFake: ['setTimeout', 'clearTimeout'] -// }) -// after(() => clock.uninstall()) - -// const orgTimers = { ...timers } -// Object.assign(timers, { setTimeout, clearTimeout }) -// after(() => { -// Object.assign(timers, orgTimers) -// }) - -// const ee = new EventEmitter() -// const server = createServer((req, res) => { -// setTimeout(() => { -// res.end('hello') -// }, 100) -// ee.emit('abort') -// clock.tick(50) -// }) -// after(() => server.close()) - -// server.listen(0, () => { -// const client = new Client(`http://localhost:${server.address().port}`, { -// headersTimeout: 50 -// }) -// after(() => client.destroy()) - -// client.request({ path: '/', method: 'GET', signal: ee }, (err, response) => { -// t.ok(err instanceof errors.RequestAbortedError) -// clock.tick(100) -// }) -// }) - -// await t.completed -// }) - -// test('Abort before timeout (abort-controller)', async (t) => { -// t = tspl(t, { plan: 1 }) - -// const clock = FakeTimers.install({ -// shouldClearNativeTimers: true, -// toFake: ['setTimeout', 'clearTimeout'] -// }) -// after(() => clock.uninstall()) - -// const orgTimers = { ...timers } -// Object.assign(timers, { setTimeout, clearTimeout }) -// after(() => { -// Object.assign(timers, orgTimers) -// }) - -// const abortController = new AbortController() -// const server = createServer((req, res) => { -// setTimeout(() => { -// res.end('hello') -// }, 100) -// abortController.abort() -// clock.tick(50) -// }) -// after(() => server.close()) - -// server.listen(0, () => { -// const client = new Client(`http://localhost:${server.address().port}`, { -// headersTimeout: 50 -// }) -// after(() => client.destroy()) - -// client.request({ path: '/', method: 'GET', signal: abortController.signal }, (err, response) => { -// t.ok(err instanceof errors.RequestAbortedError) -// clock.tick(100) -// }) -// }) - -// await t.completed -// }) +test('With EE signal', async (t) => { + t = tspl(t, { plan: 1 }) + + const clock = FakeTimers.install({ + shouldClearNativeTimers: true, + toFake: ['setTimeout', 'clearTimeout'] + }) + after(() => clock.uninstall()) + + const orgTimers = { ...timers } + Object.assign(timers, { setTimeout, clearTimeout }) + after(() => { + Object.assign(timers, orgTimers) + }) + + const server = createServer((req, res) => { + setTimeout(() => { + res.end('hello') + }, 100) + clock.tick(100) + }) + after(() => server.close()) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`, { + headersTimeout: 50 + }) + const ee = new EventEmitter() + after(() => client.destroy()) + + client.request({ path: '/', method: 'GET', signal: ee }, (err, response) => { + t.ok(err instanceof errors.HeadersTimeoutError) + }) + + clock.tick(50) + }) + + await t.completed +}) + +test('With abort-controller signal', async (t) => { + t = tspl(t, { plan: 1 }) + + const clock = FakeTimers.install({ + shouldClearNativeTimers: true, + toFake: ['setTimeout', 'clearTimeout'] + }) + after(() => clock.uninstall()) + + const orgTimers = { ...timers } + Object.assign(timers, { setTimeout, clearTimeout }) + after(() => { + Object.assign(timers, orgTimers) + }) + + const server = createServer((req, res) => { + setTimeout(() => { + res.end('hello') + }, 100) + clock.tick(100) + }) + after(() => server.close()) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`, { + headersTimeout: 50 + }) + const abortController = new AbortController() + after(() => client.destroy()) + + client.request({ path: '/', method: 'GET', signal: abortController.signal }, (err, response) => { + t.ok(err instanceof errors.HeadersTimeoutError) + }) + + clock.tick(50) + }) + + await t.completed +}) + +test('Abort before timeout (EE)', async (t) => { + t = tspl(t, { plan: 1 }) + + const clock = FakeTimers.install({ + shouldClearNativeTimers: true, + toFake: ['setTimeout', 'clearTimeout'] + }) + after(() => clock.uninstall()) + + const orgTimers = { ...timers } + Object.assign(timers, { setTimeout, clearTimeout }) + after(() => { + Object.assign(timers, orgTimers) + }) + + const ee = new EventEmitter() + const server = createServer((req, res) => { + setTimeout(() => { + res.end('hello') + }, 100) + ee.emit('abort') + clock.tick(50) + }) + after(() => server.close()) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`, { + headersTimeout: 50 + }) + after(() => client.destroy()) + + client.request({ path: '/', method: 'GET', signal: ee }, (err, response) => { + t.ok(err instanceof errors.RequestAbortedError) + clock.tick(100) + }) + }) + + await t.completed +}) + +test('Abort before timeout (abort-controller)', async (t) => { + t = tspl(t, { plan: 1 }) + + const clock = FakeTimers.install({ + shouldClearNativeTimers: true, + toFake: ['setTimeout', 'clearTimeout'] + }) + after(() => clock.uninstall()) + + const orgTimers = { ...timers } + Object.assign(timers, { setTimeout, clearTimeout }) + after(() => { + Object.assign(timers, orgTimers) + }) + + const abortController = new AbortController() + const server = createServer((req, res) => { + setTimeout(() => { + res.end('hello') + }, 100) + abortController.abort() + clock.tick(50) + }) + after(() => server.close()) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`, { + headersTimeout: 50 + }) + after(() => client.destroy()) + + client.request({ path: '/', method: 'GET', signal: abortController.signal }, (err, response) => { + t.ok(err instanceof errors.RequestAbortedError) + clock.tick(100) + }) + }) + + await t.completed +}) test('Timeout with pipelining', async (t) => { t = tspl(t, { plan: 3 })