From 9bd2256a28d2aa7e746960ee9a1bdfee5ab1c879 Mon Sep 17 00:00:00 2001 From: Sourabh Jain Date: Tue, 4 Apr 2023 00:07:29 +0530 Subject: [PATCH] refactporing --- .../src/Telemetry/ClientTelemetry.cs | 52 ++++++++----------- .../src/Telemetry/ClientTelemetryOptions.cs | 4 +- .../src/Telemetry/ClientTelemetryProcessor.cs | 10 ++-- 3 files changed, 31 insertions(+), 35 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs index 90ffa2e750..8e08b9421d 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs @@ -28,8 +28,6 @@ namespace Microsoft.Azure.Cosmos.Telemetry /// internal class ClientTelemetry : IDisposable { - private const int allowedNumberOfFailures = 3; - private static readonly TimeSpan observingWindow = ClientTelemetryOptions.GetScheduledTimeSpan(); private readonly ClientTelemetryProperties clientTelemetryInfo; @@ -37,8 +35,8 @@ internal class ClientTelemetry : IDisposable private readonly DiagnosticsHandlerHelper diagnosticsHelper; private readonly NetworkDataRecorder networkDataRecorder; - private readonly CancellationTokenSource cancellationTokenSource; - private readonly CancellationTokenSource processorCancellationTokenSource = new CancellationTokenSource(ClientTelemetryOptions.ProcessorTimeOutInMs); // 5 min + private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); + private readonly CancellationTokenSource processorCancellationTokenSource = new CancellationTokenSource(ClientTelemetryOptions.ProcessorTimeOut); // 5 min private readonly GlobalEndpointManager globalEndpointManager; @@ -57,7 +55,6 @@ private ConcurrentDictionary cacheRef /// internal ClientTelemetry() { - this.cancellationTokenSource = new CancellationTokenSource(); } /// @@ -123,8 +120,6 @@ internal ClientTelemetry( aggregationIntervalInSec: (int)observingWindow.TotalSeconds); this.networkDataRecorder = new NetworkDataRecorder(); - - this.cancellationTokenSource = new CancellationTokenSource(); } /// @@ -137,9 +132,9 @@ private void StartObserverTask() /// /// Task which does below operations , periodically - /// 1. Set Account information (one time at the time of initialization) - /// 2. Load VM metedata information (one time at the time of initialization) - /// 3. Calculate and Send telemetry Information to juno service (never ending task)/// + /// 1. Set Account information (one time during initialization) + /// 2. Load VM metedata information (one time during initialization) + /// 3. Calculate and Send telemetry Information to Client Telemetry Service (never ending task)/// /// Async Task private async Task EnrichAndSendAsync() { @@ -179,28 +174,27 @@ ConcurrentDictionary cacheRefreshInfo try { - // Use the Wait method with a CancellationToken to wait for the task to complete. - // If anyhow prev task was not finished sucessfully in 5 min then ,Do not trigger a new task. - this.processorTask?.Wait(this.processorCancellationTokenSource.Token); - - Task tempProcessorTask = Task.Run(() => this.processor - .ProcessAndSendAsync( - clientTelemetryInfo: this.clientTelemetryInfo, - operationInfoSnapshot: operationInfoSnapshot, - cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot, - requestInfoSnapshot: requestInfoSnapshot, - cancellationToken: this.cancellationTokenSource.Token)); - tempProcessorTask.Start(); // run it in background - - this.processorTask = tempProcessorTask; - } - catch (OperationCanceledException) - { - DefaultTrace.TraceError("Telemetry Job Processor failed due to timeout in 5 min"); + // Initiating Telemetry Data Processor task which will serialize and send telemetry information to Client Telemetry Service service + this.processorTask = Task.Run( + function: () => this.processor + .ProcessAndSendAsync( + clientTelemetryInfo: this.clientTelemetryInfo, + operationInfoSnapshot: operationInfoSnapshot, + cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot, + requestInfoSnapshot: requestInfoSnapshot), + cancellationToken: this.processorCancellationTokenSource.Token) + .ContinueWith((i) => + { + Exception ex = i.Exception?.GetBaseException(); + if (ex != null) + { + DefaultTrace.TraceError($"Client Telemetry data processing task faulted and stopped running. ErrorType={ex.GetType()} ErrorMessage={ex.Message}"); + } + }, TaskContinuationOptions.OnlyOnFaulted); } catch (Exception ex) { - DefaultTrace.TraceError("Telemetry Job Processor failed with error : {0}", ex); + DefaultTrace.TraceError("Telemetry Job Processor failed to run with error : {0}", ex); } } } diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs index 6341836e79..2ff4f0a309 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs @@ -79,13 +79,15 @@ internal static class ClientTelemetryOptions internal const double Percentile99 = 99.0; internal const double Percentile999 = 99.9; internal const string DateFormat = "yyyy-MM-ddTHH:mm:ssZ"; - internal const int ProcessorTimeOutInMs = 5 * 60 * 1000; // 5 minutes + internal const string EnvPropsClientTelemetrySchedulingInSeconds = "COSMOS.CLIENT_TELEMETRY_SCHEDULING_IN_SECONDS"; internal const string EnvPropsClientTelemetryEnabled = "COSMOS.CLIENT_TELEMETRY_ENABLED"; internal const string EnvPropsClientTelemetryVmMetadataUrl = "COSMOS.VM_METADATA_URL"; internal const string EnvPropsClientTelemetryEndpoint = "COSMOS.CLIENT_TELEMETRY_ENDPOINT"; internal const string EnvPropsClientTelemetryEnvironmentName = "COSMOS.ENVIRONMENT_NAME"; + internal static readonly TimeSpan ClientTelemetryServiceTimeOut = TimeSpan.FromMinutes(1); + internal static readonly TimeSpan ProcessorTimeOut = TimeSpan.FromMinutes(5); // 5 minutes internal static readonly ResourceType AllowedResourceTypes = ResourceType.Document; // Why 5 sec? As of now, if any network request is taking more than 5 millisecond sec, we will consider it slow request this value can be revisited in future internal static readonly TimeSpan NetworkLatencyThreshold = TimeSpan.FromMilliseconds(5); diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs index c9511cb4e4..5f790f9b72 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs @@ -23,7 +23,8 @@ internal class ClientTelemetryProcessor private readonly AuthorizationTokenProvider tokenProvider; private readonly CosmosHttpClient httpClient; - + private readonly CancellationTokenSource serviceCancellationToken = new CancellationTokenSource(ClientTelemetryOptions.ClientTelemetryServiceTimeOut); + internal ClientTelemetryProcessor(CosmosHttpClient httpClient, AuthorizationTokenProvider tokenProvider) { this.httpClient = httpClient ?? throw new ArgumentNullException(nameof(httpClient)); @@ -38,8 +39,7 @@ internal async Task ProcessAndSendAsync( ClientTelemetryProperties clientTelemetryInfo, ConcurrentDictionary operationInfoSnapshot, ConcurrentDictionary cacheRefreshInfoSnapshot, - IReadOnlyList requestInfoSnapshot, - CancellationToken cancellationToken) + IReadOnlyList requestInfoSnapshot) { try { @@ -48,7 +48,7 @@ await ClientTelemetryPayloadWriter.SerializedPayloadChunksAsync( operationInfoSnapshot: operationInfoSnapshot, cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot, sampledRequestInfo: requestInfoSnapshot, - callback: async (payload) => await this.SendAsync(clientTelemetryInfo.GlobalDatabaseAccountName, payload, cancellationToken)); + callback: async (payload) => await this.SendAsync(clientTelemetryInfo.GlobalDatabaseAccountName, payload, this.serviceCancellationToken.Token)); } catch (Exception ex) { @@ -59,7 +59,7 @@ await ClientTelemetryPayloadWriter.SerializedPayloadChunksAsync( } /// - /// Task to send telemetry information to configured Juno endpoint. + /// Task to send telemetry information to configured Client Telemetry Service endpoint. /// If endpoint is not configured then it won't even try to send information. It will just trace an error message. /// In any case it resets the telemetry information to collect the latest one. ///