Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EventHubs] Get Iot Hub Name from Redirect Address #19313

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion scripts/devops_tasks/test_run_samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"recv_with_checkpoint_store.py": (10),
"recv_with_custom_starting_position.py": (10),
"sample_code_eventhub.py": (10),
"iot_hub_connection_string_receive_async.py": (10),
"receive_batch_with_checkpoint_async.py": (10),
"recv_async.py": (10),
"recv_track_last_enqueued_event_prop_async.py": (10),
Expand Down Expand Up @@ -84,7 +85,6 @@
"connection_to_custom_endpoint_address.py",
"proxy.py",
"connection_to_custom_endpoint_address_async.py",
"iot_hub_connection_string_receive_async.py",
"proxy_async.py"
],
"azure-servicebus": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""

import os
import re
import time
from base64 import b64encode, b64decode
from hashlib import sha256
Expand All @@ -24,7 +25,10 @@
from uamqp import ReceiveClient, Source
from uamqp.errors import LinkRedirect

from azure.eventhub.aio import EventHubConsumerClient
from azure.iot.device.aio import IoTHubDeviceClient
from azure.iot.device import Message
from azure.eventhub import EventData
from azure.eventhub.aio import EventHubProducerClient, EventHubConsumerClient


def generate_sas_token(uri, policy, key, expiry=None):
Expand All @@ -46,6 +50,20 @@ def generate_sas_token(uri, policy, key, expiry=None):
result['skn'] = policy
return 'SharedAccessSignature ' + urlencode(result)

async def send_messages_with_provisioned_iot_device(device_conn_str):
device_client = IoTHubDeviceClient.create_from_connection_string(device_conn_str)
# Connect the client.
await device_client.connect()

for i in range(3):
message = Message("Sending to IotHub, message {}".format(i))
print("Sending message: {}".format(message))
await device_client.send_message(message)
print("Message sent succesfully.")
time.sleep(1)

# finally, disconnect
await device_client.disconnect()

def parse_iot_conn_str(iothub_conn_str):
hostname = None
Expand All @@ -63,7 +81,6 @@ def parse_iot_conn_str(iothub_conn_str):
raise ValueError("Invalid connection string")
return hostname, shared_access_key_name, shared_access_key


def convert_iothub_to_eventhub_conn_str(iothub_conn_str):
hostname, shared_access_key_name, shared_access_key = parse_iot_conn_str(iothub_conn_str)
iot_hub_name = hostname.split(".")[0]
Expand All @@ -80,12 +97,15 @@ def convert_iothub_to_eventhub_conn_str(iothub_conn_str):
# Once a redirect error is received, close the original client and recreate a new one to the re-directed address
receive_client.close()
fully_qualified_name = redirect.hostname.decode("utf-8")
return "Endpoint=sb://{}/;SharedAccessKeyName={};SharedAccessKey={};EntityPath={}".format(
# Use regular expression to parse the Event Hub name from the IoT Hub redirection address
iot_hub_name = re.search(":\d+\/.*/ConsumerGroups", str(redirect.address)).group(0).split("/")[1]
conn_str = "Endpoint=sb://{}/;SharedAccessKeyName={};SharedAccessKey={};EntityPath={}".format(
fully_qualified_name,
shared_access_key_name,
shared_access_key,
iot_hub_name
)
return conn_str
except Exception as exp:
raise ValueError(
"{} is not an invalid IoT Hub connection string. The underlying exception is {}".format(
Expand All @@ -94,7 +114,6 @@ def convert_iothub_to_eventhub_conn_str(iothub_conn_str):
)
)


async def receive_events_from_iothub(iothub_conn_str):
"""Convert the iot hub connection string to the built-in eventhub connection string
and receive events from the eventhub
Expand All @@ -112,8 +131,11 @@ async def on_event_batch(partition_context, events):
starting_position=-1 # "-1" is from the beginning of the partition.
)

async def main():
iothub_conn_str = os.environ["IOTHUB_CONNECTION_STR"]
device_conn_str = os.environ["IOT_DEVICE_CONNECTION_STR"]
await send_messages_with_provisioned_iot_device(device_conn_str)
await receive_events_from_iothub(iothub_conn_str)

if __name__ == '__main__':
iothub_conn_str = os.environ["IOTHUB_CONN_STR"]
loop = asyncio.get_event_loop()
loop.run_until_complete(receive_events_from_iothub(iothub_conn_str))
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
azure-eventhub-checkpointstoreblob
azure-eventhub-checkpointstoreblob-aio; python_version >= '3.6'
azure-eventhub-checkpointstoreblob-aio; python_version >= '3.6'
azure-iot-device
azure-iothub-provisioningserviceclient
166 changes: 166 additions & 0 deletions sdk/eventhub/test-resources.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,27 @@
"metadata": {
"description": "The maximum duration, in minutes, that a single test is permitted to run before it is considered at-risk for being hung."
}
},
"iotSkuName": {
"type": "string",
"defaultValue": "S1",
"metadata": {
"description": "The SKU to use for the IoT Hub."
}
},
"iotSkuUnits": {
"type": "string",
"defaultValue": "1",
"metadata": {
"description": "The number of IoT Hub units."
}
},
"iotEHd2cPartitions": {
"type": "string",
"defaultValue": "4",
"metadata": {
"description": "Partitions used for the event stream."
}
}
},
"variables": {
Expand All @@ -75,6 +96,8 @@
"defaultSASKeyName": "RootManageSharedAccessKey",
"eventHubsAuthRuleResourceId": "[resourceId('Microsoft.EventHub/namespaces/authorizationRules', variables('eventHubsNamespace'), variables('defaultSASKeyName'))]",
"storageAccountId": "[resourceId('Microsoft.Storage/storageAccounts', variables('storageAccount'))]",
"iotHubName": "[concat('eh-', parameters('baseName'), '-iot-hub')]",
"iotHubDeviceName": "[concat('eh-', parameters('baseName'), '-device-iot-hub')]"
},
"resources": [
{
Expand Down Expand Up @@ -159,6 +182,149 @@
"principalId": "[parameters('testApplicationOid')]",
"scope": "[resourceGroup().id]"
}
},
{
"name": "[variables('iotHubName')]",
"type": "Microsoft.Devices/IotHubs",
"apiVersion": "2020-08-01",
"location": "[parameters('location')]",
"tags": {},
"properties": {
"authorizationPolicies": [
{
"keyName": "string",
"primaryKey": "string",
"secondaryKey": "string",
"rights": "string"
}
],
"publicNetworkAccess": "string",
"ipFilterRules": [
{
"filterName": "string",
"action": "string",
"ipMask": "string"
}
],
"minTlsVersion": "string",
"privateEndpointConnections": [
{
"properties": {
"privateEndpoint": {},
"privateLinkServiceConnectionState": {
"status": "string",
"description": "string",
"actionsRequired": "string"
}
}
}
],
"eventHubEndpoints": {},
"routing": {
"endpoints": {
"serviceBusQueues": [
{
"id": "string",
"connectionString": "string",
"endpointUri": "string",
"entityPath": "string",
"authenticationType": "string",
"name": "string",
"subscriptionId": "string",
"resourceGroup": "string"
}
],
"serviceBusTopics": [
{
"id": "string",
"connectionString": "string",
"endpointUri": "string",
"entityPath": "string",
"authenticationType": "string",
"name": "string",
"subscriptionId": "string",
"resourceGroup": "string"
}
],
"eventHubs": [
{
"id": "string",
"connectionString": "string",
"endpointUri": "string",
"entityPath": "string",
"authenticationType": "string",
"name": "string",
"subscriptionId": "string",
"resourceGroup": "string"
}
],
"storageContainers": [
{
"id": "string",
"connectionString": "string",
"endpointUri": "string",
"authenticationType": "string",
"name": "string",
"subscriptionId": "string",
"resourceGroup": "string",
"containerName": "string",
"fileNameFormat": "string",
"batchFrequencyInSeconds": "integer",
"maxChunkSizeInBytes": "integer",
"encoding": "string"
}
]
},
"routes": [
{
"name": "string",
"source": "string",
"condition": "string",
"endpointNames": [
"string"
],
"isEnabled": "boolean"
}
],
"fallbackRoute": {
"name": "string",
"source": "DeviceMessages",
"condition": "string",
"endpointNames": [
"string"
],
"isEnabled": "boolean"
},
"enrichments": [
{
"key": "string",
"value": "string",
"endpointNames": [
"string"
]
}
]
},
"storageEndpoints": {},
"messagingEndpoints": {},
"enableFileUploadNotifications": "boolean",
"cloudToDevice": {
"maxDeliveryCount": "integer",
"defaultTtlAsIso8601": "string",
"feedback": {
"lockDurationAsIso8601": "string",
"ttlAsIso8601": "string",
"maxDeliveryCount": "integer"
}
},
"comments": "string",
"features": "string"
},
"sku": {
"name": "S1",
"capacity": 1
},
"resources": []
}
],
"outputs": {
Expand Down