Skip to content

Commit

Permalink
refactporing
Browse files Browse the repository at this point in the history
  • Loading branch information
sourabh1007 committed Apr 3, 2023
1 parent 4144509 commit 9bd2256
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 35 deletions.
52 changes: 23 additions & 29 deletions Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,15 @@ namespace Microsoft.Azure.Cosmos.Telemetry
/// </summary>
internal class ClientTelemetry : IDisposable
{
private const int allowedNumberOfFailures = 3;

private static readonly TimeSpan observingWindow = ClientTelemetryOptions.GetScheduledTimeSpan();

private readonly ClientTelemetryProperties clientTelemetryInfo;
private readonly ClientTelemetryProcessor processor;
private readonly DiagnosticsHandlerHelper diagnosticsHelper;
private readonly NetworkDataRecorder networkDataRecorder;

private readonly CancellationTokenSource cancellationTokenSource;
private readonly CancellationTokenSource processorCancellationTokenSource = new CancellationTokenSource(ClientTelemetryOptions.ProcessorTimeOutInMs); // 5 min
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
private readonly CancellationTokenSource processorCancellationTokenSource = new CancellationTokenSource(ClientTelemetryOptions.ProcessorTimeOut); // 5 min

private readonly GlobalEndpointManager globalEndpointManager;

Expand All @@ -57,7 +55,6 @@ private ConcurrentDictionary<CacheRefreshInfo, LongConcurrentHistogram> cacheRef
/// </summary>
internal ClientTelemetry()
{
this.cancellationTokenSource = new CancellationTokenSource();
}

/// <summary>
Expand Down Expand Up @@ -123,8 +120,6 @@ internal ClientTelemetry(
aggregationIntervalInSec: (int)observingWindow.TotalSeconds);

this.networkDataRecorder = new NetworkDataRecorder();

this.cancellationTokenSource = new CancellationTokenSource();
}

/// <summary>
Expand All @@ -137,9 +132,9 @@ private void StartObserverTask()

/// <summary>
/// Task which does below operations , periodically
/// 1. Set Account information (one time at the time of initialization)
/// 2. Load VM metedata information (one time at the time of initialization)
/// 3. Calculate and Send telemetry Information to juno service (never ending task)/// </summary>
/// 1. Set Account information (one time during initialization)
/// 2. Load VM metedata information (one time during initialization)
/// 3. Calculate and Send telemetry Information to Client Telemetry Service (never ending task)/// </summary>
/// <returns>Async Task</returns>
private async Task EnrichAndSendAsync()
{
Expand Down Expand Up @@ -179,28 +174,27 @@ ConcurrentDictionary<CacheRefreshInfo, LongConcurrentHistogram> cacheRefreshInfo

try
{
// Use the Wait method with a CancellationToken to wait for the task to complete.
// If anyhow prev task was not finished sucessfully in 5 min then ,Do not trigger a new task.
this.processorTask?.Wait(this.processorCancellationTokenSource.Token);

Task tempProcessorTask = Task.Run(() => this.processor
.ProcessAndSendAsync(
clientTelemetryInfo: this.clientTelemetryInfo,
operationInfoSnapshot: operationInfoSnapshot,
cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot,
requestInfoSnapshot: requestInfoSnapshot,
cancellationToken: this.cancellationTokenSource.Token));
tempProcessorTask.Start(); // run it in background

this.processorTask = tempProcessorTask;
}
catch (OperationCanceledException)
{
DefaultTrace.TraceError("Telemetry Job Processor failed due to timeout in 5 min");
// Initiating Telemetry Data Processor task which will serialize and send telemetry information to Client Telemetry Service service
this.processorTask = Task.Run(
function: () => this.processor
.ProcessAndSendAsync(
clientTelemetryInfo: this.clientTelemetryInfo,
operationInfoSnapshot: operationInfoSnapshot,
cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot,
requestInfoSnapshot: requestInfoSnapshot),
cancellationToken: this.processorCancellationTokenSource.Token)
.ContinueWith((i) =>
{
Exception ex = i.Exception?.GetBaseException();
if (ex != null)
{
DefaultTrace.TraceError($"Client Telemetry data processing task faulted and stopped running. ErrorType={ex.GetType()} ErrorMessage={ex.Message}");
}
}, TaskContinuationOptions.OnlyOnFaulted);
}
catch (Exception ex)
{
DefaultTrace.TraceError("Telemetry Job Processor failed with error : {0}", ex);
DefaultTrace.TraceError("Telemetry Job Processor failed to run with error : {0}", ex);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,15 @@ internal static class ClientTelemetryOptions
internal const double Percentile99 = 99.0;
internal const double Percentile999 = 99.9;
internal const string DateFormat = "yyyy-MM-ddTHH:mm:ssZ";
internal const int ProcessorTimeOutInMs = 5 * 60 * 1000; // 5 minutes

internal const string EnvPropsClientTelemetrySchedulingInSeconds = "COSMOS.CLIENT_TELEMETRY_SCHEDULING_IN_SECONDS";
internal const string EnvPropsClientTelemetryEnabled = "COSMOS.CLIENT_TELEMETRY_ENABLED";
internal const string EnvPropsClientTelemetryVmMetadataUrl = "COSMOS.VM_METADATA_URL";
internal const string EnvPropsClientTelemetryEndpoint = "COSMOS.CLIENT_TELEMETRY_ENDPOINT";
internal const string EnvPropsClientTelemetryEnvironmentName = "COSMOS.ENVIRONMENT_NAME";

internal static readonly TimeSpan ClientTelemetryServiceTimeOut = TimeSpan.FromMinutes(1);
internal static readonly TimeSpan ProcessorTimeOut = TimeSpan.FromMinutes(5); // 5 minutes
internal static readonly ResourceType AllowedResourceTypes = ResourceType.Document;
// Why 5 sec? As of now, if any network request is taking more than 5 millisecond sec, we will consider it slow request this value can be revisited in future
internal static readonly TimeSpan NetworkLatencyThreshold = TimeSpan.FromMilliseconds(5);
Expand Down
10 changes: 5 additions & 5 deletions Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ internal class ClientTelemetryProcessor

private readonly AuthorizationTokenProvider tokenProvider;
private readonly CosmosHttpClient httpClient;

private readonly CancellationTokenSource serviceCancellationToken = new CancellationTokenSource(ClientTelemetryOptions.ClientTelemetryServiceTimeOut);

internal ClientTelemetryProcessor(CosmosHttpClient httpClient, AuthorizationTokenProvider tokenProvider)
{
this.httpClient = httpClient ?? throw new ArgumentNullException(nameof(httpClient));
Expand All @@ -38,8 +39,7 @@ internal async Task ProcessAndSendAsync(
ClientTelemetryProperties clientTelemetryInfo,
ConcurrentDictionary<OperationInfo, (LongConcurrentHistogram latency, LongConcurrentHistogram requestcharget)> operationInfoSnapshot,
ConcurrentDictionary<CacheRefreshInfo, LongConcurrentHistogram> cacheRefreshInfoSnapshot,
IReadOnlyList<RequestInfo> requestInfoSnapshot,
CancellationToken cancellationToken)
IReadOnlyList<RequestInfo> requestInfoSnapshot)
{
try
{
Expand All @@ -48,7 +48,7 @@ await ClientTelemetryPayloadWriter.SerializedPayloadChunksAsync(
operationInfoSnapshot: operationInfoSnapshot,
cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot,
sampledRequestInfo: requestInfoSnapshot,
callback: async (payload) => await this.SendAsync(clientTelemetryInfo.GlobalDatabaseAccountName, payload, cancellationToken));
callback: async (payload) => await this.SendAsync(clientTelemetryInfo.GlobalDatabaseAccountName, payload, this.serviceCancellationToken.Token));
}
catch (Exception ex)
{
Expand All @@ -59,7 +59,7 @@ await ClientTelemetryPayloadWriter.SerializedPayloadChunksAsync(
}

/// <summary>
/// Task to send telemetry information to configured Juno endpoint.
/// Task to send telemetry information to configured Client Telemetry Service endpoint.
/// If endpoint is not configured then it won't even try to send information. It will just trace an error message.
/// In any case it resets the telemetry information to collect the latest one.
/// </summary>
Expand Down

0 comments on commit 9bd2256

Please sign in to comment.