Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingester/Submitter: Remove sleeps #154

Merged
merged 9 commits into from
Aug 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Propulsion.DynamoStore`: `Equinox.CosmosStore`-equivalent functionality for `Equinox.DynamoStore`. Combines elements of `CosmosStore`, `SqlStreamStore`, `Feed` [#140](https://github.com/jet/propulsion/pull/140)
- `Propulsion.Tool`: `checkpoint` commandline option; enables viewing or overriding checkpoints [#141](https://github.com/jet/propulsion/pull/141)
- `Propulsion.Tool`: Add support for [autoscaling throughput](https://docs.microsoft.com/en-us/azure/cosmos-db/provision-throughput-autoscale) of Cosmos containers and databases [#142](https://github.com/jet/propulsion/pull/142) :pray: [@brihadish](https://github.com/brihadish)
- `Ingester`: Expose optional `ingesterStatsInterval` control [#154](https://github.com/jet/propulsion/pull/154)

### Changed

Expand All @@ -25,6 +26,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Propulsion.EventStore`: Pinned to target `Equinox.EventStore` v `[3.0.7`-`3.99.0]` **Deprecated; Please migrate to `Propulsion.EventStoreDb`** [#139](https://github.com/jet/propulsion/pull/139)
- `Propulsion.EventStoreDb.EventStoreSource`: Changed API to match`Propulsion.SqlStreamStore` API rather than`Propulsion.EventStore` [#139](https://github.com/jet/propulsion/pull/139)
- `Kafka`: Target `FsCodec.NewtonsoftJson` v `3.0.0` [#139](https://github.com/jet/propulsion/pull/139)
- `Ingester`,`Submitter`: Replaced `Async.Sleep` with `Task.WhenAny`; Condensed logging [#154](https://github.com/jet/propulsion/pull/154)

### Removed

Expand All @@ -34,6 +36,7 @@ The `Unreleased` section name is replaced by the expected version of next releas

- `Propulsion.SqlStreamStore`: Replaced incorrect/meaningless stream name for `SqlStreamStoreSource` [#139](https://github.com/jet/propulsion/pull/139)
- Synced [`AwaitTaskCorrect`](http://www.fssnip.net/7Rc/title/AsyncAwaitTaskCorrect) with official version [3c11142](https://github.com/jet/propulsion/commit/3c11142b75bf3b0ef2181fd106a4b17c0b2313ef)
- `Projector`,`Ingester`,`Submitter`, `Scheduler`: Deterministic shutdown via Cancellation and/or unhandled exceptions [#154](https://github.com/jet/propulsion/pull/154)

<a name="2.12.2"></a>
## [2.12.2] - 2022-03-10
Expand Down
10 changes: 7 additions & 3 deletions src/Propulsion.Cosmos/CosmosPruner.fs
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,15 @@ type CosmosPruner =
// Default 5m
?statsInterval,
// Default 5m
?stateInterval, ?ingesterStatsInterval, ?maxSubmissionsPerPartition, ?pumpInterval,
?stateInterval,
?maxSubmissionsPerPartition,
// Delay when no items available. Default 10ms.
?idleDelay,
// Frequency with which to jettison Write Position information for inactive streams in order to limit memory consumption
// NOTE: Can impair performance and/or increase costs of writes as it inhibits the ability of the ingester to discard redundant inputs
?purgeInterval)
?purgeInterval,
// Defaults to statsInterval
?ingesterStatsInterval)
: Propulsion.ProjectorPipeline<_> =
let idleDelay = defaultArg idleDelay (TimeSpan.FromMilliseconds 10.)
let statsInterval, stateInterval = defaultArg statsInterval (TimeSpan.FromMinutes 5.), defaultArg stateInterval (TimeSpan.FromMinutes 5.)
Expand All @@ -133,4 +136,5 @@ type CosmosPruner =
let streamScheduler = Pruner.StreamSchedulingEngine.Create(pruneUntil, dispatcher, stats, dumpStreams, idleDelay=idleDelay, ?purgeInterval=purgeInterval)
Propulsion.Streams.Projector.StreamsProjectorPipeline.Start(
log, dispatcher.Pump(), streamScheduler.Pump, maxReadAhead, streamScheduler.Submit, statsInterval,
?ingesterStatsInterval=ingesterStatsInterval, ?maxSubmissionsPerPartition=maxSubmissionsPerPartition, ?pumpInterval=pumpInterval)
?maxSubmissionsPerPartition = maxSubmissionsPerPartition,
?ingesterStatsInterval = ingesterStatsInterval)
8 changes: 5 additions & 3 deletions src/Propulsion.Cosmos/CosmosSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ type CosmosSink =
?statsInterval,
// Default 5m
?stateInterval,
?ingesterStatsInterval, ?maxSubmissionsPerPartition, ?pumpInterval,
?maxSubmissionsPerPartition,
// Tune the sleep time when there are no items to schedule or responses to process. Default 1ms.
?idleDelay,
// Frequency with which to jettison Write Position information for inactive streams in order to limit memory consumption
Expand All @@ -178,7 +178,8 @@ type CosmosSink =
// Default: 16384
?maxEvents,
// Default: 1MB (limited by maximum size of a CosmosDB stored procedure invocation)
?maxBytes)
?maxBytes,
?ingesterStatsInterval)
: Propulsion.ProjectorPipeline<_> =
let statsInterval, stateInterval = defaultArg statsInterval (TimeSpan.FromMinutes 5.), defaultArg stateInterval (TimeSpan.FromMinutes 5.)
let stats = Internal.Stats(log.ForContext<Internal.Stats>(), statsInterval, stateInterval)
Expand All @@ -187,4 +188,5 @@ type CosmosSink =
let streamScheduler = Internal.StreamSchedulingEngine.Create(log, cosmosContexts, dispatcher, stats, dumpStreams, ?idleDelay=idleDelay, ?purgeInterval=purgeInterval, ?maxEvents=maxEvents, ?maxBytes=maxBytes)
Propulsion.Streams.Projector.StreamsProjectorPipeline.Start(
log, dispatcher.Pump(), streamScheduler.Pump, maxReadAhead, streamScheduler.Submit, statsInterval,
?ingesterStatsInterval=ingesterStatsInterval, ?maxSubmissionsPerPartition=maxSubmissionsPerPartition, ?pumpInterval=pumpInterval)
?maxSubmissionsPerPartition = maxSubmissionsPerPartition,
?ingesterStatsInterval = ingesterStatsInterval)
10 changes: 7 additions & 3 deletions src/Propulsion.CosmosStore/CosmosStorePruner.fs
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,12 @@ type CosmosStorePruner =
/// Default 5m
?statsInterval,
/// Default 5m
?stateInterval, ?ingesterStatsInterval, ?maxSubmissionsPerPartition, ?pumpInterval,
?stateInterval,
?maxSubmissionsPerPartition,
/// Delay when no items available. Default 10ms.
?idleDelay)
?idleDelay,
// Defaults to statsInterval
?ingesterStatsInterval)
: Propulsion.ProjectorPipeline<_> =
let idleDelay = defaultArg idleDelay (TimeSpan.FromMilliseconds 10.)
let statsInterval, stateInterval = defaultArg statsInterval (TimeSpan.FromMinutes 5.), defaultArg stateInterval (TimeSpan.FromMinutes 5.)
Expand All @@ -128,4 +131,5 @@ type CosmosStorePruner =
let streamScheduler = Pruner.StreamSchedulingEngine.Create(pruneUntil, dispatcher, stats, dumpStreams, idleDelay=idleDelay)
Propulsion.Streams.Projector.StreamsProjectorPipeline.Start(
log, dispatcher.Pump(), streamScheduler.Pump, maxReadAhead, streamScheduler.Submit, statsInterval,
?ingesterStatsInterval=ingesterStatsInterval, ?maxSubmissionsPerPartition=maxSubmissionsPerPartition, ?pumpInterval=pumpInterval)
?maxSubmissionsPerPartition = maxSubmissionsPerPartition,
?ingesterStatsInterval = ingesterStatsInterval)
8 changes: 5 additions & 3 deletions src/Propulsion.CosmosStore/CosmosStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,14 @@ type CosmosStoreSink =
?statsInterval,
/// Default 5m
?stateInterval,
?ingesterStatsInterval, ?maxSubmissionsPerPartition, ?pumpInterval,
?maxSubmissionsPerPartition,
/// Tune the sleep time when there are no items to schedule or responses to process. Default 1ms.
?idleDelay,
/// Default: 16384
?maxEvents,
/// Default: 1MB (limited by maximum size of a CosmosDB stored procedure invocation)
?maxBytes)
?maxBytes,
?ingesterStatsInterval)
: Propulsion.ProjectorPipeline<_> =
let statsInterval, stateInterval = defaultArg statsInterval (TimeSpan.FromMinutes 5.), defaultArg stateInterval (TimeSpan.FromMinutes 5.)
let stats = Internal.Stats(log.ForContext<Internal.Stats>(), statsInterval, stateInterval)
Expand All @@ -189,4 +190,5 @@ type CosmosStoreSink =
let streamScheduler = Internal.StreamSchedulingEngine.Create(log, eventsContext, dispatcher, stats, dumpStreams, ?idleDelay=idleDelay, ?maxEvents=maxEvents, ?maxBytes=maxBytes)
Propulsion.Streams.Projector.StreamsProjectorPipeline.Start(
log, dispatcher.Pump(), streamScheduler.Pump, maxReadAhead, streamScheduler.Submit, statsInterval,
?ingesterStatsInterval=ingesterStatsInterval, ?maxSubmissionsPerPartition=maxSubmissionsPerPartition, ?pumpInterval=pumpInterval)
?maxSubmissionsPerPartition = maxSubmissionsPerPartition,
?ingesterStatsInterval = ingesterStatsInterval)
2 changes: 1 addition & 1 deletion src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

<ItemGroup>
<PackageReference Include="MinVer" Version="4.0.0" PrivateAssets="All" />

<!-- NOTE Code here leans on the fact that https://github.com/dotnet/fsharp/issues/13165 is resolved in 6.0.5 -->
<!-- <PackageReference Include="FSharp.Core" Version="6.0.5" />-->

Expand Down
9 changes: 6 additions & 3 deletions src/Propulsion.EventStore/EventStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,14 @@ type EventStoreSink =
// Default 5m
?statsInterval,
// Default 5m
?stateInterval, ?ingesterStatsInterval, ?maxSubmissionsPerPartition, ?pumpInterval,
?stateInterval,
?maxSubmissionsPerPartition,
// Tune the sleep time when there are no items to schedule or responses to process. Default 1ms.
?idleDelay,
// Frequency with which to jettison Write Position information for inactive streams in order to limit memory consumption
// NOTE: Can impair performance and/or increase costs of writes as it inhibits the ability of the ingester to discard redundant inputs
?purgeInterval)
?purgeInterval,
?ingesterStatsInterval)
: Propulsion.ProjectorPipeline<_> =
let statsInterval, stateInterval = defaultArg statsInterval (TimeSpan.FromMinutes 5.), defaultArg stateInterval (TimeSpan.FromMinutes 5.)
let stats = Internal.Stats(log.ForContext<Internal.Stats>(), statsInterval, stateInterval)
Expand All @@ -186,4 +188,5 @@ type EventStoreSink =
let streamScheduler = Internal.EventStoreSchedulingEngine.Create(log, storeLog, connections, dispatcher, stats, dumpStats, ?idleDelay=idleDelay, ?purgeInterval=purgeInterval)
Propulsion.Streams.Projector.StreamsProjectorPipeline.Start(
log, dispatcher.Pump(), streamScheduler.Pump, maxReadAhead, streamScheduler.Submit, statsInterval,
?ingesterStatsInterval=ingesterStatsInterval, ?maxSubmissionsPerPartition=maxSubmissionsPerPartition, ?pumpInterval=pumpInterval)
?maxSubmissionsPerPartition = maxSubmissionsPerPartition,
?ingesterStatsInterval = ingesterStatsInterval)
Loading