diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs index 8a83778fdb..dde7d5a7c5 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs @@ -19,7 +19,6 @@ namespace Microsoft.Azure.Cosmos.Telemetry using Microsoft.Azure.Cosmos.Tracing.TraceData; using Microsoft.Azure.Documents; using Util; - using static Microsoft.Azure.Cosmos.Tracing.TraceData.ClientSideRequestStatisticsTraceDatum; /// /// This class collects and send all the telemetry information. @@ -29,8 +28,6 @@ namespace Microsoft.Azure.Cosmos.Telemetry /// internal class ClientTelemetry : IDisposable { - private const int allowedNumberOfFailures = 3; - private static readonly TimeSpan observingWindow = ClientTelemetryOptions.GetScheduledTimeSpan(); private readonly ClientTelemetryProperties clientTelemetryInfo; @@ -39,7 +36,6 @@ internal class ClientTelemetry : IDisposable private readonly NetworkDataRecorder networkDataRecorder; private readonly CancellationTokenSource cancellationTokenSource; - private readonly GlobalEndpointManager globalEndpointManager; private Task telemetryTask; @@ -50,8 +46,6 @@ internal class ClientTelemetry : IDisposable private ConcurrentDictionary cacheRefreshInfoMap = new ConcurrentDictionary(); - private int numberOfFailures = 0; - /// /// Only for Mocking in tests /// @@ -123,7 +117,6 @@ internal ClientTelemetry( aggregationIntervalInSec: (int)observingWindow.TotalSeconds); this.networkDataRecorder = new NetworkDataRecorder(); - this.cancellationTokenSource = new CancellationTokenSource(); } @@ -137,9 +130,9 @@ private void StartObserverTask() /// /// Task which does below operations , periodically - /// 1. Set Account information (one time at the time of initialization) - /// 2. Load VM metedata information (one time at the time of initialization) - /// 3. Calculate and Send telemetry Information to juno service (never ending task)/// + /// 1. Set Account information (one time during initialization) + /// 2. Load VM metedata information (one time during initialization) + /// 3. Calculate and Send telemetry Information to Client Telemetry Service (never ending task)/// /// Async Task private async Task EnrichAndSendAsync() { @@ -149,18 +142,12 @@ private async Task EnrichAndSendAsync() { while (!this.cancellationTokenSource.IsCancellationRequested) { - if (this.numberOfFailures == allowedNumberOfFailures) - { - this.Dispose(); - break; - } - if (string.IsNullOrEmpty(this.clientTelemetryInfo.GlobalDatabaseAccountName)) { AccountProperties accountProperties = await ClientTelemetryHelper.SetAccountNameAsync(this.globalEndpointManager); this.clientTelemetryInfo.GlobalDatabaseAccountName = accountProperties.Id; } - + await Task.Delay(observingWindow, this.cancellationTokenSource.Token); this.clientTelemetryInfo.DateTimeUtc = DateTime.UtcNow.ToString(ClientTelemetryOptions.DateFormat); @@ -180,24 +167,28 @@ private async Task EnrichAndSendAsync() ConcurrentDictionary cacheRefreshInfoSnapshot = Interlocked.Exchange(ref this.cacheRefreshInfoMap, new ConcurrentDictionary()); - + + List requestInfoSnapshot = this.networkDataRecorder.GetRequests(); + try { - await this.processor - .ProcessAndSendAsync( - clientTelemetryInfo: this.clientTelemetryInfo, - operationInfoSnapshot: operationInfoSnapshot, - cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot, - requestInfoSnapshot: this.networkDataRecorder.GetRequests(), - cancellationToken: this.cancellationTokenSource.Token); - - this.numberOfFailures = 0; + CancellationTokenSource cancellationToken = new CancellationTokenSource(ClientTelemetryOptions.ClientTelemetryProcessorTimeOut); + Task processorTask = Task.Run(() => this.processor + .ProcessAndSendAsync( + clientTelemetryInfo: this.clientTelemetryInfo, + operationInfoSnapshot: operationInfoSnapshot, + cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot, + requestInfoSnapshot: requestInfoSnapshot, + cancellationToken: cancellationToken.Token), cancellationToken.Token); + + // Initiating Telemetry Data Processor task which will serialize and send telemetry information to Client Telemetry Service + // Not disposing this task. If we dispose a client then, telemetry job(telemetryTask) should stop but processor task(processorTask) should make best effort to finish the job in background. + _ = ClientTelemetry.RunProcessorTaskAsync(this.clientTelemetryInfo.DateTimeUtc, processorTask, ClientTelemetryOptions.ClientTelemetryProcessorTimeOut); + } catch (Exception ex) { - this.numberOfFailures++; - - DefaultTrace.TraceError("Telemetry Job Processor failed with error : {0}", ex); + DefaultTrace.TraceError("Exception while initiating processing task : {0} with telemetry date as {1}", ex.Message, this.clientTelemetryInfo.DateTimeUtc); } } } @@ -209,6 +200,33 @@ await this.processor DefaultTrace.TraceInformation("Telemetry Job Stopped."); } + /// + /// This Task makes sure, processing task is timing out after 5 minute of timeout + /// + /// + /// + /// + internal static async Task RunProcessorTaskAsync(string telemetryDate, Task processingTask, TimeSpan timeout) + { + using (CancellationTokenSource tokenForDelayTask = new CancellationTokenSource()) + { + Task delayTask = Task.Delay(timeout, tokenForDelayTask.Token); + + Task resultTask = await Task.WhenAny(processingTask, delayTask); + if (resultTask == delayTask) + { + DefaultTrace.TraceVerbose($"Processor task with date as {telemetryDate} is canceled as it did not finish in {timeout}"); + // Operation cancelled + throw new OperationCanceledException($"Processor task with date as {telemetryDate} is canceled as it did not finish in {timeout}"); + } + else + { + // Cancel the timer task so that it does not fire + tokenForDelayTask.Cancel(); + } + } + } + /// /// Collects Cache Telemetry Information. /// diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs index 47232a187e..2aeaadca63 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs @@ -79,13 +79,12 @@ internal static class ClientTelemetryOptions internal const double Percentile99 = 99.0; internal const double Percentile999 = 99.9; internal const string DateFormat = "yyyy-MM-ddTHH:mm:ssZ"; - internal const string EnvPropsClientTelemetrySchedulingInSeconds = "COSMOS.CLIENT_TELEMETRY_SCHEDULING_IN_SECONDS"; internal const string EnvPropsClientTelemetryEnabled = "COSMOS.CLIENT_TELEMETRY_ENABLED"; internal const string EnvPropsClientTelemetryVmMetadataUrl = "COSMOS.VM_METADATA_URL"; internal const string EnvPropsClientTelemetryEndpoint = "COSMOS.CLIENT_TELEMETRY_ENDPOINT"; internal const string EnvPropsClientTelemetryEnvironmentName = "COSMOS.ENVIRONMENT_NAME"; - + internal static readonly ResourceType AllowedResourceTypes = ResourceType.Document; // Why 5 sec? As of now, if any network request is taking more than 5 millisecond sec, we will consider it slow request this value can be revisited in future internal static readonly TimeSpan NetworkLatencyThreshold = TimeSpan.FromMilliseconds(5); @@ -101,7 +100,8 @@ internal static class ClientTelemetryOptions internal static readonly int NetworkTelemetrySampleSize = 200; internal static int PayloadSizeThreshold = 1024 * 1024 * 2; // 2MB - + internal static TimeSpan ClientTelemetryProcessorTimeOut = TimeSpan.FromMinutes(5); + private static Uri clientTelemetryEndpoint; private static string environmentName; private static TimeSpan scheduledTimeSpan = TimeSpan.Zero; diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ClientTelemetryTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ClientTelemetryTests.cs index 7e924ffd53..30edc7ebf7 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ClientTelemetryTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ClientTelemetryTests.cs @@ -1075,33 +1075,6 @@ private static void AssertCacheRefreshInfoInformation( } } - [TestMethod] - public async Task CheckMisconfiguredTelemetryEndpoint_should_stop_the_job() - { - int retryCounter = 0; - HttpClientHandlerHelper customHttpHandler = new HttpClientHandlerHelper - { - RequestCallBack = (request, cancellation) => - { - if (request.RequestUri.AbsoluteUri.Equals(ClientTelemetryOptions.GetClientTelemetryEndpoint().AbsoluteUri)) - { - retryCounter++; - throw new Exception("Exception while sending telemetry"); - } - - return null; - } - }; - - Container container = await this.CreateClientAndContainer( - mode: ConnectionMode.Direct, - customHttpHandler: customHttpHandler); - - await Task.Delay(TimeSpan.FromMilliseconds(5000)); // wait for 5 sec, ideally telemetry would be sent 5 times but client telemetry endpoint is not functional (in this test), it should try 3 times maximum and after that client telemetry job should be stopped. - - Assert.AreEqual(3, retryCounter); - } - private static ItemBatchOperation CreateItem(string itemId) { var testItem = new { id = itemId, Status = itemId }; diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Telemetry/ClientTelemetryTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Telemetry/ClientTelemetryTests.cs index 4a6b077baa..9308d22806 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Telemetry/ClientTelemetryTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Telemetry/ClientTelemetryTests.cs @@ -30,6 +30,7 @@ public class ClientTelemetryTests public void Cleanup() { Environment.SetEnvironmentVariable(ClientTelemetryOptions.EnvPropsClientTelemetryEnabled, null); + Environment.SetEnvironmentVariable(ClientTelemetryOptions.EnvPropsClientTelemetryEndpoint, null); } [TestMethod] @@ -147,7 +148,6 @@ public async Task CheckIfPayloadIsDividedCorrectlyAsync(int expectedOperationInf string payloadJson = request.Content.ReadAsStringAsync().Result; Assert.IsTrue(payloadJson.Length <= ClientTelemetryOptions.PayloadSizeThreshold, "Payload Size is " + payloadJson.Length); - Console.WriteLine(payloadJson); ClientTelemetryProperties propertiesToSend = JsonConvert.DeserializeObject(payloadJson); Assert.AreEqual(7, propertiesToSend.SystemInfo.Count, "System Info is not correct"); @@ -245,14 +245,114 @@ await processor.ProcessAndSendAsync( clientTelemetryProperties, operationInfoSnapshot, cacheRefreshInfoSnapshot, - requestInfoList, - new CancellationToken()); + requestInfoList, + new CancellationTokenSource(ClientTelemetryOptions.ClientTelemetryProcessorTimeOut).Token); Assert.AreEqual(expectedOperationInfoSize, actualOperationInfoSize, "Operation Info is not correct"); Assert.AreEqual(expectedCacheRefreshInfoSize, actualCacheRefreshInfoSize, "Cache Refresh Info is not correct"); Assert.AreEqual(expectedRequestInfoSize, actualRequestInfoSize, "Request Info is not correct"); } - + + [TestMethod] + public async Task ClientTelmetryProcessor_should_timeout() + { + Environment.SetEnvironmentVariable(ClientTelemetryOptions.EnvPropsClientTelemetryEndpoint, "http://dummy.telemetry.endpoint/"); + + string data = File.ReadAllText("Telemetry/ClientTelemetryPayloadWithoutMetrics.json", Encoding.UTF8); + ClientTelemetryProperties clientTelemetryProperties = JsonConvert.DeserializeObject(data); + + int actualOperationInfoSize = 0; + int actualCacheRefreshInfoSize = 0; + + Mock mockHttpHandler = new Mock(); + _ = mockHttpHandler.Setup(x => x.SendAsync( + It.IsAny(), + It.IsAny())) + .Callback( + (request, cancellationToken) => + { + string payloadJson = request.Content.ReadAsStringAsync().Result; + Assert.IsTrue(payloadJson.Length <= ClientTelemetryOptions.PayloadSizeThreshold, "Payload Size is " + payloadJson.Length); + + ClientTelemetryProperties propertiesToSend = JsonConvert.DeserializeObject(payloadJson); + + actualOperationInfoSize += propertiesToSend.OperationInfo?.Count ?? 0; + actualCacheRefreshInfoSize += propertiesToSend.CacheRefreshInfo?.Count ?? 0; + }) + .Returns(Task.FromResult(new HttpResponseMessage(HttpStatusCode.OK))); + + ClientTelemetryProcessor processor = new ClientTelemetryProcessor( + MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient(new HttpHandlerHelper(mockHttpHandler.Object))), + Mock.Of()); + + ConcurrentDictionary operationInfoSnapshot + = new ConcurrentDictionary(); + + for (int i = 0; i < 20; i++) + { + OperationInfo opeInfo = new OperationInfo(Regions.WestUS, + 0, + Documents.ConsistencyLevel.Session.ToString(), + "databaseName" + i, + "containerName", + Documents.OperationType.Read, + Documents.ResourceType.Document, + 200, + 0); + + LongConcurrentHistogram latency = new LongConcurrentHistogram(ClientTelemetryOptions.RequestLatencyMin, + ClientTelemetryOptions.RequestLatencyMax, + ClientTelemetryOptions.RequestLatencyPrecision); + latency.RecordValue(10); + + LongConcurrentHistogram requestcharge = new LongConcurrentHistogram(ClientTelemetryOptions.RequestChargeMin, + ClientTelemetryOptions.RequestChargeMax, + ClientTelemetryOptions.RequestChargePrecision); + requestcharge.RecordValue(11); + + operationInfoSnapshot.TryAdd(opeInfo, (latency, requestcharge)); + } + + ConcurrentDictionary cacheRefreshInfoSnapshot + = new ConcurrentDictionary(); + for (int i = 0; i < 10; i++) + { + CacheRefreshInfo crInfo = new CacheRefreshInfo(Regions.WestUS, + 10, + Documents.ConsistencyLevel.Session.ToString(), + "databaseName" + i, + "containerName", + Documents.OperationType.Read, + Documents.ResourceType.Document, + 200, + 1002, + "dummycache"); + + LongConcurrentHistogram latency = new LongConcurrentHistogram(ClientTelemetryOptions.RequestLatencyMin, + ClientTelemetryOptions.RequestLatencyMax, + ClientTelemetryOptions.RequestLatencyPrecision); + latency.RecordValue(10); + + cacheRefreshInfoSnapshot.TryAdd(crInfo, latency); + } + + Task processorTask = Task.Run(async () => + { + CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(1)); + await Task.Delay(1000, cts.Token); // Making this task wait to ensure that processir is taking more time. + await processor.ProcessAndSendAsync(clientTelemetryProperties, + operationInfoSnapshot, + cacheRefreshInfoSnapshot, + default, + cts.Token); + }); + + await Assert.ThrowsExceptionAsync(() => ClientTelemetry.RunProcessorTaskAsync( + telemetryDate: DateTime.Now.ToString(), + processingTask: processorTask, + timeout: TimeSpan.FromTicks(1))); + } + [TestMethod] [ExpectedException(typeof(FormatException))] public void CheckMisconfiguredTelemetry_should_fail()