Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable write retry and nack pending writes on reconnect #443

Merged
merged 26 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
3cd69e9
fix: nack pending writes on reconnect
alvarowolfx Apr 22, 2024
7ba612c
fix: lint issues
alvarowolfx Apr 22, 2024
70f6972
fix: lint issues
alvarowolfx Apr 22, 2024
a751d77
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Apr 22, 2024
527ab79
Merge branch 'fix-ack-on-reconnect' of https://github.com/alvarowolfx…
gcf-owl-bot[bot] Apr 22, 2024
4cfe085
feat: enable write retries on failed pending writes
alvarowolfx Apr 23, 2024
474e237
fix: lint issue
alvarowolfx Apr 23, 2024
50f2ae7
fix: only reconnect on server-side close event
alvarowolfx Apr 24, 2024
63ac809
fix: only reconnect on close if there are pending writes
alvarowolfx Apr 24, 2024
b60db53
feat: resend on retryable error
alvarowolfx Apr 25, 2024
254d96b
fix: do not emit error if is retryable and no listerners are set up
alvarowolfx Apr 25, 2024
fb9f85f
fix: let permanent errors to nack individual pending writes
alvarowolfx Apr 25, 2024
94241d7
fix: remove unused import
alvarowolfx Apr 25, 2024
b51ea18
fix: grpc conn.write sequence is not stable
alvarowolfx Apr 26, 2024
4e7bcf3
feat: handle in stream response error and retry RESOURCE_EXAUSTED
alvarowolfx Apr 26, 2024
17c0e71
fix: lint issue
alvarowolfx Apr 26, 2024
841d174
fix: remove in stream handling and RE as retryable error
alvarowolfx Apr 26, 2024
762f5d6
fix: rollback changes to deleteDatasets method
alvarowolfx Apr 26, 2024
ac7af32
fix: reconnect only on retryable errors and handle in stream errors
alvarowolfx Apr 26, 2024
2f2e600
feat: log number of pending writes on reconnect
alvarowolfx Apr 29, 2024
f35bc48
fix: reconnect trace msg
alvarowolfx Apr 29, 2024
62ff5dd
fix: don't close conn on flush
alvarowolfx Apr 30, 2024
a921b9f
fix: rename var/properties for clarity
alvarowolfx Apr 30, 2024
1e4e851
fix: address review comments
alvarowolfx May 2, 2024
4c01180
fix: return after nack pending write due to max retries
alvarowolfx May 2, 2024
de07922
fix: change connect error msg and code
alvarowolfx May 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/managedwriter/pending_write.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,41 @@ type AppendRowRequest =
export class PendingWrite {
private request: AppendRowRequest;
private response?: AppendRowsResponse;
private attempts: number;
private promise: Promise<AppendRowsResponse>;
private resolveFunc?: (response: AppendRowsResponse) => void;
private rejectFunc?: (reason?: protos.google.rpc.IStatus) => void;

constructor(request: AppendRowRequest) {
this.request = request;
this.attempts = 0;
this.promise = new Promise((resolve, reject) => {
this.resolveFunc = resolve;
this.rejectFunc = reject;
});
}

/**
* Increase number of attempts and return current value.
*
* @private
* @internal
* @returns {number} current number of attempts
*/
_increaseAttempts(): number {
return this.attempts++;
}

/**
* Resolve pending write with error or AppendRowResponse.
* This resolves the promise accessed via GetResult()
*
* @see GetResult
*
* @private
* @internal
* @returns {number} current number of attempts
*/
_markDone(err: Error | null, response?: AppendRowsResponse) {
if (err) {
this.rejectFunc && this.rejectFunc(err);
Expand Down
157 changes: 102 additions & 55 deletions src/managedwriter/stream_connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import * as protos from '../../protos/protos';
import {WriterClient} from './writer_client';
import {PendingWrite} from './pending_write';
import {logger} from './logger';
import {parseStorageErrors} from './error';

type TableSchema = protos.google.cloud.bigquery.storage.v1.ITableSchema;
type IInt64Value = protos.google.protobuf.IInt64Value;
Expand Down Expand Up @@ -56,6 +55,7 @@ export class StreamConnection extends EventEmitter {
private _streamId: string;
private _writeClient: WriterClient;
private _connection?: gax.CancellableStream | null;
private _lastConnectionError?: gax.GoogleError | null;
private _callOptions?: gax.CallOptions;
private _pendingWrites: PendingWrite[];

Expand All @@ -76,6 +76,7 @@ export class StreamConnection extends EventEmitter {
if (this.isOpen()) {
this.close();
}
this._lastConnectionError = null;
const callOptions = this.resolveCallOptions(
this._streamId,
this._callOptions
Expand All @@ -86,7 +87,23 @@ export class StreamConnection extends EventEmitter {
this._connection.on('data', this.handleData);
this._connection.on('error', this.handleError);
this._connection.on('close', () => {
this.trace('connection closed');
this.trace('connection closed', this._lastConnectionError);
if (this.hasPendingWrites()) {
const retrySettings = this._writeClient._retrySettings;
if (
retrySettings.enableWriteRetries &&
this.isRetryableError(this._lastConnectionError)
) {
this.reconnect();
alvarowolfx marked this conversation as resolved.
Show resolved Hide resolved
this.resendAllPendingWrites();
} else {
const err = new gax.GoogleError(
'Connection failure, please retry the request'
);
err.code = gax.Status.UNAVAILABLE;
this.ackAllPendingWrites(err);
}
}
});
this._connection.on('pause', () => {
this.trace('connection paused');
Expand All @@ -106,62 +123,53 @@ export class StreamConnection extends EventEmitter {

private handleError = (err: gax.GoogleError) => {
this.trace('on error', err, JSON.stringify(err));
if (this.shouldReconnect(err)) {
this.reconnect();
return;
}
let nextPendingWrite = this.getNextPendingWrite();
if (this.isPermanentError(err)) {
this.trace('found permanent error', err);
while (nextPendingWrite) {
this.ackNextPendingWrite(err);
nextPendingWrite = this.getNextPendingWrite();
}
this.emit('error', err);
return;
}
if (this.isRequestError(err) && nextPendingWrite) {
this._lastConnectionError = err;
const nextPendingWrite = this.getNextPendingWrite();
if (nextPendingWrite) {
this.trace(
'found request error with pending write',
err,
nextPendingWrite
);
this.ackNextPendingWrite(err);
this.handleRetry(err);
}
if (this.listenerCount('error') === 0 && this.isRetryableError(err)) {
return;
}
this.emit('error', err);
alvarowolfx marked this conversation as resolved.
Show resolved Hide resolved
};

private shouldReconnect(err: gax.GoogleError): boolean {
const reconnectionErrorCodes = [
gax.Status.UNAVAILABLE,
gax.Status.RESOURCE_EXHAUSTED,
private handleRetry(err: gax.GoogleError) {
alvarowolfx marked this conversation as resolved.
Show resolved Hide resolved
const retrySettings = this._writeClient._retrySettings;
if (retrySettings.enableWriteRetries && this.isRetryableError(err)) {
if (!this.isConnectionClosed()) {
const pw = this._pendingWrites.pop()!;
this.send(pw);
}
alvarowolfx marked this conversation as resolved.
Show resolved Hide resolved
} else {
this.ackNextPendingWrite(err);
}
}

private isRetryableError(err?: gax.GoogleError | null): boolean {
if (!err) {
return false;
}
const errorCodes = [
gax.Status.ABORTED,
gax.Status.UNAVAILABLE,
gax.Status.CANCELLED,
gax.Status.DEADLINE_EXCEEDED,
gax.Status.INTERNAL,
gax.Status.DEADLINE_EXCEEDED,
];
return !!err.code && reconnectionErrorCodes.includes(err.code);
return !!err.code && errorCodes.includes(err.code);
}

private isPermanentError(err: gax.GoogleError): boolean {
if (err.code === gax.Status.INVALID_ARGUMENT) {
const storageErrors = parseStorageErrors(err);
for (const storageError of storageErrors) {
if (
storageError.errorMessage?.includes(
'Schema mismatch due to extra fields in user schema'
)
) {
return true;
}
}
private isConnectionClosed() {
if (this._connection) {
return this._connection.destroyed || this._connection.closed;
}
return false;
}

private isRequestError(err: gax.GoogleError): boolean {
return err.code === gax.Status.INVALID_ARGUMENT;
return true;
}

private resolveCallOptions(
Expand All @@ -183,15 +191,23 @@ export class StreamConnection extends EventEmitter {
}

private handleData = (response: AppendRowsResponse) => {
this.trace('data arrived', response);
const pw = this.getNextPendingWrite();
if (!pw) {
this.trace('data arrived', response, this._pendingWrites.length);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably too much logging, it will log one line per request. If it is debug logging then it is fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

those trace calls are only on DEBUG mode.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a LOG mode? I think those reconnect errors need to be in LOG mode. Even for debug mode, per request logging might be too much, but I am less concerned.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be a place that could benefit from the logger I'm working on elsewhere. (Probably said that already, but specifically here...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gonna be the first user of that new logger @feywind 🚀

if (!this.hasPendingWrites()) {
this.trace('data arrived with no pending write available', response);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

return;
}
if (response.updatedSchema) {
this.emit('schemaUpdated', response.updatedSchema);
}
const responseErr = response.error;
if (responseErr) {
const gerr = new gax.GoogleError(responseErr.message!);
gerr.code = responseErr.code!;
if (this.isRetryableError(gerr)) {
this.handleRetry(gerr);
return;
}
}
this.ackNextPendingWrite(null, response);
};

Expand Down Expand Up @@ -238,13 +254,38 @@ export class StreamConnection extends EventEmitter {
return this._streamId;
};

private hasPendingWrites(): boolean {
return this._pendingWrites.length > 0;
}

private getNextPendingWrite(): PendingWrite | null {
if (this._pendingWrites.length > 0) {
return this._pendingWrites[0];
return this._pendingWrites[this._pendingWrites.length - 1];
}
return null;
}

private resendAllPendingWrites() {
const pendingWritesToRetry = [...this._pendingWrites]; // copy array;
let pw = pendingWritesToRetry.pop();
while (pw) {
this._pendingWrites.pop(); // remove from real queue
this.send(pw); // .send immediately adds to the queue
pw = pendingWritesToRetry.pop();
}
}

private ackAllPendingWrites(
err: Error | null,
result?:
| protos.google.cloud.bigquery.storage.v1.IAppendRowsResponse
| undefined
) {
while (this.hasPendingWrites()) {
this.ackNextPendingWrite(err, result);
}
}

private ackNextPendingWrite(
err: Error | null,
result?:
Expand All @@ -253,6 +294,7 @@ export class StreamConnection extends EventEmitter {
) {
const pw = this._pendingWrites.pop();
if (pw) {
this.trace('ack pending write:', pw, err, result);
pw._markDone(err, result);
}
}
Expand All @@ -279,23 +321,27 @@ export class StreamConnection extends EventEmitter {
}

private send(pw: PendingWrite) {
const request = pw.getRequest();
if (!this._connection) {
pw._markDone(new Error('connection closed'));
const retrySettings = this._writeClient._retrySettings;
const tries = pw._increaseAttempts();
if (tries > retrySettings.maxRetryAttempts) {
pw._markDone(
new Error(`pending write max retries reached: ${tries} attempts`)
);
return;
}
if (this._connection.destroyed || this._connection.closed) {
if (this.isConnectionClosed()) {
this.reconnect();
}
this.trace('sending pending write', pw);
try {
this._connection.write(request, err => {
const request = pw.getRequest();
this._pendingWrites.unshift(pw);
this._connection?.write(request, err => {
alvarowolfx marked this conversation as resolved.
Show resolved Hide resolved
this.trace('wrote pending write', err, this._pendingWrites.length);
if (err) {
pw._markDone(err); //TODO: add retries
return;
}
this._pendingWrites.unshift(pw);
});
} catch (err) {
pw._markDone(err as Error);
Expand All @@ -306,14 +352,16 @@ export class StreamConnection extends EventEmitter {
* Check if connection is open and ready to send requests.
*/
isOpen(): boolean {
return !!this._connection;
return !this.isConnectionClosed();
}

/**
* Reconnect and re send inflight requests.
* Re open appendRows BiDi gRPC connection.
*/
reconnect() {
this.trace('reconnect called');
this.trace(
`reconnect called with ${this._pendingWrites.length} pending writes`
);
this.close();
this.open();
}
Expand Down Expand Up @@ -347,7 +395,6 @@ export class StreamConnection extends EventEmitter {
async flushRows(request?: {
offset?: IInt64Value['value'];
}): Promise<FlushRowsResponse | null> {
this.close();
if (this.isDefaultStream()) {
return null;
}
Expand Down
2 changes: 1 addition & 1 deletion src/managedwriter/writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ export class Writer {
offsetValue?: IInt64Value['value']
): PendingWrite {
let offset: AppendRowRequest['offset'];
if (offsetValue) {
if (offsetValue !== undefined && offsetValue !== null) {
offset = {
value: offsetValue,
};
Expand Down
37 changes: 37 additions & 0 deletions src/managedwriter/writer_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import {StreamConnection} from './stream_connection';
type StreamConnections = {
connectionList: StreamConnection[];
};
type RetrySettings = {
enableWriteRetries: boolean;
maxRetryAttempts: number;
};
type CreateWriteStreamRequest =
protos.google.cloud.bigquery.storage.v1.ICreateWriteStreamRequest;
type BatchCommitWriteStreamsRequest =
Expand Down Expand Up @@ -55,6 +59,12 @@ export class WriterClient {
private _client: BigQueryWriteClient;
private _connections: StreamConnections;
private _open: boolean;
/**
* Retry settings, only internal for now.
* @private
* @internal
*/
_retrySettings: RetrySettings;

constructor(opts?: ClientOptions) {
const baseOptions = {
Expand All @@ -69,6 +79,10 @@ export class WriterClient {
connectionList: [],
};
this._open = false;
this._retrySettings = {
enableWriteRetries: false,
maxRetryAttempts: 4,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why these values? Can they be overridden?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they can be overwritten via the enableWriteRetries and setMaxRetryAttempts methods. The default values came from the Go ManagedWriter - https://github.com/googleapis/google-cloud-go/blob/5a8c88956ea11eaf3e8aa8cffadfca06bf58c51d/bigquery/storage/managedwriter/retry.go#L31

};
}

/**
Expand Down Expand Up @@ -102,6 +116,29 @@ export class WriterClient {
return this._open;
}

/**
* Enables StreamConnections to automatically retry failed appends.
*
* Enabling retries is best suited for cases where users want to achieve at-least-once
* append semantics. Use of automatic retries may complicate patterns where the user
* is designing for exactly-once append semantics.
*/
enableWriteRetries(enable: boolean) {
alvarowolfx marked this conversation as resolved.
Show resolved Hide resolved
this._retrySettings.enableWriteRetries = enable;
}

/**
* Change max retries attempts on child StreamConnections.
*
* The default valuen is to retry 4 times.
*
* Only valid right now when write retries are enabled.
* @see enableWriteRetries.
*/
setMaxRetryAttempts(retryAttempts: number) {
this._retrySettings.maxRetryAttempts = retryAttempts;
}

/**
* Creates a write stream to the given table.
* Additionally, every table has a special stream named DefaultStream
Expand Down
Loading
Loading