Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fulfills #20 - add option to block when the queue is full, instead of dropping events #21

Merged
merged 3 commits into from
Aug 7, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System;
using System.ComponentModel;
using Serilog.Configuration;

using Serilog.Sinks.Async;

namespace Serilog
Expand All @@ -16,18 +16,40 @@ public static class LoggerConfigurationAsyncExtensions
/// <param name="loggerSinkConfiguration">The <see cref="LoggerSinkConfiguration"/> being configured.</param>
/// <param name="configure">An action that configures the wrapped sink.</param>
/// <param name="bufferSize">The size of the concurrent queue used to feed the background worker thread. If
/// the thread is unable to process events quickly enough and the queue is filled, subsequent events will be
/// the thread is unable to process events quickly enough and the queue is filled, subsequent events will be
/// dropped until room is made in the queue.</param>
/// <returns>A <see cref="LoggerConfiguration"/> allowing configuration to continue.</returns>
[EditorBrowsable(EditorBrowsableState.Never)]
public static LoggerConfiguration Async(
this LoggerSinkConfiguration loggerSinkConfiguration,
Action<LoggerSinkConfiguration> configure,
int bufferSize = 10000)
int bufferSize)
{
return loggerSinkConfiguration.Async(configure, bufferSize, false);
}

/// <summary>
/// Configure a sink to be invoked asynchronously, on a background worker thread.
/// </summary>
/// <param name="loggerSinkConfiguration">The <see cref="LoggerSinkConfiguration"/> being configured.</param>
/// <param name="configure">An action that configures the wrapped sink.</param>
/// <param name="bufferSize">The size of the concurrent queue used to feed the background worker thread. If
/// the thread is unable to process events quickly enough and the queue is filled, depending on
/// <paramref name="blockWhenFull"/> the queue will block or subsequent events will be dropped until
/// room is made in the queue.</param>
/// <param name="blockWhenFull">Block when the queue is full, instead of dropping events.</param>
/// <returns>A <see cref="LoggerConfiguration"/> allowing configuration to continue.</returns>
public static LoggerConfiguration Async(
this LoggerSinkConfiguration loggerSinkConfiguration,
Action<LoggerSinkConfiguration> configure,
int bufferSize = 10000,
bool blockWhenFull = false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is used by quite a lot of downstream packages, we'll need to follow the old pattern of adding an overload with the old parameter set, marked [EditorBrowsable(EditorBrowsableState.Never)] with no default values applied.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I figured using a default would preserve current behaviour for existing users. Wasn't aware of that attribute

{
return LoggerSinkConfiguration.Wrap(
loggerSinkConfiguration,
wrappedSink => new BackgroundWorkerSink(wrappedSink, bufferSize),
wrappedSink => new BackgroundWorkerSink(wrappedSink, bufferSize, blockWhenFull),
configure);
}

}
}
54 changes: 31 additions & 23 deletions src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,55 +12,63 @@ sealed class BackgroundWorkerSink : ILogEventSink, IDisposable
{
readonly ILogEventSink _pipeline;
readonly int _bufferCapacity;
volatile bool _disposed;
readonly CancellationTokenSource _cancel = new CancellationTokenSource();
readonly bool _blockWhenFull;
readonly BlockingCollection<LogEvent> _queue;
readonly Task _worker;

public BackgroundWorkerSink(ILogEventSink pipeline, int bufferCapacity)
public BackgroundWorkerSink(ILogEventSink pipeline, int bufferCapacity, bool blockWhenFull)
{
if (pipeline == null) throw new ArgumentNullException(nameof(pipeline));
if (bufferCapacity <= 0) throw new ArgumentOutOfRangeException(nameof(bufferCapacity));
_pipeline = pipeline;
_bufferCapacity = bufferCapacity;
_blockWhenFull = blockWhenFull;
_queue = new BlockingCollection<LogEvent>(_bufferCapacity);
_worker = Task.Factory.StartNew(Pump, CancellationToken.None, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
}

public void Emit(LogEvent logEvent)
{
// The disposed check is racy, but only ensures we don't prevent flush from
// completing by pushing more events.
if (!_disposed && !_queue.TryAdd(logEvent))
SelfLog.WriteLine("{0} unable to enqueue, capacity {1}", typeof(BackgroundWorkerSink), _bufferCapacity);
if (this._queue.IsAddingCompleted)
return;

try
{
if (_blockWhenFull)
{
_queue.Add(logEvent);
}
else
{
if (!_queue.TryAdd(logEvent))
SelfLog.WriteLine("{0} unable to enqueue, capacity {1}", typeof(BackgroundWorkerSink), _bufferCapacity);
}
}
catch (InvalidOperationException)
{
// Thrown in the event of a race condition when we try to add another event after
// CompleteAdding has been called
}
}

public void Dispose()
{
_disposed = true;
_cancel.Cancel();
_worker.Wait();
// Prevent any more events from being added
_queue.CompleteAdding();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why this wasn't used originally - looks like a good change to make. Can we grab some benchmark output from dev vs the PR branch, just to make sure perf wasn't an issue with this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, it's late now so will add something tomorrow

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue with this one is that TryAdd can potentially throw with InvalidOperationException on some race condition

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@skomis-mm yes, you're right - it's probably best to just do away with the disposed check in Emit and wrap it in a try...catch for InvalidOperationException


// Allow queued events to be flushed
_worker.Wait();

(_pipeline as IDisposable)?.Dispose();
// _cancel not disposed, because it will make _cancel.Cancel() non-idempotent
}

void Pump()
{
try
{
try
{
while (true)
{
var next = _queue.Take(_cancel.Token);
_pipeline.Emit(next);
}
}
catch (OperationCanceledException)
foreach (var next in _queue.GetConsumingEnumerable())
{
LogEvent next;
while (_queue.TryTake(out next))
_pipeline.Emit(next);
_pipeline.Emit(next);
}
}
catch (Exception ex)
Expand Down
82 changes: 77 additions & 5 deletions test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Serilog.Core;
using Serilog.Events;
using Serilog.Parsing;
using Serilog.Sinks.Async.Tests;
using Serilog.Sinks.Async.Tests.Support;
using Xunit;

namespace Serilog.Sinks.Async.Tests
{
public class BackgroundWorkerSinkSpec : IDisposable
{
readonly Logger _logger;
readonly MemorySink _innerSink;
readonly BackgroundWorkerSink _sink;
BackgroundWorkerSink _sink;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since some tests now use _sink as initialized, and others replace it, perhaps it would be clearer to get rid of this field altogether and use something like:

var sink = CreateWithDefaultOptions();

in each test that needs it, where the method would just be a shortcut to new BackgroundWorkerSink(_logger, 10000, false)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, BackgroundWorkerSinkSpec implements IDisposable so it has a Dispose() method which dispoes of the existing _sink field. If we still want to properly dispose of the sink that would mean wrapping creation in a using block in every test, e.g.:

using (var sink = CreateWithDefaultOptions())
{
  // Test here
}

This adds quite a bit of monkey code to each test - do you still want to do it that way, or stick with the way it is?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I think I'd still probably prefer the using block to the mutated field - easier to refactor, and less opportunity for odd behavior (e.g. in the current version of the PR, we don't dispose _sink before replacing it).


public BackgroundWorkerSinkSpec()
{
_innerSink = new MemorySink();
var logger = new LoggerConfiguration().WriteTo.Sink(_innerSink).CreateLogger();
_sink = new BackgroundWorkerSink(logger, 10000);
_logger = new LoggerConfiguration().WriteTo.Sink(_innerSink).CreateLogger();
_sink = new BackgroundWorkerSink(_logger, 10000, false);
}

public void Dispose()
Expand All @@ -31,7 +32,7 @@ public void Dispose()
[Fact]
public void WhenCtorWithNullSink_ThenThrows()
{
Assert.Throws<ArgumentNullException>(() => new BackgroundWorkerSink(null, 10000));
Assert.Throws<ArgumentNullException>(() => new BackgroundWorkerSink(null, 10000, false));
}

[Fact]
Expand Down Expand Up @@ -80,6 +81,77 @@ public async Task WhenEmitMultipleTimes_ThenRelaysToInnerSink()
Assert.Equal(3, _innerSink.Events.Count);
}

[Fact]
public async Task WhenQueueFull_ThenDropsEvents()
{
_sink = new BackgroundWorkerSink(_logger, 1, false);

// Cause a delay when emmitting to the inner sink, allowing us to fill the queue to capacity
// after the first event is popped
_innerSink.DelayEmit = TimeSpan.FromMilliseconds(300);

var events = new List<LogEvent>
{
CreateEvent(),
CreateEvent(),
CreateEvent(),
CreateEvent(),
CreateEvent()
};
events.ForEach(e =>
{
var sw = Stopwatch.StartNew();
_sink.Emit(e);
sw.Stop();

Assert.True(sw.ElapsedMilliseconds < 200, "Should not block the caller when the queue is full");
});

// If we *weren't* dropped events, the delay in the inner sink would mean the 5 events would take
// at least 15 seconds to process
await Task.Delay(TimeSpan.FromSeconds(2));

// Events should be dropped
Assert.Equal(2, _innerSink.Events.Count);
}

[Fact]
public async Task WhenQueueFull_ThenBlocks()
{
_sink = new BackgroundWorkerSink(_logger, 1, true);

// Cause a delay when emmitting to the inner sink, allowing us to fill the queue to capacity
// after the first event is popped
_innerSink.DelayEmit = TimeSpan.FromMilliseconds(300);

var events = new List<LogEvent>
{
CreateEvent(),
CreateEvent(),
CreateEvent()
};

int i = 0;
events.ForEach(e =>
{
var sw = Stopwatch.StartNew();
_sink.Emit(e);
sw.Stop();

// Emit should return immediately the first time, since the queue is not yet full. On
// subsequent calls, the queue should be full, so we should be blocked
if (i > 0)
{
Assert.True(sw.ElapsedMilliseconds > 200, "Should block the caller when the queue is full");
}
});

await Task.Delay(TimeSpan.FromSeconds(2));

// No events should be dropped
Assert.Equal(3, _innerSink.Events.Count);
}

private static LogEvent CreateEvent()
{
return new LogEvent(DateTimeOffset.MaxValue, LogEventLevel.Error, null,
Expand Down
7 changes: 1 addition & 6 deletions test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkTests.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
using System;
using System.Threading;
using Serilog.Core;
using Serilog.Events;
using Serilog.Parsing;
using Serilog.Sinks.Async.Tests.Support;
using Serilog.Sinks.Async.Tests.Support;
using Xunit;

namespace Serilog.Sinks.Async.Tests
Expand Down
5 changes: 5 additions & 0 deletions test/Serilog.Sinks.Async.Tests/Support/MemorySink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,21 @@
using Serilog.Core;
using System.Collections.Concurrent;
using System;
using System.Threading.Tasks;

namespace Serilog.Sinks.Async.Tests.Support
{
public class MemorySink : ILogEventSink
{
public ConcurrentBag<LogEvent> Events { get; } = new ConcurrentBag<LogEvent>();
public bool ThrowAfterCollecting { get; set; }
public TimeSpan? DelayEmit { get; set; }

public void Emit(LogEvent logEvent)
{
if (DelayEmit.HasValue)
Task.Delay(DelayEmit.Value).Wait();

Events.Add(logEvent);

if (ThrowAfterCollecting)
Expand Down