Skip to content

Commit

Permalink
Surface dropped events count. Resolves #13
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Apr 20, 2018
1 parent dcd24ac commit 06b8a92
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 5 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ The wrapped sink (`File` in this case) will be invoked on a worker thread while

Because the memory buffer may contain events that have not yet been written to the target sink, it is important to call `Log.CloseAndFlush()` or `Logger.Dispose()` when the application exits.

### Buffering
### Buffering & Dropping

The default memory buffer feeding the worker thread is capped to 10,000 items, after which arriving events will be dropped. To increase or decrease this limit, specify it when configuring the async sink.
The default memory buffer feeding the worker thread is capped to 10,000 items, after which arriving events will be dropped. To increase or decrease this limit, specify it when configuring the async sink. One can determine whether events have been dropped via `IQueueState.DroppedMessageCount` (see Buffer Inspection interface below).

```csharp
// Reduce the buffer to 500 events
Expand Down
5 changes: 5 additions & 0 deletions src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ public interface IQueueState
/// <exception cref="T:System.ObjectDisposedException">The Sink has been disposed.</exception>
int Count { get; }

/// <summary>
/// Accumulated number of messages dropped due to attempted submission having breached <see cref="BufferSize"/> limit.
/// </summary>
long DroppedMessagesCount { get; }

/// <summary>
/// Maximum number of items permitted to be held in the buffer awaiting ingestion.
/// </summary>
Expand Down
7 changes: 7 additions & 0 deletions src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ sealed class BackgroundWorkerSink : ILogEventSink, IQueueState, IDisposable
readonly BlockingCollection<LogEvent> _queue;
readonly Task _worker;

long _droppedMessages;

public BackgroundWorkerSink(ILogEventSink pipeline, int bufferCapacity, bool blockWhenFull)
{
if (bufferCapacity <= 0) throw new ArgumentOutOfRangeException(nameof(bufferCapacity));
Expand All @@ -38,7 +40,10 @@ public void Emit(LogEvent logEvent)
else
{
if (!_queue.TryAdd(logEvent))
{
Interlocked.Increment(ref _droppedMessages);
SelfLog.WriteLine("{0} unable to enqueue, capacity {1}", typeof(BackgroundWorkerSink), _queue.BoundedCapacity);
}
}
}
catch (InvalidOperationException)
Expand Down Expand Up @@ -77,5 +82,7 @@ void Pump()
int IQueueState.Count => _queue.Count;

int IQueueState.BufferSize => _queue.BoundedCapacity;

long IQueueState.DroppedMessagesCount => _droppedMessages;
}
}
19 changes: 16 additions & 3 deletions test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public async Task GivenDefaultConfig_WhenRequestsExceedCapacity_DoesNotBlock()

// Allow at least one to propagate
await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false);
Assert.NotEqual(0, ((IQueueState)sink).DroppedMessagesCount);
}
// Sanity check the overall timing
batchTiming.Stop();
Expand Down Expand Up @@ -144,6 +145,7 @@ from e in _innerSink.Events
Assert.InRange(2, 2 * 3 / 2 - 1, propagatedExcludingFinal.Count());
// Final event should have made it through
Assert.Contains(_innerSink.Events, x => Object.ReferenceEquals(finalEvent, x));
Assert.NotEqual(0, ((IQueueState)sink).DroppedMessagesCount);
}
}

Expand Down Expand Up @@ -182,11 +184,12 @@ public async Task GivenConfiguredToBlock_WhenQueueFilled_ThenBlocks()

// No events should be dropped
Assert.Equal(3, _innerSink.Events.Count);
Assert.Equal(0, ((IQueueState)sink).DroppedMessagesCount);
}
}

[Fact]
public async Task InspectorOutParameterAffordsHealthMonitoringHook()
public void InspectorOutParameterAffordsHealthMonitoringHook()
{
var collector = new MemorySink { DelayEmit = TimeSpan.FromSeconds(2) };
// 2 spaces in queue; 1 would make the second log entry eligible for dropping if consumer does not activate instantaneously
Expand All @@ -197,17 +200,27 @@ public async Task InspectorOutParameterAffordsHealthMonitoringHook()
{
Assert.Equal(bufferSize, inspector.BufferSize);
Assert.Equal(0, inspector.Count);
Assert.Equal(0, inspector.DroppedMessagesCount);
logger.Information("Something to freeze the processing for 2s");
await Task.Delay(TimeSpan.FromMilliseconds(200));
// Can be taken from queue either instantanously or be awaiting consumer to take
Assert.InRange(inspector.Count, 0, 1);
Assert.Equal(0, inspector.DroppedMessagesCount);
logger.Information("Something that will sit in the queue");
Assert.InRange(inspector.Count, 1, 2);
logger.Information("Something that will probably also sit in the queue (but could get dropped if first message has still not been picked up)");
Assert.InRange(inspector.Count, 1, 2);
logger.Information("Something that will get dropped unless we get preempted for 2s during our execution");
const string droppedMessage = "Something that will definitely get dropped";
logger.Information(droppedMessage);
Assert.InRange(inspector.Count, 1, 2);
// Unless we are put to sleep for a Rip Van Winkle period, either:
// a) the BackgroundWorker will be emitting the item [and incurring the 2s delay we established], leaving a single item in the buffer
// or b) neither will have been picked out of the buffer yet.
await Task.Delay(TimeSpan.FromMilliseconds(200));
Assert.InRange(inspector.Count, 1, 2);
Assert.Equal(bufferSize, inspector.BufferSize);
Assert.DoesNotContain(collector.Events, x => x.MessageTemplate.Text == droppedMessage);
// Because messages wait 2 seconds, the only real way to get one into the buffer is with a debugger breakpoint or a sleep
Assert.InRange(collector.Events.Count, 0, 3);
}
}

Expand Down

0 comments on commit 06b8a92

Please sign in to comment.