Skip to content

Commit

Permalink
[Internal] CTL: Fixes Reservoir Sampling Logic (#3712)
Browse files Browse the repository at this point in the history
* Code changes to fix the reservoir sampling logic in CTL

* Code changes to modify help text on reservoir type.

* Code changes to address minor code refactor.
  • Loading branch information
kundadebdatta committed Feb 17, 2023
1 parent 67e1a90 commit b257f8e
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 10 deletions.
9 changes: 8 additions & 1 deletion Microsoft.Azure.Cosmos.Samples/Tools/CTL/CTLConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace CosmosCTL
using CommandLine.Text;
using Microsoft.Azure.Cosmos;
using Newtonsoft.Json;
using static CosmosCTL.ReservoirProvider;

public class CTLConfig
{
Expand Down Expand Up @@ -76,7 +77,7 @@ public string DiagnosticsThresholdDuration
}

[Option("ctl_content_response_on_write", Required = false, HelpText = "Should return content response on writes")]
public bool IsContentResponseOnWriteEnabled { get; set; } = true;
public bool? IsContentResponseOnWriteEnabled { get; set; } = true;

[Option("ctl_output_event_traces", Required = false, HelpText = "Outputs TraceSource to console")]
public bool OutputEventTraces { get; set; } = false;
Expand All @@ -102,6 +103,12 @@ public string DiagnosticsThresholdDuration
[Option("ctl_telemetry_schedule_in_sec", Required = false, HelpText = "telemetry task schedule time in sec")]
public string TelemetryScheduleInSeconds { get; set; }

[Option("ctl_reservoir_type", Required = false, HelpText = "Defines the reservoir type. Valid values are: Uniform, SlidingWindow and ExponentialDecay. The default value is SlidingWindow.")]
public ReservoirTypes ReservoirType { get; set; } = ReservoirTypes.SlidingWindow;

[Option("ctl_reservoir_sample_size", Required = false, HelpText = "The reservoir sample size.")]
public int ReservoirSampleSize { get; set; } = 1028;

internal TimeSpan RunningTimeDurationAsTimespan { get; private set; } = TimeSpan.FromHours(10);
internal TimeSpan DiagnosticsThresholdDurationAsTimespan { get; private set; } = TimeSpan.FromSeconds(60);

Expand Down
49 changes: 49 additions & 0 deletions Microsoft.Azure.Cosmos.Samples/Tools/CTL/ReservoirProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace CosmosCTL
{
using System;
using App.Metrics.ReservoirSampling;

/// <summary>
/// Returns the <see cref="IReservoir"/> based on the CTL configuration.
/// </summary>
public class ReservoirProvider
{
/// <summary>
/// Create and returns a new instance of the <see cref="IReservoir"/> based on the CTL configuration.
/// </summary>
/// <param name="ctlConfig">An instance of <see cref="CTLConfig"/> containing the CTL config params.</param>
/// <returns>An implementation of <see cref="IReservoir"/>.</returns>
public static IReservoir GetReservoir(CTLConfig ctlConfig)
{
return ctlConfig.ReservoirType switch
{
ReservoirTypes.Uniform => new App.Metrics.ReservoirSampling.Uniform.DefaultAlgorithmRReservoir(
sampleSize: ctlConfig.ReservoirSampleSize),

ReservoirTypes.SlidingWindow => new App.Metrics.ReservoirSampling.SlidingWindow.DefaultSlidingWindowReservoir(
sampleSize: ctlConfig.ReservoirSampleSize),

ReservoirTypes.ExponentialDecay => new App.Metrics.ReservoirSampling.ExponentialDecay.DefaultForwardDecayingReservoir(
sampleSize: ctlConfig.ReservoirSampleSize,
alpha: 0.015),

_ => throw new ArgumentException(
message: "Invalid ReservoirType Specified."),
};
}

/// <summary>
/// An enum containing different reservoir types.
/// </summary>
public enum ReservoirTypes
{
Uniform,
SlidingWindow,
ExponentialDecay
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public async Task RunAsync(
DurationUnit = TimeUnit.Milliseconds,
RateUnit = TimeUnit.Seconds,
Context = loggingContextIdentifier,
Reservoir = () => new App.Metrics.ReservoirSampling.Uniform.DefaultAlgorithmRReservoir()
Reservoir = () => ReservoirProvider.GetReservoir(config)
};

Container container = cosmosClient.GetContainer(config.Database, config.Collection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public async Task RunAsync(
DurationUnit = TimeUnit.Milliseconds,
RateUnit = TimeUnit.Seconds,
Context = loggingContextIdentifier,
Reservoir = () => new App.Metrics.ReservoirSampling.Uniform.DefaultAlgorithmRReservoir()
Reservoir = () => ReservoirProvider.GetReservoir(config)
};

Container container = cosmosClient.GetContainer(config.Database, config.Collection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,34 +105,34 @@ private async Task ExecuteOperationsAsync(
CounterOptions querySuccessMeter = new CounterOptions { Name = "#Query Successful Operations", Context = loggingContextIdentifier };
CounterOptions queryFailureMeter = new CounterOptions { Name = "#Query Unsuccessful Operations", Context = loggingContextIdentifier };

TimerOptions readLatencyTimer = new TimerOptions
TimerOptions readLatencyTimer = new()
{
Name = "Read latency",
MeasurementUnit = Unit.Requests,
DurationUnit = TimeUnit.Milliseconds,
RateUnit = TimeUnit.Seconds,
Context = loggingContextIdentifier,
Reservoir = () => new App.Metrics.ReservoirSampling.Uniform.DefaultAlgorithmRReservoir()
Reservoir = () => ReservoirProvider.GetReservoir(config)
};

TimerOptions writeLatencyTimer = new TimerOptions
TimerOptions writeLatencyTimer = new ()
{
Name = "Write latency",
MeasurementUnit = Unit.Requests,
DurationUnit = TimeUnit.Milliseconds,
RateUnit = TimeUnit.Seconds,
Context = loggingContextIdentifier,
Reservoir = () => new App.Metrics.ReservoirSampling.Uniform.DefaultAlgorithmRReservoir()
Reservoir = () => ReservoirProvider.GetReservoir(config)
};

TimerOptions queryLatencyTimer = new TimerOptions
TimerOptions queryLatencyTimer = new ()
{
Name = "Query latency",
MeasurementUnit = Unit.Requests,
DurationUnit = TimeUnit.Milliseconds,
RateUnit = TimeUnit.Seconds,
Context = loggingContextIdentifier,
Reservoir = () => new App.Metrics.ReservoirSampling.Uniform.DefaultAlgorithmRReservoir()
Reservoir = () => ReservoirProvider.GetReservoir(config)
};

SemaphoreSlim concurrencyControlSemaphore = new SemaphoreSlim(config.Concurrency);
Expand Down Expand Up @@ -178,7 +178,7 @@ private async Task ExecuteOperationsAsync(
operation: i,
partitionKeyAttributeName: config.CollectionPartitionKey,
containers: initializationResult.Containers,
isContentResponseOnWriteEnabled: config.IsContentResponseOnWriteEnabled)),
isContentResponseOnWriteEnabled: config.IsContentResponseOnWriteEnabled.Value)),
onSuccess: () =>
{
concurrencyControlSemaphore.Release();
Expand Down

0 comments on commit b257f8e

Please sign in to comment.