Skip to content

Commit

Permalink
Merge pull request #30 from bartelink/cleanup
Browse files Browse the repository at this point in the history
Cover blockWhenFull in README; update xUnit; minor tidying
  • Loading branch information
nblumhardt committed Apr 19, 2018
2 parents 870c52e + fda2f4d commit f22d170
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 51 deletions.
22 changes: 17 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ Install from [NuGet](https://nuget.org/packages/serilog.sinks.async):
Install-Package Serilog.Sinks.Async
```

Assuming you have already installed the target sink, such as the rolling file sink, move the wrapped sink's configuration within a `WriteTo.Async()` statement:
Assuming you have already installed the target sink, such as the file sink, move the wrapped sink's configuration within a `WriteTo.Async()` statement:

```csharp
Log.Logger = new LoggerConfiguration()
.WriteTo.Async(a => a.RollingFile("logs/myapp-{Date}.txt"))
.WriteTo.Async(a => a.File("logs/myapp.log"))
// Other logger configuration
.CreateLogger()

Expand All @@ -26,7 +26,7 @@ Log.Information("This will be written to disk on the worker thread");
Log.CloseAndFlush();
```

The wrapped sink (`RollingFile` in this case) will be invoked on a worker thread while your application's thread gets on with more important stuff.
The wrapped sink (`File` in this case) will be invoked on a worker thread while your application's thread gets on with more important stuff.

Because the memory buffer may contain events that have not yet been written to the target sink, it is important to call `Log.CloseAndFlush()` or `Logger.Dispose()` when the application exits.

Expand All @@ -36,7 +36,19 @@ The default memory buffer feeding the worker thread is capped to 10,000 items, a

```csharp
// Reduce the buffer to 500 events
.WriteTo.Async(a => a.RollingFile("logs/myapp-{Date}.txt"), 500)
.WriteTo.Async(a => a.File("logs/myapp.log"), bufferSize: 500)
```

### Blocking

Warning: For the same reason one typically does not want exceptions from logging to leak into the execution path, one typically does not want a logger to be able to have the side-efect of actually interrupting application processing until the log propagation has been unblocked.

When the buffer size limit is reached, the default behavior is to drop any further attempted writes until the queue abates, reporting each such failure to the `Serilog.Debugging.SelfLog`. To replace this with a blocking behaviour, set `blockWhenFull` to `true`.

```csharp
// Wait for any queued event to be accepted by the `File` log before allowing the calling thread
// to resume its application work after a logging call when there are 10,000 LogEvents waiting
.WriteTo.Async(a => a.File("logs/myapp.log"), blockWhenFull: true)
```

### XML `<appSettings>` and JSON configuration
Expand All @@ -50,7 +62,7 @@ Using [Serilog.Settings.Configuration](https://github.com/serilog/serilog-settin
"Name": "Async",
"Args": {
"configure": [{
"Name": "LiterateConsole"
"Name": "Console"
}]
}
}]
Expand Down
6 changes: 3 additions & 3 deletions src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public static class LoggerConfigurationAsyncExtensions
/// <param name="loggerSinkConfiguration">The <see cref="LoggerSinkConfiguration"/> being configured.</param>
/// <param name="configure">An action that configures the wrapped sink.</param>
/// <param name="bufferSize">The size of the concurrent queue used to feed the background worker thread. If
/// the thread is unable to process events quickly enough and the queue is filled, subsequent events will be
/// the thread is unable to process events quickly enough and the queue is filled, subsequent events will be
/// dropped until room is made in the queue.</param>
/// <returns>A <see cref="LoggerConfiguration"/> allowing configuration to continue.</returns>
[EditorBrowsable(EditorBrowsableState.Never)]
Expand All @@ -34,8 +34,8 @@ public static LoggerConfiguration Async(
/// <param name="loggerSinkConfiguration">The <see cref="LoggerSinkConfiguration"/> being configured.</param>
/// <param name="configure">An action that configures the wrapped sink.</param>
/// <param name="bufferSize">The size of the concurrent queue used to feed the background worker thread. If
/// the thread is unable to process events quickly enough and the queue is filled, depending on
/// <paramref name="blockWhenFull"/> the queue will block or subsequent events will be dropped until
/// the thread is unable to process events quickly enough and the queue is filled, depending on
/// <paramref name="blockWhenFull"/> the queue will block or subsequent events will be dropped until
/// room is made in the queue.</param>
/// <param name="blockWhenFull">Block when the queue is full, instead of dropping events.</param>
/// <returns>A <see cref="LoggerConfiguration"/> allowing configuration to continue.</returns>
Expand Down
12 changes: 5 additions & 7 deletions src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using Serilog.Core;
using Serilog.Debugging;
using Serilog.Events;
using System.Threading.Tasks;

namespace Serilog.Sinks.Async
{
sealed class BackgroundWorkerSink : ILogEventSink, IDisposable
{
readonly ILogEventSink _pipeline;
readonly int _bufferCapacity;
readonly bool _blockWhenFull;
readonly BlockingCollection<LogEvent> _queue;
readonly Task _worker;
Expand All @@ -21,15 +20,14 @@ public BackgroundWorkerSink(ILogEventSink pipeline, int bufferCapacity, bool blo
if (pipeline == null) throw new ArgumentNullException(nameof(pipeline));
if (bufferCapacity <= 0) throw new ArgumentOutOfRangeException(nameof(bufferCapacity));
_pipeline = pipeline;
_bufferCapacity = bufferCapacity;
_blockWhenFull = blockWhenFull;
_queue = new BlockingCollection<LogEvent>(_bufferCapacity);
_queue = new BlockingCollection<LogEvent>(bufferCapacity);
_worker = Task.Factory.StartNew(Pump, CancellationToken.None, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
}

public void Emit(LogEvent logEvent)
{
if (this._queue.IsAddingCompleted)
if (_queue.IsAddingCompleted)
return;

try
Expand All @@ -41,12 +39,12 @@ public void Emit(LogEvent logEvent)
else
{
if (!_queue.TryAdd(logEvent))
SelfLog.WriteLine("{0} unable to enqueue, capacity {1}", typeof(BackgroundWorkerSink), _bufferCapacity);
SelfLog.WriteLine("{0} unable to enqueue, capacity {1}", typeof(BackgroundWorkerSink), _queue.BoundedCapacity);
}
}
catch (InvalidOperationException)
{
// Thrown in the event of a race condition when we try to add another event after
// Thrown in the event of a race condition when we try to add another event after
// CompleteAdding has been called
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.0.0" />
<PackageReference Include="xunit" Version="2.2.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.2.0" />
<PackageReference Include="xunit" Version="2.3.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" />
<PackageReference Include="BenchmarkDotNet" Version="0.10.6" />
<PackageReference Include="Serilog.Sinks.File" Version="3.1.0" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public void WhenAuditSingle_ThenQueued()

var result = RetrieveEvents(_memorySink, 1);

Assert.Equal(1, result.Count);
Assert.Single(result);
}

[Fact]
Expand Down
89 changes: 59 additions & 30 deletions test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public async Task WhenEmitSingle_ThenRelaysToInnerSink()

await Task.Delay(TimeSpan.FromSeconds(3));

Assert.Equal(1, _innerSink.Events.Count);
Assert.Single(_innerSink.Events);
}
}

Expand Down Expand Up @@ -84,46 +84,75 @@ public async Task WhenEmitMultipleTimes_ThenRelaysToInnerSink()
}

[Fact]
public async Task WhenQueueFull_ThenDropsEvents()
public async Task GivenDefaultConfig_WhenQueueOverCapacity_DoesNotBlock()
{
using (var sink = new BackgroundWorkerSink(_logger, 1, false))
var batchTiming = Stopwatch.StartNew();
using (var sink = new BackgroundWorkerSink(_logger, 1, blockWhenFull: false /*default*/))
{
// Cause a delay when emmitting to the inner sink, allowing us to fill the queue to capacity
// after the first event is popped
_innerSink.DelayEmit = TimeSpan.FromMilliseconds(300);

var events = new List<LogEvent>
{
CreateEvent(),
CreateEvent(),
CreateEvent(),
CreateEvent(),
CreateEvent()
};
events.ForEach(e =>
// Cause a delay when emmitting to the inner sink, allowing us to easily fill the queue to capacity
// while the first event is being propagated
var acceptInterval = TimeSpan.FromMilliseconds(500);
_innerSink.DelayEmit = acceptInterval;
var tenSecondsWorth = 10_000 / acceptInterval.TotalMilliseconds + 1;
for (int i = 0; i < tenSecondsWorth; i++)
{
var sw = Stopwatch.StartNew();
sink.Emit(e);
sw.Stop();
var emissionTiming = Stopwatch.StartNew();
sink.Emit(CreateEvent());
emissionTiming.Stop();

Assert.True(sw.ElapsedMilliseconds < 200, "Should not block the caller when the queue is full");
});
// Should not block the caller when the queue is full
Assert.InRange(emissionTiming.ElapsedMilliseconds, 0, 200);
}

// If we *weren't* dropped events, the delay in the inner sink would mean the 5 events would take
// at least 15 seconds to process
await Task.Delay(TimeSpan.FromSeconds(2));
// Allow at least one to propagate
await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false);
}
// Sanity check the overall timing
batchTiming.Stop();
// Need to add a significant fudge factor as AppVeyor build can result in `await` taking quite some time
Assert.InRange(batchTiming.ElapsedMilliseconds, 950, 2050);
}

[Fact]
public async Task GivenDefaultConfig_WhenRequestsOverCapacity_ThenDropsEventsAndRecovers()
{
using (var sink = new BackgroundWorkerSink(_logger, 1, blockWhenFull: false /*default*/))
{
var acceptInterval = TimeSpan.FromMilliseconds(200);
_innerSink.DelayEmit = acceptInterval;

// Events should be dropped
Assert.Equal(2, _innerSink.Events.Count);
for (int i = 0; i < 2; i++)
{
sink.Emit(CreateEvent());
sink.Emit(CreateEvent());
await Task.Delay(acceptInterval);
sink.Emit(CreateEvent());
}
// Wait for the buffer and propagation to complete
await Task.Delay(TimeSpan.FromSeconds(1));
// Now verify things are back to normal; emit an event...
var finalEvent = CreateEvent();
sink.Emit(finalEvent);
// ... give adequate time for it to be guaranteed to have percolated through
await Task.Delay(TimeSpan.FromSeconds(1));

// At least one of the preceding events should not have made it through
var propagatedExcludingFinal =
from e in _innerSink.Events
where !Object.ReferenceEquals(finalEvent, e)
select e;
Assert.InRange(2, 2 * 3 / 2 - 1, propagatedExcludingFinal.Count());
// Final event should have made it through
Assert.Contains(_innerSink.Events, x => Object.ReferenceEquals(finalEvent, x));
}
}

[Fact]
public async Task WhenQueueFull_ThenBlocks()
public async Task GivenConfiguredToBlock_WhenQueueFilled_ThenBlocks()
{
using (var sink = new BackgroundWorkerSink(_logger, 1, true))
using (var sink = new BackgroundWorkerSink(_logger, 1, blockWhenFull: true))
{
// Cause a delay when emmitting to the inner sink, allowing us to fill the queue to capacity
// Cause a delay when emmitting to the inner sink, allowing us to fill the queue to capacity
// after the first event is popped
_innerSink.DelayEmit = TimeSpan.FromMilliseconds(300);

Expand All @@ -141,7 +170,7 @@ public async Task WhenQueueFull_ThenBlocks()
sink.Emit(e);
sw.Stop();
// Emit should return immediately the first time, since the queue is not yet full. On
// Emit should return immediately the first time, since the queue is not yet full. On
// subsequent calls, the queue should be full, so we should be blocked
if (i > 0)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void DisposeCompletesWithoutWorkPerformed()
{
}

Assert.Equal(0, collector.Events.Count);
Assert.Empty(collector.Events);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.0.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.2.0" />
<PackageReference Include="xunit" Version="2.2.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" />
<PackageReference Include="xunit" Version="2.3.1" />
</ItemGroup>

<ItemGroup>
Expand Down

0 comments on commit f22d170

Please sign in to comment.