Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat(client): Add Telemetry APIs #15047

Merged
merged 11 commits into from
Sep 11, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,7 @@ PagedFlux<String> listRelationships(String digitalTwinId, String relationshipNam
* @param modelId The Id of the model to decommission.
* @return an empty Mono
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Void> decommissionModel(String modelId) {
return decommissionModelWithResponse(modelId)
.flatMap(voidResponse -> Mono.empty());
Expand All @@ -947,6 +948,7 @@ public Mono<Void> decommissionModel(String modelId) {
* @param modelId The Id of the model to decommission.
* @return The http response.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<Void>> decommissionModelWithResponse(String modelId) {
return withContext(context -> decommissionModelWithResponse(modelId, context));
}
Expand Down Expand Up @@ -1338,4 +1340,106 @@ Mono<PagedResponse<EventRoute>> listEventRoutesNextPage(String nextLink, Context
}

//endregion Event Route APIs

//region Telemetry APIs

/**
* Publishes telemetry from a digital twin
* The result is then consumed by one or many destination endpoints (subscribers) defined under {@link EventRoute}
* These event routes need to be set before publishing a telemetry message, in order for the telemetry message to be consumed.
* @param digitalTwinId The Id of the digital twin.
* @param payload The application/json telemetry payload to be sent.
* @return An empty mono.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Void> publishTelemetry(String digitalTwinId, String payload) {
PublishTelemetryRequestOptions publishTelemetryRequestOptions = new PublishTelemetryRequestOptions();
return withContext(context -> publishTelemetryWithResponse(digitalTwinId, payload, publishTelemetryRequestOptions, context))
.flatMap(voidResponse -> Mono.empty());
}

/**
* Publishes telemetry from a digital twin
* The result is then consumed by one or many destination endpoints (subscribers) defined under {@link EventRoute}
* These event routes need to be set before publishing a telemetry message, in order for the telemetry message to be consumed.
* @param digitalTwinId The Id of the digital twin.
* @param payload The application/json telemetry payload to be sent.
* @param publishTelemetryRequestOptions The additional information to be used when processing a telemetry request.
* @return A {@link Response} containing an empty mono.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<Void>> publishTelemetryWithResponse(String digitalTwinId, String payload, PublishTelemetryRequestOptions publishTelemetryRequestOptions) {
return withContext(context -> publishTelemetryWithResponse(digitalTwinId, payload, publishTelemetryRequestOptions, context));
}

Mono<Response<Void>> publishTelemetryWithResponse(String digitalTwinId, String payload, PublishTelemetryRequestOptions publishTelemetryRequestOptions, Context context) {
Object payloadObject = null;
try {
payloadObject = mapper.readValue(payload, Object.class);
}
catch (JsonProcessingException e) {
logger.error("Could not parse the payload [%s]: %s", payload, e);
return Mono.error(e);
}

return protocolLayer.getDigitalTwins().sendTelemetryWithResponseAsync(
digitalTwinId,
publishTelemetryRequestOptions.getMessageId(),
payloadObject,
publishTelemetryRequestOptions.getTimestamp().toString(),
context);
}

/**
* Publishes telemetry from a digital twin's component
* The result is then consumed by one or many destination endpoints (subscribers) defined under {@link EventRoute}
* These event routes need to be set before publishing a telemetry message, in order for the telemetry message to be consumed.
* @param digitalTwinId The Id of the digital twin.
* @param componentName The name of the DTDL component.
* @param payload The application/json telemetry payload to be sent.
* @return An empty mono.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Void> publishComponentTelemetry(String digitalTwinId, String componentName, String payload) {
PublishTelemetryRequestOptions publishTelemetryRequestOptions = new PublishTelemetryRequestOptions();
return withContext(context -> publishComponentTelemetryWithResponse(digitalTwinId, componentName, payload, publishTelemetryRequestOptions, context))
.flatMap(voidResponse -> Mono.empty());
}

/**
* Publishes telemetry from a digital twin's component
* The result is then consumed by one or many destination endpoints (subscribers) defined under {@link EventRoute}
* These event routes need to be set before publishing a telemetry message, in order for the telemetry message to be consumed.
* @param digitalTwinId The Id of the digital twin.
* @param componentName The name of the DTDL component.
* @param payload The application/json telemetry payload to be sent.
* @param publishTelemetryRequestOptions The additional information to be used when processing a telemetry request.
* @return A {@link Response} containing an empty mono.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<Void>> publishComponentTelemetryWithResponse(String digitalTwinId, String componentName, String payload, PublishTelemetryRequestOptions publishTelemetryRequestOptions) {
return withContext(context -> publishComponentTelemetryWithResponse(digitalTwinId, componentName, payload, publishTelemetryRequestOptions, context));
}

Mono<Response<Void>> publishComponentTelemetryWithResponse(String digitalTwinId, String componentName, String payload, PublishTelemetryRequestOptions publishTelemetryRequestOptions, Context context) {

Object payloadObject = null;
try {
payloadObject = mapper.readValue(payload, Object.class);
}
catch (JsonProcessingException e) {
logger.error("Could not parse the payload [%s]: %s", payload, e);
return Mono.error(e);
}

return protocolLayer.getDigitalTwins().sendComponentTelemetryWithResponseAsync(
digitalTwinId,
componentName,
publishTelemetryRequestOptions.getMessageId(),
payloadObject,
publishTelemetryRequestOptions.getTimestamp().toString(),
context);
}

//endregion Telemetry APIs
}
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,7 @@ public Response<Void> deleteModelWithResponse(String modelId, Context context) {
* Decommissions a model.
* @param modelId The Id of the model to decommission.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public void decommissionModel(String modelId) {
decommissionModelWithResponse(modelId, Context.NONE);
}
Expand All @@ -560,6 +561,7 @@ public void decommissionModel(String modelId) {
* @param context Additional context that is passed through the Http pipeline during the service call.
* @return The http response.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<Void> decommissionModelWithResponse(String modelId, Context context) {
return digitalTwinsAsyncClient.decommissionModelWithResponse(modelId, context).block();
}
Expand Down Expand Up @@ -717,10 +719,11 @@ public void createEventRoute(String eventRouteId, EventRoute eventRoute) {
* @param eventRouteId The id of the event route to create.
* @param eventRoute The event route to create.
* @param context Additional context that is passed through the Http pipeline during the service call.
* @return A {@link Response}.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public void createEventRouteWithResponse(String eventRouteId, EventRoute eventRoute, Context context) {
this.digitalTwinsAsyncClient.createEventRouteWithResponse(eventRouteId, eventRoute, context).block();
public Response<Void> createEventRouteWithResponse(String eventRouteId, EventRoute eventRoute, Context context) {
return this.digitalTwinsAsyncClient.createEventRouteWithResponse(eventRouteId, eventRoute, context).block();
}

/**
Expand Down Expand Up @@ -768,4 +771,66 @@ public PagedIterable<EventRoute> listEventRoutes(EventRoutesListOptions options,
}

//endregion Event Route APIs

//region Telemetry APIs

/**
* Publishes telemetry from a digital twin
* The result is then consumed by one or many destination endpoints (subscribers) defined under {@link EventRoute}
* These event routes need to be set before publishing a telemetry message, in order for the telemetry message to be consumed.
* @param digitalTwinId The Id of the digital twin.
* @param payload The application/json telemetry payload to be sent.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public void publishTelemetry(String digitalTwinId, String payload) {
PublishTelemetryRequestOptions publishTelemetryRequestOptions = new PublishTelemetryRequestOptions();
publishTelemetryWithResponse(digitalTwinId, payload, publishTelemetryRequestOptions, Context.NONE);
}

/**
* Publishes telemetry from a digital twin
* The result is then consumed by one or many destination endpoints (subscribers) defined under {@link EventRoute}
* These event routes need to be set before publishing a telemetry message, in order for the telemetry message to be consumed.
* @param digitalTwinId The Id of the digital twin.
* @param payload The application/json telemetry payload to be sent.
* @param publishTelemetryRequestOptions The additional information to be used when processing a telemetry request.
* @param context Additional context that is passed through the Http pipeline during the service call.
* @return A {@link Response}.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<Void> publishTelemetryWithResponse(String digitalTwinId, String payload, PublishTelemetryRequestOptions publishTelemetryRequestOptions, Context context) {
return digitalTwinsAsyncClient.publishTelemetryWithResponse(digitalTwinId, payload, publishTelemetryRequestOptions, context).block();
}

/**
* Publishes telemetry from a digital twin's component
* The result is then consumed by one or many destination endpoints (subscribers) defined under {@link EventRoute}
* These event routes need to be set before publishing a telemetry message, in order for the telemetry message to be consumed.
* @param digitalTwinId The Id of the digital twin.
* @param componentName The name of the DTDL component.
* @param payload The application/json telemetry payload to be sent.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public void publishComponentTelemetry(String digitalTwinId, String componentName, String payload) {
PublishTelemetryRequestOptions publishTelemetryRequestOptions = new PublishTelemetryRequestOptions();
publishComponentTelemetryWithResponse(digitalTwinId, componentName, payload, publishTelemetryRequestOptions, Context.NONE);
}

/**
* Publishes telemetry from a digital twin's component
* The result is then consumed by one or many destination endpoints (subscribers) defined under {@link EventRoute}
* These event routes need to be set before publishing a telemetry message, in order for the telemetry message to be consumed.
* @param digitalTwinId The Id of the digital twin.
* @param componentName The name of the DTDL component.
* @param payload The application/json telemetry payload to be sent.
* @param publishTelemetryRequestOptions The additional information to be used when processing a telemetry request.
* @param context Additional context that is passed through the Http pipeline during the service call.
* @return A {@link Response}.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<Void> publishComponentTelemetryWithResponse(String digitalTwinId, String componentName, String payload, PublishTelemetryRequestOptions publishTelemetryRequestOptions, Context context) {
return digitalTwinsAsyncClient.publishComponentTelemetryWithResponse(digitalTwinId, componentName, payload, publishTelemetryRequestOptions, context).block();
}

//endregion TelemetryAPIs
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.azure.digitaltwins.core.util;

import com.azure.core.annotation.Fluent;

import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.UUID;

/**
* The additional information to be used when processing a telemetry request.
*/
@Fluent
public final class PublishTelemetryRequestOptions {

/**
* A unique message identifier (within the scope of the digital twin id) that is commonly used for de-duplicating messages.
* Defaults to a random guid.
*/
private String messageId = UUID.randomUUID().toString();

/**
* An RFC 3339 timestamp that identifies the time the telemetry was measured.
* It defaults to the current date/time UTC.
*/
private OffsetDateTime timestamp = OffsetDateTime.now(ZoneOffset.UTC);

/**
* Gets the message Id.
* @return A unique message identifier (within the scope of the digital twin id) that is commonly used for de-duplicating messages.
*/
public String getMessageId() {
return this.messageId;
}

/**
* Gets the timestamp.
* @return The timestamp that identifies the time the telemetry was measured.
*/
public OffsetDateTime getTimestamp() {
return this.timestamp;
}

/**
* Set the message Id
* @param messageId A unique message identifier (within the scope of the digital twin id) that is commonly used for de-duplicating messages.
* @return The PublishTelemetryRequestOptions object itself.
*/
public PublishTelemetryRequestOptions setMessageId(String messageId) {
this.messageId = messageId;
return this;
}

/**
* Set the timestamp
* @param timestamp The timestamp that identifies the time the telemetry was measured.
* @return The PublishTelemetryRequestOptions object itself.
*/
public PublishTelemetryRequestOptions setTimestamp(OffsetDateTime timestamp) {
this.timestamp = timestamp;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void componentLifecycleTest(HttpClient httpClient, DigitalTwinsServiceVer
StepVerifier.create(asyncClient.updateComponentWithResponse(roomWithWifiTwinId, wifiComponentName, TestAssetsHelper.getWifiComponentUpdatePayload(), new UpdateComponentRequestOptions()))
.assertNext(updateResponse -> {
assertEquals(updateResponse.getStatusCode(), HttpURLConnection.HTTP_NO_CONTENT);
logger.info("Updated component successfully");
logger.info("Updated the component successfully");
})
.verifyComplete();
}
Expand Down