diff --git a/Microsoft.Azure.Cosmos.Samples/Tools/CTL/CTLConfig.cs b/Microsoft.Azure.Cosmos.Samples/Tools/CTL/CTLConfig.cs
index ae930dd7ba..05064003ce 100644
--- a/Microsoft.Azure.Cosmos.Samples/Tools/CTL/CTLConfig.cs
+++ b/Microsoft.Azure.Cosmos.Samples/Tools/CTL/CTLConfig.cs
@@ -11,6 +11,7 @@ namespace CosmosCTL
using CommandLine.Text;
using Microsoft.Azure.Cosmos;
using Newtonsoft.Json;
+ using static CosmosCTL.ReservoirProvider;
public class CTLConfig
{
@@ -76,7 +77,7 @@ public string DiagnosticsThresholdDuration
}
[Option("ctl_content_response_on_write", Required = false, HelpText = "Should return content response on writes")]
- public bool IsContentResponseOnWriteEnabled { get; set; } = true;
+ public bool? IsContentResponseOnWriteEnabled { get; set; } = true;
[Option("ctl_output_event_traces", Required = false, HelpText = "Outputs TraceSource to console")]
public bool OutputEventTraces { get; set; } = false;
@@ -102,6 +103,12 @@ public string DiagnosticsThresholdDuration
[Option("ctl_telemetry_schedule_in_sec", Required = false, HelpText = "telemetry task schedule time in sec")]
public string TelemetryScheduleInSeconds { get; set; }
+ [Option("ctl_reservoir_type", Required = false, HelpText = "Defines the reservoir type. Valid values are: Uniform, SlidingWindow and ExponentialDecay. The default value is SlidingWindow.")]
+ public ReservoirTypes ReservoirType { get; set; } = ReservoirTypes.SlidingWindow;
+
+ [Option("ctl_reservoir_sample_size", Required = false, HelpText = "The reservoir sample size.")]
+ public int ReservoirSampleSize { get; set; } = 1028;
+
internal TimeSpan RunningTimeDurationAsTimespan { get; private set; } = TimeSpan.FromHours(10);
internal TimeSpan DiagnosticsThresholdDurationAsTimespan { get; private set; } = TimeSpan.FromSeconds(60);
diff --git a/Microsoft.Azure.Cosmos.Samples/Tools/CTL/ReservoirProvider.cs b/Microsoft.Azure.Cosmos.Samples/Tools/CTL/ReservoirProvider.cs
new file mode 100644
index 0000000000..9f655bbd0d
--- /dev/null
+++ b/Microsoft.Azure.Cosmos.Samples/Tools/CTL/ReservoirProvider.cs
@@ -0,0 +1,49 @@
+//------------------------------------------------------------
+// Copyright (c) Microsoft Corporation. All rights reserved.
+//------------------------------------------------------------
+
+namespace CosmosCTL
+{
+ using System;
+ using App.Metrics.ReservoirSampling;
+
+ ///
+ /// Returns the based on the CTL configuration.
+ ///
+ public class ReservoirProvider
+ {
+ ///
+ /// Create and returns a new instance of the based on the CTL configuration.
+ ///
+ /// An instance of containing the CTL config params.
+ /// An implementation of .
+ public static IReservoir GetReservoir(CTLConfig ctlConfig)
+ {
+ return ctlConfig.ReservoirType switch
+ {
+ ReservoirTypes.Uniform => new App.Metrics.ReservoirSampling.Uniform.DefaultAlgorithmRReservoir(
+ sampleSize: ctlConfig.ReservoirSampleSize),
+
+ ReservoirTypes.SlidingWindow => new App.Metrics.ReservoirSampling.SlidingWindow.DefaultSlidingWindowReservoir(
+ sampleSize: ctlConfig.ReservoirSampleSize),
+
+ ReservoirTypes.ExponentialDecay => new App.Metrics.ReservoirSampling.ExponentialDecay.DefaultForwardDecayingReservoir(
+ sampleSize: ctlConfig.ReservoirSampleSize,
+ alpha: 0.015),
+
+ _ => throw new ArgumentException(
+ message: "Invalid ReservoirType Specified."),
+ };
+ }
+
+ ///
+ /// An enum containing different reservoir types.
+ ///
+ public enum ReservoirTypes
+ {
+ Uniform,
+ SlidingWindow,
+ ExponentialDecay
+ }
+ }
+}
diff --git a/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ChangeFeedPullScenario.cs b/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ChangeFeedPullScenario.cs
index d549f398d1..65c40864f7 100644
--- a/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ChangeFeedPullScenario.cs
+++ b/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ChangeFeedPullScenario.cs
@@ -64,7 +64,7 @@ public async Task RunAsync(
DurationUnit = TimeUnit.Milliseconds,
RateUnit = TimeUnit.Seconds,
Context = loggingContextIdentifier,
- Reservoir = () => new App.Metrics.ReservoirSampling.Uniform.DefaultAlgorithmRReservoir()
+ Reservoir = () => ReservoirProvider.GetReservoir(config)
};
Container container = cosmosClient.GetContainer(config.Database, config.Collection);
diff --git a/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ReadManyScenario.cs b/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ReadManyScenario.cs
index ae53acb07f..09a84530a6 100644
--- a/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ReadManyScenario.cs
+++ b/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ReadManyScenario.cs
@@ -76,7 +76,7 @@ public async Task RunAsync(
DurationUnit = TimeUnit.Milliseconds,
RateUnit = TimeUnit.Seconds,
Context = loggingContextIdentifier,
- Reservoir = () => new App.Metrics.ReservoirSampling.Uniform.DefaultAlgorithmRReservoir()
+ Reservoir = () => ReservoirProvider.GetReservoir(config)
};
Container container = cosmosClient.GetContainer(config.Database, config.Collection);
diff --git a/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ReadWriteQueryScenario.cs b/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ReadWriteQueryScenario.cs
index fe15e4efaa..db85445bef 100644
--- a/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ReadWriteQueryScenario.cs
+++ b/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ReadWriteQueryScenario.cs
@@ -105,34 +105,34 @@ private async Task ExecuteOperationsAsync(
CounterOptions querySuccessMeter = new CounterOptions { Name = "#Query Successful Operations", Context = loggingContextIdentifier };
CounterOptions queryFailureMeter = new CounterOptions { Name = "#Query Unsuccessful Operations", Context = loggingContextIdentifier };
- TimerOptions readLatencyTimer = new TimerOptions
+ TimerOptions readLatencyTimer = new()
{
Name = "Read latency",
MeasurementUnit = Unit.Requests,
DurationUnit = TimeUnit.Milliseconds,
RateUnit = TimeUnit.Seconds,
Context = loggingContextIdentifier,
- Reservoir = () => new App.Metrics.ReservoirSampling.Uniform.DefaultAlgorithmRReservoir()
+ Reservoir = () => ReservoirProvider.GetReservoir(config)
};
- TimerOptions writeLatencyTimer = new TimerOptions
+ TimerOptions writeLatencyTimer = new ()
{
Name = "Write latency",
MeasurementUnit = Unit.Requests,
DurationUnit = TimeUnit.Milliseconds,
RateUnit = TimeUnit.Seconds,
Context = loggingContextIdentifier,
- Reservoir = () => new App.Metrics.ReservoirSampling.Uniform.DefaultAlgorithmRReservoir()
+ Reservoir = () => ReservoirProvider.GetReservoir(config)
};
- TimerOptions queryLatencyTimer = new TimerOptions
+ TimerOptions queryLatencyTimer = new ()
{
Name = "Query latency",
MeasurementUnit = Unit.Requests,
DurationUnit = TimeUnit.Milliseconds,
RateUnit = TimeUnit.Seconds,
Context = loggingContextIdentifier,
- Reservoir = () => new App.Metrics.ReservoirSampling.Uniform.DefaultAlgorithmRReservoir()
+ Reservoir = () => ReservoirProvider.GetReservoir(config)
};
SemaphoreSlim concurrencyControlSemaphore = new SemaphoreSlim(config.Concurrency);
@@ -178,7 +178,7 @@ private async Task ExecuteOperationsAsync(
operation: i,
partitionKeyAttributeName: config.CollectionPartitionKey,
containers: initializationResult.Containers,
- isContentResponseOnWriteEnabled: config.IsContentResponseOnWriteEnabled)),
+ isContentResponseOnWriteEnabled: config.IsContentResponseOnWriteEnabled.Value)),
onSuccess: () =>
{
concurrencyControlSemaphore.Release();