diff --git a/scripts/devops_tasks/test_run_samples.py b/scripts/devops_tasks/test_run_samples.py index 79bc4552dd58..21cb87e43827 100644 --- a/scripts/devops_tasks/test_run_samples.py +++ b/scripts/devops_tasks/test_run_samples.py @@ -42,6 +42,7 @@ "recv_with_checkpoint_store.py": (10), "recv_with_custom_starting_position.py": (10), "sample_code_eventhub.py": (10), + "iot_hub_connection_string_receive_async.py": (10), "receive_batch_with_checkpoint_async.py": (10), "recv_async.py": (10), "recv_track_last_enqueued_event_prop_async.py": (10), @@ -84,7 +85,6 @@ "connection_to_custom_endpoint_address.py", "proxy.py", "connection_to_custom_endpoint_address_async.py", - "iot_hub_connection_string_receive_async.py", "proxy_async.py" ], "azure-servicebus": [ diff --git a/sdk/eventhub/azure-eventhub/samples/async_samples/iot_hub_connection_string_receive_async.py b/sdk/eventhub/azure-eventhub/samples/async_samples/iot_hub_connection_string_receive_async.py index 9a42e6cb7f07..c4c7616044ff 100644 --- a/sdk/eventhub/azure-eventhub/samples/async_samples/iot_hub_connection_string_receive_async.py +++ b/sdk/eventhub/azure-eventhub/samples/async_samples/iot_hub_connection_string_receive_async.py @@ -15,6 +15,7 @@ """ import os +import re import time from base64 import b64encode, b64decode from hashlib import sha256 @@ -24,7 +25,10 @@ from uamqp import ReceiveClient, Source from uamqp.errors import LinkRedirect -from azure.eventhub.aio import EventHubConsumerClient +from azure.iot.device.aio import IoTHubDeviceClient +from azure.iot.device import Message +from azure.eventhub import EventData +from azure.eventhub.aio import EventHubProducerClient, EventHubConsumerClient def generate_sas_token(uri, policy, key, expiry=None): @@ -46,6 +50,20 @@ def generate_sas_token(uri, policy, key, expiry=None): result['skn'] = policy return 'SharedAccessSignature ' + urlencode(result) +async def send_messages_with_provisioned_iot_device(device_conn_str): + device_client = IoTHubDeviceClient.create_from_connection_string(device_conn_str) + # Connect the client. + await device_client.connect() + + for i in range(3): + message = Message("Sending to IotHub, message {}".format(i)) + print("Sending message: {}".format(message)) + await device_client.send_message(message) + print("Message sent succesfully.") + time.sleep(1) + + # finally, disconnect + await device_client.disconnect() def parse_iot_conn_str(iothub_conn_str): hostname = None @@ -63,7 +81,6 @@ def parse_iot_conn_str(iothub_conn_str): raise ValueError("Invalid connection string") return hostname, shared_access_key_name, shared_access_key - def convert_iothub_to_eventhub_conn_str(iothub_conn_str): hostname, shared_access_key_name, shared_access_key = parse_iot_conn_str(iothub_conn_str) iot_hub_name = hostname.split(".")[0] @@ -80,12 +97,15 @@ def convert_iothub_to_eventhub_conn_str(iothub_conn_str): # Once a redirect error is received, close the original client and recreate a new one to the re-directed address receive_client.close() fully_qualified_name = redirect.hostname.decode("utf-8") - return "Endpoint=sb://{}/;SharedAccessKeyName={};SharedAccessKey={};EntityPath={}".format( + # Use regular expression to parse the Event Hub name from the IoT Hub redirection address + iot_hub_name = re.search(":\d+\/.*/ConsumerGroups", str(redirect.address)).group(0).split("/")[1] + conn_str = "Endpoint=sb://{}/;SharedAccessKeyName={};SharedAccessKey={};EntityPath={}".format( fully_qualified_name, shared_access_key_name, shared_access_key, iot_hub_name ) + return conn_str except Exception as exp: raise ValueError( "{} is not an invalid IoT Hub connection string. The underlying exception is {}".format( @@ -94,7 +114,6 @@ def convert_iothub_to_eventhub_conn_str(iothub_conn_str): ) ) - async def receive_events_from_iothub(iothub_conn_str): """Convert the iot hub connection string to the built-in eventhub connection string and receive events from the eventhub @@ -112,8 +131,11 @@ async def on_event_batch(partition_context, events): starting_position=-1 # "-1" is from the beginning of the partition. ) +async def main(): + iothub_conn_str = os.environ["IOTHUB_CONNECTION_STR"] + device_conn_str = os.environ["IOT_DEVICE_CONNECTION_STR"] + await send_messages_with_provisioned_iot_device(device_conn_str) + await receive_events_from_iothub(iothub_conn_str) -if __name__ == '__main__': - iothub_conn_str = os.environ["IOTHUB_CONN_STR"] - loop = asyncio.get_event_loop() - loop.run_until_complete(receive_events_from_iothub(iothub_conn_str)) +loop = asyncio.get_event_loop() +loop.run_until_complete(main()) diff --git a/sdk/eventhub/azure-eventhub/samples/sample_dev_requirements.txt b/sdk/eventhub/azure-eventhub/samples/sample_dev_requirements.txt index 1f546dd19488..4b05ace892dd 100644 --- a/sdk/eventhub/azure-eventhub/samples/sample_dev_requirements.txt +++ b/sdk/eventhub/azure-eventhub/samples/sample_dev_requirements.txt @@ -1,2 +1,4 @@ azure-eventhub-checkpointstoreblob -azure-eventhub-checkpointstoreblob-aio; python_version >= '3.6' \ No newline at end of file +azure-eventhub-checkpointstoreblob-aio; python_version >= '3.6' +azure-iot-device +azure-iothub-provisioningserviceclient \ No newline at end of file diff --git a/sdk/eventhub/test-resources.json b/sdk/eventhub/test-resources.json index 3566eb9745f9..03df342c55c2 100644 --- a/sdk/eventhub/test-resources.json +++ b/sdk/eventhub/test-resources.json @@ -61,6 +61,27 @@ "metadata": { "description": "The maximum duration, in minutes, that a single test is permitted to run before it is considered at-risk for being hung." } + }, + "iotSkuName": { + "type": "string", + "defaultValue": "S1", + "metadata": { + "description": "The SKU to use for the IoT Hub." + } + }, + "iotSkuUnits": { + "type": "string", + "defaultValue": "1", + "metadata": { + "description": "The number of IoT Hub units." + } + }, + "iotEHd2cPartitions": { + "type": "string", + "defaultValue": "4", + "metadata": { + "description": "Partitions used for the event stream." + } } }, "variables": { @@ -75,6 +96,8 @@ "defaultSASKeyName": "RootManageSharedAccessKey", "eventHubsAuthRuleResourceId": "[resourceId('Microsoft.EventHub/namespaces/authorizationRules', variables('eventHubsNamespace'), variables('defaultSASKeyName'))]", "storageAccountId": "[resourceId('Microsoft.Storage/storageAccounts', variables('storageAccount'))]", + "iotHubName": "[concat('eh-', parameters('baseName'), '-iot-hub')]", + "iotHubDeviceName": "[concat('eh-', parameters('baseName'), '-device-iot-hub')]" }, "resources": [ { @@ -159,6 +182,149 @@ "principalId": "[parameters('testApplicationOid')]", "scope": "[resourceGroup().id]" } + }, + { + "name": "[variables('iotHubName')]", + "type": "Microsoft.Devices/IotHubs", + "apiVersion": "2020-08-01", + "location": "[parameters('location')]", + "tags": {}, + "properties": { + "authorizationPolicies": [ + { + "keyName": "string", + "primaryKey": "string", + "secondaryKey": "string", + "rights": "string" + } + ], + "publicNetworkAccess": "string", + "ipFilterRules": [ + { + "filterName": "string", + "action": "string", + "ipMask": "string" + } + ], + "minTlsVersion": "string", + "privateEndpointConnections": [ + { + "properties": { + "privateEndpoint": {}, + "privateLinkServiceConnectionState": { + "status": "string", + "description": "string", + "actionsRequired": "string" + } + } + } + ], + "eventHubEndpoints": {}, + "routing": { + "endpoints": { + "serviceBusQueues": [ + { + "id": "string", + "connectionString": "string", + "endpointUri": "string", + "entityPath": "string", + "authenticationType": "string", + "name": "string", + "subscriptionId": "string", + "resourceGroup": "string" + } + ], + "serviceBusTopics": [ + { + "id": "string", + "connectionString": "string", + "endpointUri": "string", + "entityPath": "string", + "authenticationType": "string", + "name": "string", + "subscriptionId": "string", + "resourceGroup": "string" + } + ], + "eventHubs": [ + { + "id": "string", + "connectionString": "string", + "endpointUri": "string", + "entityPath": "string", + "authenticationType": "string", + "name": "string", + "subscriptionId": "string", + "resourceGroup": "string" + } + ], + "storageContainers": [ + { + "id": "string", + "connectionString": "string", + "endpointUri": "string", + "authenticationType": "string", + "name": "string", + "subscriptionId": "string", + "resourceGroup": "string", + "containerName": "string", + "fileNameFormat": "string", + "batchFrequencyInSeconds": "integer", + "maxChunkSizeInBytes": "integer", + "encoding": "string" + } + ] + }, + "routes": [ + { + "name": "string", + "source": "string", + "condition": "string", + "endpointNames": [ + "string" + ], + "isEnabled": "boolean" + } + ], + "fallbackRoute": { + "name": "string", + "source": "DeviceMessages", + "condition": "string", + "endpointNames": [ + "string" + ], + "isEnabled": "boolean" + }, + "enrichments": [ + { + "key": "string", + "value": "string", + "endpointNames": [ + "string" + ] + } + ] + }, + "storageEndpoints": {}, + "messagingEndpoints": {}, + "enableFileUploadNotifications": "boolean", + "cloudToDevice": { + "maxDeliveryCount": "integer", + "defaultTtlAsIso8601": "string", + "feedback": { + "lockDurationAsIso8601": "string", + "ttlAsIso8601": "string", + "maxDeliveryCount": "integer" + } + }, + "comments": "string", + "features": "string" + }, + "sku": { + "name": "S1", + "capacity": 1 + }, + "resources": [] } ], "outputs": {