Skip to content

Commit

Permalink
Add outcome to prepare, make producing optional (#50)
Browse files Browse the repository at this point in the history
* Add outcome to prepare signature
* make actually generating an event optional
  • Loading branch information
bartelink committed Mar 11, 2020
1 parent 94a08ee commit 08c834f
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 22 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ The `Unreleased` section name is replaced by the expected version of next releas
## [Unreleased]

### Added

- `StreamsProducerSink`: Added overload to facilitate `prepare` make production of an output `option`al and admitting processing of Stats [#50](https://github.com/jet/propulsion/pull/50)

### Changed

- `StreamsSync`: Add `* 'outcome` to `handle` function signature [#50](https://github.com/jet/propulsion/pull/50)
- Update to `3.1.101` SDK
- Retarget `netcoreapp2.1` apps to `netcoreapp3.1` (not least to make tool traverse proxies on Windows)

Expand Down
56 changes: 45 additions & 11 deletions src/Propulsion.Kafka/ProducerSinks.fs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ type ParallelProducerSink =
Parallel.ParallelProjector.Start(Log.Logger, maxReadAhead, maxDop, handle >> Async.Catch, statsInterval=statsInterval, logExternalStats = producer.DumpStats)

type StreamsProducerSink =

static member Start
( log : ILogger, maxReadAhead, maxConcurrentStreams, prepare, producer : Producer,
?statsInterval, ?stateInterval,
( log : ILogger, maxReadAhead, maxConcurrentStreams,
prepare : StreamName * StreamSpan<_> -> Async<(string*string) option * 'Outcome>,
producer : Producer,
stats : Streams.Sync.StreamsSyncStats<'Outcome>, projectorStatsInterval,
/// Default .5 ms
?idleDelay,
/// Default 1 MiB
Expand All @@ -32,14 +35,45 @@ type StreamsProducerSink =
: ProjectorPipeline<_> =
let maxBytes = (defaultArg maxBytes (1024*1024 - (*fudge*)4096))
let handle (stream : StreamName, span) = async {
let! (key : string, message : string) = prepare (stream, span)
match message.Length with
| x when x > maxBytes -> log.Warning("Message on {stream} had String.Length {length} Queue length {queueLen}", stream, x, span.events.Length)
| _ -> ()
let! _ = producer.ProduceAsync(key,message)
return span.index + span.events.LongLength
let! (maybeMsg, outcome : 'Outcome) = prepare (stream, span)
match maybeMsg with
| Some (key : string, message : string) ->
match message.Length with
| x when x > maxBytes -> log.Warning("Message on {stream} had String.Length {length} Queue length {queueLen}", stream, x, span.events.Length)
| _ -> ()
let! _ = producer.ProduceAsync(key,message) in ()
| None -> ()
return span.index + span.events.LongLength, outcome
}
Sync.StreamsSync.Start
( log, maxReadAhead, maxConcurrentStreams, handle,
maxBytes=maxBytes, ?statsInterval = statsInterval, ?stateInterval = stateInterval, ?idleDelay=idleDelay,
?maxEvents=maxEvents, ?maxBatches=maxBatches, ?maxCycles=maxCycles, dumpExternalStats=producer.DumpStats)
( log, maxReadAhead, maxConcurrentStreams, handle, stats, projectorStatsInterval = projectorStatsInterval,
maxBytes=maxBytes, ?idleDelay=idleDelay,
?maxEvents=maxEvents, ?maxBatches=maxBatches, ?maxCycles=maxCycles, dumpExternalStats=producer.DumpStats)

static member Start
( log : ILogger, maxReadAhead, maxConcurrentStreams,
prepare : StreamName * StreamSpan<_> -> Async<string*string>,
producer : Producer,
?statsInterval, ?stateInterval,
/// Default .5 ms
?idleDelay,
/// Default 1 MiB
?maxBytes,
/// Default 16384
?maxEvents,
/// Max scheduling readahead. Default 128.
?maxBatches,
/// Max inner cycles per loop. Default 128.
?maxCycles)
: ProjectorPipeline<_> =
let statsInterval, stateInterval = defaultArg statsInterval (TimeSpan.FromMinutes 5.), defaultArg stateInterval (TimeSpan.FromMinutes 5.)
let stats = Streams.Sync.StreamsSyncStats<unit>(log.ForContext<Sync.StreamsSyncStats<unit>>(), statsInterval, stateInterval)
let prepare (stream,span) = async {
let! k,v = prepare (stream,span)
return Some (k,v), ()
}
StreamsProducerSink.Start
( log, maxReadAhead, maxConcurrentStreams,
prepare, producer, stats, projectorStatsInterval = statsInterval,
?idleDelay=idleDelay, ?maxBytes = maxBytes,
?maxEvents=maxEvents, ?maxBatches=maxBatches, ?maxCycles=maxCycles)
24 changes: 13 additions & 11 deletions src/Propulsion/Streams.fs
Original file line number Diff line number Diff line change
Expand Up @@ -856,8 +856,8 @@ type StreamsProjector =

module Sync =

type StreamsSyncStats(log : ILogger, statsInterval, stateInterval) =
inherit Scheduling.StreamSchedulerStats<Projector.OkResult<TimeSpan>, Projector.FailResult>(log, statsInterval, stateInterval)
type StreamsSyncStats<'Outcome>(log : ILogger, statsInterval, stateInterval) =
inherit Scheduling.StreamSchedulerStats<Projector.OkResult<TimeSpan * 'Outcome>, Projector.FailResult>(log, statsInterval, stateInterval)
let okStreams, failStreams = HashSet(), HashSet()
let prepareStats = Internal.LatencyStats("prepare")
let mutable okEvents, okBytes, exnEvents, exnBytes = 0, 0L, 0, 0L
Expand All @@ -874,7 +874,7 @@ module Sync =
base.Handle message
match message with
| Scheduling.InternalMessage.Added _ -> () // Processed by standard logging already; we have nothing to add
| Scheduling.InternalMessage.Result (_duration, (stream, Choice1Of2 (_, (es, bs), prepareElapsed))) ->
| Scheduling.InternalMessage.Result (_duration, (stream, Choice1Of2 (_, (es, bs), (prepareElapsed, _)))) ->
adds stream okStreams
okEvents <- okEvents + es
okBytes <- okBytes + int64 bs
Expand All @@ -887,8 +887,10 @@ module Sync =
type StreamsSync =

static member Start
( log : ILogger, maxReadAhead, maxConcurrentStreams, handle,
?statsInterval, ?stateInterval,
( log : ILogger, maxReadAhead, maxConcurrentStreams,
handle : StreamName * StreamSpan<_> -> Async<int64 * 'Outcome>,
stats : Scheduling.StreamSchedulerStats<_,_>,
?projectorStatsInterval,
/// Default .5 ms
?idleDelay,
/// Default 1 MiB
Expand All @@ -902,16 +904,16 @@ module Sync =
/// Hook to wire in external stats
?dumpExternalStats)
: ProjectorPipeline<_> =
let statsInterval, stateInterval = defaultArg statsInterval (TimeSpan.FromMinutes 5.), defaultArg stateInterval (TimeSpan.FromMinutes 5.)
let projectorStatsInterval = defaultArg projectorStatsInterval (TimeSpan.FromMinutes 5.)

let maxBatches, maxEvents, maxBytes = defaultArg maxBatches 128, defaultArg maxEvents 16384, (defaultArg maxBytes (1024 * 1024 - (*fudge*)4096))
let stats = StreamsSyncStats(log.ForContext<StreamsSyncStats>(), statsInterval, stateInterval)

let attemptWrite (item : Scheduling.DispatchItem<_>) = async {
let (eventCount, bytesCount), span = Buffering.StreamSpan.slice (maxEvents, maxBytes) item.span
let sw = System.Diagnostics.Stopwatch.StartNew()
try let! version' = handle (item.stream, span)
try let! (version', outcome) = handle (item.stream, span)
let prepareElapsed = sw.Elapsed
return Choice1Of2 (version', (eventCount, bytesCount), prepareElapsed)
return Choice1Of2 (version', (eventCount, bytesCount), (prepareElapsed, outcome))
with e -> return Choice2Of2 ((eventCount, bytesCount), e) }

let interpretWriteResultProgress _streams (stream : StreamName) = function
Expand All @@ -927,8 +929,8 @@ module Sync =

let dispatcher = Scheduling.MultiDispatcher<_, _>(itemDispatcher, attemptWrite, interpretWriteResultProgress, stats, dumpStreams)
let streamScheduler =
Scheduling.StreamSchedulingEngine<Projector.OkResult<TimeSpan>, Projector.FailResult>
Scheduling.StreamSchedulingEngine<Projector.OkResult<TimeSpan * 'Outcome>, Projector.FailResult>
( dispatcher, maxBatches = maxBatches, maxCycles = defaultArg maxCycles 128, idleDelay = defaultArg idleDelay (TimeSpan.FromMilliseconds 0.5))

Projector.StreamsProjectorPipeline.Start(
log, itemDispatcher.Pump(), streamScheduler.Pump, maxReadAhead, streamScheduler.Submit, statsInterval, maxSubmissionsPerPartition = maxBatches)
log, itemDispatcher.Pump(), streamScheduler.Pump, maxReadAhead, streamScheduler.Submit, projectorStatsInterval, maxSubmissionsPerPartition = maxBatches)

0 comments on commit 08c834f

Please sign in to comment.