Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fulfills #20 - add option to block when the queue is full, instead of dropping events #21

Merged
merged 3 commits into from
Aug 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System;
using System.ComponentModel;
using Serilog.Configuration;

using Serilog.Sinks.Async;

namespace Serilog
Expand All @@ -16,18 +16,40 @@ public static class LoggerConfigurationAsyncExtensions
/// <param name="loggerSinkConfiguration">The <see cref="LoggerSinkConfiguration"/> being configured.</param>
/// <param name="configure">An action that configures the wrapped sink.</param>
/// <param name="bufferSize">The size of the concurrent queue used to feed the background worker thread. If
/// the thread is unable to process events quickly enough and the queue is filled, subsequent events will be
/// the thread is unable to process events quickly enough and the queue is filled, subsequent events will be
/// dropped until room is made in the queue.</param>
/// <returns>A <see cref="LoggerConfiguration"/> allowing configuration to continue.</returns>
[EditorBrowsable(EditorBrowsableState.Never)]
public static LoggerConfiguration Async(
this LoggerSinkConfiguration loggerSinkConfiguration,
Action<LoggerSinkConfiguration> configure,
int bufferSize = 10000)
int bufferSize)
{
return loggerSinkConfiguration.Async(configure, bufferSize, false);
}

/// <summary>
/// Configure a sink to be invoked asynchronously, on a background worker thread.
/// </summary>
/// <param name="loggerSinkConfiguration">The <see cref="LoggerSinkConfiguration"/> being configured.</param>
/// <param name="configure">An action that configures the wrapped sink.</param>
/// <param name="bufferSize">The size of the concurrent queue used to feed the background worker thread. If
/// the thread is unable to process events quickly enough and the queue is filled, depending on
/// <paramref name="blockWhenFull"/> the queue will block or subsequent events will be dropped until
/// room is made in the queue.</param>
/// <param name="blockWhenFull">Block when the queue is full, instead of dropping events.</param>
/// <returns>A <see cref="LoggerConfiguration"/> allowing configuration to continue.</returns>
public static LoggerConfiguration Async(
this LoggerSinkConfiguration loggerSinkConfiguration,
Action<LoggerSinkConfiguration> configure,
int bufferSize = 10000,
bool blockWhenFull = false)
{
return LoggerSinkConfiguration.Wrap(
loggerSinkConfiguration,
wrappedSink => new BackgroundWorkerSink(wrappedSink, bufferSize),
wrappedSink => new BackgroundWorkerSink(wrappedSink, bufferSize, blockWhenFull),
configure);
}

}
}
54 changes: 31 additions & 23 deletions src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,55 +12,63 @@ sealed class BackgroundWorkerSink : ILogEventSink, IDisposable
{
readonly ILogEventSink _pipeline;
readonly int _bufferCapacity;
volatile bool _disposed;
readonly CancellationTokenSource _cancel = new CancellationTokenSource();
readonly bool _blockWhenFull;
readonly BlockingCollection<LogEvent> _queue;
readonly Task _worker;

public BackgroundWorkerSink(ILogEventSink pipeline, int bufferCapacity)
public BackgroundWorkerSink(ILogEventSink pipeline, int bufferCapacity, bool blockWhenFull)
{
if (pipeline == null) throw new ArgumentNullException(nameof(pipeline));
if (bufferCapacity <= 0) throw new ArgumentOutOfRangeException(nameof(bufferCapacity));
_pipeline = pipeline;
_bufferCapacity = bufferCapacity;
_blockWhenFull = blockWhenFull;
_queue = new BlockingCollection<LogEvent>(_bufferCapacity);
_worker = Task.Factory.StartNew(Pump, CancellationToken.None, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
}

public void Emit(LogEvent logEvent)
{
// The disposed check is racy, but only ensures we don't prevent flush from
// completing by pushing more events.
if (!_disposed && !_queue.TryAdd(logEvent))
SelfLog.WriteLine("{0} unable to enqueue, capacity {1}", typeof(BackgroundWorkerSink), _bufferCapacity);
if (this._queue.IsAddingCompleted)
return;

try
{
if (_blockWhenFull)
{
_queue.Add(logEvent);
}
else
{
if (!_queue.TryAdd(logEvent))
SelfLog.WriteLine("{0} unable to enqueue, capacity {1}", typeof(BackgroundWorkerSink), _bufferCapacity);
}
}
catch (InvalidOperationException)
{
// Thrown in the event of a race condition when we try to add another event after
// CompleteAdding has been called
}
}

public void Dispose()
{
_disposed = true;
_cancel.Cancel();
_worker.Wait();
// Prevent any more events from being added
_queue.CompleteAdding();

// Allow queued events to be flushed
_worker.Wait();

(_pipeline as IDisposable)?.Dispose();
// _cancel not disposed, because it will make _cancel.Cancel() non-idempotent
}

void Pump()
{
try
{
try
{
while (true)
{
var next = _queue.Take(_cancel.Token);
_pipeline.Emit(next);
}
}
catch (OperationCanceledException)
foreach (var next in _queue.GetConsumingEnumerable())
{
LogEvent next;
while (_queue.TryTake(out next))
_pipeline.Emit(next);
_pipeline.Emit(next);
}
}
catch (Exception ex)
Expand Down
147 changes: 114 additions & 33 deletions test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs
Original file line number Diff line number Diff line change
@@ -1,83 +1,164 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Serilog.Core;
using Serilog.Events;
using Serilog.Parsing;
using Serilog.Sinks.Async.Tests;
using Serilog.Sinks.Async.Tests.Support;
using Xunit;

namespace Serilog.Sinks.Async.Tests
{
public class BackgroundWorkerSinkSpec : IDisposable
public class BackgroundWorkerSinkSpec
{
readonly Logger _logger;
readonly MemorySink _innerSink;
readonly BackgroundWorkerSink _sink;

public BackgroundWorkerSinkSpec()
{
_innerSink = new MemorySink();
var logger = new LoggerConfiguration().WriteTo.Sink(_innerSink).CreateLogger();
_sink = new BackgroundWorkerSink(logger, 10000);
}

public void Dispose()
{
_sink.Dispose();
_logger = new LoggerConfiguration().WriteTo.Sink(_innerSink).CreateLogger();
}

[Fact]
public void WhenCtorWithNullSink_ThenThrows()
{
Assert.Throws<ArgumentNullException>(() => new BackgroundWorkerSink(null, 10000));
Assert.Throws<ArgumentNullException>(() => new BackgroundWorkerSink(null, 10000, false));
}

[Fact]
public async Task WhenEmitSingle_ThenRelaysToInnerSink()
{
var logEvent = CreateEvent();
_sink.Emit(logEvent);
using (var sink = this.CreateSinkWithDefaultOptions())
{
var logEvent = CreateEvent();

sink.Emit(logEvent);

await Task.Delay(TimeSpan.FromSeconds(3));
await Task.Delay(TimeSpan.FromSeconds(3));

Assert.Equal(1, _innerSink.Events.Count);
Assert.Equal(1, _innerSink.Events.Count);
}
}

[Fact]
public async Task WhenInnerEmitThrows_ThenContinuesRelaysToInnerSink()
{
_innerSink.ThrowAfterCollecting = true;

var events = new List<LogEvent>
using (var sink = this.CreateSinkWithDefaultOptions())
{
CreateEvent(),
CreateEvent(),
CreateEvent()
};
events.ForEach(e => _sink.Emit(e));
_innerSink.ThrowAfterCollecting = true;

var events = new List<LogEvent>
{
CreateEvent(),
CreateEvent(),
CreateEvent()
};
events.ForEach(e => sink.Emit(e));

await Task.Delay(TimeSpan.FromSeconds(3));
await Task.Delay(TimeSpan.FromSeconds(3));

Assert.Equal(3, _innerSink.Events.Count);
Assert.Equal(3, _innerSink.Events.Count);
}
}

[Fact]
public async Task WhenEmitMultipleTimes_ThenRelaysToInnerSink()
{
var events = new List<LogEvent>
using (var sink = this.CreateSinkWithDefaultOptions())
{
CreateEvent(),
CreateEvent(),
CreateEvent()
};
var events = new List<LogEvent>
{
CreateEvent(),
CreateEvent(),
CreateEvent()
};
events.ForEach(e => { sink.Emit(e); });

await Task.Delay(TimeSpan.FromSeconds(3));

Assert.Equal(3, _innerSink.Events.Count);
}
}

events.ForEach(e => { _sink.Emit(e); });
[Fact]
public async Task WhenQueueFull_ThenDropsEvents()
{
using (var sink = new BackgroundWorkerSink(_logger, 1, false))
{
// Cause a delay when emmitting to the inner sink, allowing us to fill the queue to capacity
// after the first event is popped
_innerSink.DelayEmit = TimeSpan.FromMilliseconds(300);

var events = new List<LogEvent>
{
CreateEvent(),
CreateEvent(),
CreateEvent(),
CreateEvent(),
CreateEvent()
};
events.ForEach(e =>
{
var sw = Stopwatch.StartNew();
sink.Emit(e);
sw.Stop();

Assert.True(sw.ElapsedMilliseconds < 200, "Should not block the caller when the queue is full");
});

// If we *weren't* dropped events, the delay in the inner sink would mean the 5 events would take
// at least 15 seconds to process
await Task.Delay(TimeSpan.FromSeconds(2));

// Events should be dropped
Assert.Equal(2, _innerSink.Events.Count);
}
}

await Task.Delay(TimeSpan.FromSeconds(3));
[Fact]
public async Task WhenQueueFull_ThenBlocks()
{
using (var sink = new BackgroundWorkerSink(_logger, 1, true))
{
// Cause a delay when emmitting to the inner sink, allowing us to fill the queue to capacity
// after the first event is popped
_innerSink.DelayEmit = TimeSpan.FromMilliseconds(300);

var events = new List<LogEvent>
{
CreateEvent(),
CreateEvent(),
CreateEvent()
};

int i = 0;
events.ForEach(e =>
{
var sw = Stopwatch.StartNew();
sink.Emit(e);
sw.Stop();

// Emit should return immediately the first time, since the queue is not yet full. On
// subsequent calls, the queue should be full, so we should be blocked
if (i > 0)
{
Assert.True(sw.ElapsedMilliseconds > 200, "Should block the caller when the queue is full");
}
});

await Task.Delay(TimeSpan.FromSeconds(2));

// No events should be dropped
Assert.Equal(3, _innerSink.Events.Count);
}
}

Assert.Equal(3, _innerSink.Events.Count);
private BackgroundWorkerSink CreateSinkWithDefaultOptions()
{
return new BackgroundWorkerSink(_logger, 10000, false);
}

private static LogEvent CreateEvent()
Expand Down
7 changes: 1 addition & 6 deletions test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkTests.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
using System;
using System.Threading;
using Serilog.Core;
using Serilog.Events;
using Serilog.Parsing;
using Serilog.Sinks.Async.Tests.Support;
using Serilog.Sinks.Async.Tests.Support;
using Xunit;

namespace Serilog.Sinks.Async.Tests
Expand Down
5 changes: 5 additions & 0 deletions test/Serilog.Sinks.Async.Tests/Support/MemorySink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,21 @@
using Serilog.Core;
using System.Collections.Concurrent;
using System;
using System.Threading.Tasks;

namespace Serilog.Sinks.Async.Tests.Support
{
public class MemorySink : ILogEventSink
{
public ConcurrentBag<LogEvent> Events { get; } = new ConcurrentBag<LogEvent>();
public bool ThrowAfterCollecting { get; set; }
public TimeSpan? DelayEmit { get; set; }

public void Emit(LogEvent logEvent)
{
if (DelayEmit.HasValue)
Task.Delay(DelayEmit.Value).Wait();

Events.Add(logEvent);

if (ThrowAfterCollecting)
Expand Down