Skip to content

Commit

Permalink
Complete IAsyncLogEventSinkMonitor impl
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Apr 25, 2018
1 parent 6546958 commit bcbf490
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 52 deletions.
19 changes: 14 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,26 @@ In practice, this configuration (assuming one provisions an adequate `bufferSize

```csharp
// Example check: log message to an out of band alarm channel if logging is showing signs of getting overwhelmed
void PeriodicMonitorCheck(IQueueState inspector)
void PeriodicMonitorCheck(IAsyncLogEventSinkState inspector)
{
var usagePct = inspector.Count * 100 / inspector.BoundedCapacity;
if (usagePct > 50) SelfLog.WriteLine("Log buffer exceeded {0:p0} usage (limit: {1})", usagePct, inspector.BoundedCapacity);
}

// Allow a backlog of up to 10,000 items to be maintained (dropping extras if full)
.WriteTo.Async(a => a.File("logs/myapp.log"), inspector: out Async.IAsyncLogEventSinkState inspector) ...
class MonitorConfiguration : IAsyncLogEventSinkMonitor
{
// Wire the inspector through to application health monitoring and/or metrics
// in order to periodically emit a metric, raise an alarm, etc.
public void MonitorState(IAsyncLogEventSinkState state) =>
HealthMonitor.RegisterAsyncLogSink(() => PeriodicMonitorCheck(state));

public void StopMonitoring(IAsyncLogEventSinkState state)
{ /* reverse of MonitorState */ }
}

// Wire the inspector through to application health monitoring and/or metrics in order to periodically emit a metric, raise an alarm, etc.
... HealthMonitor.RegisterAsyncLogSink(inspector);
// Configure to drop events if > 10,000 backlog; request supplying of inspector to Monitor
var monitor = new MonitorConfiguration();
.WriteTo.Async(a => a.File("logs/myapp.log"), monitor: monitor) ...
```

### Blocking
Expand Down
46 changes: 3 additions & 43 deletions src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,59 +59,19 @@ public static LoggerConfiguration Async(
/// <paramref name="blockWhenFull"/> the queue will block or subsequent events will be dropped until
/// room is made in the queue.</param>
/// <param name="blockWhenFull">Block when the queue is full, instead of dropping events.</param>
/// <param name="monitor">Monitor to supply buffer information to. If the monitor implements <see cref="IDisposable"/>, <c>Dispose()</c> will be called to advise of the Sink being <c>Dispose()</c>d.</param>
/// <param name="monitor">Monitor to supply buffer information to.</param>
/// <returns>A <see cref="LoggerConfiguration"/> allowing configuration to continue.</returns>
public static LoggerConfiguration Async(
this LoggerSinkConfiguration loggerSinkConfiguration,
Action<LoggerSinkConfiguration> configure,
IAsyncLogEventSinkMonitor monitor,
int bufferSize,
bool blockWhenFull)
{
return LoggerSinkConfiguration.Wrap(
loggerSinkConfiguration,
wrappedSink =>
{
var sink = new BackgroundWorkerSink(wrappedSink, bufferSize, blockWhenFull, monitor);
monitor?.MonitorState(sink);
return sink;
},
configure);
}

/// <summary>
/// Configure a sink to be invoked asynchronously, on a background worker thread.
/// Provides an <paramref name="inspector"/> that can be used to check the live state of the buffer for health monitoring purposes.
/// </summary>
/// <param name="loggerSinkConfiguration">The <see cref="LoggerSinkConfiguration"/> being configured.</param>
/// <param name="configure">An action that configures the wrapped sink.</param>
/// <param name="bufferSize">The size of the concurrent queue used to feed the background worker thread. If
/// the thread is unable to process events quickly enough and the queue is filled, depending on
/// <paramref name="blockWhenFull"/> the queue will block or subsequent events will be dropped until
/// room is made in the queue.</param>
/// <param name="blockWhenFull">Block when the queue is full, instead of dropping events.</param>
/// <param name="inspector">Provides a way to inspect the state of the queue for health monitoring purposes.</param>
/// <returns>A <see cref="LoggerConfiguration"/> allowing configuration to continue.</returns>
public static LoggerConfiguration Async(
this LoggerSinkConfiguration loggerSinkConfiguration,
Action<LoggerSinkConfiguration> configure,
out IAsyncLogEventSinkState inspector,
int bufferSize = 10000,
bool blockWhenFull = false)
{
// Cannot assign directly to the out param from within the lambda, so we need a temp
IAsyncLogEventSinkState stateLens = null;
var result = LoggerSinkConfiguration.Wrap(
return LoggerSinkConfiguration.Wrap(
loggerSinkConfiguration,
wrappedSink =>
{
var sink = new BackgroundWorkerSink(wrappedSink, bufferSize, blockWhenFull, null);
stateLens = sink;
return sink;
},
wrappedSink => new BackgroundWorkerSink(wrappedSink, bufferSize, blockWhenFull, monitor),
configure);
inspector = stateLens;
return result;
}
}
}
5 changes: 3 additions & 2 deletions src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public BackgroundWorkerSink(ILogEventSink pipeline, int bufferCapacity, bool blo
_queue = new BlockingCollection<LogEvent>(bufferCapacity);
_worker = Task.Factory.StartNew(Pump, CancellationToken.None, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
_monitor = monitor;
monitor?.MonitorState(this);
}

public void Emit(LogEvent logEvent)
Expand Down Expand Up @@ -65,7 +66,7 @@ public void Dispose()

(_pipeline as IDisposable)?.Dispose();

(_monitor as IDisposable)?.Dispose();
_monitor?.StopMonitoring(this);
}

void Pump()
Expand All @@ -89,4 +90,4 @@ void Pump()

long IAsyncLogEventSinkState.DroppedMessagesCount => _droppedMessages;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,18 @@
/// <summary>
/// Defines a mechanism for the Async Sink to provide buffer metadata to facilitate integration into system health checking.
/// </summary>
/// <remarks>If the instance implements <see cref="System.IDisposable"/>, it will be <c>Dispose()</c>d at then time the Sink is.</remarks>
public interface IAsyncLogEventSinkMonitor
{
/// <summary>
/// Invoked by Sink to supply the buffer state hook to the monitor.
/// </summary>
/// <param name="state">The Async Sink's state information interface.</param>
void MonitorState(IAsyncLogEventSinkState state);

/// <summary>
/// Invoked by Sink to indicate that the supplied buffer is being Disposed.
/// </summary>
/// <param name="state">The Async Sink's state information interface.</param>
void StopMonitoring(IAsyncLogEventSinkState state);
}
}
19 changes: 18 additions & 1 deletion test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Serilog.Core;
using Serilog.Events;
Expand Down Expand Up @@ -188,16 +189,30 @@ public async Task GivenConfiguredToBlock_WhenQueueFilled_ThenBlocks()
}
}

class DummyMonitor : IAsyncLogEventSinkMonitor
{
IAsyncLogEventSinkState inspector;
public IAsyncLogEventSinkState Inspector => inspector;

void IAsyncLogEventSinkMonitor.MonitorState(IAsyncLogEventSinkState state) =>
inspector = state;

void IAsyncLogEventSinkMonitor.StopMonitoring(IAsyncLogEventSinkState state) =>
Interlocked.CompareExchange(ref inspector, null, state);
}

[Fact]
public void InspectorOutParameterAffordsHealthMonitoringHook()
{
var collector = new MemorySink { DelayEmit = TimeSpan.FromSeconds(2) };
// 2 spaces in queue; 1 would make the second log entry eligible for dropping if consumer does not activate instantaneously
var bufferSize = 2;
var monitor = new DummyMonitor();
using (var logger = new LoggerConfiguration()
.WriteTo.Async(w => w.Sink(collector), bufferSize: 2, inspector: out IAsyncLogEventSinkState inspector)
.WriteTo.Async(w => w.Sink(collector), bufferSize: 2, monitor: monitor)
.CreateLogger())
{
var inspector = monitor.Inspector;
Assert.Equal(bufferSize, inspector.BufferSize);
Assert.Equal(0, inspector.Count);
Assert.Equal(0, inspector.DroppedMessagesCount);
Expand All @@ -222,6 +237,8 @@ public void InspectorOutParameterAffordsHealthMonitoringHook()
// Because messages wait 2 seconds, the only real way to get one into the buffer is with a debugger breakpoint or a sleep
Assert.InRange(collector.Events.Count, 0, 3);
}
// Dispose should trigger a StopMonitoring call
Assert.Null(monitor.Inspector);
}

private BackgroundWorkerSink CreateSinkWithDefaultOptions()
Expand Down

0 comments on commit bcbf490

Please sign in to comment.