Skip to content

Commit

Permalink
Add IQueueState inspector
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Apr 20, 2018
1 parent 579a0cf commit dcd24ac
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 71 deletions.
21 changes: 13 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,25 @@ The default memory buffer feeding the worker thread is capped to 10,000 items, a
.WriteTo.Async(a => a.File("logs/myapp.log"), bufferSize: 500)
```

### Monitoring
### Health Monitoring via the Buffer Inspection interface

Typically, one should assign adequate buffer capacity to enable the wrapped sinks to ingest the events as they are processed without ever approaching the limit. In order to gain awareness of the processing backlog becoming abnormal, it's possible to instrument the Async sink by suppling a `monitor` callback that allows for periodic inspection of the backlog
The Async wrapper is primarily intended to allow one to achieve minimal logging latency at all times, even when writing to sinks that may momentarily block during the course of their processing (e.g., a File sink might block for a low number of ms while flushing). The dropping behavior is an important failsafe in that it avoids having an unbounded buffering behaviour should logging frequency overwhelm the sink, or the sink ingestion throughput degrades to a major degree.

In practice, this configuration (assuming one provisions an adequate `bufferSize`) achieves an efficient and resilient logging configuration that can handle load gracefully. The key risk is of course that events may be dropped when the buffer threshold gets breached. The `inspector` allows one to arrange for your Application's health monitoring mechanism to actively validate that the buffer allocation is not being exceeded in practice.

```csharp
void LogBufferMonitor(buffer : BlockingQueue<Serilog.Events.LogEvent> queue)
// Example check: log message to an out of band alarm channel if logging is showing signs of getting overwhelmed
void PeriodicMonitorCheck(IQueueState inspector)
{
var usagePct = queue.Count * 100 / queue.BoundedCapacity;
if (usagePct > 50) SelfLog.WriteLine("Log buffer exceeded {0:p0} usage (limit: {1})", usage, queue.BoundedCapacity);
var usagePct = inspector.Count * 100 / inspector.BoundedCapacity;
if (usagePct > 50) SelfLog.WriteLine("Log buffer exceeded {0:p0} usage (limit: {1})", usagePct, inspector.BoundedCapacity);
}

// Wait for any queued event to be accepted by the `File` log before allowing the calling thread
// to resume its application work after a logging call when there are 10,000 LogEvents waiting
.WriteTo.Async(a => a.File("logs/myapp.log"), monitorIntervalSeconds: 60, monitor: LogBufferMonitor)
// Allow a backlog of up to 10,000 items to be maintained (dropping extras if full)
.WriteTo.Async(a => a.File("logs/myapp.log"), inspector: out IQueueState inspector) ...

// Wire the inspector through to health monitoring and/or metrics in order to periodically emit a metric, raise an alarm, etc.
... healthMonitoring.RegisterCheck(() => new PeriodicMonitorCheck(inspector));
```

### Blocking
Expand Down
60 changes: 54 additions & 6 deletions src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,70 @@ public static LoggerConfiguration Async(
/// <paramref name="blockWhenFull"/> the queue will block or subsequent events will be dropped until
/// room is made in the queue.</param>
/// <param name="blockWhenFull">Block when the queue is full, instead of dropping events.</param>
/// <param name="monitorIntervalSeconds">Interval between invocations of <paramref name="monitor"/>.</param>
/// <param name="monitor">Callback to facilitate health checking the internal queue. Frequency is controlled by <paramref name="monitorIntervalSeconds"/>.</param>
/// <returns>A <see cref="LoggerConfiguration"/> allowing configuration to continue.</returns>
public static LoggerConfiguration Async(
this LoggerSinkConfiguration loggerSinkConfiguration,
Action<LoggerSinkConfiguration> configure,
int bufferSize = 10000,
bool blockWhenFull = false,
int monitorIntervalSeconds = 10,
Action<System.Collections.Concurrent.BlockingCollection<Events.LogEvent>> monitor = null)
bool blockWhenFull = false)
{
return LoggerSinkConfiguration.Wrap(
loggerSinkConfiguration,
wrappedSink => new BackgroundWorkerSink(wrappedSink, bufferSize, blockWhenFull, monitorIntervalSeconds, monitor),
wrappedSink => new BackgroundWorkerSink(wrappedSink, bufferSize, blockWhenFull),
configure);
}

/// <summary>
/// Configure a sink to be invoked asynchronously, on a background worker thread.
/// Provides an <paramref name="inspector"/> that can be used to check the live state of the buffer for health monitoring purposes.
/// </summary>
/// <param name="loggerSinkConfiguration">The <see cref="LoggerSinkConfiguration"/> being configured.</param>
/// <param name="configure">An action that configures the wrapped sink.</param>
/// <param name="bufferSize">The size of the concurrent queue used to feed the background worker thread. If
/// the thread is unable to process events quickly enough and the queue is filled, depending on
/// <paramref name="blockWhenFull"/> the queue will block or subsequent events will be dropped until
/// room is made in the queue.</param>
/// <param name="blockWhenFull">Block when the queue is full, instead of dropping events.</param>
/// <param name="inspector">Provides a way to inspect the state of the queue for health monitoring purposes.</param>
/// <returns>A <see cref="LoggerConfiguration"/> allowing configuration to continue.</returns>
public static LoggerConfiguration Async(
this LoggerSinkConfiguration loggerSinkConfiguration,
Action<LoggerSinkConfiguration> configure,
out IQueueState inspector,
int bufferSize = 10000,
bool blockWhenFull = false)
{
// Cannot assign directly to the out param from within the lambda, so we need a temp
IQueueState stateLens = null;
var result = LoggerSinkConfiguration.Wrap(
loggerSinkConfiguration,
wrappedSink =>
{
var sink = new BackgroundWorkerSink(wrappedSink, bufferSize, blockWhenFull);
stateLens = sink;
return sink;
},
configure);
inspector = stateLens;
return result;
}
}

/// <summary>
/// Provides a way to inspect the current state of Async wrapper's ingestion queue.
/// </summary>
public interface IQueueState
{
/// <summary>
/// Count of items currently awaiting ingestion.
/// </summary>
/// <exception cref="T:System.ObjectDisposedException">The Sink has been disposed.</exception>
int Count { get; }

/// <summary>
/// Maximum number of items permitted to be held in the buffer awaiting ingestion.
/// </summary>
/// <exception cref="T:System.ObjectDisposedException">The Sink has been disposed.</exception>
int BufferSize { get; }
}
}
11 changes: 1 addition & 10 deletions src/Serilog.Sinks.Async/Serilog.Sinks.Async.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<AssemblyVersion>1.0.0</AssemblyVersion>
<VersionPrefix>1.2.0</VersionPrefix>
<Authors>Jezz Santos;Serilog Contributors</Authors>
<TargetFrameworks>net45;netstandard1.1;netstandard1.2</TargetFrameworks>
<TargetFrameworks>net45;netstandard1.1</TargetFrameworks>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<AssemblyName>Serilog.Sinks.Async</AssemblyName>
Expand All @@ -26,10 +26,6 @@
<PackageReference Include="Serilog" Version="2.5.0" />
</ItemGroup>

<PropertyGroup Condition=" '$(TargetFramework)' == 'netstandard1.1' ">
<DefineConstants>$(DefineConstants);NETSTANDARD_NO_TIMER</DefineConstants>
</PropertyGroup>

<ItemGroup Condition=" '$(TargetFramework)' == 'net45' ">
<Reference Include="System" />
<Reference Include="Microsoft.CSharp" />
Expand All @@ -39,9 +35,4 @@
<PackageReference Include="System.Collections.Concurrent" Version="4.0.12" />
</ItemGroup>

<ItemGroup Condition=" '$(TargetFramework)' == 'netstandard1.2' ">
<PackageReference Include="System.Collections.Concurrent" Version="4.0.12" />
<PackageReference Include="System.Threading.Timer" Version="4.0.1" />
</ItemGroup>

</Project>
35 changes: 8 additions & 27 deletions src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,38 +8,20 @@

namespace Serilog.Sinks.Async
{
sealed class BackgroundWorkerSink : ILogEventSink, IDisposable
sealed class BackgroundWorkerSink : ILogEventSink, IQueueState, IDisposable
{
readonly ILogEventSink _pipeline;
readonly bool _blockWhenFull;
readonly BlockingCollection<LogEvent> _queue;
readonly Task _worker;
#if! NETSTANDARD_NO_TIMER
readonly Timer _monitorCallbackInvocationTimer;
#endif
public BackgroundWorkerSink(
ILogEventSink pipeline, int bufferCapacity,
bool blockWhenFull,
int monitorIntervalSeconds = 0, Action<BlockingCollection<LogEvent>> monitor = null)

public BackgroundWorkerSink(ILogEventSink pipeline, int bufferCapacity, bool blockWhenFull)
{
if (pipeline == null) throw new ArgumentNullException(nameof(pipeline));
if (bufferCapacity <= 0) throw new ArgumentOutOfRangeException(nameof(bufferCapacity));
if (monitorIntervalSeconds < 0) throw new ArgumentOutOfRangeException(nameof(monitorIntervalSeconds));
_pipeline = pipeline;
_pipeline = pipeline ?? throw new ArgumentNullException(nameof(pipeline));
_blockWhenFull = blockWhenFull;
_queue = new BlockingCollection<LogEvent>(bufferCapacity);
_worker = Task.Factory.StartNew(Pump, CancellationToken.None, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);

if (monitor != null)
{
if (monitorIntervalSeconds < 1) throw new ArgumentOutOfRangeException(nameof(monitorIntervalSeconds), "must be >=1");
#if! NETSTANDARD_NO_TIMER
var interval = TimeSpan.FromSeconds(monitorIntervalSeconds);
_monitorCallbackInvocationTimer = new Timer(queue => monitor((BlockingCollection<LogEvent>)queue), _queue, interval, interval);
#else
throw new PlatformNotSupportedException($"Please use a platform supporting .netstandard1.2 or later to avail of the ${nameof(monitor)} facility.");
#endif
}
}

public void Emit(LogEvent logEvent)
Expand Down Expand Up @@ -74,11 +56,6 @@ public void Dispose()
// Allow queued events to be flushed
_worker.Wait();

#if! NETSTANDARD_NO_TIMER
// Only stop monitoring when we've actually completed flushing
_monitorCallbackInvocationTimer?.Dispose();
#endif

(_pipeline as IDisposable)?.Dispose();
}

Expand All @@ -96,5 +73,9 @@ void Pump()
SelfLog.WriteLine("{0} fatal error in worker thread: {1}", typeof(BackgroundWorkerSink), ex);
}
}

int IQueueState.Count => _queue.Count;

int IQueueState.BufferSize => _queue.BoundedCapacity;
}
}
41 changes: 21 additions & 20 deletions test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
Expand Down Expand Up @@ -85,12 +84,12 @@ public async Task WhenEmitMultipleTimes_ThenRelaysToInnerSink()
}

[Fact]
public async Task GivenDefaultConfig_WhenQueueOverCapacity_DoesNotBlock()
public async Task GivenDefaultConfig_WhenRequestsExceedCapacity_DoesNotBlock()
{
var batchTiming = Stopwatch.StartNew();
using (var sink = new BackgroundWorkerSink(_logger, 1, blockWhenFull: false /*default*/))
{
// Cause a delay when emmitting to the inner sink, allowing us to easily fill the queue to capacity
// Cause a delay when emitting to the inner sink, allowing us to easily fill the queue to capacity
// while the first event is being propagated
var acceptInterval = TimeSpan.FromMilliseconds(500);
_innerSink.DelayEmit = acceptInterval;
Expand All @@ -115,7 +114,7 @@ public async Task GivenDefaultConfig_WhenQueueOverCapacity_DoesNotBlock()
}

[Fact]
public async Task GivenDefaultConfig_WhenRequestsOverCapacity_ThenDropsEventsAndRecovers()
public async Task GivenDefaultConfig_WhenRequestsExceedCapacity_ThenDropsEventsAndRecovers()
{
using (var sink = new BackgroundWorkerSink(_logger, 1, blockWhenFull: false /*default*/))
{
Expand Down Expand Up @@ -186,29 +185,31 @@ public async Task GivenConfiguredToBlock_WhenQueueFilled_ThenBlocks()
}
}

#if !NETSTANDARD_NO_TIMER
[Fact]
public void MonitorArgumentAffordsBacklogHealthMonitoringFacility()
public async Task InspectorOutParameterAffordsHealthMonitoringHook()
{
bool logWasObservedToHaveReachedHalfFull = false;
void inspectBuffer(BlockingCollection<LogEvent> queue) =>

logWasObservedToHaveReachedHalfFull = logWasObservedToHaveReachedHalfFull
|| queue.Count * 100 / queue.BoundedCapacity >= 50;

var collector = new MemorySink { DelayEmit = TimeSpan.FromSeconds(3) };
var collector = new MemorySink { DelayEmit = TimeSpan.FromSeconds(2) };
// 2 spaces in queue; 1 would make the second log entry eligible for dropping if consumer does not activate instantaneously
var bufferSize = 2;
using (var logger = new LoggerConfiguration()
.WriteTo.Async(w => w.Sink(collector), bufferSize: 2, monitorIntervalSeconds: 1, monitor: inspectBuffer)
.WriteTo.Async(w => w.Sink(collector), bufferSize: 2, inspector: out IQueueState inspector)
.CreateLogger())
{
logger.Information("Something to block the pipe");
logger.Information("I'll just leave this here pending for a few seconds so I can observe it");
System.Threading.Thread.Sleep(TimeSpan.FromSeconds(2));
Assert.Equal(bufferSize, inspector.BufferSize);
Assert.Equal(0, inspector.Count);
logger.Information("Something to freeze the processing for 2s");
await Task.Delay(TimeSpan.FromMilliseconds(200));
// Can be taken from queue either instantanously or be awaiting consumer to take
Assert.InRange(inspector.Count, 0, 1);
logger.Information("Something that will sit in the queue");
// Unless we are put to sleep for a Rip Van Winkle period, either:
// a) the BackgroundWorker will be emitting the item [and incurring the 2s delay we established], leaving a single item in the buffer
// or b) neither will have been picked out of the buffer yet.
await Task.Delay(TimeSpan.FromMilliseconds(200));
Assert.InRange(inspector.Count, 1, 2);
Assert.Equal(bufferSize, inspector.BufferSize);
}

Assert.True(logWasObservedToHaveReachedHalfFull);
}
#endif

private BackgroundWorkerSink CreateSinkWithDefaultOptions()
{
Expand Down

0 comments on commit dcd24ac

Please sign in to comment.