Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add queue inspector facility #29

Merged
merged 6 commits into from
Apr 25, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,36 @@ The wrapped sink (`File` in this case) will be invoked on a worker thread while

Because the memory buffer may contain events that have not yet been written to the target sink, it is important to call `Log.CloseAndFlush()` or `Logger.Dispose()` when the application exits.

### Buffering
### Buffering & Dropping

The default memory buffer feeding the worker thread is capped to 10,000 items, after which arriving events will be dropped. To increase or decrease this limit, specify it when configuring the async sink.
The default memory buffer feeding the worker thread is capped to 10,000 items, after which arriving events will be dropped. To increase or decrease this limit, specify it when configuring the async sink. One can determine whether events have been dropped via `IQueueState.DroppedMessagesCount` (see Buffer Inspection interface below).

```csharp
// Reduce the buffer to 500 events
.WriteTo.Async(a => a.File("logs/myapp.log"), bufferSize: 500)
```

### Health Monitoring via the Buffer Inspection interface

The Async wrapper is primarily intended to allow one to achieve minimal logging latency at all times, even when writing to sinks that may momentarily block during the course of their processing (e.g., a File sink might block for a low number of ms while flushing). The dropping behavior is an important failsafe in that it avoids having an unbounded buffering behaviour should logging frequency overwhelm the sink, or the sink ingestion throughput degrades to a major degree.

In practice, this configuration (assuming one provisions an adequate `bufferSize`) achieves an efficient and resilient logging configuration that can handle load gracefully. The key risk is of course that events may be dropped when the buffer threshold gets breached. The `inspector` allows one to arrange for your Application's health monitoring mechanism to actively validate that the buffer allocation is not being exceeded in practice.

```csharp
// Example check: log message to an out of band alarm channel if logging is showing signs of getting overwhelmed
void PeriodicMonitorCheck(IQueueState inspector)
{
var usagePct = inspector.Count * 100 / inspector.BoundedCapacity;
if (usagePct > 50) SelfLog.WriteLine("Log buffer exceeded {0:p0} usage (limit: {1})", usagePct, inspector.BoundedCapacity);
}

// Allow a backlog of up to 10,000 items to be maintained (dropping extras if full)
.WriteTo.Async(a => a.File("logs/myapp.log"), inspector: out IQueueState inspector) ...

// Wire the inspector through to health monitoring and/or metrics in order to periodically emit a metric, raise an alarm, etc.
... healthMonitoring.RegisterCheck(() => new PeriodicMonitorCheck(inspector));
```

### Blocking

Warning: For the same reason one typically does not want exceptions from logging to leak into the execution path, one typically does not want a logger to be able to have the side-efect of actually interrupting application processing until the log propagation has been unblocked.
Expand Down
57 changes: 57 additions & 0 deletions src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,62 @@ public static LoggerConfiguration Async(
configure);
}

/// <summary>
/// Configure a sink to be invoked asynchronously, on a background worker thread.
/// Provides an <paramref name="inspector"/> that can be used to check the live state of the buffer for health monitoring purposes.
/// </summary>
/// <param name="loggerSinkConfiguration">The <see cref="LoggerSinkConfiguration"/> being configured.</param>
/// <param name="configure">An action that configures the wrapped sink.</param>
/// <param name="bufferSize">The size of the concurrent queue used to feed the background worker thread. If
/// the thread is unable to process events quickly enough and the queue is filled, depending on
/// <paramref name="blockWhenFull"/> the queue will block or subsequent events will be dropped until
/// room is made in the queue.</param>
/// <param name="blockWhenFull">Block when the queue is full, instead of dropping events.</param>
/// <param name="inspector">Provides a way to inspect the state of the queue for health monitoring purposes.</param>
/// <returns>A <see cref="LoggerConfiguration"/> allowing configuration to continue.</returns>
public static LoggerConfiguration Async(
this LoggerSinkConfiguration loggerSinkConfiguration,
Action<LoggerSinkConfiguration> configure,
out IQueueState inspector,
int bufferSize = 10000,
bool blockWhenFull = false)
{
// Cannot assign directly to the out param from within the lambda, so we need a temp
IQueueState stateLens = null;
var result = LoggerSinkConfiguration.Wrap(
loggerSinkConfiguration,
wrappedSink =>
{
var sink = new BackgroundWorkerSink(wrappedSink, bufferSize, blockWhenFull);
stateLens = sink;
return sink;
},
configure);
inspector = stateLens;
return result;
}
}

/// <summary>
/// Provides a way to inspect the current state of Async wrapper's ingestion queue.
/// </summary>
public interface IQueueState
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit- would be preferable to have this in its own file.

Another nit :-) ... it's closer to being the sink state than the queue state, since the dropped count isn't logically a property of the queue..?

Just looking at namespacing etc., could we tuck this away under Serilog.Sinks.Async.IAsyncLogEventSinkState or something along those lines, so that it's less likely to conflict with other types? I can't imagine there are too many IQueueState interfaces out there, but since it's not something a user will type regularly, a more descriptive name might not hurt.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my language of choice it should not be a separate file ;) Agree and will do.
The specific name is way better (the original was a speculative generality land grab) - dropped events are not a thing one wants as a universal 'feature'

{
/// <summary>
/// Count of items currently awaiting ingestion.
/// </summary>
/// <exception cref="T:System.ObjectDisposedException">The Sink has been disposed.</exception>
int Count { get; }

/// <summary>
/// Accumulated number of messages dropped due to attempted submission having breached <see cref="BufferSize"/> limit.
/// </summary>
long DroppedMessagesCount { get; }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any advance on this name?


/// <summary>
/// Maximum number of items permitted to be held in the buffer awaiting ingestion.
/// </summary>
/// <exception cref="T:System.ObjectDisposedException">The Sink has been disposed.</exception>
int BufferSize { get; }
}
}
20 changes: 15 additions & 5 deletions src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,19 @@

namespace Serilog.Sinks.Async
{
sealed class BackgroundWorkerSink : ILogEventSink, IDisposable
sealed class BackgroundWorkerSink : ILogEventSink, IQueueState, IDisposable
{
readonly ILogEventSink _pipeline;
readonly bool _blockWhenFull;
readonly BlockingCollection<LogEvent> _queue;
readonly Task _worker;

long _droppedMessages;

public BackgroundWorkerSink(ILogEventSink pipeline, int bufferCapacity, bool blockWhenFull)
{
if (pipeline == null) throw new ArgumentNullException(nameof(pipeline));
if (bufferCapacity <= 0) throw new ArgumentOutOfRangeException(nameof(bufferCapacity));
_pipeline = pipeline;
_pipeline = pipeline ?? throw new ArgumentNullException(nameof(pipeline));
_blockWhenFull = blockWhenFull;
_queue = new BlockingCollection<LogEvent>(bufferCapacity);
_worker = Task.Factory.StartNew(Pump, CancellationToken.None, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
Expand All @@ -39,7 +40,10 @@ public void Emit(LogEvent logEvent)
else
{
if (!_queue.TryAdd(logEvent))
{
Interlocked.Increment(ref _droppedMessages);
SelfLog.WriteLine("{0} unable to enqueue, capacity {1}", typeof(BackgroundWorkerSink), _queue.BoundedCapacity);
}
}
}
catch (InvalidOperationException)
Expand All @@ -55,8 +59,8 @@ public void Dispose()
_queue.CompleteAdding();

// Allow queued events to be flushed
_worker.Wait();
_worker.Wait();

(_pipeline as IDisposable)?.Dispose();
}

Expand All @@ -74,5 +78,11 @@ void Pump()
SelfLog.WriteLine("{0} fatal error in worker thread: {1}", typeof(BackgroundWorkerSink), ex);
}
}

int IQueueState.Count => _queue.Count;

int IQueueState.BufferSize => _queue.BoundedCapacity;

long IQueueState.DroppedMessagesCount => _droppedMessages;
}
}
45 changes: 42 additions & 3 deletions test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ public async Task WhenEmitMultipleTimes_ThenRelaysToInnerSink()
}

[Fact]
public async Task GivenDefaultConfig_WhenQueueOverCapacity_DoesNotBlock()
public async Task GivenDefaultConfig_WhenRequestsExceedCapacity_DoesNotBlock()
{
var batchTiming = Stopwatch.StartNew();
using (var sink = new BackgroundWorkerSink(_logger, 1, blockWhenFull: false /*default*/))
{
// Cause a delay when emmitting to the inner sink, allowing us to easily fill the queue to capacity
// Cause a delay when emitting to the inner sink, allowing us to easily fill the queue to capacity
// while the first event is being propagated
var acceptInterval = TimeSpan.FromMilliseconds(500);
_innerSink.DelayEmit = acceptInterval;
Expand All @@ -106,6 +106,7 @@ public async Task GivenDefaultConfig_WhenQueueOverCapacity_DoesNotBlock()

// Allow at least one to propagate
await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false);
Assert.NotEqual(0, ((IQueueState)sink).DroppedMessagesCount);
}
// Sanity check the overall timing
batchTiming.Stop();
Expand All @@ -114,7 +115,7 @@ public async Task GivenDefaultConfig_WhenQueueOverCapacity_DoesNotBlock()
}

[Fact]
public async Task GivenDefaultConfig_WhenRequestsOverCapacity_ThenDropsEventsAndRecovers()
public async Task GivenDefaultConfig_WhenRequestsExceedCapacity_ThenDropsEventsAndRecovers()
{
using (var sink = new BackgroundWorkerSink(_logger, 1, blockWhenFull: false /*default*/))
{
Expand Down Expand Up @@ -144,6 +145,7 @@ from e in _innerSink.Events
Assert.InRange(2, 2 * 3 / 2 - 1, propagatedExcludingFinal.Count());
// Final event should have made it through
Assert.Contains(_innerSink.Events, x => Object.ReferenceEquals(finalEvent, x));
Assert.NotEqual(0, ((IQueueState)sink).DroppedMessagesCount);
}
}

Expand Down Expand Up @@ -182,6 +184,43 @@ public async Task GivenConfiguredToBlock_WhenQueueFilled_ThenBlocks()

// No events should be dropped
Assert.Equal(3, _innerSink.Events.Count);
Assert.Equal(0, ((IQueueState)sink).DroppedMessagesCount);
}
}

[Fact]
public void InspectorOutParameterAffordsHealthMonitoringHook()
{
var collector = new MemorySink { DelayEmit = TimeSpan.FromSeconds(2) };
// 2 spaces in queue; 1 would make the second log entry eligible for dropping if consumer does not activate instantaneously
var bufferSize = 2;
using (var logger = new LoggerConfiguration()
.WriteTo.Async(w => w.Sink(collector), bufferSize: 2, inspector: out IQueueState inspector)
.CreateLogger())
{
Assert.Equal(bufferSize, inspector.BufferSize);
Assert.Equal(0, inspector.Count);
Assert.Equal(0, inspector.DroppedMessagesCount);
logger.Information("Something to freeze the processing for 2s");
// Can be taken from queue either instantanously or be awaiting consumer to take
Assert.InRange(inspector.Count, 0, 1);
Assert.Equal(0, inspector.DroppedMessagesCount);
logger.Information("Something that will sit in the queue");
Assert.InRange(inspector.Count, 1, 2);
logger.Information("Something that will probably also sit in the queue (but could get dropped if first message has still not been picked up)");
Assert.InRange(inspector.Count, 1, 2);
logger.Information("Something that will get dropped unless we get preempted for 2s during our execution");
const string droppedMessage = "Something that will definitely get dropped";
logger.Information(droppedMessage);
Assert.InRange(inspector.Count, 1, 2);
// Unless we are put to sleep for a Rip Van Winkle period, either:
// a) the BackgroundWorker will be emitting the item [and incurring the 2s delay we established], leaving a single item in the buffer
// or b) neither will have been picked out of the buffer yet.
Assert.InRange(inspector.Count, 1, 2);
Assert.Equal(bufferSize, inspector.BufferSize);
Assert.DoesNotContain(collector.Events, x => x.MessageTemplate.Text == droppedMessage);
// Because messages wait 2 seconds, the only real way to get one into the buffer is with a debugger breakpoint or a sleep
Assert.InRange(collector.Events.Count, 0, 3);
}
}

Expand Down