From a40ea7e4071d512e8056ad60982ea8e1f0066996 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Thu, 28 May 2020 15:05:34 -0700 Subject: [PATCH 1/5] add dlq samples --- pubsub/cloud-client/README.rst | 23 +- pubsub/cloud-client/subscriber.py | 296 +++++++++++++++++++++++-- pubsub/cloud-client/subscriber_test.py | 117 +++++++++- 3 files changed, 398 insertions(+), 38 deletions(-) diff --git a/pubsub/cloud-client/README.rst b/pubsub/cloud-client/README.rst index c30fd190a233..f27f9438ea96 100644 --- a/pubsub/cloud-client/README.rst +++ b/pubsub/cloud-client/README.rst @@ -74,7 +74,7 @@ To run this sample: .. code-block:: bash - $ python publisher.py + $ python publisher.py --help usage: publisher.py [-h] project_id @@ -124,11 +124,11 @@ To run this sample: .. code-block:: bash - $ python subscriber.py + $ python subscriber.py --help usage: subscriber.py [-h] project_id - {list_in_topic,list_in_project,create,create-push,delete,update,receive,receive-custom-attributes,receive-flow-control,receive-synchronously,receive-synchronously-with-lease,listen_for_errors} + {list-in-topic,list-in-project,create,create-with-dead-letter-policy,create-push,delete,update-push,update-dead-letter-policy,remove-dead-letter-policy,receive,receive-custom-attributes,receive-flow-control,receive-synchronously,receive-synchronously-with-lease,listen-for-errors,receive-messages-with-delivery-attempts} ... This application demonstrates how to perform basic operations on @@ -139,15 +139,21 @@ To run this sample: positional arguments: project_id Your Google Cloud project ID - {list_in_topic,list_in_project,create,create-push,delete,update,receive,receive-custom-attributes,receive-flow-control,receive-synchronously,receive-synchronously-with-lease,listen_for_errors} - list_in_topic Lists all subscriptions for a given topic. - list_in_project Lists all subscriptions in the current project. + {list-in-topic,list-in-project,create,create-with-dead-letter-policy,create-push,delete,update-push,update-dead-letter-policy,remove-dead-letter-policy,receive,receive-custom-attributes,receive-flow-control,receive-synchronously,receive-synchronously-with-lease,listen-for-errors,receive-messages-with-delivery-attempts} + list-in-topic Lists all subscriptions for a given topic. + list-in-project Lists all subscriptions in the current project. create Create a new pull subscription on the given topic. + create-with-dead-letter-policy + Create a subscription with dead letter policy. create-push Create a new push subscription on the given topic. delete Deletes an existing Pub/Sub topic. - update Updates an existing Pub/Sub subscription's push + update-push Updates an existing Pub/Sub subscription's push endpoint URL. Note that certain properties of a subscription, such as its topic, are not modifiable. + update-dead-letter-policy + Update a subscription's dead letter policy. + remove-dead-letter-policy + Remove dead letter policy from a subscription. receive Receives messages from a pull subscription. receive-custom-attributes Receives messages from a pull subscription. @@ -158,8 +164,9 @@ To run this sample: Pulling messages synchronously. receive-synchronously-with-lease Pulling messages synchronously with lease management - listen_for_errors Receives messages and catches errors from a pull + listen-for-errors Receives messages and catches errors from a pull subscription. + receive-messages-with-delivery-attempts optional arguments: -h, --help show this help message and exit diff --git a/pubsub/cloud-client/subscriber.py b/pubsub/cloud-client/subscriber.py index e22efc7b16f3..d7f3f2b9da32 100644 --- a/pubsub/cloud-client/subscriber.py +++ b/pubsub/cloud-client/subscriber.py @@ -72,9 +72,7 @@ def create_subscription(project_id, topic_name, subscription_name): project_id, subscription_name ) - subscription = subscriber.create_subscription( - subscription_path, topic_path - ) + subscription = subscriber.create_subscription(subscription_path, topic_path) print("Subscription created: {}".format(subscription)) @@ -82,6 +80,57 @@ def create_subscription(project_id, topic_name, subscription_name): # [END pubsub_create_pull_subscription] +def create_subscription_with_dead_letter_topic( + project_id, topic_name, subscription_name, dead_letter_topic_name +): + """Create a subscription with dead letter policy.""" + # [START pubsub_dead_letter_create_subscription] + from google.cloud import pubsub_v1 + from google.cloud.pubsub_v1.types import DeadLetterPolicy + + # TODO project_id = "Your Google Cloud Project ID" + # This is an existing topic that the subscription with dead letter policy + # is attached to. + # TODO topic_name = "Your Pub/Sub topic name" + # This is an existing subscription with a dead letter policy. + # TODO subscription_name = "Your Pub/Sub subscription name" + # This is an existing dead letter topic that the subscription with dead + # letter policy will forward dead letter messages to. + # TODO dead_letter_topic_name = "Your Pub/Sub dead letter topic name" + + subscriber = pubsub_v1.SubscriberClient() + topic_path = subscriber.topic_path(project_id, topic_name) + subscription_path = subscriber.subscription_path( + project_id, subscription_name + ) + dead_letter_topic_path = subscriber.topic_path( + project_id, dead_letter_topic_name + ) + + dead_letter_policy = DeadLetterPolicy( + dead_letter_topic=dead_letter_topic_path, max_delivery_attempts=10 + ) + + subscription = subscriber.create_subscription( + subscription_path, topic_path, dead_letter_policy=dead_letter_policy + ) + + print("Subscription created: {}".format(subscription.name)) + print( + "It will forward dead letter messages to: {}".format( + subscription.dead_letter_policy.dead_letter_topic + ) + ) + print( + "After {} delivery attempts.".format( + subscription.dead_letter_policy.max_delivery_attempts + ) + ) + + subscriber.close() + # [END pubsub_dead_letter_create_subscription] + + def create_push_subscription( project_id, topic_name, subscription_name, endpoint ): @@ -134,7 +183,9 @@ def delete_subscription(project_id, subscription_name): # [END pubsub_delete_subscription] -def update_subscription(project_id, subscription_name, endpoint): +def update_push_subscription( + project_id, topic_name, subscription_name, endpoint +): """ Updates an existing Pub/Sub subscription's push endpoint URL. Note that certain properties of a subscription, such as @@ -156,13 +207,12 @@ def update_subscription(project_id, subscription_name, endpoint): push_config = pubsub_v1.types.PushConfig(push_endpoint=endpoint) subscription = pubsub_v1.types.Subscription( - name=subscription_path, push_config=push_config + name=subscription_path, topic=topic_name, push_config=push_config ) update_mask = {"paths": {"push_config"}} - subscriber.update_subscription(subscription, update_mask) - result = subscriber.get_subscription(subscription_path) + result = subscriber.update_subscription(subscription, update_mask) print("Subscription updated: {}".format(subscription_path)) print("New endpoint for subscription is: {}".format(result.push_config)) @@ -171,6 +221,113 @@ def update_subscription(project_id, subscription_name, endpoint): # [END pubsub_update_push_configuration] +def update_subscription_with_dead_letter_policy( + project_id, topic_name, subscription_name, dead_letter_topic_name +): + """Update a subscription's dead letter policy.""" + # [START pubsub_dead_letter_update_subscription] + from google.cloud import pubsub_v1 + from google.cloud.pubsub_v1.types import DeadLetterPolicy + + # TODO project_id = "Your Google Cloud Project ID" + # This is an existing topic that the subscription with dead letter policy + # is attached to. + # TODO topic_name = "Your Pub/Sub topic name" + # This is an existing subscription with a dead letter policy. + # TODO subscription_name = "Your Pub/Sub subscription name" + # This is an existing dead letter topic that the subscription with dead + # letter policy will forward dead letter messages to. + # TODO dead_letter_topic_name = "Your Pub/Sub dead letter topic name" + + subscriber = pubsub_v1.SubscriberClient() + topic_path = subscriber.topic_path(project_id, topic_name) + subscription_path = subscriber.subscription_path( + project_id, subscription_name + ) + dead_letter_topic_path = subscriber.topic_path( + project_id, dead_letter_topic_name + ) + + subscription_before_update = subscriber.get_subscription(subscription_path) + print("Before the update: {}".format(subscription_before_update)) + + subscription_before_update.HasField("dead_letter_policy") + + assert 'dead_letter_policy' in properties + update_mask = {"paths": {"dead_letter_policy": {"max_delivery_attempts"}}} + + # Construct a dead letter policy you expect to have after the update. + dead_letter_policy = DeadLetterPolicy( + dead_letter_topic=dead_letter_topic_path, max_delivery_attempts=20 + ) + + # Construct the subscription with the dead letter policy you expect to have + # after the update. Here, values in the required fields (name, topic) help + # identify the subscription. + subscription = pubsub_v1.types.Subscription( + name=subscription_path, + topic=topic_path, + dead_letter_policy=dead_letter_policy, + ) + + subscription_after_update = subscriber.update_subscription( + subscription, update_mask + ) + + print("After the update: {}".format(subscription_after_update)) + + subscriber.close() + # [END pubsub_dead_letter_update_subscription] + return subscription_after_update + + +def remove_dead_letter_policy(project_id, topic_name, subscription_name): + """Remove dead letter policy from a subscription.""" + # [START pubsub_dead_letter_remove] + from google.cloud import pubsub_v1 + from google.cloud.pubsub_v1.types import DeadLetterPolicy + + # TODO project_id = "Your Google Cloud Project ID" + # This is an existing topic that the subscription with dead letter policy + # is attached to. + # TODO topic_name = "Your Pub/Sub topic name" + # This is an existing subscription with a dead letter policy. + # TODO subscription_name = "Your Pub/Sub subscription name" + + subscriber = pubsub_v1.SubscriberClient() + topic_path = subscriber.topic_path(project_id, topic_name) + subscription_path = subscriber.subscription_path( + project_id, subscription_name + ) + + subscription_before_update = subscriber.get_subscription(subscription_path) + print("Before removing the policy: {}".format(subscription_before_update)) + + update_mask = { + "paths": {"dead_letter_policy": {"dead_letter_topic"}}, + "paths": {"dead_letter_policy": {"max_delivery_attempts"}}, + } + + # Construct the subscription with the dead letter policy you expect to have + # after the update. Here, values in the required fields (name, topic) help + # identify the subscription. + subscription = pubsub_v1.types.Subscription( + name=subscription_path, topic=topic_path + ) + + subscription_after_update = subscriber.update_subscription( + subscription, update_mask + ) + + print("After removing the policy: {}".format(subscription_after_update)) + + import inspect + + subscriber.close() + # [END pubsub_dead_letter_remove] + return subscription_after_update + + def receive_messages(project_id, subscription_name, timeout=None): """Receives messages from a pull subscription.""" # [START pubsub_subscriber_async_pull] @@ -461,6 +618,46 @@ def callback(message): # [END pubsub_subscriber_error_listener] +def receive_messages_with_delivery_attempts( + project_id, subscription_name, timeout=None +): + # [START pubsub_dead_letter_delivery_attempt] + from google.cloud import pubsub_v1 + + # TODO project_id = "Your Google Cloud Project ID" + # TODO subscription_name = "Your Pubsub subscription name" + + subscriber = pubsub_v1.SubscriberClient() + subscription_path = subscriber.subscription_path( + project_id, subscription_name + ) + + def callback(message): + print("Received message: {}".format(message)) + print("With delivery attempts: {}".format(message.delivery_attempt)) + message.ack() + + streaming_pull_future = subscriber.subscribe( + subscription_path, callback=callback + ) + print("Listening for messages on {}..\n".format(subscription_path)) + + # Wrap subscriber in a 'with' block to automatically call close() when done. + with subscriber: + # When `timeout` is not set, result() will block indefinitely, + # unless an exception is encountered first. + try: + streaming_pull_future.result(timeout=timeout) + except Exception as e: + streaming_pull_future.cancel() + print( + "Listening for messages on {} threw an exception: {}.".format( + subscription_name, e + ) + ) + # [END pubsub_dead_letter_delivery_attempt] + + if __name__ == "__main__": parser = argparse.ArgumentParser( description=__doc__, @@ -470,12 +667,12 @@ def callback(message): subparsers = parser.add_subparsers(dest="command") list_in_topic_parser = subparsers.add_parser( - "list_in_topic", help=list_subscriptions_in_topic.__doc__ + "list-in-topic", help=list_subscriptions_in_topic.__doc__ ) list_in_topic_parser.add_argument("topic_name") list_in_project_parser = subparsers.add_parser( - "list_in_project", help=list_subscriptions_in_project.__doc__ + "list-in-project", help=list_subscriptions_in_project.__doc__ ) create_parser = subparsers.add_parser( @@ -484,6 +681,14 @@ def callback(message): create_parser.add_argument("topic_name") create_parser.add_argument("subscription_name") + create_with_dead_letter_policy_parser = subparsers.add_parser( + "create-with-dead-letter-policy", + help=create_subscription_with_dead_letter_topic.__doc__, + ) + create_with_dead_letter_policy_parser.add_argument("topic_name") + create_with_dead_letter_policy_parser.add_argument("subscription_name") + create_with_dead_letter_policy_parser.add_argument("dead_letter_topic_name") + create_push_parser = subparsers.add_parser( "create-push", help=create_push_subscription.__doc__ ) @@ -496,11 +701,26 @@ def callback(message): ) delete_parser.add_argument("subscription_name") - update_parser = subparsers.add_parser( - "update", help=update_subscription.__doc__ + update_push_parser = subparsers.add_parser( + "update-push", help=update_push_subscription.__doc__ + ) + update_push_parser.add_argument("topic_name") + update_push_parser.add_argument("subscription_name") + update_push_parser.add_argument("endpoint") + + update_dead_letter_policy_parser = subparsers.add_parser( + "update-dead-letter-policy", + help=update_subscription_with_dead_letter_policy.__doc__, + ) + update_dead_letter_policy_parser.add_argument("topic_name") + update_dead_letter_policy_parser.add_argument("subscription_name") + update_dead_letter_policy_parser.add_argument("dead_letter_topic_name") + + remove_dead_letter_policy_parser = subparsers.add_parser( + "remove-dead-letter-policy", help=remove_dead_letter_policy.__doc__ ) - update_parser.add_argument("subscription_name") - update_parser.add_argument("endpoint") + remove_dead_letter_policy_parser.add_argument("topic_name") + remove_dead_letter_policy_parser.add_argument("subscription_name") receive_parser = subparsers.add_parser( "receive", help=receive_messages.__doc__ @@ -539,23 +759,39 @@ def callback(message): ) listen_for_errors_parser = subparsers.add_parser( - "listen_for_errors", help=listen_for_errors.__doc__ + "listen-for-errors", help=listen_for_errors.__doc__ ) listen_for_errors_parser.add_argument("subscription_name") - listen_for_errors_parser.add_argument( + listen_for_errors_parser.add_argument("--timeout", default=None, type=float) + + receive_messages_with_delivery_attempts_parser = subparsers.add_parser( + "receive-messages-with-delivery-attempts", + help=receive_messages_with_delivery_attempts.__doc__, + ) + receive_messages_with_delivery_attempts_parser.add_argument( + "subscription_name" + ) + receive_messages_with_delivery_attempts_parser.add_argument( "--timeout", default=None, type=float ) args = parser.parse_args() - if args.command == "list_in_topic": + if args.command == "list-in-topic": list_subscriptions_in_topic(args.project_id, args.topic_name) - elif args.command == "list_in_project": + elif args.command == "list-in-project": list_subscriptions_in_project(args.project_id) elif args.command == "create": create_subscription( args.project_id, args.topic_name, args.subscription_name ) + elif args.command == "create-with-dead-letter-policy": + create_subscription_with_dead_letter_topic( + args.project_id, + args.topic_name, + args.subscription_name, + args.dead_letter_topic_name, + ) elif args.command == "create-push": create_push_subscription( args.project_id, @@ -565,9 +801,23 @@ def callback(message): ) elif args.command == "delete": delete_subscription(args.project_id, args.subscription_name) - elif args.command == "update": - update_subscription( - args.project_id, args.subscription_name, args.endpoint + elif args.command == "update-push": + update_push_subscription( + args.project_id, + args.topic_name, + args.subscription_name, + args.endpoint, + ) + elif args.command == "update-dead-letter-policy": + update_subscription_with_dead_letter_policy( + args.project_id, + args.topic_name, + args.subscription_name, + args.dead_letter_topic_name, + ) + elif args.command == "remove-dead-letter-policy": + remove_dead_letter_policy( + args.project_id, args.topic_name, args.subscription_name ) elif args.command == "receive": receive_messages(args.project_id, args.subscription_name, args.timeout) @@ -585,7 +835,9 @@ def callback(message): synchronous_pull_with_lease_management( args.project_id, args.subscription_name ) - elif args.command == "listen_for_errors": - listen_for_errors( + elif args.command == "listen-for-errors": + listen_for_errors(args.project_id, args.subscription_name, args.timeout) + elif args.command == "receive-messages-with-delivery-attempts": + receive_messages_with_delivery_attempts( args.project_id, args.subscription_name, args.timeout ) diff --git a/pubsub/cloud-client/subscriber_test.py b/pubsub/cloud-client/subscriber_test.py index 94905d63525d..9b4c948d5446 100644 --- a/pubsub/cloud-client/subscriber_test.py +++ b/pubsub/cloud-client/subscriber_test.py @@ -24,9 +24,11 @@ UUID = uuid.uuid4().hex PROJECT = os.environ["GCLOUD_PROJECT"] TOPIC = "subscription-test-topic-" + UUID +DEAD_LETTER_TOPIC = "subscription-test-dead-letter-topic-" + UUID SUBSCRIPTION_ADMIN = "subscription-test-subscription-admin-" + UUID SUBSCRIPTION_ASYNC = "subscription-test-subscription-async-" + UUID SUBSCRIPTION_SYNC = "subscription-test-subscription-sync-" + UUID +SUBSCRIPTION_DLQ = "subscription-test-subscription-dlq-" + UUID ENDPOINT = "https://{}.appspot.com/push".format(PROJECT) NEW_ENDPOINT = "https://{}.appspot.com/push2".format(PROJECT) @@ -41,13 +43,27 @@ def topic(publisher_client): topic_path = publisher_client.topic_path(PROJECT, TOPIC) try: - subscription = publisher_client.get_topic(topic_path) + topic = publisher_client.get_topic(topic_path) except: # noqa - subscription = publisher_client.create_topic(topic_path) + topic = publisher_client.create_topic(topic_path) - yield subscription.name + yield topic.name + + publisher_client.delete_topic(topic.name) + + +@pytest.fixture(scope="module") +def dead_letter_topic(publisher_client): + topic_path = publisher_client.topic_path(PROJECT, DEAD_LETTER_TOPIC) + + try: + dead_letter_topic = publisher_client.get_topic(topic_path) + except: # noqa + dead_letter_topic = publisher_client.create_topic(topic_path) + + yield dead_letter_topic.name - publisher_client.delete_topic(subscription.name) + publisher_client.delete_topic(dead_letter_topic.name) @pytest.fixture(scope="module") @@ -109,6 +125,24 @@ def subscription_async(subscriber_client, topic): subscriber_client.delete_subscription(subscription.name) +@pytest.fixture(scope="module") +def subscription_dlq(subscriber_client, topic): + subscription_path = subscriber_client.subscription_path( + PROJECT, SUBSCRIPTION_DLQ + ) + + try: + subscription = subscriber_client.get_subscription(subscription_path) + except: # noqa + subscription = subscriber_client.create_subscription( + subscription_path, topic=topic + ) + + yield subscription.name + + subscriber_client.delete_subscription(subscription.name) + + def test_list_in_topic(subscription_admin, capsys): @eventually_consistent.call def _(): @@ -142,6 +176,37 @@ def _(): assert subscriber_client.get_subscription(subscription_path) +def test_create_subscription_with_dead_letter_policy( + subscriber_client, publisher_client, topic, dead_letter_topic, capsys +): + subscription_path = subscriber_client.subscription_path( + PROJECT, SUBSCRIPTION_DLQ + ) + dead_letter_topic_path = publisher_client.topic_path( + PROJECT, DEAD_LETTER_TOPIC + ) + + try: + subscriber_client.delete_subscription(subscription_path) + except Exception: + pass + + subscriber.create_subscription_with_dead_letter_topic( + PROJECT, TOPIC, SUBSCRIPTION_DLQ, DEAD_LETTER_TOPIC + ) + + @eventually_consistent.call + def _(): + out, _ = capsys.readouterr() + assert "Subscription created: " + subscription_path in out + assert ( + "It will forward dead letter messages to: " + dead_letter_topic_path + in out + ) + assert "After 10 delivery attempts." in out + assert subscriber_client.get_subscription(subscription_path) + + def test_create_push(subscriber_client): subscription_path = subscriber_client.subscription_path( PROJECT, SUBSCRIPTION_ADMIN @@ -161,12 +226,25 @@ def _(): def test_update(subscriber_client, subscription_admin, capsys): - subscriber.update_subscription(PROJECT, SUBSCRIPTION_ADMIN, NEW_ENDPOINT) + subscriber.update_push_subscription( + PROJECT, TOPIC, SUBSCRIPTION_ADMIN, NEW_ENDPOINT + ) out, _ = capsys.readouterr() assert "Subscription updated" in out +def test_update_dead_letter_policy( + subscriber_client, topic, subscription_dlq, dead_letter_topic, capsys +): + subscription_after_update = subscriber.update_subscription_with_dead_letter_policy( + PROJECT, TOPIC, SUBSCRIPTION_DLQ, DEAD_LETTER_TOPIC + ) + + out, _ = capsys.readouterr() + assert "max_delivery_attempts: 20" in out + + def test_delete(subscriber_client, subscription_admin): subscriber.delete_subscription(PROJECT, SUBSCRIPTION_ADMIN) @@ -252,9 +330,7 @@ def test_receive_synchronously_with_lease( assert "Done." in out -def test_listen_for_errors( - publisher_client, topic, subscription_async, capsys -): +def test_listen_for_errors(publisher_client, topic, subscription_async, capsys): _publish_messages(publisher_client, topic) @@ -264,3 +340,28 @@ def test_listen_for_errors( assert "Listening" in out assert subscription_async in out assert "threw an exception" in out + + +def test_receive_with_delivery_attempts( + publisher_client, topic, subscription_dlq, dead_letter_topic, capsys +): + _publish_messages(publisher_client, topic) + + subscriber.receive_messages_with_delivery_attempts( + PROJECT, SUBSCRIPTION_DLQ, 10 + ) + + out, _ = capsys.readouterr() + assert "Listening" in out + assert subscription_dlq in out + assert "Received message: " in out + assert "message 4" in out + assert "With delivery attempts: " in out + + +def test_remove_dead_letter_policy(subscriber_client, subscription_dlq): + subscription_after_update = subscriber.remove_dead_letter_policy( + PROJECT, TOPIC, SUBSCRIPTION_DLQ + ) + + assert not subscription_after_update.HasField("dead_letter_policy") From f954653258e79e1d2389037df53dc7a97e7a39de Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Thu, 28 May 2020 15:43:26 -0700 Subject: [PATCH 2/5] lint & cleanup --- pubsub/cloud-client/subscriber.py | 159 +++++++------------------ pubsub/cloud-client/subscriber_test.py | 76 ++++-------- 2 files changed, 61 insertions(+), 174 deletions(-) diff --git a/pubsub/cloud-client/subscriber.py b/pubsub/cloud-client/subscriber.py index d7f3f2b9da32..dadbc71e2f18 100644 --- a/pubsub/cloud-client/subscriber.py +++ b/pubsub/cloud-client/subscriber.py @@ -68,9 +68,7 @@ def create_subscription(project_id, topic_name, subscription_name): subscriber = pubsub_v1.SubscriberClient() topic_path = subscriber.topic_path(project_id, topic_name) - subscription_path = subscriber.subscription_path( - project_id, subscription_name - ) + subscription_path = subscriber.subscription_path(project_id, subscription_name) subscription = subscriber.create_subscription(subscription_path, topic_path) @@ -100,12 +98,8 @@ def create_subscription_with_dead_letter_topic( subscriber = pubsub_v1.SubscriberClient() topic_path = subscriber.topic_path(project_id, topic_name) - subscription_path = subscriber.subscription_path( - project_id, subscription_name - ) - dead_letter_topic_path = subscriber.topic_path( - project_id, dead_letter_topic_name - ) + subscription_path = subscriber.subscription_path(project_id, subscription_name) + dead_letter_topic_path = subscriber.topic_path(project_id, dead_letter_topic_name) dead_letter_policy = DeadLetterPolicy( dead_letter_topic=dead_letter_topic_path, max_delivery_attempts=10 @@ -131,9 +125,7 @@ def create_subscription_with_dead_letter_topic( # [END pubsub_dead_letter_create_subscription] -def create_push_subscription( - project_id, topic_name, subscription_name, endpoint -): +def create_push_subscription(project_id, topic_name, subscription_name, endpoint): """Create a new push subscription on the given topic.""" # [START pubsub_create_push_subscription] from google.cloud import pubsub_v1 @@ -145,9 +137,7 @@ def create_push_subscription( subscriber = pubsub_v1.SubscriberClient() topic_path = subscriber.topic_path(project_id, topic_name) - subscription_path = subscriber.subscription_path( - project_id, subscription_name - ) + subscription_path = subscriber.subscription_path(project_id, subscription_name) push_config = pubsub_v1.types.PushConfig(push_endpoint=endpoint) @@ -171,9 +161,7 @@ def delete_subscription(project_id, subscription_name): # TODO subscription_name = "Your Pub/Sub subscription name" subscriber = pubsub_v1.SubscriberClient() - subscription_path = subscriber.subscription_path( - project_id, subscription_name - ) + subscription_path = subscriber.subscription_path(project_id, subscription_name) subscriber.delete_subscription(subscription_path) @@ -183,9 +171,7 @@ def delete_subscription(project_id, subscription_name): # [END pubsub_delete_subscription] -def update_push_subscription( - project_id, topic_name, subscription_name, endpoint -): +def update_push_subscription(project_id, topic_name, subscription_name, endpoint): """ Updates an existing Pub/Sub subscription's push endpoint URL. Note that certain properties of a subscription, such as @@ -200,9 +186,7 @@ def update_push_subscription( # TODO endpoint = "https://my-test-project.appspot.com/push" subscriber = pubsub_v1.SubscriberClient() - subscription_path = subscriber.subscription_path( - project_id, subscription_name - ) + subscription_path = subscriber.subscription_path(project_id, subscription_name) push_config = pubsub_v1.types.PushConfig(push_endpoint=endpoint) @@ -241,19 +225,14 @@ def update_subscription_with_dead_letter_policy( subscriber = pubsub_v1.SubscriberClient() topic_path = subscriber.topic_path(project_id, topic_name) - subscription_path = subscriber.subscription_path( - project_id, subscription_name - ) - dead_letter_topic_path = subscriber.topic_path( - project_id, dead_letter_topic_name - ) + subscription_path = subscriber.subscription_path(project_id, subscription_name) + dead_letter_topic_path = subscriber.topic_path(project_id, dead_letter_topic_name) subscription_before_update = subscriber.get_subscription(subscription_path) print("Before the update: {}".format(subscription_before_update)) subscription_before_update.HasField("dead_letter_policy") - assert 'dead_letter_policy' in properties update_mask = {"paths": {"dead_letter_policy": {"max_delivery_attempts"}}} # Construct a dead letter policy you expect to have after the update. @@ -265,9 +244,7 @@ def update_subscription_with_dead_letter_policy( # after the update. Here, values in the required fields (name, topic) help # identify the subscription. subscription = pubsub_v1.types.Subscription( - name=subscription_path, - topic=topic_path, - dead_letter_policy=dead_letter_policy, + name=subscription_path, topic=topic_path, dead_letter_policy=dead_letter_policy, ) subscription_after_update = subscriber.update_subscription( @@ -285,7 +262,6 @@ def remove_dead_letter_policy(project_id, topic_name, subscription_name): """Remove dead letter policy from a subscription.""" # [START pubsub_dead_letter_remove] from google.cloud import pubsub_v1 - from google.cloud.pubsub_v1.types import DeadLetterPolicy # TODO project_id = "Your Google Cloud Project ID" # This is an existing topic that the subscription with dead letter policy @@ -296,16 +272,16 @@ def remove_dead_letter_policy(project_id, topic_name, subscription_name): subscriber = pubsub_v1.SubscriberClient() topic_path = subscriber.topic_path(project_id, topic_name) - subscription_path = subscriber.subscription_path( - project_id, subscription_name - ) + subscription_path = subscriber.subscription_path(project_id, subscription_name) subscription_before_update = subscriber.get_subscription(subscription_path) print("Before removing the policy: {}".format(subscription_before_update)) update_mask = { - "paths": {"dead_letter_policy": {"dead_letter_topic"}}, - "paths": {"dead_letter_policy": {"max_delivery_attempts"}}, + "paths": { + {"dead_letter_policy": {"dead_letter_topic"}}, + {"dead_letter_policy": {"max_delivery_attempts"}}, + } } # Construct the subscription with the dead letter policy you expect to have @@ -321,8 +297,6 @@ def remove_dead_letter_policy(project_id, topic_name, subscription_name): print("After removing the policy: {}".format(subscription_after_update)) - import inspect - subscriber.close() # [END pubsub_dead_letter_remove] return subscription_after_update @@ -342,17 +316,13 @@ def receive_messages(project_id, subscription_name, timeout=None): subscriber = pubsub_v1.SubscriberClient() # The `subscription_path` method creates a fully qualified identifier # in the form `projects/{project_id}/subscriptions/{subscription_name}` - subscription_path = subscriber.subscription_path( - project_id, subscription_name - ) + subscription_path = subscriber.subscription_path(project_id, subscription_name) def callback(message): print("Received message: {}".format(message)) message.ack() - streaming_pull_future = subscriber.subscribe( - subscription_path, callback=callback - ) + streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) print("Listening for messages on {}..\n".format(subscription_path)) # Wrap subscriber in a 'with' block to automatically call close() when done. @@ -381,9 +351,7 @@ def receive_messages_with_custom_attributes( # messages in seconds" subscriber = pubsub_v1.SubscriberClient() - subscription_path = subscriber.subscription_path( - project_id, subscription_name - ) + subscription_path = subscriber.subscription_path(project_id, subscription_name) def callback(message): print("Received message: {}".format(message.data)) @@ -394,9 +362,7 @@ def callback(message): print("{}: {}".format(key, value)) message.ack() - streaming_pull_future = subscriber.subscribe( - subscription_path, callback=callback - ) + streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) print("Listening for messages on {}..\n".format(subscription_path)) # Wrap subscriber in a 'with' block to automatically call close() when done. @@ -411,9 +377,7 @@ def callback(message): # [END pubsub_subscriber_sync_pull_custom_attributes] -def receive_messages_with_flow_control( - project_id, subscription_name, timeout=None -): +def receive_messages_with_flow_control(project_id, subscription_name, timeout=None): """Receives messages from a pull subscription with flow control.""" # [START pubsub_subscriber_flow_settings] from google.cloud import pubsub_v1 @@ -424,9 +388,7 @@ def receive_messages_with_flow_control( # messages in seconds" subscriber = pubsub_v1.SubscriberClient() - subscription_path = subscriber.subscription_path( - project_id, subscription_name - ) + subscription_path = subscriber.subscription_path(project_id, subscription_name) def callback(message): print("Received message: {}".format(message.data)) @@ -460,9 +422,7 @@ def synchronous_pull(project_id, subscription_name): # TODO subscription_name = "Your Pub/Sub subscription name" subscriber = pubsub_v1.SubscriberClient() - subscription_path = subscriber.subscription_path( - project_id, subscription_name - ) + subscription_path = subscriber.subscription_path(project_id, subscription_name) NUM_MESSAGES = 3 @@ -501,9 +461,7 @@ def synchronous_pull_with_lease_management(project_id, subscription_name): # TODO subscription_name = "Your Pub/Sub subscription name" subscriber = pubsub_v1.SubscriberClient() - subscription_path = subscriber.subscription_path( - project_id, subscription_name - ) + subscription_path = subscriber.subscription_path(project_id, subscription_name) NUM_MESSAGES = 2 ACK_DEADLINE = 30 @@ -542,15 +500,11 @@ def worker(msg): if process.is_alive(): # `ack_deadline_seconds` must be between 10 to 600. subscriber.modify_ack_deadline( - subscription_path, - [ack_id], - ack_deadline_seconds=ACK_DEADLINE, + subscription_path, [ack_id], ack_deadline_seconds=ACK_DEADLINE, ) logger.info( "{}: Reset ack deadline for {} for {}s".format( - time.strftime("%X", time.gmtime()), - msg_data, - ACK_DEADLINE, + time.strftime("%X", time.gmtime()), msg_data, ACK_DEADLINE, ) ) @@ -589,17 +543,13 @@ def listen_for_errors(project_id, subscription_name, timeout=None): # messages in seconds" subscriber = pubsub_v1.SubscriberClient() - subscription_path = subscriber.subscription_path( - project_id, subscription_name - ) + subscription_path = subscriber.subscription_path(project_id, subscription_name) def callback(message): print("Received message: {}".format(message)) message.ack() - streaming_pull_future = subscriber.subscribe( - subscription_path, callback=callback - ) + streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) print("Listening for messages on {}..\n".format(subscription_path)) # Wrap subscriber in a 'with' block to automatically call close() when done. @@ -628,18 +578,14 @@ def receive_messages_with_delivery_attempts( # TODO subscription_name = "Your Pubsub subscription name" subscriber = pubsub_v1.SubscriberClient() - subscription_path = subscriber.subscription_path( - project_id, subscription_name - ) + subscription_path = subscriber.subscription_path(project_id, subscription_name) def callback(message): print("Received message: {}".format(message)) print("With delivery attempts: {}".format(message.delivery_attempt)) message.ack() - streaming_pull_future = subscriber.subscribe( - subscription_path, callback=callback - ) + streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) print("Listening for messages on {}..\n".format(subscription_path)) # Wrap subscriber in a 'with' block to automatically call close() when done. @@ -660,8 +606,7 @@ def callback(message): if __name__ == "__main__": parser = argparse.ArgumentParser( - description=__doc__, - formatter_class=argparse.RawDescriptionHelpFormatter, + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("project_id", help="Your Google Cloud project ID") @@ -675,9 +620,7 @@ def callback(message): "list-in-project", help=list_subscriptions_in_project.__doc__ ) - create_parser = subparsers.add_parser( - "create", help=create_subscription.__doc__ - ) + create_parser = subparsers.add_parser("create", help=create_subscription.__doc__) create_parser.add_argument("topic_name") create_parser.add_argument("subscription_name") @@ -696,9 +639,7 @@ def callback(message): create_push_parser.add_argument("subscription_name") create_push_parser.add_argument("endpoint") - delete_parser = subparsers.add_parser( - "delete", help=delete_subscription.__doc__ - ) + delete_parser = subparsers.add_parser("delete", help=delete_subscription.__doc__) delete_parser.add_argument("subscription_name") update_push_parser = subparsers.add_parser( @@ -722,9 +663,7 @@ def callback(message): remove_dead_letter_policy_parser.add_argument("topic_name") remove_dead_letter_policy_parser.add_argument("subscription_name") - receive_parser = subparsers.add_parser( - "receive", help=receive_messages.__doc__ - ) + receive_parser = subparsers.add_parser("receive", help=receive_messages.__doc__) receive_parser.add_argument("subscription_name") receive_parser.add_argument("--timeout", default=None, type=float) @@ -741,9 +680,7 @@ def callback(message): "receive-flow-control", help=receive_messages_with_flow_control.__doc__ ) receive_with_flow_control_parser.add_argument("subscription_name") - receive_with_flow_control_parser.add_argument( - "--timeout", default=None, type=float - ) + receive_with_flow_control_parser.add_argument("--timeout", default=None, type=float) synchronous_pull_parser = subparsers.add_parser( "receive-synchronously", help=synchronous_pull.__doc__ @@ -754,9 +691,7 @@ def callback(message): "receive-synchronously-with-lease", help=synchronous_pull_with_lease_management.__doc__, ) - synchronous_pull_with_lease_management_parser.add_argument( - "subscription_name" - ) + synchronous_pull_with_lease_management_parser.add_argument("subscription_name") listen_for_errors_parser = subparsers.add_parser( "listen-for-errors", help=listen_for_errors.__doc__ @@ -768,9 +703,7 @@ def callback(message): "receive-messages-with-delivery-attempts", help=receive_messages_with_delivery_attempts.__doc__, ) - receive_messages_with_delivery_attempts_parser.add_argument( - "subscription_name" - ) + receive_messages_with_delivery_attempts_parser.add_argument("subscription_name") receive_messages_with_delivery_attempts_parser.add_argument( "--timeout", default=None, type=float ) @@ -782,9 +715,7 @@ def callback(message): elif args.command == "list-in-project": list_subscriptions_in_project(args.project_id) elif args.command == "create": - create_subscription( - args.project_id, args.topic_name, args.subscription_name - ) + create_subscription(args.project_id, args.topic_name, args.subscription_name) elif args.command == "create-with-dead-letter-policy": create_subscription_with_dead_letter_topic( args.project_id, @@ -794,19 +725,13 @@ def callback(message): ) elif args.command == "create-push": create_push_subscription( - args.project_id, - args.topic_name, - args.subscription_name, - args.endpoint, + args.project_id, args.topic_name, args.subscription_name, args.endpoint, ) elif args.command == "delete": delete_subscription(args.project_id, args.subscription_name) elif args.command == "update-push": update_push_subscription( - args.project_id, - args.topic_name, - args.subscription_name, - args.endpoint, + args.project_id, args.topic_name, args.subscription_name, args.endpoint, ) elif args.command == "update-dead-letter-policy": update_subscription_with_dead_letter_policy( @@ -832,9 +757,7 @@ def callback(message): elif args.command == "receive-synchronously": synchronous_pull(args.project_id, args.subscription_name) elif args.command == "receive-synchronously-with-lease": - synchronous_pull_with_lease_management( - args.project_id, args.subscription_name - ) + synchronous_pull_with_lease_management(args.project_id, args.subscription_name) elif args.command == "listen-for-errors": listen_for_errors(args.project_id, args.subscription_name, args.timeout) elif args.command == "receive-messages-with-delivery-attempts": diff --git a/pubsub/cloud-client/subscriber_test.py b/pubsub/cloud-client/subscriber_test.py index 8e988cf3f66c..5a79f4e16d09 100644 --- a/pubsub/cloud-client/subscriber_test.py +++ b/pubsub/cloud-client/subscriber_test.py @@ -75,9 +75,7 @@ def subscriber_client(): @pytest.fixture(scope="module") def subscription_admin(subscriber_client, topic): - subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_ADMIN - ) + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_ADMIN) try: subscription = subscriber_client.get_subscription(subscription_path) @@ -91,9 +89,7 @@ def subscription_admin(subscriber_client, topic): @pytest.fixture(scope="module") def subscription_sync(subscriber_client, topic): - subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_SYNC - ) + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_SYNC) try: subscription = subscriber_client.get_subscription(subscription_path) @@ -109,9 +105,7 @@ def subscription_sync(subscriber_client, topic): @pytest.fixture(scope="module") def subscription_async(subscriber_client, topic): - subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_ASYNC - ) + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_ASYNC) try: subscription = subscriber_client.get_subscription(subscription_path) @@ -127,9 +121,7 @@ def subscription_async(subscriber_client, topic): @pytest.fixture(scope="module") def subscription_dlq(subscriber_client, topic): - subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_DLQ - ) + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_DLQ) try: subscription = subscriber_client.get_subscription(subscription_path) @@ -164,9 +156,7 @@ def eventually_consistent_test(): def test_create(subscriber_client): - subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_ADMIN - ) + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_ADMIN) try: subscriber_client.delete_subscription(subscription_path) @@ -185,12 +175,8 @@ def eventually_consistent_test(): def test_create_subscription_with_dead_letter_policy( subscriber_client, publisher_client, topic, dead_letter_topic, capsys ): - subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_DLQ - ) - dead_letter_topic_path = publisher_client.topic_path( - PROJECT, DEAD_LETTER_TOPIC - ) + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_DLQ) + dead_letter_topic_path = publisher_client.topic_path(PROJECT, DEAD_LETTER_TOPIC) try: subscriber_client.delete_subscription(subscription_path) @@ -201,30 +187,20 @@ def test_create_subscription_with_dead_letter_policy( PROJECT, TOPIC, SUBSCRIPTION_DLQ, DEAD_LETTER_TOPIC ) - @eventually_consistent.call - def _(): - out, _ = capsys.readouterr() - assert "Subscription created: " + subscription_path in out - assert ( - "It will forward dead letter messages to: " + dead_letter_topic_path - in out - ) - assert "After 10 delivery attempts." in out - assert subscriber_client.get_subscription(subscription_path) + out, _ = capsys.readouterr() + assert "Subscription created: " + subscription_path in out + assert "It will forward dead letter messages to: " + dead_letter_topic_path in out + assert "After 10 delivery attempts." in out def test_create_push(subscriber_client): - subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_ADMIN - ) + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_ADMIN) try: subscriber_client.delete_subscription(subscription_path) except Exception: pass - subscriber.create_push_subscription( - PROJECT, TOPIC, SUBSCRIPTION_ADMIN, ENDPOINT - ) + subscriber.create_push_subscription(PROJECT, TOPIC, SUBSCRIPTION_ADMIN, ENDPOINT) @backoff.on_exception(backoff.expo, AssertionError, max_time=60) def eventually_consistent_test(): @@ -245,7 +221,7 @@ def test_update(subscriber_client, subscription_admin, capsys): def test_update_dead_letter_policy( subscriber_client, topic, subscription_dlq, dead_letter_topic, capsys ): - subscription_after_update = subscriber.update_subscription_with_dead_letter_policy( + _ = subscriber.update_subscription_with_dead_letter_policy( PROJECT, TOPIC, SUBSCRIPTION_DLQ, DEAD_LETTER_TOPIC ) @@ -290,9 +266,7 @@ def test_receive_with_custom_attributes( _publish_messages(publisher_client, topic) - subscriber.receive_messages_with_custom_attributes( - PROJECT, SUBSCRIPTION_ASYNC, 5 - ) + subscriber.receive_messages_with_custom_attributes(PROJECT, SUBSCRIPTION_ASYNC, 5) out, _ = capsys.readouterr() assert "message" in out @@ -300,15 +274,11 @@ def test_receive_with_custom_attributes( assert "python-sample" in out -def test_receive_with_flow_control( - publisher_client, topic, subscription_async, capsys -): +def test_receive_with_flow_control(publisher_client, topic, subscription_async, capsys): _publish_messages(publisher_client, topic) - subscriber.receive_messages_with_flow_control( - PROJECT, SUBSCRIPTION_ASYNC, 5 - ) + subscriber.receive_messages_with_flow_control(PROJECT, SUBSCRIPTION_ASYNC, 5) out, _ = capsys.readouterr() assert "Listening" in out @@ -316,9 +286,7 @@ def test_receive_with_flow_control( assert "message" in out -def test_receive_synchronously( - publisher_client, topic, subscription_sync, capsys -): +def test_receive_synchronously(publisher_client, topic, subscription_sync, capsys): _publish_messages(publisher_client, topic) subscriber.synchronous_pull(PROJECT, SUBSCRIPTION_SYNC) @@ -332,9 +300,7 @@ def test_receive_synchronously_with_lease( ): _publish_messages(publisher_client, topic) - subscriber.synchronous_pull_with_lease_management( - PROJECT, SUBSCRIPTION_SYNC - ) + subscriber.synchronous_pull_with_lease_management(PROJECT, SUBSCRIPTION_SYNC) out, _ = capsys.readouterr() assert "Done." in out @@ -357,9 +323,7 @@ def test_receive_with_delivery_attempts( ): _publish_messages(publisher_client, topic) - subscriber.receive_messages_with_delivery_attempts( - PROJECT, SUBSCRIPTION_DLQ, 10 - ) + subscriber.receive_messages_with_delivery_attempts(PROJECT, SUBSCRIPTION_DLQ, 10) out, _ = capsys.readouterr() assert "Listening" in out From 4e866210c959bdb654709e2ce943ef524d9ad856 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Thu, 28 May 2020 15:57:49 -0700 Subject: [PATCH 3/5] add noqa to use multiple paths --- pubsub/cloud-client/subscriber.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pubsub/cloud-client/subscriber.py b/pubsub/cloud-client/subscriber.py index dadbc71e2f18..c1a1d759233b 100644 --- a/pubsub/cloud-client/subscriber.py +++ b/pubsub/cloud-client/subscriber.py @@ -278,10 +278,8 @@ def remove_dead_letter_policy(project_id, topic_name, subscription_name): print("Before removing the policy: {}".format(subscription_before_update)) update_mask = { - "paths": { - {"dead_letter_policy": {"dead_letter_topic"}}, - {"dead_letter_policy": {"max_delivery_attempts"}}, - } + "paths": {{"dead_letter_policy": {"dead_letter_topic"}}}, # noqa + "paths": {{"dead_letter_policy": {"max_delivery_attempts"}}} # noqa } # Construct the subscription with the dead letter policy you expect to have From 830e4719591c309d50372bfc3b9a838a95cd6846 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Thu, 28 May 2020 16:56:49 -0700 Subject: [PATCH 4/5] kvg's suggestions --- pubsub/cloud-client/publisher.py | 52 ++++--- pubsub/cloud-client/subscriber.py | 208 +++++++++++++------------ pubsub/cloud-client/subscriber_test.py | 2 +- 3 files changed, 140 insertions(+), 122 deletions(-) diff --git a/pubsub/cloud-client/publisher.py b/pubsub/cloud-client/publisher.py index 6802ec85fc13..9e7820fbf305 100644 --- a/pubsub/cloud-client/publisher.py +++ b/pubsub/cloud-client/publisher.py @@ -29,7 +29,8 @@ def list_topics(project_id): # [START pubsub_list_topics] from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" + # TODO(developer) + # project_id = "your-project-id" publisher = pubsub_v1.PublisherClient() project_path = publisher.project_path(project_id) @@ -45,8 +46,9 @@ def create_topic(project_id, topic_name): # [START pubsub_create_topic] from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO topic_name = "Your Pub/Sub topic name" + # TODO(developer) + # project_id = "your-project-id" + # topic_name = "your-topic-id" publisher = pubsub_v1.PublisherClient() topic_path = publisher.topic_path(project_id, topic_name) @@ -63,8 +65,9 @@ def delete_topic(project_id, topic_name): # [START pubsub_delete_topic] from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO topic_name = "Your Pub/Sub topic name" + # TODO(developer) + # project_id = "your-project-id" + # topic_name = "your-topic-id" publisher = pubsub_v1.PublisherClient() topic_path = publisher.topic_path(project_id, topic_name) @@ -81,8 +84,9 @@ def publish_messages(project_id, topic_name): # [START pubsub_publish] from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO topic_name = "Your Pub/Sub topic name" + # TODO(developer) + # project_id = "your-project-id" + # topic_name = "your-topic-id" publisher = pubsub_v1.PublisherClient() # The `topic_path` method creates a fully qualified identifier @@ -108,8 +112,9 @@ def publish_messages_with_custom_attributes(project_id, topic_name): # [START pubsub_publish_custom_attributes] from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO topic_name = "Your Pub/Sub topic name" + # TODO(developer) + # project_id = "your-project-id" + # topic_name = "your-topic-id" publisher = pubsub_v1.PublisherClient() topic_path = publisher.topic_path(project_id, topic_name) @@ -135,8 +140,9 @@ def publish_messages_with_error_handler(project_id, topic_name): from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO topic_name = "Your Pub/Sub topic name" + # TODO(developer) + # project_id = "your-project-id" + # topic_name = "your-topic-id" publisher = pubsub_v1.PublisherClient() topic_path = publisher.topic_path(project_id, topic_name) @@ -177,8 +183,9 @@ def publish_messages_with_batch_settings(project_id, topic_name): # [START pubsub_publisher_batch_settings] from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO topic_name = "Your Pub/Sub topic name" + # TODO(developer) + # project_id = "your-project-id" + # topic_name = "your-topic-id" # Configure the batch to publish as soon as there is ten messages, # one kilobyte of data, or one second has passed. @@ -212,8 +219,9 @@ def publish_messages_with_retry_settings(project_id, topic_name): # [START pubsub_publisher_retry_settings] from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO topic_name = "Your Pub/Sub topic name" + # TODO(developer) + # project_id = "your-project-id" + # topic_name = "your-topic-id" # Configure the retry settings. Defaults will be overwritten. retry_settings = { @@ -267,8 +275,7 @@ def publish_messages_with_retry_settings(project_id, topic_name): if __name__ == "__main__": parser = argparse.ArgumentParser( - description=__doc__, - formatter_class=argparse.RawDescriptionHelpFormatter, + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("project_id", help="Your Google Cloud project ID") @@ -281,9 +288,7 @@ def publish_messages_with_retry_settings(project_id, topic_name): delete_parser = subparsers.add_parser("delete", help=delete_topic.__doc__) delete_parser.add_argument("topic_name") - publish_parser = subparsers.add_parser( - "publish", help=publish_messages.__doc__ - ) + publish_parser = subparsers.add_parser("publish", help=publish_messages.__doc__) publish_parser.add_argument("topic_name") publish_with_custom_attributes_parser = subparsers.add_parser( @@ -293,8 +298,7 @@ def publish_messages_with_retry_settings(project_id, topic_name): publish_with_custom_attributes_parser.add_argument("topic_name") publish_with_error_handler_parser = subparsers.add_parser( - "publish-with-error-handler", - help=publish_messages_with_error_handler.__doc__, + "publish-with-error-handler", help=publish_messages_with_error_handler.__doc__, ) publish_with_error_handler_parser.add_argument("topic_name") @@ -321,9 +325,7 @@ def publish_messages_with_retry_settings(project_id, topic_name): elif args.command == "publish": publish_messages(args.project_id, args.topic_name) elif args.command == "publish-with-custom-attributes": - publish_messages_with_custom_attributes( - args.project_id, args.topic_name - ) + publish_messages_with_custom_attributes(args.project_id, args.topic_name) elif args.command == "publish-with-error-handler": publish_messages_with_error_handler(args.project_id, args.topic_name) elif args.command == "publish-with-batch-settings": diff --git a/pubsub/cloud-client/subscriber.py b/pubsub/cloud-client/subscriber.py index c1a1d759233b..e4e4250721aa 100644 --- a/pubsub/cloud-client/subscriber.py +++ b/pubsub/cloud-client/subscriber.py @@ -29,8 +29,9 @@ def list_subscriptions_in_topic(project_id, topic_name): # [START pubsub_list_topic_subscriptions] from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO topic_name = "Your Pub/Sub topic name" + # TODO(developer) + # project_id = "your-project-id" + # topic_name = "your-topic-id" publisher = pubsub_v1.PublisherClient() topic_path = publisher.topic_path(project_id, topic_name) @@ -45,7 +46,8 @@ def list_subscriptions_in_project(project_id): # [START pubsub_list_subscriptions] from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" + # TODO(developer) + # project_id = "your-project-id" subscriber = pubsub_v1.SubscriberClient() project_path = subscriber.project_path(project_id) @@ -62,9 +64,10 @@ def create_subscription(project_id, topic_name, subscription_name): # [START pubsub_create_pull_subscription] from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO topic_name = "Your Pub/Sub topic name" - # TODO subscription_name = "Your Pub/Sub subscription name" + # TODO(developer) + # project_id = "your-project-id" + # topic_name = "your-topic-id" + # subscription_name = "your-subscription-id" subscriber = pubsub_v1.SubscriberClient() topic_path = subscriber.topic_path(project_id, topic_name) @@ -86,15 +89,17 @@ def create_subscription_with_dead_letter_topic( from google.cloud import pubsub_v1 from google.cloud.pubsub_v1.types import DeadLetterPolicy - # TODO project_id = "Your Google Cloud Project ID" - # This is an existing topic that the subscription with dead letter policy - # is attached to. - # TODO topic_name = "Your Pub/Sub topic name" - # This is an existing subscription with a dead letter policy. - # TODO subscription_name = "Your Pub/Sub subscription name" - # This is an existing dead letter topic that the subscription with dead - # letter policy will forward dead letter messages to. - # TODO dead_letter_topic_name = "Your Pub/Sub dead letter topic name" + # TODO(developer) + # project_id = "your-project-id" + # endpoint = "https://my-test-project.appspot.com/push" + # TODO(developer): This is an existing topic that the subscription + # with dead letter policy is attached to. + # topic_name = "your-topic-id" + # TODO(developer): This is an existing subscription with a dead letter policy. + # subscription_name = "your-subscription-id" + # TODO(developer): This is an existing dead letter topic that the subscription + # with dead letter policy will forward dead letter messages to. + # dead_letter_topic_name = "your-dead-letter-topic-id" subscriber = pubsub_v1.SubscriberClient() topic_path = subscriber.topic_path(project_id, topic_name) @@ -105,9 +110,10 @@ def create_subscription_with_dead_letter_topic( dead_letter_topic=dead_letter_topic_path, max_delivery_attempts=10 ) - subscription = subscriber.create_subscription( - subscription_path, topic_path, dead_letter_policy=dead_letter_policy - ) + with subscriber: + subscription = subscriber.create_subscription( + subscription_path, topic_path, dead_letter_policy=dead_letter_policy + ) print("Subscription created: {}".format(subscription.name)) print( @@ -120,8 +126,6 @@ def create_subscription_with_dead_letter_topic( subscription.dead_letter_policy.max_delivery_attempts ) ) - - subscriber.close() # [END pubsub_dead_letter_create_subscription] @@ -130,10 +134,11 @@ def create_push_subscription(project_id, topic_name, subscription_name, endpoint # [START pubsub_create_push_subscription] from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO topic_name = "Your Pub/Sub topic name" - # TODO subscription_name = "Your Pub/Sub subscription name" - # TODO endpoint = "https://my-test-project.appspot.com/push" + # TODO(developer) + # project_id = "your-project-id" + # topic_name = "your-topic-id" + # subscription_name = "your-subscription-id" + # endpoint = "https://my-test-project.appspot.com/push" subscriber = pubsub_v1.SubscriberClient() topic_path = subscriber.topic_path(project_id, topic_name) @@ -157,8 +162,9 @@ def delete_subscription(project_id, subscription_name): # [START pubsub_delete_subscription] from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO subscription_name = "Your Pub/Sub subscription name" + # TODO(developer) + # project_id = "your-project-id" + # subscription_name = "your-subscription-id" subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path(project_id, subscription_name) @@ -180,10 +186,11 @@ def update_push_subscription(project_id, topic_name, subscription_name, endpoint # [START pubsub_update_push_configuration] from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO topic_name = "Your Pub/Sub topic name" - # TODO subscription_name = "Your Pub/Sub subscription name" - # TODO endpoint = "https://my-test-project.appspot.com/push" + # TODO(developer) + # project_id = "your-project-id" + # topic_name = "your-topic-id" + # subscription_name = "your-subscription-id" + # endpoint = "https://my-test-project.appspot.com/push" subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path(project_id, subscription_name) @@ -211,17 +218,18 @@ def update_subscription_with_dead_letter_policy( """Update a subscription's dead letter policy.""" # [START pubsub_dead_letter_update_subscription] from google.cloud import pubsub_v1 - from google.cloud.pubsub_v1.types import DeadLetterPolicy - - # TODO project_id = "Your Google Cloud Project ID" - # This is an existing topic that the subscription with dead letter policy - # is attached to. - # TODO topic_name = "Your Pub/Sub topic name" - # This is an existing subscription with a dead letter policy. - # TODO subscription_name = "Your Pub/Sub subscription name" - # This is an existing dead letter topic that the subscription with dead - # letter policy will forward dead letter messages to. - # TODO dead_letter_topic_name = "Your Pub/Sub dead letter topic name" + from google.cloud.pubsub_v1.types import DeadLetterPolicy, FieldMask + + # TODO(developer) + # project_id = "your-project-id" + # TODO(developer): This is an existing topic that the subscription + # with dead letter policy is attached to. + # topic_name = "your-topic-name" + # TODO(developer): This is an existing subscription with a dead letter policy. + # subscription_name = "your-subscription-id" + # TODO(developer): This is an existing dead letter topic that the subscription + # with dead letter policy will forward dead letter messages to. + # dead_letter_topic_name = "your-dead-letter-topic-id" subscriber = pubsub_v1.SubscriberClient() topic_path = subscriber.topic_path(project_id, topic_name) @@ -231,9 +239,8 @@ def update_subscription_with_dead_letter_policy( subscription_before_update = subscriber.get_subscription(subscription_path) print("Before the update: {}".format(subscription_before_update)) - subscription_before_update.HasField("dead_letter_policy") - - update_mask = {"paths": {"dead_letter_policy": {"max_delivery_attempts"}}} + # Indicates which fields in the provided subscription to update. + update_mask = FieldMask(paths=["dead_letter_policy.max_delivery_attempts"]) # Construct a dead letter policy you expect to have after the update. dead_letter_policy = DeadLetterPolicy( @@ -247,13 +254,12 @@ def update_subscription_with_dead_letter_policy( name=subscription_path, topic=topic_path, dead_letter_policy=dead_letter_policy, ) - subscription_after_update = subscriber.update_subscription( - subscription, update_mask - ) + with subscriber: + subscription_after_update = subscriber.update_subscription( + subscription, update_mask + ) print("After the update: {}".format(subscription_after_update)) - - subscriber.close() # [END pubsub_dead_letter_update_subscription] return subscription_after_update @@ -262,13 +268,15 @@ def remove_dead_letter_policy(project_id, topic_name, subscription_name): """Remove dead letter policy from a subscription.""" # [START pubsub_dead_letter_remove] from google.cloud import pubsub_v1 + from google.cloud.pubsub_v1.types import FieldMask - # TODO project_id = "Your Google Cloud Project ID" - # This is an existing topic that the subscription with dead letter policy - # is attached to. - # TODO topic_name = "Your Pub/Sub topic name" - # This is an existing subscription with a dead letter policy. - # TODO subscription_name = "Your Pub/Sub subscription name" + # TODO(developer) + # project_id = "your-project-id" + # TODO(developer): This is an existing topic that the subscription + # with dead letter policy is attached to. + # topic_name = "your-topic-name" + # TODO(developer): This is an existing subscription with a dead letter policy. + # subscription_name = "your-subscription-id" subscriber = pubsub_v1.SubscriberClient() topic_path = subscriber.topic_path(project_id, topic_name) @@ -277,25 +285,27 @@ def remove_dead_letter_policy(project_id, topic_name, subscription_name): subscription_before_update = subscriber.get_subscription(subscription_path) print("Before removing the policy: {}".format(subscription_before_update)) - update_mask = { - "paths": {{"dead_letter_policy": {"dead_letter_topic"}}}, # noqa - "paths": {{"dead_letter_policy": {"max_delivery_attempts"}}} # noqa - } + # Indicates which fields in the provided subscription to update. + update_mask = FieldMask( + paths=[ + "dead_letter_policy.dead_letter_topic", + "dead_letter_policy.max_delivery_attempts", + ] + ) - # Construct the subscription with the dead letter policy you expect to have + # Construct the subscription with a dead letter policy you expect to have # after the update. Here, values in the required fields (name, topic) help # identify the subscription. subscription = pubsub_v1.types.Subscription( name=subscription_path, topic=topic_path ) - subscription_after_update = subscriber.update_subscription( - subscription, update_mask - ) + with subscriber: + subscription_after_update = subscriber.update_subscription( + subscription, update_mask + ) print("After removing the policy: {}".format(subscription_after_update)) - - subscriber.close() # [END pubsub_dead_letter_remove] return subscription_after_update @@ -304,12 +314,14 @@ def receive_messages(project_id, subscription_name, timeout=None): """Receives messages from a pull subscription.""" # [START pubsub_subscriber_async_pull] # [START pubsub_quickstart_subscriber] + from concurrent.futures import TimeoutError from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO subscription_name = "Your Pub/Sub subscription name" - # TODO timeout = 5.0 # "How long the subscriber should listen for - # messages in seconds" + # TODO(developer) + # project_id = "your-project-id" + # subscription_name = "your-subscription-id" + # Number of seconds the subscriber should listen for messages + # timeout = 5.0 subscriber = pubsub_v1.SubscriberClient() # The `subscription_path` method creates a fully qualified identifier @@ -329,7 +341,7 @@ def callback(message): # When `timeout` is not set, result() will block indefinitely, # unless an exception is encountered first. streaming_pull_future.result(timeout=timeout) - except: # noqa + except TimeoutError: streaming_pull_future.cancel() # [END pubsub_subscriber_async_pull] # [END pubsub_quickstart_subscriber] @@ -341,12 +353,14 @@ def receive_messages_with_custom_attributes( """Receives messages from a pull subscription.""" # [START pubsub_subscriber_sync_pull_custom_attributes] # [START pubsub_subscriber_async_pull_custom_attributes] + from concurrent.futures import TimeoutError from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO subscription_name = "Your Pub/Sub subscription name" - # TODO timeout = 5.0 # "How long the subscriber should listen for - # messages in seconds" + # TODO(developer) + # project_id = "your-project-id" + # subscription_name = "your-subscription-id" + # Number of seconds the subscriber should listen for messages + # timeout = 5.0 subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path(project_id, subscription_name) @@ -369,7 +383,7 @@ def callback(message): # When `timeout` is not set, result() will block indefinitely, # unless an exception is encountered first. streaming_pull_future.result(timeout=timeout) - except: # noqa + except TimeoutError: streaming_pull_future.cancel() # [END pubsub_subscriber_async_pull_custom_attributes] # [END pubsub_subscriber_sync_pull_custom_attributes] @@ -378,12 +392,14 @@ def callback(message): def receive_messages_with_flow_control(project_id, subscription_name, timeout=None): """Receives messages from a pull subscription with flow control.""" # [START pubsub_subscriber_flow_settings] + from concurrent.futures import TimeoutError from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO subscription_name = "Your Pub/Sub subscription name" - # TODO timeout = 5.0 # "How long the subscriber should listen for - # messages in seconds" + # TODO(developer) + # project_id = "your-project-id" + # subscription_name = "your-subscription-id" + # Number of seconds the subscriber should listen for messages + # timeout = 5.0 subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path(project_id, subscription_name) @@ -406,7 +422,7 @@ def callback(message): # When `timeout` is not set, result() will block indefinitely, # unless an exception is encountered first. streaming_pull_future.result(timeout=timeout) - except: # noqa + except TimeoutError: streaming_pull_future.cancel() # [END pubsub_subscriber_flow_settings] @@ -416,8 +432,9 @@ def synchronous_pull(project_id, subscription_name): # [START pubsub_subscriber_sync_pull] from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO subscription_name = "Your Pub/Sub subscription name" + # TODO(developer) + # project_id = "your-project-id" + # subscription_name = "your-subscription-id" subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path(project_id, subscription_name) @@ -455,8 +472,9 @@ def synchronous_pull_with_lease_management(project_id, subscription_name): from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO subscription_name = "Your Pub/Sub subscription name" + # TODO(developer) + # project_id = "your-project-id" + # subscription_name = "your-subscription-id" subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path(project_id, subscription_name) @@ -535,10 +553,11 @@ def listen_for_errors(project_id, subscription_name, timeout=None): # [START pubsub_subscriber_error_listener] from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO subscription_name = "Your Pubsub subscription name" - # TODO timeout = 5.0 # "How long the subscriber should listen for - # messages in seconds" + # TODO(developer) + # project_id = "Your Google Cloud Project ID" + # subscription_name = "Your Pubsub subscription name" + # Number of seconds the subscriber should listen for messages + # timeout = 5.0 subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path(project_id, subscription_name) @@ -570,10 +589,12 @@ def receive_messages_with_delivery_attempts( project_id, subscription_name, timeout=None ): # [START pubsub_dead_letter_delivery_attempt] + from concurrent.futures import TimeoutError from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO subscription_name = "Your Pubsub subscription name" + # TODO(developer) + # project_id = "your-project-id" + # subscription_name = "your-subscription-id" subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path(project_id, subscription_name) @@ -592,13 +613,8 @@ def callback(message): # unless an exception is encountered first. try: streaming_pull_future.result(timeout=timeout) - except Exception as e: + except TimeoutError: streaming_pull_future.cancel() - print( - "Listening for messages on {} threw an exception: {}.".format( - subscription_name, e - ) - ) # [END pubsub_dead_letter_delivery_attempt] diff --git a/pubsub/cloud-client/subscriber_test.py b/pubsub/cloud-client/subscriber_test.py index 5a79f4e16d09..6b90396f942e 100644 --- a/pubsub/cloud-client/subscriber_test.py +++ b/pubsub/cloud-client/subscriber_test.py @@ -338,4 +338,4 @@ def test_remove_dead_letter_policy(subscriber_client, subscription_dlq): PROJECT, TOPIC, SUBSCRIPTION_DLQ ) - assert not subscription_after_update.HasField("dead_letter_policy") + assert subscription_after_update.dead_letter_policy.dead_letter_topic == "" From cb3fe4d37630595a34928d19cf98381bb7259a7a Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Thu, 28 May 2020 17:12:15 -0700 Subject: [PATCH 5/5] modify comment --- pubsub/cloud-client/subscriber.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pubsub/cloud-client/subscriber.py b/pubsub/cloud-client/subscriber.py index e4e4250721aa..b5af760aed49 100644 --- a/pubsub/cloud-client/subscriber.py +++ b/pubsub/cloud-client/subscriber.py @@ -293,9 +293,8 @@ def remove_dead_letter_policy(project_id, topic_name, subscription_name): ] ) - # Construct the subscription with a dead letter policy you expect to have - # after the update. Here, values in the required fields (name, topic) help - # identify the subscription. + # Construct the subscription (without any dead letter policy) that you + # expect to have after the update. subscription = pubsub_v1.types.Subscription( name=subscription_path, topic=topic_path )