From 9a91dbe41f9dea23900b2ce43cf4439d90cb85a5 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Sat, 29 Jun 2019 02:10:18 +0100 Subject: [PATCH] Remove dead code --- CHANGELOG.md | 1 + src/Propulsion.EventStore/EventStoreReader.fs | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cbd1840b..0441be82 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Removed ### Fixed +- Idling logic bug [#13](https://github.com/jet/propulsion/pull/13) - Gorging -> Tailing transition [#10](https://github.com/jet/propulsion/issues/10) [#13](https://github.com/jet/propulsion/pull/13) diff --git a/src/Propulsion.EventStore/EventStoreReader.fs b/src/Propulsion.EventStore/EventStoreReader.fs index f0a40bc9..f3214435 100644 --- a/src/Propulsion.EventStore/EventStoreReader.fs +++ b/src/Propulsion.EventStore/EventStoreReader.fs @@ -133,7 +133,7 @@ let pullStream (conn : IEventStoreConnection, batchSize) (stream,pos,limit : int /// Walks the $all stream, yielding batches together with the associated Position info for the purposes of checkpointing /// Can throw (in which case the caller is in charge of retrying, possibly with a smaller batch size) -type [] PullResult = Exn of exn: exn | Eof of Position | EndOfTranche +type [] PullResult = Exn of exn: exn | Eof | EndOfTranche let pullAll (slicesStats : SliceStatsBuffer, overallStats : OverallStats) (conn : IEventStoreConnection, batchSize) (range:Range, once) (tryMapEvent : ResolvedEvent -> StreamEvent<_> option) categorize (postBatch : Position -> StreamEvent<_>[] -> Async) = let sw = Stopwatch.StartNew() // we'll report the warmup/connect time on the first batch @@ -153,7 +153,7 @@ let pullAll (slicesStats : SliceStatsBuffer, overallStats : OverallStats) (conn range.Current.CommitPosition, range.PositionAsRangePercentage, (let e = sw.Elapsed in e.TotalSeconds), mb batchBytes, batchEvents, cats.Count, streams.Count, events.Length, (let e = postSw.Elapsed in e.TotalSeconds), cur, max) if not (range.TryNext currentSlice.NextPosition && not once && not currentSlice.IsEndOfStream) then - if currentSlice.IsEndOfStream then return Eof currentSlice.NextPosition + if currentSlice.IsEndOfStream then return Eof else return EndOfTranche else sw.Restart() // restart the clock as we hand off back to the Reader @@ -225,7 +225,7 @@ type EventStoreReader(conns : _ [], defaultBatchSize, minBatchSize, categorize, Log.Information("Commencing tranche, batch size {bs}", batchSize) let! t, res = pullAll (slicesStats, overallStats) (conn, batchSize) (range, false) tryMapEvent categorize postBatch |> Stopwatch.Time match res with - | PullResult.Eof _pos -> + | PullResult.Eof -> Log.Warning("completed tranche AND REACHED THE END in {ms:n3}m", let e = t.Elapsed in e.TotalMinutes) let! _ = post (Res.EndOfChunk series) in () work.Enqueue EofDetected