Skip to content

Commit

Permalink
Minor cleanup of azure-monitor-ingestion (#40925)
Browse files Browse the repository at this point in the history
  • Loading branch information
alzimmermsft committed Jul 9, 2024
1 parent 3b7e612 commit 6b78a86
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@
import java.util.function.Consumer;

import static com.azure.core.util.FluxUtil.withContext;
import static com.azure.monitor.ingestion.implementation.Utils.CONTENT_ENCODING;
import static com.azure.monitor.ingestion.implementation.Utils.GZIP;
import static com.azure.monitor.ingestion.implementation.Utils.getConcurrency;
import static com.azure.monitor.ingestion.implementation.Utils.gzipRequest;

/**
* <p>This class provides an asynchronous client for uploading custom logs to an Azure Monitor Log Analytics workspace.
* This client encapsulates REST API calls, used to send data to a Log Analytics workspace, into a set of asynchronous operations.</p>
* This client encapsulates REST API calls, used to send data to a Log Analytics workspace, into a set of asynchronous
* operations.</p>
*
* <h2>Getting Started</h2>
*
Expand All @@ -52,9 +52,11 @@
* <ol>
* <li>{@code endpoint} - The <a href="https://learn.microsoft.com/azure/azure-monitor/essentials/data-collection-endpoint-overview?tabs=portal#create-a-data-collection-endpoint">data collection endpoint</a>.
* See {@link LogsIngestionClientBuilder#endpoint(String) endpoint} method for more details.</li>
* <li>{@code credential} - The AAD authentication credential that has the "Monitoring Metrics Publisher" role assigned to it.
* <li>{@code credential} - The AAD authentication credential that has the "Monitoring Metrics Publisher" role assigned
* to it.
* <a href="https://learn.microsoft.com/java/api/overview/azure/identity-readme?view=azure-java-stable">Azure Identity</a>
* provides a variety of AAD credential types that can be used. See {@link LogsIngestionClientBuilder#credential(TokenCredential) credential } method for more details.</li>
* provides a variety of AAD credential types that can be used. See
* {@link LogsIngestionClientBuilder#credential(TokenCredential) credential} method for more details.</li>
* </ol>
*
* <p><strong>Instantiating an asynchronous Logs ingestion client</strong></p>
Expand All @@ -75,13 +77,16 @@
*
* <ul>
* <li>
* {@link LogsIngestionAsyncClient#upload(String, String, Iterable) upload(String, String, Iterable)} - Uploads logs to a Log Analytics workspace.
* {@link #upload(String, String, Iterable) upload(String, String, Iterable)} - Uploads
* logs to a Log Analytics workspace.
* </li>
* <li>
* {@link LogsIngestionAsyncClient#upload(String, String, Iterable, LogsUploadOptions) upload(String, String, Iterable, LogsUploadOptions)} - Uploads logs to a Log Analytics workspace with options to configure the upload request.
* {@link #upload(String, String, Iterable, LogsUploadOptions) upload(String, String, Iterable, LogsUploadOptions)}
* - Uploads logs to a Log Analytics workspace with options to configure the upload request.
* </li>
* <li>
* {@link LogsIngestionAsyncClient#uploadWithResponse(String, String, BinaryData, RequestOptions) uploadWithResponse(String, String, BinaryData, RequestOptions)} - Uploads logs to a Log Analytics workspace with options to configure the HTTP request.
* {@link #uploadWithResponse(String, String, BinaryData, RequestOptions) uploadWithResponse(String, String, BinaryData, RequestOptions)}
* - Uploads logs to a Log Analytics workspace with options to configure the HTTP request.
* </li>
* </ul>
*
Expand All @@ -95,6 +100,7 @@ public final class LogsIngestionAsyncClient {

/**
* Creates a {@link LogsIngestionAsyncClient} that sends requests to the data collection endpoint.
*
* @param service The {@link IngestionUsingDataCollectionRulesClient} that the client routes its request through.
*/
LogsIngestionAsyncClient(IngestionUsingDataCollectionRulesAsyncClient service) {
Expand Down Expand Up @@ -169,22 +175,22 @@ public Mono<Void> upload(String ruleId, String streamName, Iterable<Object> logs
* @throws IllegalArgumentException if {@code logs} is empty.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Void> upload(String ruleId, String streamName,
Iterable<Object> logs, LogsUploadOptions options) {
public Mono<Void> upload(String ruleId, String streamName, Iterable<Object> logs, LogsUploadOptions options) {
return withContext(context -> upload(ruleId, streamName, logs, options, context));
}

/**
* This method is used to upload logs to Azure Monitor Log Analytics with specified data collection rule id and stream name. This
* upload method provides a more granular control of the HTTP request sent to the service. Use {@link RequestOptions}
* to configure the HTTP request.
* This method is used to upload logs to Azure Monitor Log Analytics with specified data collection rule id and
* stream name. This upload method provides a more granular control of the HTTP request sent to the service. Use
* {@link RequestOptions} to configure the HTTP request.
*
* <p>
* The input logs should be a JSON array with each element in the array
* matching the <a href="https://learn.microsoft.com/azure/azure-monitor/essentials/data-collection-rule-structure#streamdeclarations">schema defined
* by the stream name</a>. The stream's schema can be found in the Azure portal. This content will be gzipped before sending to the service.
* If the content is already gzipped, then set the {@code Content-Encoding} header to {@code gzip} using {@link RequestOptions#setHeader(HttpHeaderName, String) requestOptions}
* and pass the content as is.
* by the stream name</a>. The stream's schema can be found in the Azure portal. This content will be gzipped before
* sending to the service. If the content is already gzipped, then set the {@code Content-Encoding} header to
* {@code gzip} using {@link RequestOptions#setHeader(HttpHeaderName, String) requestOptions} and pass the content
* as is.
* </p>
*
* <p><strong>Header Parameters</strong>
Expand All @@ -208,15 +214,15 @@ public Mono<Void> upload(String ruleId, String streamName,
* @param streamName The streamDeclaration name as defined in the Data Collection Rule.
* @param logs An array of objects matching the schema defined by the provided stream.
* @param requestOptions The options to configure the HTTP request before HTTP client sends it.
* @return the {@link Response} on successful completion of {@link Mono}.
* @throws HttpResponseException thrown if the request is rejected by server.
* @throws ClientAuthenticationException thrown if the request is rejected by server on status code 401.
* @throws ResourceNotFoundException thrown if the request is rejected by server on status code 404.
* @throws ResourceModifiedException thrown if the request is rejected by server on status code 409.
* @return the {@link Response} on successful completion of {@link Mono}.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<Void>> uploadWithResponse(
String ruleId, String streamName, BinaryData logs, RequestOptions requestOptions) {
public Mono<Response<Void>> uploadWithResponse(String ruleId, String streamName, BinaryData logs,
RequestOptions requestOptions) {
Objects.requireNonNull(ruleId, "'ruleId' cannot be null.");
Objects.requireNonNull(streamName, "'streamName' cannot be null.");
Objects.requireNonNull(logs, "'logs' cannot be null.");
Expand All @@ -225,44 +231,42 @@ public Mono<Response<Void>> uploadWithResponse(
requestOptions = new RequestOptions();
}
requestOptions.addRequestCallback(request -> {
HttpHeader httpHeader = request.getHeaders().get(CONTENT_ENCODING);
HttpHeader httpHeader = request.getHeaders().get(HttpHeaderName.CONTENT_ENCODING);
if (httpHeader == null) {
BinaryData gzippedRequest = BinaryData.fromBytes(gzipRequest(logs.toBytes()));
request.setBody(gzippedRequest);
request.setHeader(CONTENT_ENCODING, GZIP);
request.setHeader(HttpHeaderName.CONTENT_ENCODING, GZIP);
}
});
return service.uploadWithResponse(ruleId, streamName, logs, requestOptions);
}

Mono<Void> upload(String ruleId, String streamName,
Iterable<Object> logs, LogsUploadOptions options,
Context context) {
Mono<Void> upload(String ruleId, String streamName, Iterable<Object> logs, LogsUploadOptions options,
Context context) {
return Mono.defer(() -> splitAndUpload(ruleId, streamName, logs, options, context));
}


/**
* This method splits the input logs into < 1MB HTTP requests and uploads to the Azure Monitor service.
*
* @param ruleId The data collection rule id.
* @param streamName The stream name configured in the data collection rule.
* @param logs The input logs to upload.
* @param options The options to configure the upload request.
* @param context additional context that is passed through the Http pipeline during the service call. If no
* additional context is required, pass {@link Context#NONE} instead.
* additional context is required, pass {@link Context#NONE} instead.
* @return the {@link Mono} that completes on completion of the upload request.
*/
private Mono<Void> splitAndUpload(String ruleId, String streamName, Iterable<Object> logs,
LogsUploadOptions options, Context context) {
LogsUploadOptions options, Context context) {

int concurrency = getConcurrency(options);

return new Batcher(options, logs)
.toFlux()
return new Batcher(options, logs).toFlux()
.flatMapSequential(request -> uploadToService(ruleId, streamName, context, request), concurrency)
.<LogsUploadException>handle((responseHolder, sink) -> processResponse(options, responseHolder, sink))
.collectList()
.handle((result, sink) -> processExceptions(result, sink));
.handle(this::processExceptions);
}

private void processExceptions(List<LogsUploadException> result, SynchronousSink<Void> sink) {
Expand All @@ -279,33 +283,33 @@ private void processExceptions(List<LogsUploadException> result, SynchronousSink
}
}

private void processResponse(LogsUploadOptions options, UploadLogsResponseHolder responseHolder, SynchronousSink<LogsUploadException> sink) {
private void processResponse(LogsUploadOptions options, UploadLogsResponseHolder responseHolder,
SynchronousSink<LogsUploadException> sink) {
if (responseHolder.getException() != null) {
Consumer<LogsUploadError> uploadLogsErrorConsumer = null;
if (options != null) {
uploadLogsErrorConsumer = options.getLogsUploadErrorConsumer();
}
if (uploadLogsErrorConsumer != null) {
uploadLogsErrorConsumer.accept(new LogsUploadError(responseHolder.getException(), responseHolder.getRequest().getLogs()));
uploadLogsErrorConsumer.accept(
new LogsUploadError(responseHolder.getException(), responseHolder.getRequest().getLogs()));
return;
}
// emit the responseHolder without the original logs only if there's an error and there's no
// error consumer
sink.next(new LogsUploadException(Collections.singletonList(responseHolder.getException()),
responseHolder.getRequest().getLogs().size()));
responseHolder.getRequest().getLogs().size()));
}
}

private Mono<UploadLogsResponseHolder> uploadToService(String ruleId, String streamName,
Context context,
LogsIngestionRequest request) {
RequestOptions requestOptions = new RequestOptions()
.addHeader(CONTENT_ENCODING, GZIP)
.setContext(context);
return service.uploadWithResponse(ruleId, streamName,
BinaryData.fromBytes(request.getRequestBody()), requestOptions)
.map(response -> new UploadLogsResponseHolder(null, null))
.onErrorResume(HttpResponseException.class,
ex -> Mono.fromSupplier(() -> new UploadLogsResponseHolder(request, ex)));
private Mono<UploadLogsResponseHolder> uploadToService(String ruleId, String streamName, Context context,
LogsIngestionRequest request) {
RequestOptions requestOptions = new RequestOptions().addHeader(HttpHeaderName.CONTENT_ENCODING, GZIP)
.setContext(context);
return service.uploadWithResponse(ruleId, streamName, BinaryData.fromBytes(request.getRequestBody()),
requestOptions)
.map(response -> new UploadLogsResponseHolder(null, null))
.onErrorResume(HttpResponseException.class,
ex -> Mono.fromSupplier(() -> new UploadLogsResponseHolder(request, ex)));
}
}
Loading

0 comments on commit 6b78a86

Please sign in to comment.