Skip to content

Commit

Permalink
Move Streams.Projector.Stats up one level
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Apr 6, 2021
1 parent aabef52 commit 41c92d7
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 41 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Added
### Changed

- Moved `Propulsion.Streams.Projector.Stats` to `Propulsion.Streams.Stats` [#107](https://github.com/jet/propulsion/pull/107)

### Removed
### Fixed

Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.Cosmos/CosmosPruner.fs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ module Pruner =
| Nop of int

type Stats(log, statsInterval, stateInterval) =
inherit Propulsion.Streams.Projector.Stats<Outcome>(log, statsInterval, stateInterval)
inherit Propulsion.Streams.Stats<Outcome>(log, statsInterval, stateInterval)

let mutable nops, totalRedundant, ops, totalDeletes, totalDeferred = 0, 0, 0, 0, 0

Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.CosmosStore/CosmosStorePruner.fs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ module Pruner =
| Nop of int

type Stats(log, statsInterval, stateInterval) =
inherit Propulsion.Streams.Projector.Stats<Outcome>(log, statsInterval, stateInterval)
inherit Propulsion.Streams.Stats<Outcome>(log, statsInterval, stateInterval)

let mutable nops, totalRedundant, ops, totalDeletes, totalDeferred = 0, 0, 0, 0, 0

Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.EventStore/EventStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type EventStoreSource =

let ingester = sink.StartIngester(log.ForContext("Tranche", "Ingester"), 0)

let initialSeriesId, conns, dop =
let initialSeriesId, conns, dop =
log.Information("Tailing every {intervalS:n1}s TODO with {streamReaders} stream catchup-readers", spec.tailInterval.TotalSeconds, spec.streamReaders)
match spec.gorge with
| Some factor ->
Expand Down
76 changes: 38 additions & 38 deletions src/Propulsion/Streams.fs
Original file line number Diff line number Diff line change
Expand Up @@ -867,45 +867,45 @@ module Scheduling =
: StreamSchedulingEngine<int64 * ('Metrics * unit), 'Stats * unit, 'Stats * exn> =
StreamSchedulingEngine<_, _, _>(dispatcher, ?maxBatches=maxBatches, ?idleDelay=idleDelay, ?enableSlipstreaming=enableSlipstreaming)

module Projector =

[<AbstractClass>]
type Stats<'Outcome>(log : ILogger, statsInterval, statesInterval) =
inherit Scheduling.Stats<EventMetrics * 'Outcome, EventMetrics * exn>(log, statsInterval, statesInterval)
let okStreams, failStreams, badCats, resultOk, resultExnOther = HashSet(), HashSet(), CatStats(), ref 0, ref 0
let mutable okEvents, okBytes, exnEvents, exnBytes = 0, 0L, 0, 0L

override __.DumpStats() =
log.Information("Projected {mb:n0}MB {completed:n0}r {streams:n0}s {events:n0}e ({ok:n0} ok)",
mb okBytes, !resultOk, okStreams.Count, okEvents, !resultOk)
okStreams.Clear(); resultOk := 0; okEvents <- 0; okBytes <- 0L
if !resultExnOther <> 0 then
log.Warning("Exceptions {mb:n0}MB {fails:n0}r {streams:n0}s {events:n0}e",
mb exnBytes, !resultExnOther, failStreams.Count, exnEvents)
resultExnOther := 0; failStreams.Clear(); exnBytes <- 0L; exnEvents <- 0
log.Warning("Affected cats {@badCats}", badCats.StatsDescending)
badCats.Clear()
[<AbstractClass>]
type Stats<'Outcome>(log : ILogger, statsInterval, statesInterval) =
inherit Scheduling.Stats<EventMetrics * 'Outcome, EventMetrics * exn>(log, statsInterval, statesInterval)
let okStreams, failStreams, badCats, resultOk, resultExnOther = HashSet(), HashSet(), CatStats(), ref 0, ref 0
let mutable okEvents, okBytes, exnEvents, exnBytes = 0, 0L, 0, 0L

override __.DumpStats() =
log.Information("Projected {mb:n0}MB {completed:n0}r {streams:n0}s {events:n0}e ({ok:n0} ok)",
mb okBytes, !resultOk, okStreams.Count, okEvents, !resultOk)
okStreams.Clear(); resultOk := 0; okEvents <- 0; okBytes <- 0L
if !resultExnOther <> 0 then
log.Warning("Exceptions {mb:n0}MB {fails:n0}r {streams:n0}s {events:n0}e",
mb exnBytes, !resultExnOther, failStreams.Count, exnEvents)
resultExnOther := 0; failStreams.Clear(); exnBytes <- 0L; exnEvents <- 0
log.Warning("Affected cats {@badCats}", badCats.StatsDescending)
badCats.Clear()

override __.Handle message =
let inline adds x (set : HashSet<_>) = set.Add x |> ignore
let inline bads x (set : HashSet<_>) = badCats.Ingest(StreamName.categorize x); adds x set
base.Handle message
match message with
| Scheduling.Added _ -> () // Processed by standard logging already; we have nothing to add
| Scheduling.Result (_duration, (stream, Choice1Of2 ((es, bs), res))) ->
adds stream okStreams
okEvents <- okEvents + es
okBytes <- okBytes + int64 bs
incr resultOk
__.HandleOk res
| Scheduling.Result (_duration, (stream, Choice2Of2 ((es, bs), exn))) ->
bads stream failStreams
exnEvents <- exnEvents + es
exnBytes <- exnBytes + int64 bs
incr resultExnOther
__.HandleExn(log.ForContext("stream", stream).ForContext("events", es), exn)
abstract member HandleOk : outcome : 'Outcome -> unit
abstract member HandleExn : log : ILogger * exn : exn -> unit

override __.Handle message =
let inline adds x (set : HashSet<_>) = set.Add x |> ignore
let inline bads x (set : HashSet<_>) = badCats.Ingest(StreamName.categorize x); adds x set
base.Handle message
match message with
| Scheduling.Added _ -> () // Processed by standard logging already; we have nothing to add
| Scheduling.Result (_duration, (stream, Choice1Of2 ((es, bs), res))) ->
adds stream okStreams
okEvents <- okEvents + es
okBytes <- okBytes + int64 bs
incr resultOk
__.HandleOk res
| Scheduling.Result (_duration, (stream, Choice2Of2 ((es, bs), exn))) ->
bads stream failStreams
exnEvents <- exnEvents + es
exnBytes <- exnBytes + int64 bs
incr resultExnOther
__.HandleExn(log.ForContext("stream", stream).ForContext("events", es), exn)
abstract member HandleOk : outcome : 'Outcome -> unit
abstract member HandleExn : log : ILogger * exn : exn -> unit
module Projector =

type StreamsIngester =

Expand Down

0 comments on commit 41c92d7

Please sign in to comment.