Skip to content

Commit

Permalink
feat: Add ability to specify RetryOptions and BigQueryRetryConfig whe…
Browse files Browse the repository at this point in the history
…n create job and waitFor (#3398)

* feat: initial implementation of customizable BigQueryRetryConfig

* Add unit and integration tests

* Fix lint issues

* Revert unintentional changes to testQueryStatistics in ITBigQueryTest

* Revert unintentional changes to testQueryStatistics in ITBigQueryTest

* Revert unintentional changes to testQueryStatistics in ITBigQueryTest

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Add additional comments to Job.waitFor() for BigQueryRetryConfig

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
PhongChuong and gcf-owl-bot[bot] committed Jul 24, 2024
1 parent 95c8d6f commit 1f91ae7
Show file tree
Hide file tree
Showing 7 changed files with 370 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ public static JobListOption fields(JobField... fields) {
/** Class for specifying table get and create options. */
class JobOption extends Option {

private static final long serialVersionUID = -3111736712316353665L;
private static final long serialVersionUID = -3111736712316353664L;

private JobOption(BigQueryRpc.Option option, Object value) {
super(option, value);
Expand All @@ -578,6 +578,16 @@ public static JobOption fields(JobField... fields) {
return new JobOption(
BigQueryRpc.Option.FIELDS, Helper.selector(JobField.REQUIRED_FIELDS, fields));
}

/** Returns an option to specify the job's BigQuery retry configuration. */
public static JobOption bigQueryRetryConfig(BigQueryRetryConfig bigQueryRetryConfig) {
return new JobOption(BigQueryRpc.Option.BIGQUERY_RETRY_CONFIG, bigQueryRetryConfig);
}

/** Returns an option to specify the job's retry options. */
public static JobOption retryOptions(RetryOption... options) {
return new JobOption(BigQueryRpc.Option.RETRY_OPTIONS, options);
}
}

/** Class for specifying query results options. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.google.cloud.Policy;
import com.google.cloud.RetryHelper;
import com.google.cloud.RetryHelper.RetryHelperException;
import com.google.cloud.RetryOption;
import com.google.cloud.Tuple;
import com.google.cloud.bigquery.InsertAllRequest.RowToInsert;
import com.google.cloud.bigquery.QueryJobConfiguration.JobCreationMode;
Expand Down Expand Up @@ -415,10 +416,15 @@ public com.google.api.services.bigquery.model.Job call() {
}
}
},
getOptions().getRetrySettings(),
getRetryOptions(optionsMap) != null
? RetryOption.mergeToSettings(
getOptions().getRetrySettings(), getRetryOptions(optionsMap))
: getOptions().getRetrySettings(),
BigQueryBaseService.BIGQUERY_EXCEPTION_HANDLER,
getOptions().getClock(),
DEFAULT_RETRY_CONFIG));
getBigQueryRetryConfig(optionsMap) != null
? getBigQueryRetryConfig(optionsMap)
: DEFAULT_RETRY_CONFIG));
} catch (BigQueryRetryHelper.BigQueryRetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}
Expand Down Expand Up @@ -1628,4 +1634,13 @@ public com.google.api.services.bigquery.model.TestIamPermissionsResponse call()
}
return optionMap;
}

static BigQueryRetryConfig getBigQueryRetryConfig(Map<BigQueryRpc.Option, ?> options) {
return (BigQueryRetryConfig)
options.getOrDefault(BigQueryRpc.Option.BIGQUERY_RETRY_CONFIG, null);
}

static RetryOption[] getRetryOptions(Map<BigQueryRpc.Option, ?> options) {
return (RetryOption[]) options.getOrDefault(BigQueryRpc.Option.RETRY_OPTIONS, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,21 @@ public boolean isDone() {
Job job = bigquery.getJob(getJobId(), JobOption.fields(BigQuery.JobField.STATUS));
return job == null || JobStatus.State.DONE.equals(job.getStatus().getState());
}

/** See {@link #waitFor(BigQueryRetryConfig, RetryOption...)} */
public Job waitFor(RetryOption... waitOptions) throws InterruptedException {
return waitForInternal(DEFAULT_RETRY_CONFIG, waitOptions);
}

/**
* Blocks until this job completes its execution, either failing or succeeding. This method
* returns current job's latest information. If the job no longer exists, this method returns
* {@code null}. By default, the job status is checked using jittered exponential backoff with 1
* second as an initial delay, 2.0 as a backoff factor, 1 minute as maximum delay between polls,
* 12 hours as a total timeout and unlimited number of attempts.
* 12 hours as a total timeout and unlimited number of attempts. For query jobs, the job status
* check can be configured to retry on specific BigQuery error messages using {@link
* BigQueryRetryConfig}. This {@link BigQueryRetryConfig} configuration is not available for
* non-query jobs.
*
* <p>Example usage of {@code waitFor()}.
*
Expand Down Expand Up @@ -232,18 +241,46 @@ public boolean isDone() {
* }
* }</pre>
*
* <p>Example usage of {@code waitFor()} with BigQuery retry configuration to retry on rate limit
* exceeded error messages for query jobs.
*
* <pre>{@code
* Job completedJob =
* job.waitFor(
* BigQueryRetryConfig.newBuilder()
* .retryOnMessage(BigQueryErrorMessages.RATE_LIMIT_EXCEEDED_MSG)
* .retryOnMessage(BigQueryErrorMessages.JOB_RATE_LIMIT_EXCEEDED_MSG)
* .retryOnRegEx(BigQueryErrorMessages.RetryRegExPatterns.RATE_LIMIT_EXCEEDED_REGEX)
* .build());
* if (completedJob == null) {
* // job no longer exists
* } else if (completedJob.getStatus().getError() != null) {
* // job failed, handle error
* } else {
* // job completed successfully
* }
* }</pre>
*
* @param bigQueryRetryConfig configures retries for query jobs for BigQuery failures
* @param waitOptions options to configure checking period and timeout
* @throws BigQueryException upon failure, check {@link BigQueryException#getCause()} for details
* @throws InterruptedException if the current thread gets interrupted while waiting for the job
* to complete
*/
public Job waitFor(RetryOption... waitOptions) throws InterruptedException {
public Job waitFor(BigQueryRetryConfig bigQueryRetryConfig, RetryOption... waitOptions)
throws InterruptedException {
return waitForInternal(bigQueryRetryConfig, waitOptions);
}

private Job waitForInternal(BigQueryRetryConfig bigQueryRetryConfig, RetryOption... waitOptions)
throws InterruptedException {
checkNotDryRun("waitFor");
Object completedJobResponse;
if (getConfiguration().getType() == Type.QUERY) {
completedJobResponse =
waitForQueryResults(
RetryOption.mergeToSettings(DEFAULT_JOB_WAIT_SETTINGS, waitOptions),
bigQueryRetryConfig,
DEFAULT_QUERY_WAIT_OPTIONS);
} else {
completedJobResponse =
Expand Down Expand Up @@ -294,7 +331,9 @@ public TableResult getQueryResults(QueryResultsOption... options)

QueryResponse response =
waitForQueryResults(
DEFAULT_JOB_WAIT_SETTINGS, waitOptions.toArray(new QueryResultsOption[0]));
DEFAULT_JOB_WAIT_SETTINGS,
DEFAULT_RETRY_CONFIG,
waitOptions.toArray(new QueryResultsOption[0]));

// Get the job resource to determine if it has errored.
Job job = this;
Expand Down Expand Up @@ -334,7 +373,9 @@ public TableResult getQueryResults(QueryResultsOption... options)
}

private QueryResponse waitForQueryResults(
RetrySettings retrySettings, final QueryResultsOption... resultsOptions)
RetrySettings retrySettings,
BigQueryRetryConfig bigQueryRetryConfig,
final QueryResultsOption... resultsOptions)
throws InterruptedException {
if (getConfiguration().getType() != Type.QUERY) {
throw new UnsupportedOperationException(
Expand All @@ -360,7 +401,7 @@ public boolean shouldRetry(
}
},
options.getClock(),
DEFAULT_RETRY_CONFIG);
bigQueryRetryConfig);
} catch (BigQueryRetryHelper.BigQueryRetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ enum Option {
STATE_FILTER("stateFilter"),
TIMEOUT("timeoutMs"),
REQUESTED_POLICY_VERSION("requestedPolicyVersion"),
TABLE_METADATA_VIEW("view");
TABLE_METADATA_VIEW("view"),
RETRY_OPTIONS("retryOptions"),
BIGQUERY_RETRY_CONFIG("bigQueryRetryConfig");

private final String value;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.api.services.bigquery.model.*;
import com.google.api.services.bigquery.model.JobStatistics;
import com.google.cloud.Policy;
import com.google.cloud.RetryOption;
import com.google.cloud.ServiceOptions;
import com.google.cloud.Tuple;
import com.google.cloud.bigquery.BigQuery.JobOption;
Expand Down Expand Up @@ -1594,6 +1595,119 @@ public void testCreateJobFailureShouldRetry() {
verify(bigqueryRpcMock, times(6)).create(jobCapture.capture(), eq(EMPTY_RPC_OPTIONS));
}

@Test
public void testCreateJobWithBigQueryRetryConfigFailureShouldRetry() {
// Validate create job with BigQueryRetryConfig that retries on rate limit error message.
JobOption bigQueryRetryConfigOption =
JobOption.bigQueryRetryConfig(
BigQueryRetryConfig.newBuilder()
.retryOnMessage(BigQueryErrorMessages.RATE_LIMIT_EXCEEDED_MSG)
.retryOnMessage(BigQueryErrorMessages.JOB_RATE_LIMIT_EXCEEDED_MSG)
.retryOnRegEx(BigQueryErrorMessages.RetryRegExPatterns.RATE_LIMIT_EXCEEDED_REGEX)
.build());

Map<BigQueryRpc.Option, ?> bigQueryRpcOptions = optionMap(bigQueryRetryConfigOption);
when(bigqueryRpcMock.create(jobCapture.capture(), eq(bigQueryRpcOptions)))
.thenThrow(
new BigQueryException(
400, RATE_LIMIT_ERROR_MSG)) // retrial on based on RATE_LIMIT_EXCEEDED_MSG
.thenThrow(new BigQueryException(200, RATE_LIMIT_ERROR_MSG))
.thenReturn(newJobPb());

bigquery = options.getService();
bigquery =
options
.toBuilder()
.setRetrySettings(ServiceOptions.getDefaultRetrySettings())
.build()
.getService();

((BigQueryImpl) bigquery)
.create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY), bigQueryRetryConfigOption);
verify(bigqueryRpcMock, times(3)).create(jobCapture.capture(), eq(bigQueryRpcOptions));
}

@Test
public void testCreateJobWithBigQueryRetryConfigFailureShouldNotRetry() {
// Validate create job with BigQueryRetryConfig that does not retry on rate limit error message.
JobOption bigQueryRetryConfigOption =
JobOption.bigQueryRetryConfig(BigQueryRetryConfig.newBuilder().build());

Map<BigQueryRpc.Option, ?> bigQueryRpcOptions = optionMap(bigQueryRetryConfigOption);
when(bigqueryRpcMock.create(jobCapture.capture(), eq(bigQueryRpcOptions)))
.thenThrow(new BigQueryException(400, RATE_LIMIT_ERROR_MSG));

bigquery = options.getService();
bigquery =
options
.toBuilder()
.setRetrySettings(ServiceOptions.getDefaultRetrySettings())
.build()
.getService();

try {
((BigQueryImpl) bigquery)
.create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY), bigQueryRetryConfigOption);
fail("JobException expected");
} catch (BigQueryException e) {
assertNotNull(e.getMessage());
}
// Verify that getQueryResults is attempted only once and not retried since the error message
// does not match.
verify(bigqueryRpcMock, times(1)).create(jobCapture.capture(), eq(bigQueryRpcOptions));
}

@Test
public void testCreateJobWithRetryOptionsFailureShouldRetry() {
// Validate create job with RetryOptions.
JobOption retryOptions = JobOption.retryOptions(RetryOption.maxAttempts(4));
Map<BigQueryRpc.Option, ?> bigQueryRpcOptions = optionMap(retryOptions);
when(bigqueryRpcMock.create(jobCapture.capture(), eq(bigQueryRpcOptions)))
.thenThrow(new BigQueryException(500, "InternalError"))
.thenThrow(new BigQueryException(502, "Bad Gateway"))
.thenThrow(new BigQueryException(503, "Service Unavailable"))
.thenReturn(newJobPb());

bigquery = options.getService();
bigquery =
options
.toBuilder()
.setRetrySettings(ServiceOptions.getDefaultRetrySettings())
.build()
.getService();

((BigQueryImpl) bigquery)
.create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY), retryOptions);
verify(bigqueryRpcMock, times(4)).create(jobCapture.capture(), eq(bigQueryRpcOptions));
}

@Test
public void testCreateJobWithRetryOptionsFailureShouldNotRetry() {
// Validate create job with RetryOptions that only attempts once (no retry).
JobOption retryOptions = JobOption.retryOptions(RetryOption.maxAttempts(1));
Map<BigQueryRpc.Option, ?> bigQueryRpcOptions = optionMap(retryOptions);
when(bigqueryRpcMock.create(jobCapture.capture(), eq(bigQueryRpcOptions)))
.thenThrow(new BigQueryException(500, "InternalError"))
.thenReturn(newJobPb());

bigquery = options.getService();
bigquery =
options
.toBuilder()
.setRetrySettings(ServiceOptions.getDefaultRetrySettings())
.build()
.getService();

try {
((BigQueryImpl) bigquery)
.create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY), retryOptions);
fail("JobException expected");
} catch (BigQueryException e) {
assertNotNull(e.getMessage());
}
verify(bigqueryRpcMock, times(1)).create(jobCapture.capture(), eq(bigQueryRpcOptions));
}

@Test
public void testCreateJobWithSelectedFields() {
when(bigqueryRpcMock.create(
Expand Down
Loading

0 comments on commit 1f91ae7

Please sign in to comment.