From 1f28af4309465b796e14e4c749bb03b636a6b188 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 7 May 2024 10:16:03 +0200 Subject: [PATCH] fixup --- lib/api/api-request.js | 15 +- test/client-request.js | 78 +++++----- test/pool.js | 280 +++++++++++++++++------------------ test/request-timeout.js | 318 ++++++++++++++++++++-------------------- 4 files changed, 348 insertions(+), 343 deletions(-) diff --git a/lib/api/api-request.js b/lib/api/api-request.js index 553bc615dcc..f70f351f2dc 100644 --- a/lib/api/api-request.js +++ b/lib/api/api-request.js @@ -2,7 +2,7 @@ const assert = require('node:assert') const { Readable } = require('./readable') -const { InvalidArgumentError, AbortError } = require('../core/errors') +const { InvalidArgumentError, RequestAbortedError } = require('../core/errors') const util = require('../core/util') const { getResolveErrorBodyCallback } = require('./util') const { AsyncResource } = require('node:async_hooks') @@ -57,6 +57,7 @@ class RequestHandler extends AsyncResource { this.highWaterMark = highWaterMark this.signal = signal this.reason = null + this.removeAbortListener = null if (util.isStream(body)) { body.on('error', (err) => { @@ -66,10 +67,13 @@ class RequestHandler extends AsyncResource { if (this.signal) { if (this.signal.aborted) { - this.reason = this.signal.reason ?? new AbortError() + this.reason = this.signal.reason ?? new RequestAbortedError() } else { this.removeAbortListener = util.addAbortListener(this.signal, () => { - this.reason = this.signal.reason ?? new AbortError() + this.removeAbortListener?.() + this.removeAbortListener = null + + this.reason = this.signal.reason ?? new RequestAbortedError() if (this.res) { util.destroy(this.res, this.reason) } else if (this.abort) { @@ -152,6 +156,9 @@ class RequestHandler extends AsyncResource { onError (err) { const { res, callback, body, opaque } = this + this.removeAbortListener?.() + this.removeAbortListener = null + if (callback) { // TODO: Does this need queueMicrotask? this.callback = null @@ -166,8 +173,6 @@ class RequestHandler extends AsyncResource { queueMicrotask(() => { util.destroy(res, err) }) - } else if (this.removeAbortListener) { - this.removeAbortListener() } if (body) { diff --git a/test/client-request.js b/test/client-request.js index 116fd649af7..2bd38201a72 100644 --- a/test/client-request.js +++ b/test/client-request.js @@ -135,45 +135,45 @@ test('request hwm', async (t) => { await t.completed }) -// test('request abort before headers', async (t) => { -// t = tspl(t, { plan: 6 }) - -// const signal = new EE() -// const server = createServer((req, res) => { -// res.end('hello') -// signal.emit('abort') -// }) -// after(() => server.close()) - -// server.listen(0, () => { -// const client = new Client(`http://localhost:${server.address().port}`) -// after(() => client.destroy()) - -// client[kConnect](() => { -// client.request({ -// path: '/', -// method: 'GET', -// signal -// }, (err) => { -// t.ok(err instanceof errors.RequestAbortedError) -// t.strictEqual(signal.listenerCount('abort'), 0) -// }) -// t.strictEqual(signal.listenerCount('abort'), 1) - -// client.request({ -// path: '/', -// method: 'GET', -// signal -// }, (err) => { -// t.ok(err instanceof errors.RequestAbortedError) -// t.strictEqual(signal.listenerCount('abort'), 0) -// }) -// t.strictEqual(signal.listenerCount('abort'), 2) -// }) -// }) - -// await t.completed -// }) +test('request abort before headers', async (t) => { + t = tspl(t, { plan: 6 }) + + const signal = new EE() + const server = createServer((req, res) => { + res.end('hello') + signal.emit('abort') + }) + after(() => server.close()) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + after(() => client.destroy()) + + client[kConnect](() => { + client.request({ + path: '/', + method: 'GET', + signal + }, (err) => { + t.ok(err instanceof errors.RequestAbortedError) + t.strictEqual(signal.listenerCount('abort'), 0) + }) + t.strictEqual(signal.listenerCount('abort'), 1) + + client.request({ + path: '/', + method: 'GET', + signal + }, (err) => { + t.ok(err instanceof errors.RequestAbortedError) + t.strictEqual(signal.listenerCount('abort'), 0) + }) + t.strictEqual(signal.listenerCount('abort'), 2) + }) + }) + + await t.completed +}) test('request body destroyed on invalid callback', async (t) => { t = tspl(t, { plan: 1 }) diff --git a/test/pool.js b/test/pool.js index d9e1eefd6b3..b75cd530d43 100644 --- a/test/pool.js +++ b/test/pool.js @@ -781,146 +781,146 @@ test('pool dispatch error', async (t) => { await t.completed }) -// test('pool request abort in queue', async (t) => { -// t = tspl(t, { plan: 3 }) - -// const server = createServer((req, res) => { -// res.end('asd') -// }) -// after(() => server.close()) - -// server.listen(0, async () => { -// const client = new Pool(`http://localhost:${server.address().port}`, { -// connections: 1, -// pipelining: 1 -// }) -// after(() => client.close()) - -// client.dispatch({ -// path: '/', -// method: 'GET' -// }, { -// onConnect () { -// }, -// onHeaders (statusCode, headers) { -// t.strictEqual(statusCode, 200) -// }, -// onData (chunk) { -// }, -// onComplete () { -// t.ok(true, 'pass') -// }, -// onError () { -// } -// }) - -// const signal = new EventEmitter() -// client.request({ -// path: '/', -// method: 'GET', -// signal -// }, (err) => { -// t.strictEqual(err.code, 'UND_ERR_ABORTED') -// }) -// signal.emit('abort') -// }) - -// await t.completed -// }) - -// test('pool stream abort in queue', async (t) => { -// t = tspl(t, { plan: 3 }) - -// const server = createServer((req, res) => { -// res.end('asd') -// }) -// after(() => server.close()) - -// server.listen(0, async () => { -// const client = new Pool(`http://localhost:${server.address().port}`, { -// connections: 1, -// pipelining: 1 -// }) -// after(() => client.close()) - -// client.dispatch({ -// path: '/', -// method: 'GET' -// }, { -// onConnect () { -// }, -// onHeaders (statusCode, headers) { -// t.strictEqual(statusCode, 200) -// }, -// onData (chunk) { -// }, -// onComplete () { -// t.ok(true, 'pass') -// }, -// onError () { -// } -// }) - -// const signal = new EventEmitter() -// client.stream({ -// path: '/', -// method: 'GET', -// signal -// }, ({ body }) => body, (err) => { -// t.strictEqual(err.code, 'UND_ERR_ABORTED') -// }) -// signal.emit('abort') -// }) - -// await t.completed -// }) - -// test('pool pipeline abort in queue', async (t) => { -// t = tspl(t, { plan: 3 }) - -// const server = createServer((req, res) => { -// res.end('asd') -// }) -// after(() => server.close()) - -// server.listen(0, async () => { -// const client = new Pool(`http://localhost:${server.address().port}`, { -// connections: 1, -// pipelining: 1 -// }) -// after(() => client.close()) - -// client.dispatch({ -// path: '/', -// method: 'GET' -// }, { -// onConnect () { -// }, -// onHeaders (statusCode, headers) { -// t.strictEqual(statusCode, 200) -// }, -// onData (chunk) { -// }, -// onComplete () { -// t.ok(true, 'pass') -// }, -// onError () { -// } -// }) - -// const signal = new EventEmitter() -// client.pipeline({ -// path: '/', -// method: 'GET', -// signal -// }, ({ body }) => body).end().on('error', (err) => { -// t.strictEqual(err.code, 'UND_ERR_ABORTED') -// }) -// signal.emit('abort') -// }) - -// await t.completed -// }) +test('pool request abort in queue', async (t) => { + t = tspl(t, { plan: 3 }) + + const server = createServer((req, res) => { + res.end('asd') + }) + after(() => server.close()) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`, { + connections: 1, + pipelining: 1 + }) + after(() => client.close()) + + client.dispatch({ + path: '/', + method: 'GET' + }, { + onConnect () { + }, + onHeaders (statusCode, headers) { + t.strictEqual(statusCode, 200) + }, + onData (chunk) { + }, + onComplete () { + t.ok(true, 'pass') + }, + onError () { + } + }) + + const signal = new EventEmitter() + client.request({ + path: '/', + method: 'GET', + signal + }, (err) => { + t.strictEqual(err.code, 'UND_ERR_ABORTED') + }) + signal.emit('abort') + }) + + await t.completed +}) + +test('pool stream abort in queue', async (t) => { + t = tspl(t, { plan: 3 }) + + const server = createServer((req, res) => { + res.end('asd') + }) + after(() => server.close()) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`, { + connections: 1, + pipelining: 1 + }) + after(() => client.close()) + + client.dispatch({ + path: '/', + method: 'GET' + }, { + onConnect () { + }, + onHeaders (statusCode, headers) { + t.strictEqual(statusCode, 200) + }, + onData (chunk) { + }, + onComplete () { + t.ok(true, 'pass') + }, + onError () { + } + }) + + const signal = new EventEmitter() + client.stream({ + path: '/', + method: 'GET', + signal + }, ({ body }) => body, (err) => { + t.strictEqual(err.code, 'UND_ERR_ABORTED') + }) + signal.emit('abort') + }) + + await t.completed +}) + +test('pool pipeline abort in queue', async (t) => { + t = tspl(t, { plan: 3 }) + + const server = createServer((req, res) => { + res.end('asd') + }) + after(() => server.close()) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`, { + connections: 1, + pipelining: 1 + }) + after(() => client.close()) + + client.dispatch({ + path: '/', + method: 'GET' + }, { + onConnect () { + }, + onHeaders (statusCode, headers) { + t.strictEqual(statusCode, 200) + }, + onData (chunk) { + }, + onComplete () { + t.ok(true, 'pass') + }, + onError () { + } + }) + + const signal = new EventEmitter() + client.pipeline({ + path: '/', + method: 'GET', + signal + }, ({ body }) => body).end().on('error', (err) => { + t.strictEqual(err.code, 'UND_ERR_ABORTED') + }) + signal.emit('abort') + }) + + await t.completed +}) test('pool stream constructor error destroy body', async (t) => { t = tspl(t, { plan: 4 }) diff --git a/test/request-timeout.js b/test/request-timeout.js index 34fe48c3ee5..03d34c9bef5 100644 --- a/test/request-timeout.js +++ b/test/request-timeout.js @@ -178,165 +178,165 @@ test('overridden body timeout', async (t) => { await t.completed }) -// test('With EE signal', async (t) => { -// t = tspl(t, { plan: 1 }) - -// const clock = FakeTimers.install({ -// shouldClearNativeTimers: true, -// toFake: ['setTimeout', 'clearTimeout'] -// }) -// after(() => clock.uninstall()) - -// const orgTimers = { ...timers } -// Object.assign(timers, { setTimeout, clearTimeout }) -// after(() => { -// Object.assign(timers, orgTimers) -// }) - -// const server = createServer((req, res) => { -// setTimeout(() => { -// res.end('hello') -// }, 100) -// clock.tick(100) -// }) -// after(() => server.close()) - -// server.listen(0, () => { -// const client = new Client(`http://localhost:${server.address().port}`, { -// headersTimeout: 50 -// }) -// const ee = new EventEmitter() -// after(() => client.destroy()) - -// client.request({ path: '/', method: 'GET', signal: ee }, (err, response) => { -// t.ok(err instanceof errors.HeadersTimeoutError) -// }) - -// clock.tick(50) -// }) - -// await t.completed -// }) - -// test('With abort-controller signal', async (t) => { -// t = tspl(t, { plan: 1 }) - -// const clock = FakeTimers.install({ -// shouldClearNativeTimers: true, -// toFake: ['setTimeout', 'clearTimeout'] -// }) -// after(() => clock.uninstall()) - -// const orgTimers = { ...timers } -// Object.assign(timers, { setTimeout, clearTimeout }) -// after(() => { -// Object.assign(timers, orgTimers) -// }) - -// const server = createServer((req, res) => { -// setTimeout(() => { -// res.end('hello') -// }, 100) -// clock.tick(100) -// }) -// after(() => server.close()) - -// server.listen(0, () => { -// const client = new Client(`http://localhost:${server.address().port}`, { -// headersTimeout: 50 -// }) -// const abortController = new AbortController() -// after(() => client.destroy()) - -// client.request({ path: '/', method: 'GET', signal: abortController.signal }, (err, response) => { -// t.ok(err instanceof errors.HeadersTimeoutError) -// }) - -// clock.tick(50) -// }) - -// await t.completed -// }) - -// test('Abort before timeout (EE)', async (t) => { -// t = tspl(t, { plan: 1 }) - -// const clock = FakeTimers.install({ -// shouldClearNativeTimers: true, -// toFake: ['setTimeout', 'clearTimeout'] -// }) -// after(() => clock.uninstall()) - -// const orgTimers = { ...timers } -// Object.assign(timers, { setTimeout, clearTimeout }) -// after(() => { -// Object.assign(timers, orgTimers) -// }) - -// const ee = new EventEmitter() -// const server = createServer((req, res) => { -// setTimeout(() => { -// res.end('hello') -// }, 100) -// ee.emit('abort') -// clock.tick(50) -// }) -// after(() => server.close()) - -// server.listen(0, () => { -// const client = new Client(`http://localhost:${server.address().port}`, { -// headersTimeout: 50 -// }) -// after(() => client.destroy()) - -// client.request({ path: '/', method: 'GET', signal: ee }, (err, response) => { -// t.ok(err instanceof errors.RequestAbortedError) -// clock.tick(100) -// }) -// }) - -// await t.completed -// }) - -// test('Abort before timeout (abort-controller)', async (t) => { -// t = tspl(t, { plan: 1 }) - -// const clock = FakeTimers.install({ -// shouldClearNativeTimers: true, -// toFake: ['setTimeout', 'clearTimeout'] -// }) -// after(() => clock.uninstall()) - -// const orgTimers = { ...timers } -// Object.assign(timers, { setTimeout, clearTimeout }) -// after(() => { -// Object.assign(timers, orgTimers) -// }) - -// const abortController = new AbortController() -// const server = createServer((req, res) => { -// setTimeout(() => { -// res.end('hello') -// }, 100) -// abortController.abort() -// clock.tick(50) -// }) -// after(() => server.close()) - -// server.listen(0, () => { -// const client = new Client(`http://localhost:${server.address().port}`, { -// headersTimeout: 50 -// }) -// after(() => client.destroy()) - -// client.request({ path: '/', method: 'GET', signal: abortController.signal }, (err, response) => { -// t.ok(err instanceof errors.RequestAbortedError) -// clock.tick(100) -// }) -// }) - -// await t.completed -// }) +test('With EE signal', async (t) => { + t = tspl(t, { plan: 1 }) + + const clock = FakeTimers.install({ + shouldClearNativeTimers: true, + toFake: ['setTimeout', 'clearTimeout'] + }) + after(() => clock.uninstall()) + + const orgTimers = { ...timers } + Object.assign(timers, { setTimeout, clearTimeout }) + after(() => { + Object.assign(timers, orgTimers) + }) + + const server = createServer((req, res) => { + setTimeout(() => { + res.end('hello') + }, 100) + clock.tick(100) + }) + after(() => server.close()) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`, { + headersTimeout: 50 + }) + const ee = new EventEmitter() + after(() => client.destroy()) + + client.request({ path: '/', method: 'GET', signal: ee }, (err, response) => { + t.ok(err instanceof errors.HeadersTimeoutError) + }) + + clock.tick(50) + }) + + await t.completed +}) + +test('With abort-controller signal', async (t) => { + t = tspl(t, { plan: 1 }) + + const clock = FakeTimers.install({ + shouldClearNativeTimers: true, + toFake: ['setTimeout', 'clearTimeout'] + }) + after(() => clock.uninstall()) + + const orgTimers = { ...timers } + Object.assign(timers, { setTimeout, clearTimeout }) + after(() => { + Object.assign(timers, orgTimers) + }) + + const server = createServer((req, res) => { + setTimeout(() => { + res.end('hello') + }, 100) + clock.tick(100) + }) + after(() => server.close()) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`, { + headersTimeout: 50 + }) + const abortController = new AbortController() + after(() => client.destroy()) + + client.request({ path: '/', method: 'GET', signal: abortController.signal }, (err, response) => { + t.ok(err instanceof errors.HeadersTimeoutError) + }) + + clock.tick(50) + }) + + await t.completed +}) + +test('Abort before timeout (EE)', async (t) => { + t = tspl(t, { plan: 1 }) + + const clock = FakeTimers.install({ + shouldClearNativeTimers: true, + toFake: ['setTimeout', 'clearTimeout'] + }) + after(() => clock.uninstall()) + + const orgTimers = { ...timers } + Object.assign(timers, { setTimeout, clearTimeout }) + after(() => { + Object.assign(timers, orgTimers) + }) + + const ee = new EventEmitter() + const server = createServer((req, res) => { + setTimeout(() => { + res.end('hello') + }, 100) + ee.emit('abort') + clock.tick(50) + }) + after(() => server.close()) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`, { + headersTimeout: 50 + }) + after(() => client.destroy()) + + client.request({ path: '/', method: 'GET', signal: ee }, (err, response) => { + t.ok(err instanceof errors.RequestAbortedError) + clock.tick(100) + }) + }) + + await t.completed +}) + +test('Abort before timeout (abort-controller)', async (t) => { + t = tspl(t, { plan: 1 }) + + const clock = FakeTimers.install({ + shouldClearNativeTimers: true, + toFake: ['setTimeout', 'clearTimeout'] + }) + after(() => clock.uninstall()) + + const orgTimers = { ...timers } + Object.assign(timers, { setTimeout, clearTimeout }) + after(() => { + Object.assign(timers, orgTimers) + }) + + const abortController = new AbortController() + const server = createServer((req, res) => { + setTimeout(() => { + res.end('hello') + }, 100) + abortController.abort() + clock.tick(50) + }) + after(() => server.close()) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`, { + headersTimeout: 50 + }) + after(() => client.destroy()) + + client.request({ path: '/', method: 'GET', signal: abortController.signal }, (err, response) => { + t.ok(err instanceof errors.RequestAbortedError) + clock.tick(100) + }) + }) + + await t.completed +}) test('Timeout with pipelining', async (t) => { t = tspl(t, { plan: 3 })