Skip to content

Commit

Permalink
Implement streamingUpload flag
Browse files Browse the repository at this point in the history
COMPANION_STREAMING_UPLOAD
Default to false due to backward compatibility
If set to true, will start to upload files at the same time as dowlnoading them, by piping the streams

- Also implement progress for downloading too
- and fix progress duplication logic
- fix test that assumed file was fully downloaded after first progress event
  • Loading branch information
mifi committed Sep 7, 2021
1 parent 2f7fc2d commit 50cd8c3
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 36 deletions.
1 change: 1 addition & 0 deletions packages/@uppy/companion/KUBERNETES.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ data:
COMPANION_DOMAIN: "YOUR SERVER DOMAIN"
COMPANION_DOMAINS: "sub1.domain.com,sub2.domain.com,sub3.domain.com"
COMPANION_PROTOCOL: "YOUR SERVER PROTOCOL"
COMPANION_STREAMING_UPLOAD: true
COMPANION_REDIS_URL: redis://:superSecretPassword@uppy-redis.uppy.svc.cluster.local:6379
COMPANION_SECRET: "shh!Issa Secret!"
COMPANION_DROPBOX_KEY: "YOUR DROPBOX KEY"
Expand Down
1 change: 1 addition & 0 deletions packages/@uppy/companion/env_example
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ COMPANION_DOMAIN=uppy.xxxx.com
COMPANION_SELF_ENDPOINT=uppy.xxxx.com
COMPANION_HIDE_METRICS=false
COMPANION_HIDE_WELCOME=false
COMPANION_STREAMING_UPLOAD=true

COMPANION_PROTOCOL=https
COMPANION_DATADIR=/mnt/uppy-server-data
Expand Down
1 change: 1 addition & 0 deletions packages/@uppy/companion/src/companion.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const defaultOptions = {
},
debug: true,
logClientVersion: true,
streamingUpload: false,
}

// make the errors available publicly for custom providers
Expand Down
86 changes: 57 additions & 29 deletions packages/@uppy/companion/src/server/Uploader.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,19 @@ class Uploader {
this.token = uuid.v4()
this.options.metadata = this.options.metadata || {}
this.options.fieldname = this.options.fieldname || DEFAULT_FIELD_NAME
this.uploadSize = options.size
this.size = options.size
this.uploadFileName = this.options.metadata.name
? this.options.metadata.name.substring(0, MAX_FILENAME_LENGTH)
: `${Uploader.FILE_NAME_PREFIX}-${this.token}`

this.uploadStopped = false

/** @type {number} */
this.emittedProgress = 0
this.emittedProgress = {}
this.storage = options.storage
this._paused = false

this.downloadedBytes = 0

this.readStream = null

if (this.options.protocol === PROTOCOLS.tus) {
Expand Down Expand Up @@ -132,23 +133,33 @@ class Uploader {
}
}

// Some streams need to be downloaded entirely first, because we don't know their size from the provider
// This is true for zoom and drive (exported files) or some URL downloads.
// The stream will then typically come from a "Transfer-Encoding: chunked" response
async _prepareUnknownLengthStream (stream) {
async _downloadStreamAsFile (stream) {
this.tmpPath = join(this.options.pathPrefix, this.uploadFileName)

logger.debug('fully downloading file', 'uploader.download', this.shortToken)
// TODO limit to 10MB? (which is max for google drive and zoom export)
// TODO progress? Maybe OK with no progress because files are small
const writeStream = createWriteStream(this.tmpPath)

const onData = (chunk) => {
this.downloadedBytes += chunk.length
this.onProgress(0, undefined)
}

stream.on('data', onData)

await pipeline(stream, writeStream)
logger.debug('fully downloaded file', 'uploader.download', this.shortToken)
logger.debug('finished fully downloading file', 'uploader.download', this.shortToken)

const { size } = await stat(this.tmpPath)
this.uploadSize = size

const readStream = createReadStream(this.tmpPath)
this.readStream = readStream
this.size = size

const fileStream = createReadStream(this.tmpPath)
this.readStream = fileStream
}

_needDownloadFirst () {
return !this.options.size || !this.options.companionOptions.streamingUpload
}

/**
Expand All @@ -161,9 +172,12 @@ class Uploader {
if (this.readStream) throw new Error('Already uploading')

this.readStream = stream
if (!this.uploadSize) {
logger.debug('unable to determine file size, need to download the whole file first', 'controller.get.provider.size', this.shortToken)
await this._prepareUnknownLengthStream(this.readStream)
if (this._needDownloadFirst()) {
logger.debug('need to download the whole file first', 'controller.get.provider.size', this.shortToken)
// Some streams need to be downloaded entirely first, because we don't know their size from the provider
// This is true for zoom and drive (exported files) or some URL downloads.
// The stream will then typically come from a "Transfer-Encoding: chunked" response
await this._downloadStreamAsFile(this.readStream)
}
if (this.uploadStopped) return

Expand Down Expand Up @@ -356,30 +370,44 @@ class Uploader {
* @param {number | null} bytesTotalIn
*/
onProgress (bytesUploaded, bytesTotalIn) {
const bytesTotal = bytesTotalIn || this.uploadSize
const bytesTotal = bytesTotalIn || this.size || 0

// If fully downloading before uploading, combine downloaded and uploaded bytes
// This will make sure that the user sees half of the progress before upload starts (while downloading)
let combinedBytes = bytesUploaded
if (this._needDownloadFirst()) {
combinedBytes = Math.floor((combinedBytes + (this.downloadedBytes || 0)) / 2)
}

// Prevent divide by zero
let percentage = 0
if (bytesTotal > 0) percentage = Math.min(Math.max(0, ((combinedBytes / bytesTotal) * 100)), 100)

const percentage = Math.min(Math.max(0, ((bytesUploaded / bytesTotal) * 100)), 100)
const formatPercentage = percentage.toFixed(2)
const formattedPercentage = percentage.toFixed(2)
logger.debug(
`${bytesUploaded} ${bytesTotal} ${formatPercentage}%`,
'uploader.upload.progress',
`${combinedBytes} ${bytesTotal} ${formattedPercentage}%`,
'uploader.total.progress',
this.shortToken
)

if (this._paused || this.uploadStopped) {
return
}

const payload = { progress: formattedPercentage, bytesUploaded: combinedBytes, bytesTotal }
const dataToEmit = {
action: 'progress',
payload: { progress: formatPercentage, bytesUploaded, bytesTotal },
payload,
}
this.saveState(dataToEmit)

const isEqual = (p1, p2) => (p1.progress === p2.progress
&& p1.bytesUploaded === p2.bytesUploaded
&& p1.bytesTotal === p2.bytesTotal)

// avoid flooding the client with progress events.
const roundedPercentage = Math.floor(percentage)
if (this.emittedProgress !== roundedPercentage) {
this.emittedProgress = roundedPercentage
if (!isEqual(this.emittedProgress, payload)) {
this.emittedProgress = payload
emitter().emit(this.token, dataToEmit)
}
}
Expand Down Expand Up @@ -427,7 +455,7 @@ class Uploader {
uploadUrl: this.options.uploadUrl,
uploadLengthDeferred: false,
retryDelays: [0, 1000, 3000, 5000],
uploadSize: this.uploadSize,
uploadSize: this.size,
chunkSize: this.options.chunkSize || 50e6,
headers: headerSanitize(this.options.headers),
addRequestId: true,
Expand Down Expand Up @@ -499,12 +527,12 @@ class Uploader {
options: {
filename: this.uploadFileName,
contentType: this.options.metadata.type,
knownLength: this.uploadSize,
knownLength: this.size,
},
},
}
} else {
reqOptions.headers['content-length'] = this.uploadSize
reqOptions.headers['content-length'] = this.size
reqOptions.body = stream
}

Expand Down Expand Up @@ -539,8 +567,8 @@ class Uploader {
throw err
}

if (bytesUploaded !== this.uploadSize) {
const errMsg = `uploaded only ${bytesUploaded} of ${this.uploadSize} with status: ${response.statusCode}`
if (bytesUploaded !== this.size) {
const errMsg = `uploaded only ${bytesUploaded} of ${this.size} with status: ${response.statusCode}`
logger.error(errMsg, 'upload.multipart.mismatch.error')
throw new Error(errMsg)
}
Expand Down
1 change: 1 addition & 0 deletions packages/@uppy/companion/src/standalone/helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ const getConfigFromEnv = () => {
// cookieDomain is kind of a hack to support distributed systems. This should be improved but we never got so far.
cookieDomain: process.env.COMPANION_COOKIE_DOMAIN,
multipleInstances: true,
streamingUpload: process.env.COMPANION_STREAMING_UPLOAD === 'true',
}
}

Expand Down
12 changes: 7 additions & 5 deletions packages/@uppy/companion/test/__tests__/uploader.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,14 @@ describe('uploader with tus protocol', () => {
socketClient.onProgress(uploadToken, (message) => {
// validate that the file has been downloaded and saved into the file path
try {
const fileInfo = fs.statSync(uploader.tmpPath)
expect(fileInfo.isFile()).toBe(true)
expect(fileInfo.size).toBe(fileContent.length)

progressReceived = message.payload.bytesUploaded
expect(message.payload.bytesTotal).toBe(fileContent.length)

if (progressReceived === fileContent.length) {
const fileInfo = fs.statSync(uploader.tmpPath)
expect(fileInfo.isFile()).toBe(true)
expect(fileInfo.size).toBe(fileContent.length)
expect(message.payload.bytesTotal).toBe(fileContent.length)
}
} catch (err) {
reject(err)
}
Expand Down
1 change: 1 addition & 0 deletions test/endtoend/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class CompanionService {
COMPANION_DOMAIN: 'localhost:3030',
COMPANION_PROTOCOL: 'http',
COMPANION_PORT: 3030,
COMPANION_STREAMING_UPLOAD: true,
COMPANION_SECRET: process.env.TEST_COMPANION_SECRET,
COMPANION_DROPBOX_KEY: process.env.TEST_COMPANION_DROPBOX_KEY,
COMPANION_DROPBOX_SECRET: process.env.TEST_COMPANION_DROPBOX_SECRET,
Expand Down
10 changes: 8 additions & 2 deletions website/src/docs/companion.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ Install from NPM:
npm install @uppy/companion
```

If you don't have a Node.js project with a `package.json` you might want to install/run Companion globally like so: `[sudo] npm install -g @uppy/companion@2.x`.
If you don't have a Node.js project with a `package.json` you might want to install/run Companion globally like so: `npm install -g @uppy/companion`.

### Prerequisite

Expand All @@ -48,7 +48,7 @@ Unfortunately, Windows is not a supported platform right now. It may work, and w

Companion may either be used as a pluggable express app, which you plug into your already existing server, or it may simply be run as a standalone server:

### Plugging into an already existing server
### Plugging into an existing express server

To plug Companion into an existing server, call its `.app` method, passing in an [options](#Options) object as a parameter. This returns a server instance that you can mount on a subpath in your Express or app.

Expand Down Expand Up @@ -238,6 +238,9 @@ export COMPANION_SELF_ENDPOINT="THIS SHOULD BE SAME AS YOUR DOMAIN + PATH"
# comma-separated URLs
# corresponds to the uploadUrls option
export COMPANION_UPLOAD_URLS="http://tusd.tusdemo.net/files/,https://tusd.tusdemo.net/files/"

# corresponds to the streamingUpload option
export COMPANION_STREAMING_UPLOAD=true
```

See [env.example.sh](https://github.com/transloadit/uppy/blob/master/env.example.sh) for an example configuration script.
Expand Down Expand Up @@ -288,6 +291,7 @@ const options = {
uploadUrls: ['https://myuploadurl.com', 'http://myuploadurl2.com'],
debug: true,
metrics: false,
streamingUpload: true,
}
```

Expand Down Expand Up @@ -324,6 +328,8 @@ const options = {

13. **metrics(optional)** - A boolean flag to tell Companion whether or not to provide an endpoint `/metrics` with Prometheus metrics.

14. **streamingUpload(optional)** - A boolean flag to tell Companion whether or not to enable streaming uploads. If enabled, it will lead to **faster uploads* because companion will start uploading at the same time as downloading using `stream.pipe`. If `false`, files will be fully downloaded first, then uploaded. Defaults to `false`.

### Provider Redirect URIs

When generating your provider API keys on their corresponding developer platforms (e.g [Google Developer Console](https://console.developers.google.com/)), you'd need to provide a `redirect URI` for the OAuth authorization process. In general the redirect URI for each provider takes the format:
Expand Down

0 comments on commit 50cd8c3

Please sign in to comment.