Skip to content

Commit

Permalink
[Internal] Client Telemetry: Refactors code to run client telemetry d…
Browse files Browse the repository at this point in the history
…ata processing task in background. (#3783)

* first draft

* remove failure count test

* refactporing

* code refactor

* create task with timeout

* fix test

* code refactoring

* fix timeout code

* space fix

* not failing if processor is taking time

* fix procrsser test

* code refactor

* refactor and test fix
  • Loading branch information
sourabh1007 committed Apr 10, 2023
1 parent 789f701 commit 2b6fdb7
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 64 deletions.
78 changes: 48 additions & 30 deletions Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ namespace Microsoft.Azure.Cosmos.Telemetry
using Microsoft.Azure.Cosmos.Tracing.TraceData;
using Microsoft.Azure.Documents;
using Util;
using static Microsoft.Azure.Cosmos.Tracing.TraceData.ClientSideRequestStatisticsTraceDatum;

/// <summary>
/// This class collects and send all the telemetry information.
Expand All @@ -29,8 +28,6 @@ namespace Microsoft.Azure.Cosmos.Telemetry
/// </summary>
internal class ClientTelemetry : IDisposable
{
private const int allowedNumberOfFailures = 3;

private static readonly TimeSpan observingWindow = ClientTelemetryOptions.GetScheduledTimeSpan();

private readonly ClientTelemetryProperties clientTelemetryInfo;
Expand All @@ -39,7 +36,6 @@ internal class ClientTelemetry : IDisposable
private readonly NetworkDataRecorder networkDataRecorder;

private readonly CancellationTokenSource cancellationTokenSource;

private readonly GlobalEndpointManager globalEndpointManager;

private Task telemetryTask;
Expand All @@ -50,8 +46,6 @@ internal class ClientTelemetry : IDisposable
private ConcurrentDictionary<CacheRefreshInfo, LongConcurrentHistogram> cacheRefreshInfoMap
= new ConcurrentDictionary<CacheRefreshInfo, LongConcurrentHistogram>();

private int numberOfFailures = 0;

/// <summary>
/// Only for Mocking in tests
/// </summary>
Expand Down Expand Up @@ -123,7 +117,6 @@ internal ClientTelemetry(
aggregationIntervalInSec: (int)observingWindow.TotalSeconds);

this.networkDataRecorder = new NetworkDataRecorder();

this.cancellationTokenSource = new CancellationTokenSource();
}

Expand All @@ -137,9 +130,9 @@ private void StartObserverTask()

/// <summary>
/// Task which does below operations , periodically
/// 1. Set Account information (one time at the time of initialization)
/// 2. Load VM metedata information (one time at the time of initialization)
/// 3. Calculate and Send telemetry Information to juno service (never ending task)/// </summary>
/// 1. Set Account information (one time during initialization)
/// 2. Load VM metedata information (one time during initialization)
/// 3. Calculate and Send telemetry Information to Client Telemetry Service (never ending task)/// </summary>
/// <returns>Async Task</returns>
private async Task EnrichAndSendAsync()
{
Expand All @@ -149,18 +142,12 @@ private async Task EnrichAndSendAsync()
{
while (!this.cancellationTokenSource.IsCancellationRequested)
{
if (this.numberOfFailures == allowedNumberOfFailures)
{
this.Dispose();
break;
}

if (string.IsNullOrEmpty(this.clientTelemetryInfo.GlobalDatabaseAccountName))
{
AccountProperties accountProperties = await ClientTelemetryHelper.SetAccountNameAsync(this.globalEndpointManager);
this.clientTelemetryInfo.GlobalDatabaseAccountName = accountProperties.Id;
}

await Task.Delay(observingWindow, this.cancellationTokenSource.Token);

this.clientTelemetryInfo.DateTimeUtc = DateTime.UtcNow.ToString(ClientTelemetryOptions.DateFormat);
Expand All @@ -180,24 +167,28 @@ private async Task EnrichAndSendAsync()

ConcurrentDictionary<CacheRefreshInfo, LongConcurrentHistogram> cacheRefreshInfoSnapshot
= Interlocked.Exchange(ref this.cacheRefreshInfoMap, new ConcurrentDictionary<CacheRefreshInfo, LongConcurrentHistogram>());


List<RequestInfo> requestInfoSnapshot = this.networkDataRecorder.GetRequests();

try
{
await this.processor
.ProcessAndSendAsync(
clientTelemetryInfo: this.clientTelemetryInfo,
operationInfoSnapshot: operationInfoSnapshot,
cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot,
requestInfoSnapshot: this.networkDataRecorder.GetRequests(),
cancellationToken: this.cancellationTokenSource.Token);

this.numberOfFailures = 0;
CancellationTokenSource cancellationToken = new CancellationTokenSource(ClientTelemetryOptions.ClientTelemetryProcessorTimeOut);
Task processorTask = Task.Run(() => this.processor
.ProcessAndSendAsync(
clientTelemetryInfo: this.clientTelemetryInfo,
operationInfoSnapshot: operationInfoSnapshot,
cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot,
requestInfoSnapshot: requestInfoSnapshot,
cancellationToken: cancellationToken.Token), cancellationToken.Token);

// Initiating Telemetry Data Processor task which will serialize and send telemetry information to Client Telemetry Service
// Not disposing this task. If we dispose a client then, telemetry job(telemetryTask) should stop but processor task(processorTask) should make best effort to finish the job in background.
_ = ClientTelemetry.RunProcessorTaskAsync(this.clientTelemetryInfo.DateTimeUtc, processorTask, ClientTelemetryOptions.ClientTelemetryProcessorTimeOut);

}
catch (Exception ex)
{
this.numberOfFailures++;

DefaultTrace.TraceError("Telemetry Job Processor failed with error : {0}", ex);
DefaultTrace.TraceError("Exception while initiating processing task : {0} with telemetry date as {1}", ex.Message, this.clientTelemetryInfo.DateTimeUtc);
}
}
}
Expand All @@ -209,6 +200,33 @@ await this.processor
DefaultTrace.TraceInformation("Telemetry Job Stopped.");
}

/// <summary>
/// This Task makes sure, processing task is timing out after 5 minute of timeout
/// </summary>
/// <param name="telemetryDate"></param>
/// <param name="processingTask"></param>
/// <param name="timeout"></param>
internal static async Task RunProcessorTaskAsync(string telemetryDate, Task processingTask, TimeSpan timeout)
{
using (CancellationTokenSource tokenForDelayTask = new CancellationTokenSource())
{
Task delayTask = Task.Delay(timeout, tokenForDelayTask.Token);

Task resultTask = await Task.WhenAny(processingTask, delayTask);
if (resultTask == delayTask)
{
DefaultTrace.TraceVerbose($"Processor task with date as {telemetryDate} is canceled as it did not finish in {timeout}");
// Operation cancelled
throw new OperationCanceledException($"Processor task with date as {telemetryDate} is canceled as it did not finish in {timeout}");
}
else
{
// Cancel the timer task so that it does not fire
tokenForDelayTask.Cancel();
}
}
}

/// <summary>
/// Collects Cache Telemetry Information.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,12 @@ internal static class ClientTelemetryOptions
internal const double Percentile99 = 99.0;
internal const double Percentile999 = 99.9;
internal const string DateFormat = "yyyy-MM-ddTHH:mm:ssZ";

internal const string EnvPropsClientTelemetrySchedulingInSeconds = "COSMOS.CLIENT_TELEMETRY_SCHEDULING_IN_SECONDS";
internal const string EnvPropsClientTelemetryEnabled = "COSMOS.CLIENT_TELEMETRY_ENABLED";
internal const string EnvPropsClientTelemetryVmMetadataUrl = "COSMOS.VM_METADATA_URL";
internal const string EnvPropsClientTelemetryEndpoint = "COSMOS.CLIENT_TELEMETRY_ENDPOINT";
internal const string EnvPropsClientTelemetryEnvironmentName = "COSMOS.ENVIRONMENT_NAME";

internal static readonly ResourceType AllowedResourceTypes = ResourceType.Document;
// Why 5 sec? As of now, if any network request is taking more than 5 millisecond sec, we will consider it slow request this value can be revisited in future
internal static readonly TimeSpan NetworkLatencyThreshold = TimeSpan.FromMilliseconds(5);
Expand All @@ -101,7 +100,8 @@ internal static class ClientTelemetryOptions

internal static readonly int NetworkTelemetrySampleSize = 200;
internal static int PayloadSizeThreshold = 1024 * 1024 * 2; // 2MB

internal static TimeSpan ClientTelemetryProcessorTimeOut = TimeSpan.FromMinutes(5);

private static Uri clientTelemetryEndpoint;
private static string environmentName;
private static TimeSpan scheduledTimeSpan = TimeSpan.Zero;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1075,33 +1075,6 @@ private static void AssertCacheRefreshInfoInformation(
}
}

[TestMethod]
public async Task CheckMisconfiguredTelemetryEndpoint_should_stop_the_job()
{
int retryCounter = 0;
HttpClientHandlerHelper customHttpHandler = new HttpClientHandlerHelper
{
RequestCallBack = (request, cancellation) =>
{
if (request.RequestUri.AbsoluteUri.Equals(ClientTelemetryOptions.GetClientTelemetryEndpoint().AbsoluteUri))
{
retryCounter++;
throw new Exception("Exception while sending telemetry");
}
return null;
}
};

Container container = await this.CreateClientAndContainer(
mode: ConnectionMode.Direct,
customHttpHandler: customHttpHandler);

await Task.Delay(TimeSpan.FromMilliseconds(5000)); // wait for 5 sec, ideally telemetry would be sent 5 times but client telemetry endpoint is not functional (in this test), it should try 3 times maximum and after that client telemetry job should be stopped.

Assert.AreEqual(3, retryCounter);
}

private static ItemBatchOperation CreateItem(string itemId)
{
var testItem = new { id = itemId, Status = itemId };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class ClientTelemetryTests
public void Cleanup()
{
Environment.SetEnvironmentVariable(ClientTelemetryOptions.EnvPropsClientTelemetryEnabled, null);
Environment.SetEnvironmentVariable(ClientTelemetryOptions.EnvPropsClientTelemetryEndpoint, null);
}

[TestMethod]
Expand Down Expand Up @@ -147,7 +148,6 @@ public async Task CheckIfPayloadIsDividedCorrectlyAsync(int expectedOperationInf
string payloadJson = request.Content.ReadAsStringAsync().Result;
Assert.IsTrue(payloadJson.Length <= ClientTelemetryOptions.PayloadSizeThreshold, "Payload Size is " + payloadJson.Length);
Console.WriteLine(payloadJson);
ClientTelemetryProperties propertiesToSend = JsonConvert.DeserializeObject<ClientTelemetryProperties>(payloadJson);
Assert.AreEqual(7, propertiesToSend.SystemInfo.Count, "System Info is not correct");
Expand Down Expand Up @@ -245,14 +245,114 @@ await processor.ProcessAndSendAsync(
clientTelemetryProperties,
operationInfoSnapshot,
cacheRefreshInfoSnapshot,
requestInfoList,
new CancellationToken());
requestInfoList,
new CancellationTokenSource(ClientTelemetryOptions.ClientTelemetryProcessorTimeOut).Token);

Assert.AreEqual(expectedOperationInfoSize, actualOperationInfoSize, "Operation Info is not correct");
Assert.AreEqual(expectedCacheRefreshInfoSize, actualCacheRefreshInfoSize, "Cache Refresh Info is not correct");
Assert.AreEqual(expectedRequestInfoSize, actualRequestInfoSize, "Request Info is not correct");
}


[TestMethod]
public async Task ClientTelmetryProcessor_should_timeout()
{
Environment.SetEnvironmentVariable(ClientTelemetryOptions.EnvPropsClientTelemetryEndpoint, "http://dummy.telemetry.endpoint/");

string data = File.ReadAllText("Telemetry/ClientTelemetryPayloadWithoutMetrics.json", Encoding.UTF8);
ClientTelemetryProperties clientTelemetryProperties = JsonConvert.DeserializeObject<ClientTelemetryProperties>(data);

int actualOperationInfoSize = 0;
int actualCacheRefreshInfoSize = 0;

Mock<IHttpHandler> mockHttpHandler = new Mock<IHttpHandler>();
_ = mockHttpHandler.Setup(x => x.SendAsync(
It.IsAny<HttpRequestMessage>(),
It.IsAny<CancellationToken>()))
.Callback<HttpRequestMessage, CancellationToken>(
(request, cancellationToken) =>
{
string payloadJson = request.Content.ReadAsStringAsync().Result;
Assert.IsTrue(payloadJson.Length <= ClientTelemetryOptions.PayloadSizeThreshold, "Payload Size is " + payloadJson.Length);
ClientTelemetryProperties propertiesToSend = JsonConvert.DeserializeObject<ClientTelemetryProperties>(payloadJson);
actualOperationInfoSize += propertiesToSend.OperationInfo?.Count ?? 0;
actualCacheRefreshInfoSize += propertiesToSend.CacheRefreshInfo?.Count ?? 0;
})
.Returns(Task.FromResult(new HttpResponseMessage(HttpStatusCode.OK)));

ClientTelemetryProcessor processor = new ClientTelemetryProcessor(
MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient(new HttpHandlerHelper(mockHttpHandler.Object))),
Mock.Of<AuthorizationTokenProvider>());

ConcurrentDictionary<OperationInfo, (LongConcurrentHistogram latency, LongConcurrentHistogram requestcharge)> operationInfoSnapshot
= new ConcurrentDictionary<OperationInfo, (LongConcurrentHistogram latency, LongConcurrentHistogram requestcharge)>();

for (int i = 0; i < 20; i++)
{
OperationInfo opeInfo = new OperationInfo(Regions.WestUS,
0,
Documents.ConsistencyLevel.Session.ToString(),
"databaseName" + i,
"containerName",
Documents.OperationType.Read,
Documents.ResourceType.Document,
200,
0);

LongConcurrentHistogram latency = new LongConcurrentHistogram(ClientTelemetryOptions.RequestLatencyMin,
ClientTelemetryOptions.RequestLatencyMax,
ClientTelemetryOptions.RequestLatencyPrecision);
latency.RecordValue(10);

LongConcurrentHistogram requestcharge = new LongConcurrentHistogram(ClientTelemetryOptions.RequestChargeMin,
ClientTelemetryOptions.RequestChargeMax,
ClientTelemetryOptions.RequestChargePrecision);
requestcharge.RecordValue(11);

operationInfoSnapshot.TryAdd(opeInfo, (latency, requestcharge));
}

ConcurrentDictionary<CacheRefreshInfo, LongConcurrentHistogram> cacheRefreshInfoSnapshot
= new ConcurrentDictionary<CacheRefreshInfo, LongConcurrentHistogram>();
for (int i = 0; i < 10; i++)
{
CacheRefreshInfo crInfo = new CacheRefreshInfo(Regions.WestUS,
10,
Documents.ConsistencyLevel.Session.ToString(),
"databaseName" + i,
"containerName",
Documents.OperationType.Read,
Documents.ResourceType.Document,
200,
1002,
"dummycache");

LongConcurrentHistogram latency = new LongConcurrentHistogram(ClientTelemetryOptions.RequestLatencyMin,
ClientTelemetryOptions.RequestLatencyMax,
ClientTelemetryOptions.RequestLatencyPrecision);
latency.RecordValue(10);

cacheRefreshInfoSnapshot.TryAdd(crInfo, latency);
}

Task processorTask = Task.Run(async () =>
{
CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(1));
await Task.Delay(1000, cts.Token); // Making this task wait to ensure that processir is taking more time.
await processor.ProcessAndSendAsync(clientTelemetryProperties,
operationInfoSnapshot,
cacheRefreshInfoSnapshot,
default,
cts.Token);
});

await Assert.ThrowsExceptionAsync<OperationCanceledException>(() => ClientTelemetry.RunProcessorTaskAsync(
telemetryDate: DateTime.Now.ToString(),
processingTask: processorTask,
timeout: TimeSpan.FromTicks(1)));
}

[TestMethod]
[ExpectedException(typeof(FormatException))]
public void CheckMisconfiguredTelemetry_should_fail()
Expand Down

0 comments on commit 2b6fdb7

Please sign in to comment.