From 5ca361b1cf4d6e43491cd996fa4992febc9e53bb Mon Sep 17 00:00:00 2001 From: David Cavazos Date: Wed, 12 Feb 2020 09:57:33 -0800 Subject: [PATCH] dataflow: wait for jobs to be cancellable in tests for run_template. (#2829) Co-authored-by: Leah E. Cole <6719667+leahecole@users.noreply.github.com> --- dataflow/run_template/main_test.py | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/dataflow/run_template/main_test.py b/dataflow/run_template/main_test.py index 07dc903dde2a..413f98917558 100644 --- a/dataflow/run_template/main_test.py +++ b/dataflow/run_template/main_test.py @@ -22,6 +22,7 @@ import os import pytest import time +import uuid from datetime import datetime from googleapiclient.discovery import build @@ -41,9 +42,14 @@ def app(): return flask.Flask(__name__) +def unique_job_name(label): + return datetime.now().strftime('{}-%Y%m%d-%H%M%S-{}'.format( + label, uuid.uuid4().hex)) + + def test_run_template_python_empty_args(app): project = PROJECT - job = datetime.now().strftime('test_run_template_python-%Y%m%d-%H%M%S') + job = unique_job_name('test_run_template_empty') template = 'gs://dataflow-templates/latest/Word_Count' with pytest.raises(HttpError): main.run(project, job, template) @@ -51,7 +57,7 @@ def test_run_template_python_empty_args(app): def test_run_template_python(app): project = PROJECT - job = datetime.now().strftime('test_run_template_python-%Y%m%d-%H%M%S') + job = unique_job_name('test_run_template_python') template = 'gs://dataflow-templates/latest/Word_Count' parameters = { 'inputFile': 'gs://apache-beam-samples/shakespeare/kinglear.txt', @@ -70,7 +76,7 @@ def test_run_template_http_empty_args(app): def test_run_template_http_url(app): args = { 'project': PROJECT, - 'job': datetime.now().strftime('test_run_template_url-%Y%m%d-%H%M%S'), + 'job': unique_job_name('test_run_template_url'), 'template': 'gs://dataflow-templates/latest/Word_Count', 'inputFile': 'gs://apache-beam-samples/shakespeare/kinglear.txt', 'output': 'gs://{}/dataflow/wordcount/outputs'.format(BUCKET), @@ -84,7 +90,7 @@ def test_run_template_http_url(app): def test_run_template_http_data(app): args = { 'project': PROJECT, - 'job': datetime.now().strftime('test_run_template_data-%Y%m%d-%H%M%S'), + 'job': unique_job_name('test_run_template_data'), 'template': 'gs://dataflow-templates/latest/Word_Count', 'inputFile': 'gs://apache-beam-samples/shakespeare/kinglear.txt', 'output': 'gs://{}/dataflow/wordcount/outputs'.format(BUCKET), @@ -98,7 +104,7 @@ def test_run_template_http_data(app): def test_run_template_http_json(app): args = { 'project': PROJECT, - 'job': datetime.now().strftime('test_run_template_json-%Y%m%d-%H%M%S'), + 'job': unique_job_name('test_run_template_json'), 'template': 'gs://dataflow-templates/latest/Word_Count', 'inputFile': 'gs://apache-beam-samples/shakespeare/kinglear.txt', 'output': 'gs://{}/dataflow/wordcount/outputs'.format(BUCKET), @@ -110,9 +116,17 @@ def test_run_template_http_json(app): def dataflow_jobs_cancel(job_id): - # Wait time until a job can be cancelled, as a best effort. - # If it fails to be cancelled, the job will run for ~8 minutes. - time.sleep(5) # seconds + # Wait time until the job can be cancelled. + state = None + while state != 'JOB_STATE_RUNNING': + job = dataflow.projects().jobs().get( + projectId=PROJECT, + jobId=job_id + ).execute() + state = job['currentState'] + time.sleep(1) + + # Cancel the Dataflow job. request = dataflow.projects().jobs().update( projectId=PROJECT, jobId=job_id,