Skip to content

Commit

Permalink
Remove dead code
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jun 29, 2019
1 parent 4da2ed4 commit 57fd7f2
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Removed
### Fixed

- Idling logic bug [#13](https://github.com/jet/propulsion/pull/13)
- Gorging -> Tailing transition [#10](https://github.com/jet/propulsion/issues/10) [#13](https://github.com/jet/propulsion/pull/13)

<a name="1.0.1-rc3"></a>
Expand Down
6 changes: 3 additions & 3 deletions src/Propulsion.EventStore/EventStoreReader.fs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ let pullStream (conn : IEventStoreConnection, batchSize) (stream,pos,limit : int

/// Walks the $all stream, yielding batches together with the associated Position info for the purposes of checkpointing
/// Can throw (in which case the caller is in charge of retrying, possibly with a smaller batch size)
type [<NoComparison>] PullResult = Exn of exn: exn | Eof of Position | EndOfTranche
type [<NoComparison>] PullResult = Exn of exn: exn | Eof | EndOfTranche
let pullAll (slicesStats : SliceStatsBuffer, overallStats : OverallStats) (conn : IEventStoreConnection, batchSize)
(range:Range, once) (tryMapEvent : ResolvedEvent -> StreamEvent<_> option) categorize (postBatch : Position -> StreamEvent<_>[] -> Async<int*int>) =
let sw = Stopwatch.StartNew() // we'll report the warmup/connect time on the first batch
Expand All @@ -153,7 +153,7 @@ let pullAll (slicesStats : SliceStatsBuffer, overallStats : OverallStats) (conn
range.Current.CommitPosition, range.PositionAsRangePercentage, (let e = sw.Elapsed in e.TotalSeconds), mb batchBytes,
batchEvents, cats.Count, streams.Count, events.Length, (let e = postSw.Elapsed in e.TotalSeconds), cur, max)
if not (range.TryNext currentSlice.NextPosition && not once && not currentSlice.IsEndOfStream) then
if currentSlice.IsEndOfStream then return Eof currentSlice.NextPosition
if currentSlice.IsEndOfStream then return Eof
else return EndOfTranche
else
sw.Restart() // restart the clock as we hand off back to the Reader
Expand Down Expand Up @@ -225,7 +225,7 @@ type EventStoreReader(conns : _ [], defaultBatchSize, minBatchSize, categorize,
Log.Information("Commencing tranche, batch size {bs}", batchSize)
let! t, res = pullAll (slicesStats, overallStats) (conn, batchSize) (range, false) tryMapEvent categorize postBatch |> Stopwatch.Time
match res with
| PullResult.Eof _pos ->
| PullResult.Eof ->
Log.Warning("completed tranche AND REACHED THE END in {ms:n3}m", let e = t.Elapsed in e.TotalMinutes)
let! _ = post (Res.EndOfChunk series) in ()
work.Enqueue EofDetected
Expand Down

0 comments on commit 57fd7f2

Please sign in to comment.