From b257f8e9b14ee1eae243b2c462d7582dbf5fb07f Mon Sep 17 00:00:00 2001 From: Debdatta Kunda <87335885+kundadebdatta@users.noreply.github.com> Date: Fri, 17 Feb 2023 15:26:51 -0800 Subject: [PATCH] [Internal] CTL: Fixes Reservoir Sampling Logic (#3712) * Code changes to fix the reservoir sampling logic in CTL * Code changes to modify help text on reservoir type. * Code changes to address minor code refactor. --- .../Tools/CTL/CTLConfig.cs | 9 +++- .../Tools/CTL/ReservoirProvider.cs | 49 +++++++++++++++++++ .../CTL/Scenarios/ChangeFeedPullScenario.cs | 2 +- .../Tools/CTL/Scenarios/ReadManyScenario.cs | 2 +- .../CTL/Scenarios/ReadWriteQueryScenario.cs | 14 +++--- 5 files changed, 66 insertions(+), 10 deletions(-) create mode 100644 Microsoft.Azure.Cosmos.Samples/Tools/CTL/ReservoirProvider.cs diff --git a/Microsoft.Azure.Cosmos.Samples/Tools/CTL/CTLConfig.cs b/Microsoft.Azure.Cosmos.Samples/Tools/CTL/CTLConfig.cs index ae930dd7ba..05064003ce 100644 --- a/Microsoft.Azure.Cosmos.Samples/Tools/CTL/CTLConfig.cs +++ b/Microsoft.Azure.Cosmos.Samples/Tools/CTL/CTLConfig.cs @@ -11,6 +11,7 @@ namespace CosmosCTL using CommandLine.Text; using Microsoft.Azure.Cosmos; using Newtonsoft.Json; + using static CosmosCTL.ReservoirProvider; public class CTLConfig { @@ -76,7 +77,7 @@ public string DiagnosticsThresholdDuration } [Option("ctl_content_response_on_write", Required = false, HelpText = "Should return content response on writes")] - public bool IsContentResponseOnWriteEnabled { get; set; } = true; + public bool? IsContentResponseOnWriteEnabled { get; set; } = true; [Option("ctl_output_event_traces", Required = false, HelpText = "Outputs TraceSource to console")] public bool OutputEventTraces { get; set; } = false; @@ -102,6 +103,12 @@ public string DiagnosticsThresholdDuration [Option("ctl_telemetry_schedule_in_sec", Required = false, HelpText = "telemetry task schedule time in sec")] public string TelemetryScheduleInSeconds { get; set; } + [Option("ctl_reservoir_type", Required = false, HelpText = "Defines the reservoir type. Valid values are: Uniform, SlidingWindow and ExponentialDecay. The default value is SlidingWindow.")] + public ReservoirTypes ReservoirType { get; set; } = ReservoirTypes.SlidingWindow; + + [Option("ctl_reservoir_sample_size", Required = false, HelpText = "The reservoir sample size.")] + public int ReservoirSampleSize { get; set; } = 1028; + internal TimeSpan RunningTimeDurationAsTimespan { get; private set; } = TimeSpan.FromHours(10); internal TimeSpan DiagnosticsThresholdDurationAsTimespan { get; private set; } = TimeSpan.FromSeconds(60); diff --git a/Microsoft.Azure.Cosmos.Samples/Tools/CTL/ReservoirProvider.cs b/Microsoft.Azure.Cosmos.Samples/Tools/CTL/ReservoirProvider.cs new file mode 100644 index 0000000000..9f655bbd0d --- /dev/null +++ b/Microsoft.Azure.Cosmos.Samples/Tools/CTL/ReservoirProvider.cs @@ -0,0 +1,49 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace CosmosCTL +{ + using System; + using App.Metrics.ReservoirSampling; + + /// + /// Returns the based on the CTL configuration. + /// + public class ReservoirProvider + { + /// + /// Create and returns a new instance of the based on the CTL configuration. + /// + /// An instance of containing the CTL config params. + /// An implementation of . + public static IReservoir GetReservoir(CTLConfig ctlConfig) + { + return ctlConfig.ReservoirType switch + { + ReservoirTypes.Uniform => new App.Metrics.ReservoirSampling.Uniform.DefaultAlgorithmRReservoir( + sampleSize: ctlConfig.ReservoirSampleSize), + + ReservoirTypes.SlidingWindow => new App.Metrics.ReservoirSampling.SlidingWindow.DefaultSlidingWindowReservoir( + sampleSize: ctlConfig.ReservoirSampleSize), + + ReservoirTypes.ExponentialDecay => new App.Metrics.ReservoirSampling.ExponentialDecay.DefaultForwardDecayingReservoir( + sampleSize: ctlConfig.ReservoirSampleSize, + alpha: 0.015), + + _ => throw new ArgumentException( + message: "Invalid ReservoirType Specified."), + }; + } + + /// + /// An enum containing different reservoir types. + /// + public enum ReservoirTypes + { + Uniform, + SlidingWindow, + ExponentialDecay + } + } +} diff --git a/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ChangeFeedPullScenario.cs b/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ChangeFeedPullScenario.cs index d549f398d1..65c40864f7 100644 --- a/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ChangeFeedPullScenario.cs +++ b/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ChangeFeedPullScenario.cs @@ -64,7 +64,7 @@ public async Task RunAsync( DurationUnit = TimeUnit.Milliseconds, RateUnit = TimeUnit.Seconds, Context = loggingContextIdentifier, - Reservoir = () => new App.Metrics.ReservoirSampling.Uniform.DefaultAlgorithmRReservoir() + Reservoir = () => ReservoirProvider.GetReservoir(config) }; Container container = cosmosClient.GetContainer(config.Database, config.Collection); diff --git a/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ReadManyScenario.cs b/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ReadManyScenario.cs index ae53acb07f..09a84530a6 100644 --- a/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ReadManyScenario.cs +++ b/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ReadManyScenario.cs @@ -76,7 +76,7 @@ public async Task RunAsync( DurationUnit = TimeUnit.Milliseconds, RateUnit = TimeUnit.Seconds, Context = loggingContextIdentifier, - Reservoir = () => new App.Metrics.ReservoirSampling.Uniform.DefaultAlgorithmRReservoir() + Reservoir = () => ReservoirProvider.GetReservoir(config) }; Container container = cosmosClient.GetContainer(config.Database, config.Collection); diff --git a/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ReadWriteQueryScenario.cs b/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ReadWriteQueryScenario.cs index fe15e4efaa..db85445bef 100644 --- a/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ReadWriteQueryScenario.cs +++ b/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ReadWriteQueryScenario.cs @@ -105,34 +105,34 @@ private async Task ExecuteOperationsAsync( CounterOptions querySuccessMeter = new CounterOptions { Name = "#Query Successful Operations", Context = loggingContextIdentifier }; CounterOptions queryFailureMeter = new CounterOptions { Name = "#Query Unsuccessful Operations", Context = loggingContextIdentifier }; - TimerOptions readLatencyTimer = new TimerOptions + TimerOptions readLatencyTimer = new() { Name = "Read latency", MeasurementUnit = Unit.Requests, DurationUnit = TimeUnit.Milliseconds, RateUnit = TimeUnit.Seconds, Context = loggingContextIdentifier, - Reservoir = () => new App.Metrics.ReservoirSampling.Uniform.DefaultAlgorithmRReservoir() + Reservoir = () => ReservoirProvider.GetReservoir(config) }; - TimerOptions writeLatencyTimer = new TimerOptions + TimerOptions writeLatencyTimer = new () { Name = "Write latency", MeasurementUnit = Unit.Requests, DurationUnit = TimeUnit.Milliseconds, RateUnit = TimeUnit.Seconds, Context = loggingContextIdentifier, - Reservoir = () => new App.Metrics.ReservoirSampling.Uniform.DefaultAlgorithmRReservoir() + Reservoir = () => ReservoirProvider.GetReservoir(config) }; - TimerOptions queryLatencyTimer = new TimerOptions + TimerOptions queryLatencyTimer = new () { Name = "Query latency", MeasurementUnit = Unit.Requests, DurationUnit = TimeUnit.Milliseconds, RateUnit = TimeUnit.Seconds, Context = loggingContextIdentifier, - Reservoir = () => new App.Metrics.ReservoirSampling.Uniform.DefaultAlgorithmRReservoir() + Reservoir = () => ReservoirProvider.GetReservoir(config) }; SemaphoreSlim concurrencyControlSemaphore = new SemaphoreSlim(config.Concurrency); @@ -178,7 +178,7 @@ private async Task ExecuteOperationsAsync( operation: i, partitionKeyAttributeName: config.CollectionPartitionKey, containers: initializationResult.Containers, - isContentResponseOnWriteEnabled: config.IsContentResponseOnWriteEnabled)), + isContentResponseOnWriteEnabled: config.IsContentResponseOnWriteEnabled.Value)), onSuccess: () => { concurrencyControlSemaphore.Release();