From f913231ec997f62ff18e610c9df4541f72922b38 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 19 Sep 2020 15:24:16 +0200 Subject: [PATCH 1/4] add proxy example --- examples/proxy/index.js | 49 ++++++++ examples/proxy/proxy.js | 262 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 311 insertions(+) create mode 100644 examples/proxy/index.js create mode 100644 examples/proxy/proxy.js diff --git a/examples/proxy/index.js b/examples/proxy/index.js new file mode 100644 index 00000000000..ed13b26d34d --- /dev/null +++ b/examples/proxy/index.js @@ -0,0 +1,49 @@ +const { Pool, Client } = require('../../') +const http = require('http') +const proxy = require('./proxy') + +const pool = new Pool('http://localhost:4001', { + connections: 256, + pipelining: 1 +}) + +async function run () { + await Promise.all([ + new Promise(resolve => { + // Proxy + http.createServer((req, res) => { + proxy({ req, res, proxyName: 'example' }, pool).catch(err => { + if (res.headersSent) { + res.destroy(err) + } else { + for (const name of res.getHeaderNames()) { + res.removeHeader(name) + } + res.writeHead(err.statusCode || 500) + res.end() + } + }) + }).listen(4000, resolve) + }), + new Promise(resolve => { + // Upstream + http.createServer((req, res) => { + res.end('hello world') + }).listen(4001, resolve) + }) + ]) + + const client = new Client('http://localhost:4000') + const { body } = await client.request({ + method: 'GET', + path: '/' + }) + + for await (const chunk of body) { + console.log(String(chunk)) + } +} + +run() + +// TODO: Add websocket example. diff --git a/examples/proxy/proxy.js b/examples/proxy/proxy.js new file mode 100644 index 00000000000..571683b945d --- /dev/null +++ b/examples/proxy/proxy.js @@ -0,0 +1,262 @@ +const net = require('net') +const { pipeline } = require('stream') +const createError = require('http-errors') + +module.exports = async function proxy (ctx, client) { + const { req, socket } = ctx + + if (socket) { + const handler = new WSHandler(ctx) + client.dispatch({ + method: req.method, + path: req.url, + headers: getHeaders(ctx), + upgrade: 'Websocket' + }, handler) + return handler.promise + } else { + const handler = new HTTPHandler(ctx) + client.dispatch({ + method: req.method, + path: req.url, + headers: getHeaders(ctx), + body: req + }, handler) + return handler.promise + } +} + +class HTTPHandler { + constructor (ctx) { + const { req, res } = ctx + + this.req = req + this.res = res + this.resume = null + this.abort = null + this.promise = new Promise((resolve, reject) => { + this.callback = err => err ? reject(err) : resolve() + }) + } + + onConnect (abort) { + if (this.req.aborted) { + abort() + } else { + this.abort = abort + this.res.on('close', abort) + } + } + + onHeaders (statusCode, headers, resume) { + if (statusCode < 200) { + return + } + + this.resume = resume + this.res.on('drain', resume) + writeHead(this.res, statusCode, getHeaders({ headers })) + } + + onData (chunk) { + return this.res.write(chunk) + } + + onComplete () { + this.res.end() + this.callback() + } + + onError (err) { + this.res.off('close', this.abort) + this.res.off('drain', this.resume) + this.callback(err) + } +} + +class WSHandler { + constructor (ctx) { + const { socket, head } = ctx + + setupSocket(socket) + + this.socket = socket + this.head = head + this.abort = null + this.promise = new Promise((resolve, reject) => { + this.callback = err => err ? reject(err) : resolve() + }) + } + + onConnect (abort) { + if (this.socket.destroyed) { + abort() + } else { + this.abort = abort + this.socket.on('close', abort) + } + } + + onUpgrade (statusCode, headers, socket) { + if (this.head && this.head.length) { + socket.unshift(this.head) + } + + setupSocket(socket) + + let head = 'HTTP/1.1 101 Switching Protocols\r\nconnection: upgrade\r\nupgrade: websocket' + + headers = getHeaders({ headers }) + for (let n = 0; n < headers.length; n += 2) { + const key = headers[n + 0] + const val = headers[n + 1] + + if (!Array.isArray(val)) { + head += `\r\n${key}: ${val}` + } else { + for (let i = 0; i < val.length; i++) { + head += `\r\n${key}: ${val[i]}` + } + } + } + head += '\r\n\r\n' + + this.socket.write(head) + + pipeline(socket, this.socket, socket, this.callback) + } + + onError (err) { + this.socket.off('close', this.abort) + this.callback(err) + } +} + +// This expression matches hop-by-hop headers. +// These headers are meaningful only for a single transport-level connection, +// and must not be retransmitted by proxies or cached. +const HOP_EXPR = /^(te|host|upgrade|trailers|connection|keep-alive|http2-settings|transfer-encoding|proxy-connection|proxy-authenticate|proxy-authorization)$/i + +// Removes hop-by-hop and pseudo headers. +// Updates via and forwarded headers. +// Only hop-by-hop headers may be set using the Connection general header. +function getHeaders (ctx) { + const { + proxyName, + req, + headers = req ? req.rawHeaders : undefined + } = ctx + + let via = '' + let forwarded = '' + let host = '' + let authority = '' + let connection = '' + + for (let n = 0; n < headers.length; n += 2) { + const key = headers[n + 0] + const val = headers[n + 1] + + if (!via && key.length === 3 && key.toLowerCase() === 'via') { + via = val + } else if (!host && key.length === 4 && key.toLowerCase() === 'host') { + host = val + } else if (!forwarded && key.length === 9 && key.toLowerCase() === 'forwarded') { + forwarded = val + } else if (!connection && key.length === 10 && key.toLowerCase() === 'connection') { + connection = val + } else if (!authority && key.length === 10 && key === ':authority') { + authority = val + } + } + + let remove + if (connection && !HOP_EXPR.test(connection)) { + remove = connection.split(/,\s*/) + } + + const result = [] + for (let n = 0; n < headers.length; n += 2) { + const key = headers[n + 0] + const val = headers[n + 1] + + if ( + key.charAt(0) !== ':' && + !HOP_EXPR.test(key) && + (!remove || !remove.includes(key)) + ) { + result.push(key, val) + } + } + + if (req) { + const { socket, httpVersion } = req + + result.push('forwarded', (forwarded ? forwarded + ', ' : '') + [ + `by=${printIp(socket.localAddress, socket.localPort)}`, + `for=${printIp(socket.remoteAddress, socket.remotePort)}`, + `proto=${socket.encrypted ? 'https' : 'http'}`, + `host=${printIp(authority || host || '')}` + ].join(';')) + + // TODO (fix): Should also apply to responses. + if (proxyName) { + if (via) { + if (via.split(',').some(name => name.endsWith(proxyName))) { + throw new createError.LoopDetected() + } + via += ', ' + } + via += `${httpVersion} ${proxyName}` + } + } + + if (via) { + result.push('via', via) + } + + return result +} + +function setupSocket (socket) { + socket.setTimeout(0) + socket.setNoDelay(true) + socket.setKeepAlive(true, 0) +} + +function printIp (address, port) { + const isIPv6 = net.isIPv6(address) + let str = `${address}` + if (isIPv6) { + str = `[${str}]` + } + if (port) { + str = `${str}:${port}` + } + if (isIPv6 || port) { + str = `"${str}"` + } + return str +} + +function writeHead (res, statusCode, headers) { + // TODO (perf): res.writeHead should support Array and/or string. + const obj = {} + for (var i = 0; i < headers.length; i += 2) { + var key = headers[i] + var val = obj[key] + if (!val) { + obj[key] = headers[i + 1] + } else { + if (!Array.isArray(val)) { + val = [val] + obj[key] = val + } + val.push(headers[i + 1]) + } + } + + res.writeHead(statusCode, obj) + + return res +} From 362ab709f24381e7c931eb56566ca753945aed92 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 20 Sep 2020 16:09:22 +0200 Subject: [PATCH 2/4] fixup --- examples/proxy/proxy.js | 62 +++++++++++++++++++++++++---------------- 1 file changed, 38 insertions(+), 24 deletions(-) diff --git a/examples/proxy/proxy.js b/examples/proxy/proxy.js index 571683b945d..52ebf382418 100644 --- a/examples/proxy/proxy.js +++ b/examples/proxy/proxy.js @@ -5,12 +5,19 @@ const createError = require('http-errors') module.exports = async function proxy (ctx, client) { const { req, socket } = ctx + const headers = getHeaders({ + headers: req.rawHeaders, + httpVersion: req.httpVersion, + socket: req.socket, + proxyName: ctx.proxyName + }) + if (socket) { const handler = new WSHandler(ctx) client.dispatch({ method: req.method, path: req.url, - headers: getHeaders(ctx), + headers, upgrade: 'Websocket' }, handler) return handler.promise @@ -19,7 +26,7 @@ module.exports = async function proxy (ctx, client) { client.dispatch({ method: req.method, path: req.url, - headers: getHeaders(ctx), + headers, body: req }, handler) return handler.promise @@ -28,8 +35,9 @@ module.exports = async function proxy (ctx, client) { class HTTPHandler { constructor (ctx) { - const { req, res } = ctx + const { req, res, proxyName } = ctx + this.proxyName = proxyName this.req = req this.res = res this.resume = null @@ -55,7 +63,11 @@ class HTTPHandler { this.resume = resume this.res.on('drain', resume) - writeHead(this.res, statusCode, getHeaders({ headers })) + writeHead(this.res, statusCode, getHeaders({ + headers, + proxyName: this.proxyName, + httpVersion: this.httpVersion + })) } onData (chunk) { @@ -76,10 +88,12 @@ class HTTPHandler { class WSHandler { constructor (ctx) { - const { socket, head } = ctx + const { req, socket, proxyName, head } = ctx setupSocket(socket) + this.proxyName = proxyName + this.httpVersion = req.httpVersion this.socket = socket this.head = head this.abort = null @@ -106,7 +120,11 @@ class WSHandler { let head = 'HTTP/1.1 101 Switching Protocols\r\nconnection: upgrade\r\nupgrade: websocket' - headers = getHeaders({ headers }) + headers = getHeaders({ + headers, + proxyName: this.proxyName, + httpVersion: this.httpVersion + }) for (let n = 0; n < headers.length; n += 2) { const key = headers[n + 0] const val = headers[n + 1] @@ -140,13 +158,12 @@ const HOP_EXPR = /^(te|host|upgrade|trailers|connection|keep-alive|http2-setting // Removes hop-by-hop and pseudo headers. // Updates via and forwarded headers. // Only hop-by-hop headers may be set using the Connection general header. -function getHeaders (ctx) { - const { - proxyName, - req, - headers = req ? req.rawHeaders : undefined - } = ctx - +function getHeaders ({ + headers, + proxyName, + httpVersion, + socket +}) { let via = '' let forwarded = '' let host = '' @@ -189,26 +206,23 @@ function getHeaders (ctx) { } } - if (req) { - const { socket, httpVersion } = req - + if (socket) { result.push('forwarded', (forwarded ? forwarded + ', ' : '') + [ `by=${printIp(socket.localAddress, socket.localPort)}`, `for=${printIp(socket.remoteAddress, socket.remotePort)}`, `proto=${socket.encrypted ? 'https' : 'http'}`, `host=${printIp(authority || host || '')}` ].join(';')) + } - // TODO (fix): Should also apply to responses. - if (proxyName) { - if (via) { - if (via.split(',').some(name => name.endsWith(proxyName))) { - throw new createError.LoopDetected() - } - via += ', ' + if (httpVersion && proxyName) { + if (via) { + if (via.split(',').some(name => name.endsWith(proxyName))) { + throw new createError.LoopDetected() } - via += `${httpVersion} ${proxyName}` + via += ', ' } + via += `${httpVersion} ${proxyName}` } if (via) { From 656c28c0c95152cc47c26e77c50a5e01a1593e4d Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 20 Sep 2020 16:14:27 +0200 Subject: [PATCH 3/4] fixup --- examples/proxy/proxy.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/examples/proxy/proxy.js b/examples/proxy/proxy.js index 52ebf382418..3bce64af59d 100644 --- a/examples/proxy/proxy.js +++ b/examples/proxy/proxy.js @@ -213,6 +213,8 @@ function getHeaders ({ `proto=${socket.encrypted ? 'https' : 'http'}`, `host=${printIp(authority || host || '')}` ].join(';')) + } else { + result.push('forwarded', forwarded) } if (httpVersion && proxyName) { From b56794fb9dd24e7295e8a9a427cc7c89757e5277 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 20 Sep 2020 16:15:07 +0200 Subject: [PATCH 4/4] fixup --- examples/proxy/index.js | 2 +- examples/proxy/proxy.js | 22 ++++++++++------------ 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/examples/proxy/index.js b/examples/proxy/index.js index ed13b26d34d..5f35049203e 100644 --- a/examples/proxy/index.js +++ b/examples/proxy/index.js @@ -19,7 +19,7 @@ async function run () { for (const name of res.getHeaderNames()) { res.removeHeader(name) } - res.writeHead(err.statusCode || 500) + res.statusCode = err.statusCode || 500 res.end() } }) diff --git a/examples/proxy/proxy.js b/examples/proxy/proxy.js index 3bce64af59d..b0f85cf210e 100644 --- a/examples/proxy/proxy.js +++ b/examples/proxy/proxy.js @@ -3,13 +3,13 @@ const { pipeline } = require('stream') const createError = require('http-errors') module.exports = async function proxy (ctx, client) { - const { req, socket } = ctx + const { req, socket, proxyName } = ctx const headers = getHeaders({ headers: req.rawHeaders, httpVersion: req.httpVersion, socket: req.socket, - proxyName: ctx.proxyName + proxyName }) if (socket) { @@ -112,6 +112,8 @@ class WSHandler { } onUpgrade (statusCode, headers, socket) { + // TODO: Check statusCode? + if (this.head && this.head.length) { socket.unshift(this.head) } @@ -125,17 +127,12 @@ class WSHandler { proxyName: this.proxyName, httpVersion: this.httpVersion }) + for (let n = 0; n < headers.length; n += 2) { const key = headers[n + 0] const val = headers[n + 1] - if (!Array.isArray(val)) { - head += `\r\n${key}: ${val}` - } else { - for (let i = 0; i < val.length; i++) { - head += `\r\n${key}: ${val[i]}` - } - } + head += `\r\n${key}: ${val}` } head += '\r\n\r\n' @@ -213,11 +210,12 @@ function getHeaders ({ `proto=${socket.encrypted ? 'https' : 'http'}`, `host=${printIp(authority || host || '')}` ].join(';')) - } else { - result.push('forwarded', forwarded) + } else if (forwarded) { + // The forwarded header should not be included in response. + throw new createError.BadGateway() } - if (httpVersion && proxyName) { + if (proxyName) { if (via) { if (via.split(',').some(name => name.endsWith(proxyName))) { throw new createError.LoopDetected()