Skip to content

Commit

Permalink
Complete IAsyncLogEventSinkMonitor impl
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Apr 25, 2018
1 parent 6546958 commit ab156fe
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 81 deletions.
45 changes: 27 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,29 +35,38 @@ Because the memory buffer may contain events that have not yet been written to t
The default memory buffer feeding the worker thread is capped to 10,000 items, after which arriving events will be dropped. To increase or decrease this limit, specify it when configuring the async sink. One can determine whether events have been dropped via `Serilog.Async.IAsyncLogEventSinkState.DroppedMessagesCount` (see Sink State Inspection interface below).

```csharp
// Reduce the buffer to 500 events
.WriteTo.Async(a => a.File("logs/myapp.log"), bufferSize: 500)
// Reduce the buffer to 500 events
.WriteTo.Async(a => a.File("logs/myapp.log"), bufferSize: 500)
```

### Health Monitoring via the Sink State Inspection interface

The `Async` wrapper is primarily intended to allow one to achieve minimal logging latency at all times, even when writing to sinks that may momentarily block during the course of their processing (e.g., a `File` Sink might block for a low number of ms while flushing). The dropping behavior is an important failsafe; it avoids having an unbounded buffering behaviour should logging frequency overwhelm the sink, or the sink ingestion throughput degrade to a major degree.

In practice, this configuration (assuming one provisions an adequate `bufferSize`) achieves an efficient and resilient logging configuration that can handle load safely. The key risk is of course that events get be dropped when the buffer threshold gets breached. The `inspector` allows one to arrange for your Application's health monitoring mechanism to actively validate that the buffer allocation is not being exceeded in practice.
In practice, this configuration (assuming one provisions an adequate `bufferSize`) achieves an efficient and resilient logging configuration that can handle load safely. The key risk is of course that events get be dropped when the buffer threshold gets breached. The inspection interface, `IAsyncLogEventSinkState` together with the monitor configuration interface, `IAsyncLogEventSinkMonitor`, allows one to arrange for your Application's health monitoring mechanism to actively validate that the buffer allocation is not being exceeded in practice.

```csharp
// Example check: log message to an out of band alarm channel if logging is showing signs of getting overwhelmed
void PeriodicMonitorCheck(IQueueState inspector)
{
var usagePct = inspector.Count * 100 / inspector.BoundedCapacity;
if (usagePct > 50) SelfLog.WriteLine("Log buffer exceeded {0:p0} usage (limit: {1})", usagePct, inspector.BoundedCapacity);
}

// Allow a backlog of up to 10,000 items to be maintained (dropping extras if full)
.WriteTo.Async(a => a.File("logs/myapp.log"), inspector: out Async.IAsyncLogEventSinkState inspector) ...

// Wire the inspector through to application health monitoring and/or metrics in order to periodically emit a metric, raise an alarm, etc.
... HealthMonitor.RegisterAsyncLogSink(inspector);
// Example check: log message to an out of band alarm channel if logging is showing signs of getting overwhelmed
void PeriodicMonitorCheck(IAsyncLogEventSinkState inspector)
{
var usagePct = inspector.Count * 100 / inspector.BoundedCapacity;
if (usagePct > 50) SelfLog.WriteLine("Log buffer exceeded {0:p0} usage (limit: {1})", usagePct, inspector.BoundedCapacity);
}

class MonitorConfiguration : IAsyncLogEventSinkMonitor
{
// Wire the inspector through to application health monitoring and/or metrics
// in order to periodically emit a metric, raise an alarm, etc.
public void StartMonitoring(IAsyncLogEventSinkState inspector) =>
HealthMonitor.AddPeriodicCheck(() => ExecuteAsyncBufferCheck(inspector));

public void StopMonitoring(IAsyncLogEventSinkState inspector)
{ /* reverse of StartMonitoring */ }
}

// Configure to drop events if > 10,000 backlog; request supplying of inspector to Monitor
var monitor = new MonitorConfiguration();
.WriteTo.Async(a => a.File("logs/myapp.log"), monitor: monitor) ...
```

### Blocking
Expand All @@ -67,9 +76,9 @@ Warning: For the same reason one typically does not want exceptions from logging
When the buffer size limit is reached, the default behavior is to drop any further attempted writes until the queue abates, reporting each such failure to the `Serilog.Debugging.SelfLog`. To replace this with a blocking behaviour, set `blockWhenFull` to `true`.

```csharp
// Wait for any queued event to be accepted by the `File` log before allowing the calling thread
// to resume its application work after a logging call when there are 10,000 LogEvents waiting
.WriteTo.Async(a => a.File("logs/myapp.log"), blockWhenFull: true)
// Wait for any queued event to be accepted by the `File` log before allowing the calling thread
// to resume its application work after a logging call when there are 10,000 LogEvents waiting
.WriteTo.Async(a => a.File("logs/myapp.log"), blockWhenFull: true)
```

### XML `<appSettings>` and JSON configuration
Expand Down
46 changes: 3 additions & 43 deletions src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,59 +59,19 @@ public static LoggerConfiguration Async(
/// <paramref name="blockWhenFull"/> the queue will block or subsequent events will be dropped until
/// room is made in the queue.</param>
/// <param name="blockWhenFull">Block when the queue is full, instead of dropping events.</param>
/// <param name="monitor">Monitor to supply buffer information to. If the monitor implements <see cref="IDisposable"/>, <c>Dispose()</c> will be called to advise of the Sink being <c>Dispose()</c>d.</param>
/// <param name="monitor">Monitor to supply buffer information to.</param>
/// <returns>A <see cref="LoggerConfiguration"/> allowing configuration to continue.</returns>
public static LoggerConfiguration Async(
this LoggerSinkConfiguration loggerSinkConfiguration,
Action<LoggerSinkConfiguration> configure,
IAsyncLogEventSinkMonitor monitor,
int bufferSize,
bool blockWhenFull)
{
return LoggerSinkConfiguration.Wrap(
loggerSinkConfiguration,
wrappedSink =>
{
var sink = new BackgroundWorkerSink(wrappedSink, bufferSize, blockWhenFull, monitor);
monitor?.MonitorState(sink);
return sink;
},
configure);
}

/// <summary>
/// Configure a sink to be invoked asynchronously, on a background worker thread.
/// Provides an <paramref name="inspector"/> that can be used to check the live state of the buffer for health monitoring purposes.
/// </summary>
/// <param name="loggerSinkConfiguration">The <see cref="LoggerSinkConfiguration"/> being configured.</param>
/// <param name="configure">An action that configures the wrapped sink.</param>
/// <param name="bufferSize">The size of the concurrent queue used to feed the background worker thread. If
/// the thread is unable to process events quickly enough and the queue is filled, depending on
/// <paramref name="blockWhenFull"/> the queue will block or subsequent events will be dropped until
/// room is made in the queue.</param>
/// <param name="blockWhenFull">Block when the queue is full, instead of dropping events.</param>
/// <param name="inspector">Provides a way to inspect the state of the queue for health monitoring purposes.</param>
/// <returns>A <see cref="LoggerConfiguration"/> allowing configuration to continue.</returns>
public static LoggerConfiguration Async(
this LoggerSinkConfiguration loggerSinkConfiguration,
Action<LoggerSinkConfiguration> configure,
out IAsyncLogEventSinkState inspector,
int bufferSize = 10000,
bool blockWhenFull = false)
{
// Cannot assign directly to the out param from within the lambda, so we need a temp
IAsyncLogEventSinkState stateLens = null;
var result = LoggerSinkConfiguration.Wrap(
return LoggerSinkConfiguration.Wrap(
loggerSinkConfiguration,
wrappedSink =>
{
var sink = new BackgroundWorkerSink(wrappedSink, bufferSize, blockWhenFull, null);
stateLens = sink;
return sink;
},
wrappedSink => new BackgroundWorkerSink(wrappedSink, bufferSize, blockWhenFull, monitor),
configure);
inspector = stateLens;
return result;
}
}
}
13 changes: 7 additions & 6 deletions src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

namespace Serilog.Sinks.Async
{
sealed class BackgroundWorkerSink : ILogEventSink, IAsyncLogEventSinkState, IDisposable
sealed class BackgroundWorkerSink : ILogEventSink, IAsyncLogEventSinkInspector, IDisposable
{
readonly ILogEventSink _pipeline;
readonly bool _blockWhenFull;
Expand All @@ -26,6 +26,7 @@ public BackgroundWorkerSink(ILogEventSink pipeline, int bufferCapacity, bool blo
_queue = new BlockingCollection<LogEvent>(bufferCapacity);
_worker = Task.Factory.StartNew(Pump, CancellationToken.None, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
_monitor = monitor;
monitor?.StartMonitoring(this);
}

public void Emit(LogEvent logEvent)
Expand Down Expand Up @@ -65,7 +66,7 @@ public void Dispose()

(_pipeline as IDisposable)?.Dispose();

(_monitor as IDisposable)?.Dispose();
_monitor?.StopMonitoring(this);
}

void Pump()
Expand All @@ -83,10 +84,10 @@ void Pump()
}
}

int IAsyncLogEventSinkState.BufferSize => _queue.BoundedCapacity;
int IAsyncLogEventSinkInspector.BufferSize => _queue.BoundedCapacity;

int IAsyncLogEventSinkState.Count => _queue.Count;
int IAsyncLogEventSinkInspector.Count => _queue.Count;

long IAsyncLogEventSinkState.DroppedMessagesCount => _droppedMessages;
long IAsyncLogEventSinkInspector.DroppedMessagesCount => _droppedMessages;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
namespace Serilog.Sinks.Async
{
/// <summary>
/// Provides a way to monitor the state of Async wrapper's ingestion queue.
/// Provides a way to inspect the state of Async wrapper's ingestion queue.
/// </summary>
public interface IAsyncLogEventSinkState
public interface IAsyncLogEventSinkInspector
{
/// <summary>
/// Configured maximum number of items permitted to be held in the buffer awaiting ingestion.
Expand All @@ -18,7 +18,7 @@ public interface IAsyncLogEventSinkState
int Count { get; }

/// <summary>
/// Accumulated number of messages dropped due to breach of <see cref="BufferSize"/> limit.
/// Accumulated number of messages dropped due to breaches of <see cref="BufferSize"/> limit.
/// </summary>
long DroppedMessagesCount { get; }
}
Expand Down
15 changes: 10 additions & 5 deletions src/Serilog.Sinks.Async/Sinks/Async/IAsyncLogEventSinkMonitor.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
namespace Serilog.Sinks.Async
{
/// <summary>
/// Defines a mechanism for the Async Sink to provide buffer metadata to facilitate integration into system health checking.
/// Defines a mechanism for the Async Sink to afford Health Checks a buffer metadata inspection mechanism.
/// </summary>
/// <remarks>If the instance implements <see cref="System.IDisposable"/>, it will be <c>Dispose()</c>d at then time the Sink is.</remarks>
public interface IAsyncLogEventSinkMonitor
{
/// <summary>
/// Invoked by Sink to supply the buffer state hook to the monitor.
/// Invoked by Sink to supply the inspector to the monitor.
/// </summary>
/// <param name="state">The Async Sink's state information interface.</param>
void MonitorState(IAsyncLogEventSinkState state);
/// <param name="inspector">The Async Sink's inspector.</param>
void StartMonitoring(IAsyncLogEventSinkInspector inspector);

/// <summary>
/// Invoked by Sink to indicate that it is being Disposed.
/// </summary>
/// <param name="inspector">The Async Sink's inspector.</param>
void StopMonitoring(IAsyncLogEventSinkInspector inspector);
}
}
16 changes: 11 additions & 5 deletions test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Serilog.Core;
using Serilog.Events;
Expand Down Expand Up @@ -106,7 +107,7 @@ public async Task GivenDefaultConfig_WhenRequestsExceedCapacity_DoesNotBlock()

// Allow at least one to propagate
await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false);
Assert.NotEqual(0, ((IAsyncLogEventSinkState)sink).DroppedMessagesCount);
Assert.NotEqual(0, ((IAsyncLogEventSinkInspector)sink).DroppedMessagesCount);
}
// Sanity check the overall timing
batchTiming.Stop();
Expand Down Expand Up @@ -145,7 +146,7 @@ from e in _innerSink.Events
Assert.InRange(2, 2 * 3 / 2 - 1, propagatedExcludingFinal.Count());
// Final event should have made it through
Assert.Contains(_innerSink.Events, x => Object.ReferenceEquals(finalEvent, x));
Assert.NotEqual(0, ((IAsyncLogEventSinkState)sink).DroppedMessagesCount);
Assert.NotEqual(0, ((IAsyncLogEventSinkInspector)sink).DroppedMessagesCount);
}
}

Expand Down Expand Up @@ -184,20 +185,23 @@ public async Task GivenConfiguredToBlock_WhenQueueFilled_ThenBlocks()

// No events should be dropped
Assert.Equal(3, _innerSink.Events.Count);
Assert.Equal(0, ((IAsyncLogEventSinkState)sink).DroppedMessagesCount);
Assert.Equal(0, ((IAsyncLogEventSinkInspector)sink).DroppedMessagesCount);
}
}

[Fact]
public void InspectorOutParameterAffordsHealthMonitoringHook()
public void MonitorParameterAffordsSinkStateInspectorSuitableForHealthChecking()
{
var collector = new MemorySink { DelayEmit = TimeSpan.FromSeconds(2) };
// 2 spaces in queue; 1 would make the second log entry eligible for dropping if consumer does not activate instantaneously
var bufferSize = 2;
var monitor = new DummyMonitor();
using (var logger = new LoggerConfiguration()
.WriteTo.Async(w => w.Sink(collector), bufferSize: 2, inspector: out IAsyncLogEventSinkState inspector)
.WriteTo.Async(w => w.Sink(collector), bufferSize: 2, monitor: monitor)
.CreateLogger())
{
// Construction of BackgroundWorkerSink triggers StartMonitoring
var inspector = monitor.Inspector;
Assert.Equal(bufferSize, inspector.BufferSize);
Assert.Equal(0, inspector.Count);
Assert.Equal(0, inspector.DroppedMessagesCount);
Expand All @@ -222,6 +226,8 @@ public void InspectorOutParameterAffordsHealthMonitoringHook()
// Because messages wait 2 seconds, the only real way to get one into the buffer is with a debugger breakpoint or a sleep
Assert.InRange(collector.Events.Count, 0, 3);
}
// Dispose should trigger a StopMonitoring call
Assert.Null(monitor.Inspector);
}

private BackgroundWorkerSink CreateSinkWithDefaultOptions()
Expand Down
18 changes: 17 additions & 1 deletion test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,21 @@ public void DisposeCompletesWithoutWorkPerformed()

Assert.Empty(collector.Events);
}

[Fact]
public void CtorAndDisposeInformMonitor()
{
var collector = new MemorySink();
var monitor = new DummyMonitor();
using (new LoggerConfiguration()
.WriteTo.Async(w => w.Sink(collector), monitor: monitor)
.CreateLogger())
{
Assert.NotNull(monitor.Inspector);
}

Assert.Empty(collector.Events);
Assert.Null(monitor.Inspector);
}
}
}
}
14 changes: 14 additions & 0 deletions test/Serilog.Sinks.Async.Tests/Support/DummyMonitor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace Serilog.Sinks.Async.Tests.Support
{
class DummyMonitor : IAsyncLogEventSinkMonitor
{
IAsyncLogEventSinkInspector inspector;
public IAsyncLogEventSinkInspector Inspector => inspector;

void IAsyncLogEventSinkMonitor.StartMonitoring(IAsyncLogEventSinkInspector inspector) =>
this.inspector = inspector;

void IAsyncLogEventSinkMonitor.StopMonitoring(IAsyncLogEventSinkInspector inspector) =>
System.Threading.Interlocked.CompareExchange(ref this.inspector, null, inspector);
}
}

0 comments on commit ab156fe

Please sign in to comment.