From 2ee0f264f71289ed822d0e9472f6ef6a09f6422f Mon Sep 17 00:00:00 2001 From: cocowalla Date: Thu, 3 Aug 2017 13:44:44 +0100 Subject: [PATCH 1/3] Add option to block when the queue is full, instead of dropping events Also, use `GetConsumingEnumerable` to enumerate the queue, instead of a `while` loop --- .../LoggerConfigurationAsyncExtensions.cs | 11 ++- .../Sinks/Async/BackgroundWorkerSink.cs | 36 ++++---- .../BackgroundWorkerSinkSpec.cs | 84 +++++++++++++++++-- .../BackgroundWorkerSinkTests.cs | 7 +- .../Support/MemorySink.cs | 5 ++ 5 files changed, 110 insertions(+), 33 deletions(-) diff --git a/src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs b/src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs index 4b56f29..60f56e7 100644 --- a/src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs +++ b/src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs @@ -16,17 +16,20 @@ public static class LoggerConfigurationAsyncExtensions /// The being configured. /// An action that configures the wrapped sink. /// The size of the concurrent queue used to feed the background worker thread. If - /// the thread is unable to process events quickly enough and the queue is filled, subsequent events will be - /// dropped until room is made in the queue. + /// the thread is unable to process events quickly enough and the queue is filled, depending on + /// the queue will block or subsequent events will be dropped until + /// room is made in the queue. + /// Block when the queue is full, instead of dropping events. /// A allowing configuration to continue. public static LoggerConfiguration Async( this LoggerSinkConfiguration loggerSinkConfiguration, Action configure, - int bufferSize = 10000) + int bufferSize = 10000, + bool blockWhenFull = false) { return LoggerSinkConfiguration.Wrap( loggerSinkConfiguration, - wrappedSink => new BackgroundWorkerSink(wrappedSink, bufferSize), + wrappedSink => new BackgroundWorkerSink(wrappedSink, bufferSize, blockWhenFull), configure); } } diff --git a/src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs b/src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs index e3fffe4..ab9100b 100644 --- a/src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs +++ b/src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs @@ -12,17 +12,18 @@ sealed class BackgroundWorkerSink : ILogEventSink, IDisposable { readonly ILogEventSink _pipeline; readonly int _bufferCapacity; + readonly bool _blockWhenFull; volatile bool _disposed; - readonly CancellationTokenSource _cancel = new CancellationTokenSource(); readonly BlockingCollection _queue; readonly Task _worker; - public BackgroundWorkerSink(ILogEventSink pipeline, int bufferCapacity) + public BackgroundWorkerSink(ILogEventSink pipeline, int bufferCapacity, bool blockWhenFull) { if (pipeline == null) throw new ArgumentNullException(nameof(pipeline)); if (bufferCapacity <= 0) throw new ArgumentOutOfRangeException(nameof(bufferCapacity)); _pipeline = pipeline; _bufferCapacity = bufferCapacity; + _blockWhenFull = blockWhenFull; _queue = new BlockingCollection(_bufferCapacity); _worker = Task.Factory.StartNew(Pump, CancellationToken.None, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach, TaskScheduler.Default); } @@ -31,36 +32,35 @@ public void Emit(LogEvent logEvent) { // The disposed check is racy, but only ensures we don't prevent flush from // completing by pushing more events. - if (!_disposed && !_queue.TryAdd(logEvent)) - SelfLog.WriteLine("{0} unable to enqueue, capacity {1}", typeof(BackgroundWorkerSink), _bufferCapacity); + if (_disposed) + return; + + if (!this._blockWhenFull) + { + if (!_queue.TryAdd(logEvent)) + SelfLog.WriteLine("{0} unable to enqueue, capacity {1}", typeof(BackgroundWorkerSink), _bufferCapacity); + } + else + { + this._queue.Add(logEvent); + } } public void Dispose() { _disposed = true; - _cancel.Cancel(); + _queue.CompleteAdding(); _worker.Wait(); (_pipeline as IDisposable)?.Dispose(); - // _cancel not disposed, because it will make _cancel.Cancel() non-idempotent } void Pump() { try { - try - { - while (true) - { - var next = _queue.Take(_cancel.Token); - _pipeline.Emit(next); - } - } - catch (OperationCanceledException) + foreach (var next in _queue.GetConsumingEnumerable()) { - LogEvent next; - while (_queue.TryTake(out next)) - _pipeline.Emit(next); + _pipeline.Emit(next); } } catch (Exception ex) diff --git a/test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs b/test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs index 119eccc..6b6ef26 100644 --- a/test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs +++ b/test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs @@ -1,11 +1,12 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Threading.Tasks; using Serilog.Core; +using Serilog.Debugging; using Serilog.Events; using Serilog.Parsing; -using Serilog.Sinks.Async.Tests; using Serilog.Sinks.Async.Tests.Support; using Xunit; @@ -13,14 +14,15 @@ namespace Serilog.Sinks.Async.Tests { public class BackgroundWorkerSinkSpec : IDisposable { + readonly Logger _logger; readonly MemorySink _innerSink; - readonly BackgroundWorkerSink _sink; + BackgroundWorkerSink _sink; public BackgroundWorkerSinkSpec() { _innerSink = new MemorySink(); - var logger = new LoggerConfiguration().WriteTo.Sink(_innerSink).CreateLogger(); - _sink = new BackgroundWorkerSink(logger, 10000); + _logger = new LoggerConfiguration().WriteTo.Sink(_innerSink).CreateLogger(); + _sink = new BackgroundWorkerSink(_logger, 10000, false); } public void Dispose() @@ -31,7 +33,7 @@ public void Dispose() [Fact] public void WhenCtorWithNullSink_ThenThrows() { - Assert.Throws(() => new BackgroundWorkerSink(null, 10000)); + Assert.Throws(() => new BackgroundWorkerSink(null, 10000, false)); } [Fact] @@ -39,6 +41,7 @@ public async Task WhenEmitSingle_ThenRelaysToInnerSink() { var logEvent = CreateEvent(); _sink.Emit(logEvent); + _sink.Dispose(); await Task.Delay(TimeSpan.FromSeconds(3)); @@ -80,6 +83,77 @@ public async Task WhenEmitMultipleTimes_ThenRelaysToInnerSink() Assert.Equal(3, _innerSink.Events.Count); } + [Fact] + public async Task WhenQueueFull_ThenDropsEvents() + { + _sink = new BackgroundWorkerSink(_logger, 1, false); + + // Cause a delay when emmitting to the inner sink, allowing us to fill the queue to capacity + // after the first event is popped + _innerSink.DelayEmit = true; + + var events = new List + { + CreateEvent(), + CreateEvent(), + CreateEvent(), + CreateEvent(), + CreateEvent() + }; + events.ForEach(e => + { + var sw = Stopwatch.StartNew(); + _sink.Emit(e); + sw.Stop(); + + Assert.True(sw.ElapsedMilliseconds < 2000, "Should not block the caller when the queue is full"); + }); + + // If we *weren't* dropped events, the delay in the inner sink would mean the 5 events would take + // at least 15 seconds to process + await Task.Delay(TimeSpan.FromSeconds(18)); + + // Events should be dropped + Assert.Equal(2, _innerSink.Events.Count); + } + + [Fact] + public async Task WhenQueueFull_ThenBlocks() + { + _sink = new BackgroundWorkerSink(_logger, 1, true); + + // Cause a delay when emmitting to the inner sink, allowing us to fill the queue to capacity + // after the first event is popped + _innerSink.DelayEmit = true; + + var events = new List + { + CreateEvent(), + CreateEvent(), + CreateEvent() + }; + + int i = 0; + events.ForEach(e => + { + var sw = Stopwatch.StartNew(); + _sink.Emit(e); + sw.Stop(); + + // Emit should return immediately the first time, since the queue is not yet full. On + // subsequent calls, the queue should be full, so we should be blocked + if (i > 0) + { + Assert.True(sw.ElapsedMilliseconds > 2000, "Should block the caller when the queue is full"); + } + }); + + await Task.Delay(TimeSpan.FromSeconds(12)); + + // No events should be dropped + Assert.Equal(3, _innerSink.Events.Count); + } + private static LogEvent CreateEvent() { return new LogEvent(DateTimeOffset.MaxValue, LogEventLevel.Error, null, diff --git a/test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkTests.cs b/test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkTests.cs index a54f92a..aca516f 100644 --- a/test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkTests.cs +++ b/test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkTests.cs @@ -1,9 +1,4 @@ -using System; -using System.Threading; -using Serilog.Core; -using Serilog.Events; -using Serilog.Parsing; -using Serilog.Sinks.Async.Tests.Support; +using Serilog.Sinks.Async.Tests.Support; using Xunit; namespace Serilog.Sinks.Async.Tests diff --git a/test/Serilog.Sinks.Async.Tests/Support/MemorySink.cs b/test/Serilog.Sinks.Async.Tests/Support/MemorySink.cs index 4d756f6..44e178f 100644 --- a/test/Serilog.Sinks.Async.Tests/Support/MemorySink.cs +++ b/test/Serilog.Sinks.Async.Tests/Support/MemorySink.cs @@ -2,6 +2,7 @@ using Serilog.Core; using System.Collections.Concurrent; using System; +using System.Threading.Tasks; namespace Serilog.Sinks.Async.Tests.Support { @@ -9,9 +10,13 @@ public class MemorySink : ILogEventSink { public ConcurrentBag Events { get; } = new ConcurrentBag(); public bool ThrowAfterCollecting { get; set; } + public bool DelayEmit { get; set; } public void Emit(LogEvent logEvent) { + if (DelayEmit) + Task.Delay(TimeSpan.FromSeconds(3)).Wait(); + Events.Add(logEvent); if (ThrowAfterCollecting) From 188741e9617670f3653fa692b75d05a2025b892d Mon Sep 17 00:00:00 2001 From: cocowalla Date: Fri, 4 Aug 2017 13:17:53 +0100 Subject: [PATCH 2/3] Response to review of #21 Response to review of #21 --- .../LoggerConfigurationAsyncExtensions.cs | 21 ++++++++++++- .../Sinks/Async/BackgroundWorkerSink.cs | 30 ++++++++++++------- .../BackgroundWorkerSinkSpec.cs | 14 ++++----- .../Support/MemorySink.cs | 6 ++-- 4 files changed, 48 insertions(+), 23 deletions(-) diff --git a/src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs b/src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs index 60f56e7..451fcc9 100644 --- a/src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs +++ b/src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs @@ -1,6 +1,6 @@ using System; +using System.ComponentModel; using Serilog.Configuration; - using Serilog.Sinks.Async; namespace Serilog @@ -10,6 +10,24 @@ namespace Serilog /// public static class LoggerConfigurationAsyncExtensions { + /// + /// Configure a sink to be invoked asynchronously, on a background worker thread. + /// + /// The being configured. + /// An action that configures the wrapped sink. + /// The size of the concurrent queue used to feed the background worker thread. If + /// the thread is unable to process events quickly enough and the queue is filled, subsequent events will be + /// dropped until room is made in the queue. + /// A allowing configuration to continue. + [EditorBrowsable(EditorBrowsableState.Never)] + public static LoggerConfiguration Async( + this LoggerSinkConfiguration loggerSinkConfiguration, + Action configure, + int bufferSize) + { + return loggerSinkConfiguration.Async(configure, bufferSize, false); + } + /// /// Configure a sink to be invoked asynchronously, on a background worker thread. /// @@ -32,5 +50,6 @@ public static LoggerConfiguration Async( wrappedSink => new BackgroundWorkerSink(wrappedSink, bufferSize, blockWhenFull), configure); } + } } diff --git a/src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs b/src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs index ab9100b..694cb6e 100644 --- a/src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs +++ b/src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs @@ -13,7 +13,6 @@ sealed class BackgroundWorkerSink : ILogEventSink, IDisposable readonly ILogEventSink _pipeline; readonly int _bufferCapacity; readonly bool _blockWhenFull; - volatile bool _disposed; readonly BlockingCollection _queue; readonly Task _worker; @@ -30,27 +29,36 @@ public BackgroundWorkerSink(ILogEventSink pipeline, int bufferCapacity, bool blo public void Emit(LogEvent logEvent) { - // The disposed check is racy, but only ensures we don't prevent flush from - // completing by pushing more events. - if (_disposed) + if (this._queue.IsAddingCompleted) return; - if (!this._blockWhenFull) + try { - if (!_queue.TryAdd(logEvent)) - SelfLog.WriteLine("{0} unable to enqueue, capacity {1}", typeof(BackgroundWorkerSink), _bufferCapacity); + if (_blockWhenFull) + { + _queue.Add(logEvent); + } + else + { + if (!_queue.TryAdd(logEvent)) + SelfLog.WriteLine("{0} unable to enqueue, capacity {1}", typeof(BackgroundWorkerSink), _bufferCapacity); + } } - else + catch (InvalidOperationException) { - this._queue.Add(logEvent); + // Thrown in the event of a race condition when we try to add another event after + // CompleteAdding has been called } } public void Dispose() { - _disposed = true; + // Prevent any more events from being added _queue.CompleteAdding(); - _worker.Wait(); + + // Allow queued events to be flushed + _worker.Wait(); + (_pipeline as IDisposable)?.Dispose(); } diff --git a/test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs b/test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs index 6b6ef26..1198e10 100644 --- a/test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs +++ b/test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs @@ -4,7 +4,6 @@ using System.Linq; using System.Threading.Tasks; using Serilog.Core; -using Serilog.Debugging; using Serilog.Events; using Serilog.Parsing; using Serilog.Sinks.Async.Tests.Support; @@ -41,7 +40,6 @@ public async Task WhenEmitSingle_ThenRelaysToInnerSink() { var logEvent = CreateEvent(); _sink.Emit(logEvent); - _sink.Dispose(); await Task.Delay(TimeSpan.FromSeconds(3)); @@ -90,7 +88,7 @@ public async Task WhenQueueFull_ThenDropsEvents() // Cause a delay when emmitting to the inner sink, allowing us to fill the queue to capacity // after the first event is popped - _innerSink.DelayEmit = true; + _innerSink.DelayEmit = TimeSpan.FromMilliseconds(300); var events = new List { @@ -106,12 +104,12 @@ public async Task WhenQueueFull_ThenDropsEvents() _sink.Emit(e); sw.Stop(); - Assert.True(sw.ElapsedMilliseconds < 2000, "Should not block the caller when the queue is full"); + Assert.True(sw.ElapsedMilliseconds < 200, "Should not block the caller when the queue is full"); }); // If we *weren't* dropped events, the delay in the inner sink would mean the 5 events would take // at least 15 seconds to process - await Task.Delay(TimeSpan.FromSeconds(18)); + await Task.Delay(TimeSpan.FromSeconds(2)); // Events should be dropped Assert.Equal(2, _innerSink.Events.Count); @@ -124,7 +122,7 @@ public async Task WhenQueueFull_ThenBlocks() // Cause a delay when emmitting to the inner sink, allowing us to fill the queue to capacity // after the first event is popped - _innerSink.DelayEmit = true; + _innerSink.DelayEmit = TimeSpan.FromMilliseconds(300); var events = new List { @@ -144,11 +142,11 @@ public async Task WhenQueueFull_ThenBlocks() // subsequent calls, the queue should be full, so we should be blocked if (i > 0) { - Assert.True(sw.ElapsedMilliseconds > 2000, "Should block the caller when the queue is full"); + Assert.True(sw.ElapsedMilliseconds > 200, "Should block the caller when the queue is full"); } }); - await Task.Delay(TimeSpan.FromSeconds(12)); + await Task.Delay(TimeSpan.FromSeconds(2)); // No events should be dropped Assert.Equal(3, _innerSink.Events.Count); diff --git a/test/Serilog.Sinks.Async.Tests/Support/MemorySink.cs b/test/Serilog.Sinks.Async.Tests/Support/MemorySink.cs index 44e178f..2945863 100644 --- a/test/Serilog.Sinks.Async.Tests/Support/MemorySink.cs +++ b/test/Serilog.Sinks.Async.Tests/Support/MemorySink.cs @@ -10,12 +10,12 @@ public class MemorySink : ILogEventSink { public ConcurrentBag Events { get; } = new ConcurrentBag(); public bool ThrowAfterCollecting { get; set; } - public bool DelayEmit { get; set; } + public TimeSpan? DelayEmit { get; set; } public void Emit(LogEvent logEvent) { - if (DelayEmit) - Task.Delay(TimeSpan.FromSeconds(3)).Wait(); + if (DelayEmit.HasValue) + Task.Delay(DelayEmit.Value).Wait(); Events.Add(logEvent); From 3e5a186188ca5e0c087fb4c66fac7b940f1fc866 Mon Sep 17 00:00:00 2001 From: cocowalla Date: Mon, 7 Aug 2017 19:48:41 +0100 Subject: [PATCH 3/3] Remove _sink field Instantiate sink in in test instead. Response to feedback in #21 --- .../BackgroundWorkerSinkSpec.cs | 175 +++++++++--------- 1 file changed, 92 insertions(+), 83 deletions(-) diff --git a/test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs b/test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs index 1198e10..76f124b 100644 --- a/test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs +++ b/test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs @@ -11,22 +11,15 @@ namespace Serilog.Sinks.Async.Tests { - public class BackgroundWorkerSinkSpec : IDisposable + public class BackgroundWorkerSinkSpec { readonly Logger _logger; readonly MemorySink _innerSink; - BackgroundWorkerSink _sink; public BackgroundWorkerSinkSpec() { _innerSink = new MemorySink(); _logger = new LoggerConfiguration().WriteTo.Sink(_innerSink).CreateLogger(); - _sink = new BackgroundWorkerSink(_logger, 10000, false); - } - - public void Dispose() - { - _sink.Dispose(); } [Fact] @@ -38,118 +31,134 @@ public void WhenCtorWithNullSink_ThenThrows() [Fact] public async Task WhenEmitSingle_ThenRelaysToInnerSink() { - var logEvent = CreateEvent(); - _sink.Emit(logEvent); + using (var sink = this.CreateSinkWithDefaultOptions()) + { + var logEvent = CreateEvent(); + + sink.Emit(logEvent); - await Task.Delay(TimeSpan.FromSeconds(3)); + await Task.Delay(TimeSpan.FromSeconds(3)); - Assert.Equal(1, _innerSink.Events.Count); + Assert.Equal(1, _innerSink.Events.Count); + } } [Fact] public async Task WhenInnerEmitThrows_ThenContinuesRelaysToInnerSink() { - _innerSink.ThrowAfterCollecting = true; - - var events = new List + using (var sink = this.CreateSinkWithDefaultOptions()) { - CreateEvent(), - CreateEvent(), - CreateEvent() - }; - events.ForEach(e => _sink.Emit(e)); + _innerSink.ThrowAfterCollecting = true; - await Task.Delay(TimeSpan.FromSeconds(3)); + var events = new List + { + CreateEvent(), + CreateEvent(), + CreateEvent() + }; + events.ForEach(e => sink.Emit(e)); + + await Task.Delay(TimeSpan.FromSeconds(3)); - Assert.Equal(3, _innerSink.Events.Count); + Assert.Equal(3, _innerSink.Events.Count); + } } [Fact] public async Task WhenEmitMultipleTimes_ThenRelaysToInnerSink() { - var events = new List + using (var sink = this.CreateSinkWithDefaultOptions()) { - CreateEvent(), - CreateEvent(), - CreateEvent() - }; - - events.ForEach(e => { _sink.Emit(e); }); + var events = new List + { + CreateEvent(), + CreateEvent(), + CreateEvent() + }; + events.ForEach(e => { sink.Emit(e); }); - await Task.Delay(TimeSpan.FromSeconds(3)); + await Task.Delay(TimeSpan.FromSeconds(3)); - Assert.Equal(3, _innerSink.Events.Count); + Assert.Equal(3, _innerSink.Events.Count); + } } [Fact] public async Task WhenQueueFull_ThenDropsEvents() { - _sink = new BackgroundWorkerSink(_logger, 1, false); - - // Cause a delay when emmitting to the inner sink, allowing us to fill the queue to capacity - // after the first event is popped - _innerSink.DelayEmit = TimeSpan.FromMilliseconds(300); - - var events = new List + using (var sink = new BackgroundWorkerSink(_logger, 1, false)) { - CreateEvent(), - CreateEvent(), - CreateEvent(), - CreateEvent(), - CreateEvent() - }; - events.ForEach(e => - { - var sw = Stopwatch.StartNew(); - _sink.Emit(e); - sw.Stop(); + // Cause a delay when emmitting to the inner sink, allowing us to fill the queue to capacity + // after the first event is popped + _innerSink.DelayEmit = TimeSpan.FromMilliseconds(300); + + var events = new List + { + CreateEvent(), + CreateEvent(), + CreateEvent(), + CreateEvent(), + CreateEvent() + }; + events.ForEach(e => + { + var sw = Stopwatch.StartNew(); + sink.Emit(e); + sw.Stop(); - Assert.True(sw.ElapsedMilliseconds < 200, "Should not block the caller when the queue is full"); - }); + Assert.True(sw.ElapsedMilliseconds < 200, "Should not block the caller when the queue is full"); + }); - // If we *weren't* dropped events, the delay in the inner sink would mean the 5 events would take - // at least 15 seconds to process - await Task.Delay(TimeSpan.FromSeconds(2)); + // If we *weren't* dropped events, the delay in the inner sink would mean the 5 events would take + // at least 15 seconds to process + await Task.Delay(TimeSpan.FromSeconds(2)); - // Events should be dropped - Assert.Equal(2, _innerSink.Events.Count); + // Events should be dropped + Assert.Equal(2, _innerSink.Events.Count); + } } [Fact] public async Task WhenQueueFull_ThenBlocks() { - _sink = new BackgroundWorkerSink(_logger, 1, true); - - // Cause a delay when emmitting to the inner sink, allowing us to fill the queue to capacity - // after the first event is popped - _innerSink.DelayEmit = TimeSpan.FromMilliseconds(300); - - var events = new List + using (var sink = new BackgroundWorkerSink(_logger, 1, true)) { - CreateEvent(), - CreateEvent(), - CreateEvent() - }; + // Cause a delay when emmitting to the inner sink, allowing us to fill the queue to capacity + // after the first event is popped + _innerSink.DelayEmit = TimeSpan.FromMilliseconds(300); - int i = 0; - events.ForEach(e => - { - var sw = Stopwatch.StartNew(); - _sink.Emit(e); - sw.Stop(); - - // Emit should return immediately the first time, since the queue is not yet full. On - // subsequent calls, the queue should be full, so we should be blocked - if (i > 0) + var events = new List { - Assert.True(sw.ElapsedMilliseconds > 200, "Should block the caller when the queue is full"); - } - }); + CreateEvent(), + CreateEvent(), + CreateEvent() + }; - await Task.Delay(TimeSpan.FromSeconds(2)); + int i = 0; + events.ForEach(e => + { + var sw = Stopwatch.StartNew(); + sink.Emit(e); + sw.Stop(); + + // Emit should return immediately the first time, since the queue is not yet full. On + // subsequent calls, the queue should be full, so we should be blocked + if (i > 0) + { + Assert.True(sw.ElapsedMilliseconds > 200, "Should block the caller when the queue is full"); + } + }); + + await Task.Delay(TimeSpan.FromSeconds(2)); + + // No events should be dropped + Assert.Equal(3, _innerSink.Events.Count); + } + } - // No events should be dropped - Assert.Equal(3, _innerSink.Events.Count); + private BackgroundWorkerSink CreateSinkWithDefaultOptions() + { + return new BackgroundWorkerSink(_logger, 10000, false); } private static LogEvent CreateEvent()