Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Progress events without breaking pub-sub #60

Merged
merged 13 commits into from
Nov 13, 2020
6 changes: 4 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"dist"
],
"browser": {
"./src/http/fetch.js": "./src/http/fetch.browser.js",
"./src/text-encoder.js": "./src/text-encoder.browser.js",
"./src/text-decoder.js": "./src/text-decoder.browser.js",
"./src/temp-dir.js": "./src/temp-dir.browser.js",
Expand Down Expand Up @@ -48,15 +49,16 @@
"native-abort-controller": "0.0.3",
"native-fetch": "^2.0.0",
"node-fetch": "^2.6.0",
"stream-to-it": "^0.2.0"
"stream-to-it": "^0.2.0",
"it-to-stream": "^0.1.2"
},
"devDependencies": {
"aegir": "^28.1.0",
"delay": "^4.3.0",
"it-all": "^1.0.2",
"it-drain": "^1.0.1",
"it-last": "^1.0.2",
"it-to-stream": "^0.1.2"
"uint8arrays": "^1.1.0"
},
"contributors": [
"Hugo Dias <hugomrdias@gmail.com>",
Expand Down
24 changes: 4 additions & 20 deletions src/http.js
Original file line number Diff line number Diff line change
@@ -1,32 +1,14 @@
/* eslint-disable no-undef */
'use strict'

const {
default: fetch,
Request,
Headers
} = require('./fetch')
const { fetch, Request, Headers } = require('./http/fetch')
const { TimeoutError, HTTPError } = require('./http/error')
const merge = require('merge-options').bind({ ignoreUndefined: true })
const { URL, URLSearchParams } = require('iso-url')
const TextDecoder = require('./text-decoder')
const AbortController = require('native-abort-controller')
const anySignal = require('any-signal')

class TimeoutError extends Error {
constructor () {
super('Request timed out')
this.name = 'TimeoutError'
}
}

class HTTPError extends Error {
constructor (response) {
super(response.statusText)
this.name = 'HTTPError'
this.response = response
}
}

const timeout = (promise, ms, abortController) => {
if (ms === undefined) {
return promise
Expand Down Expand Up @@ -88,6 +70,8 @@ const defaults = {
* @property {function(URLSearchParams): URLSearchParams } [transformSearchParams]
* @property {function(any): any} [transform] - When iterating the response body, transform each chunk with this function.
* @property {function(Response): Promise<void>} [handleError] - Handle errors
* @property {function({total:number, loaded:number, lengthComputable:boolean}):void} [onUploadProgress] - Can be passed to track upload progress.
* Note that if this option in passed underlying request will be performed using `XMLHttpRequest` and response will not be streamed.
*/

class HTTP {
Expand Down
26 changes: 26 additions & 0 deletions src/http/error.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
'use strict'

class TimeoutError extends Error {
constructor (message = 'Request timed out') {
super(message)
this.name = 'TimeoutError'
}
}
exports.TimeoutError = TimeoutError

class AbortError extends Error {
constructor (message = 'The operation was aborted.') {
super(message)
this.name = 'AbortError'
}
}
exports.AbortError = AbortError

class HTTPError extends Error {
constructor (response) {
super(response.statusText)
this.name = 'HTTPError'
this.response = response
}
}
exports.HTTPError = HTTPError
135 changes: 135 additions & 0 deletions src/http/fetch.browser.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
'use strict'
/* eslint-env browser */

const { TimeoutError, AbortError } = require('./error')
const { Request, Response, Headers } = require('../fetch')

/**
* @typedef {RequestInit & ExtraFetchOptions} FetchOptions
* @typedef {Object} ExtraFetchOptions
* @property {number} [timeout]
* @property {URLSearchParams} [searchParams]
* @property {function({total:number, loaded:number, lengthComputable:boolean}):void} [onUploadProgress]
* @property {string} [overrideMimeType]
* @returns {Promise<Response>}
*/

/**
* @param {string|URL} url
* @param {FetchOptions} [options]
* @returns {Promise<Response>}
*/
const fetchWithProgress = (url, options = {}) => {
const request = new XMLHttpRequest()
request.open(options.method || 'GET', url.toString(), true)

const { timeout } = options
if (timeout > 0 && timeout < Infinity) {
request.timeout = options.timeout
}

if (options.overrideMimeType != null) {
request.overrideMimeType(options.overrideMimeType)
}

if (options.headers) {
for (const [name, value] of options.headers.entries()) {
request.setRequestHeader(name, value)
}
}

if (options.signal) {
options.signal.onabort = () => request.abort()
}

if (options.onUploadProgress) {
request.upload.onprogress = options.onUploadProgress
}

// Note: Need to use `arraybuffer` here instead of `blob` because `Blob`
// instances coming from JSDOM are not compatible with `Response` from
// node-fetch (which is the setup we get when testing with jest because
// it uses JSDOM which does not provide a global fetch
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really necessary? We don't use jest and this file should only be used in the browser which wouldn't be using node-fetch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You told me in the earlier comment to:

Ensure it doesn't cause a regression for ipfs/js-ipfs#3238

Which is why this done here.

// https://github.com/jsdom/jsdom/issues/1724)
request.responseType = 'arraybuffer'

return new Promise((resolve, reject) => {
/**
* @param {Event} event
*/
const handleEvent = (event) => {
switch (event.type) {
case 'error': {
resolve(Response.error())
break
}
case 'load': {
resolve(
new ResponseWithURL(request.responseURL, request.response, {
status: request.status,
statusText: request.statusText,
headers: parseHeaders(request.getAllResponseHeaders())
})
)
break
}
case 'timeout': {
reject(new TimeoutError())
break
}
case 'abort': {
reject(new AbortError())
break
}
default: {
break
}
}
}
request.onerror = handleEvent
request.onload = handleEvent
request.ontimeout = handleEvent
request.onabort = handleEvent

request.send(options.body)
})
}

const fetchWithStreaming = fetch

const fetchWith = (url, options = {}) =>
(options.onUploadProgress != null)
? fetchWithProgress(url, options)
: fetchWithStreaming(url, options)

exports.fetch = fetchWith
exports.Request = Request
exports.Headers = Headers

/**
* @param {string} input
* @returns {Headers}
*/
const parseHeaders = (input) => {
const headers = new Headers()
for (const line of input.trim().split(/[\r\n]+/)) {
const index = line.indexOf(': ')
if (index > 0) {
headers.set(line.slice(0, index), line.slice(index + 1))
}
}

return headers
}

class ResponseWithURL extends Response {
/**
* @param {string} url
* @param {string|Blob|ArrayBufferView|ArrayBuffer|FormData|ReadableStream<Uint8Array>} body
* @param {ResponseInit} options
*/
constructor (url, body, options) {
super(body, options)
Object.defineProperty(this, 'url', { value: url })
}
}
9 changes: 9 additions & 0 deletions src/http/fetch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
'use strict'

// Electron has `XMLHttpRequest` and should get the browser implementation
// instead of node.
if (typeof XMLHttpRequest === 'function') {
module.exports = require('./fetch.browser')
} else {
module.exports = require('./fetch.node')
}
126 changes: 126 additions & 0 deletions src/http/fetch.node.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// @ts-check
'use strict'

const { Request, Response, Headers, default: nodeFetch } = require('../fetch')
const toStream = require('it-to-stream')
const { Buffer } = require('buffer')

/**
* @typedef {RequestInit & ExtraFetchOptions} FetchOptions
*
* @typedef {import('stream').Readable} Readable
* @typedef {Object} LoadProgress
* @property {number} total
* @property {number} loaded
* @property {boolean} lengthComputable
* @typedef {Object} ExtraFetchOptions
* @property {number} [timeout]
* @property {URLSearchParams} [searchParams]
* @property {function(LoadProgress):void} [onUploadProgress]
* @property {function(LoadProgress):void} [onDownloadProgress]
* @property {string} [overrideMimeType]
* @returns {Promise<Response>}
*/

/**
* @param {string|URL} url
* @param {FetchOptions} [options]
* @returns {Promise<Response>}
*/
const fetch = (url, options = {}) =>
// @ts-ignore
nodeFetch(url, withUploadProgress(options))

exports.fetch = fetch
exports.Request = Request
exports.Headers = Headers

/**
* Takes fetch options and wraps request body to track upload progress if
* `onUploadProgress` is supplied. Otherwise returns options as is.
*
* @param {FetchOptions} options
* @returns {FetchOptions}
*/
const withUploadProgress = (options) => {
const { onUploadProgress } = options
if (onUploadProgress) {
return {
...options,
// @ts-ignore
body: bodyWithUploadProgress(options, onUploadProgress)
}
} else {
return options
}
}

/**
* Takes request `body` and `onUploadProgress` handler and returns wrapped body
* that as consumed will report progress to supplied `onUploadProgress` handler.
*
* @param {FetchOptions} init
* @param {function(LoadProgress):void} onUploadProgress
* @returns {Readable}
*/
const bodyWithUploadProgress = (init, onUploadProgress) => {
// This works around the fact that electron-fetch serializes `Uint8Array`s
// and `ArrayBuffer`s to strings.
const content = normalizeBody(init.body)

// @ts-ignore - Response does not accept node `Readable` streams.
const { body } = new Response(content, init)
// @ts-ignore - Unlike standard Response, node-fetch `body` has a differnt
// type see: see https://github.com/node-fetch/node-fetch/blob/master/src/body.js
const source = iterateBodyWithProgress(body, onUploadProgress)
return toStream.readable(source)
}

/**
* @param {BodyInit} [input]
* @returns {Buffer|Readable|Blob|null}
*/
const normalizeBody = (input = null) => {
if (input instanceof ArrayBuffer) {
return Buffer.from(input)
} else if (ArrayBuffer.isView(input)) {
return Buffer.from(input.buffer, input.byteOffset, input.byteLength)
} else if (typeof input === 'string') {
return Buffer.from(input)
} else {
// @ts-ignore - Could be FormData|URLSearchParams|ReadableStream<Uint8Array>
// however electron-fetch does not support either of those types and
// node-fetch normalizes those to node streams.
return input
}
}

/**
* Takes body from native-fetch response as body and `onUploadProgress` handler
* and returns async iterable that emits body chunks and emits
* `onUploadProgress`.
*
* @param {Buffer|null|Readable} body
* @param {function(LoadProgress):void} onUploadProgress
* @returns {AsyncIterable<Buffer>}
*/
const iterateBodyWithProgress = async function * (body, onUploadProgress) {
/** @type {Buffer|null|Readable} */
if (body == null) {
onUploadProgress({ total: 0, loaded: 0, lengthComputable: true })
} else if (Buffer.isBuffer(body)) {
const total = body.byteLength
const lengthComputable = true
yield body
onUploadProgress({ total, loaded: total, lengthComputable })
} else {
const total = 0
const lengthComputable = false
let loaded = 0
for await (const chunk of body) {
loaded += chunk.byteLength
yield chunk
onUploadProgress({ total, loaded, lengthComputable })
}
}
}
Loading