From 06b8a92415beca4bbf55feb1efcbebc557a5ce2c Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Fri, 20 Apr 2018 15:02:05 +0100 Subject: [PATCH] Surface dropped events count. Resolves #13 --- README.md | 4 ++-- .../LoggerConfigurationAsyncExtensions.cs | 5 +++++ .../Sinks/Async/BackgroundWorkerSink.cs | 7 +++++++ .../BackgroundWorkerSinkSpec.cs | 19 ++++++++++++++++--- 4 files changed, 30 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index f167bcf..0b5440c 100644 --- a/README.md +++ b/README.md @@ -30,9 +30,9 @@ The wrapped sink (`File` in this case) will be invoked on a worker thread while Because the memory buffer may contain events that have not yet been written to the target sink, it is important to call `Log.CloseAndFlush()` or `Logger.Dispose()` when the application exits. -### Buffering +### Buffering & Dropping -The default memory buffer feeding the worker thread is capped to 10,000 items, after which arriving events will be dropped. To increase or decrease this limit, specify it when configuring the async sink. +The default memory buffer feeding the worker thread is capped to 10,000 items, after which arriving events will be dropped. To increase or decrease this limit, specify it when configuring the async sink. One can determine whether events have been dropped via `IQueueState.DroppedMessageCount` (see Buffer Inspection interface below). ```csharp // Reduce the buffer to 500 events diff --git a/src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs b/src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs index 77d630b..2adeee4 100644 --- a/src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs +++ b/src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs @@ -98,6 +98,11 @@ public interface IQueueState /// The Sink has been disposed. int Count { get; } + /// + /// Accumulated number of messages dropped due to attempted submission having breached limit. + /// + long DroppedMessagesCount { get; } + /// /// Maximum number of items permitted to be held in the buffer awaiting ingestion. /// diff --git a/src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs b/src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs index 8097571..b7a741b 100644 --- a/src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs +++ b/src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs @@ -15,6 +15,8 @@ sealed class BackgroundWorkerSink : ILogEventSink, IQueueState, IDisposable readonly BlockingCollection _queue; readonly Task _worker; + long _droppedMessages; + public BackgroundWorkerSink(ILogEventSink pipeline, int bufferCapacity, bool blockWhenFull) { if (bufferCapacity <= 0) throw new ArgumentOutOfRangeException(nameof(bufferCapacity)); @@ -38,7 +40,10 @@ public void Emit(LogEvent logEvent) else { if (!_queue.TryAdd(logEvent)) + { + Interlocked.Increment(ref _droppedMessages); SelfLog.WriteLine("{0} unable to enqueue, capacity {1}", typeof(BackgroundWorkerSink), _queue.BoundedCapacity); + } } } catch (InvalidOperationException) @@ -77,5 +82,7 @@ void Pump() int IQueueState.Count => _queue.Count; int IQueueState.BufferSize => _queue.BoundedCapacity; + + long IQueueState.DroppedMessagesCount => _droppedMessages; } } diff --git a/test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs b/test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs index b43706e..7071e79 100644 --- a/test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs +++ b/test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs @@ -106,6 +106,7 @@ public async Task GivenDefaultConfig_WhenRequestsExceedCapacity_DoesNotBlock() // Allow at least one to propagate await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false); + Assert.NotEqual(0, ((IQueueState)sink).DroppedMessagesCount); } // Sanity check the overall timing batchTiming.Stop(); @@ -144,6 +145,7 @@ from e in _innerSink.Events Assert.InRange(2, 2 * 3 / 2 - 1, propagatedExcludingFinal.Count()); // Final event should have made it through Assert.Contains(_innerSink.Events, x => Object.ReferenceEquals(finalEvent, x)); + Assert.NotEqual(0, ((IQueueState)sink).DroppedMessagesCount); } } @@ -182,11 +184,12 @@ public async Task GivenConfiguredToBlock_WhenQueueFilled_ThenBlocks() // No events should be dropped Assert.Equal(3, _innerSink.Events.Count); + Assert.Equal(0, ((IQueueState)sink).DroppedMessagesCount); } } [Fact] - public async Task InspectorOutParameterAffordsHealthMonitoringHook() + public void InspectorOutParameterAffordsHealthMonitoringHook() { var collector = new MemorySink { DelayEmit = TimeSpan.FromSeconds(2) }; // 2 spaces in queue; 1 would make the second log entry eligible for dropping if consumer does not activate instantaneously @@ -197,17 +200,27 @@ public async Task InspectorOutParameterAffordsHealthMonitoringHook() { Assert.Equal(bufferSize, inspector.BufferSize); Assert.Equal(0, inspector.Count); + Assert.Equal(0, inspector.DroppedMessagesCount); logger.Information("Something to freeze the processing for 2s"); - await Task.Delay(TimeSpan.FromMilliseconds(200)); // Can be taken from queue either instantanously or be awaiting consumer to take Assert.InRange(inspector.Count, 0, 1); + Assert.Equal(0, inspector.DroppedMessagesCount); logger.Information("Something that will sit in the queue"); + Assert.InRange(inspector.Count, 1, 2); + logger.Information("Something that will probably also sit in the queue (but could get dropped if first message has still not been picked up)"); + Assert.InRange(inspector.Count, 1, 2); + logger.Information("Something that will get dropped unless we get preempted for 2s during our execution"); + const string droppedMessage = "Something that will definitely get dropped"; + logger.Information(droppedMessage); + Assert.InRange(inspector.Count, 1, 2); // Unless we are put to sleep for a Rip Van Winkle period, either: // a) the BackgroundWorker will be emitting the item [and incurring the 2s delay we established], leaving a single item in the buffer // or b) neither will have been picked out of the buffer yet. - await Task.Delay(TimeSpan.FromMilliseconds(200)); Assert.InRange(inspector.Count, 1, 2); Assert.Equal(bufferSize, inspector.BufferSize); + Assert.DoesNotContain(collector.Events, x => x.MessageTemplate.Text == droppedMessage); + // Because messages wait 2 seconds, the only real way to get one into the buffer is with a debugger breakpoint or a sleep + Assert.InRange(collector.Events.Count, 0, 3); } }