Skip to content

Commit

Permalink
Target Jckfs 1.1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jun 28, 2019
1 parent e2548b1 commit 653c1b0
Show file tree
Hide file tree
Showing 10 changed files with 42 additions and 352 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Added
### Changed

- Targeted `Jet.ConfluentKafka.FSharp` v `1.1.0` [#10](https://github.com/jet/propulsion/pull/10)

### Removed
### Fixed

Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ The components within this repository are delivered as a multi-targeted Nuget pa
- `Propulsion` [![NuGet](https://img.shields.io/nuget/vpre/Propulsion.svg)](https://www.nuget.org/packages/Propulsion/) Implements core functionality in a channel-independent fashion including `ParallelProjector`, `StreamsProjector`. [Depends](https://www.fuget.org/packages/Propulsion) on `MathNet.Numerics`, `Serilog`
- `Propulsion.Cosmos` [![NuGet](https://img.shields.io/nuget/vpre/Propulsion.Cosmos.svg)](https://www.nuget.org/packages/Propulsion.Cosmos/) Provides bindings to Azure CosmosDb a) writing to `Equinox.Cosmos` :- `CosmosSink` b) reading from CosmosDb's changefeed by wrapping the [`dotnet-changefeedprocessor` library](https://github.com/Azure/azure-documentdb-changefeedprocessor-dotnet) :- `CosmosSource`. [Depends](https://www.fuget.org/packages/Propulsion.Cosmos) on `Equinox.Cosmos`, `Microsoft.Azure.DocumentDB.ChangeFeedProcessor`, `Serilog`
- `Propulsion.EventStore` [![NuGet](https://img.shields.io/nuget/vpre/Propulsion.EventStore.svg)](https://www.nuget.org/packages/Propulsion.EventStore/). Provides bindings to [EventStore](eventstore.org), writing via `Propulsion.EventStore.EventStoreSink` [Depends](https://www.fuget.org/packages/Propulsion.EventStore) on `Equinox.EventStore`, `Serilog`
- `Propulsion.Kafka` [![NuGet](https://img.shields.io/nuget/vpre/Propulsion.Kafka.svg)](https://www.nuget.org/packages/Propulsion.Kafka/) Provides bindings for producing and consuming both streamwise and in parallel. Includes a standard codec for use with streamwise projection and consumption, `Propulsion.Kafka.Codec.RenderedSpan`. [Depends](https://www.fuget.org/packages/Propulsion.Kafka) on `Jet.ConfluentKafka.FSharp` v ` >= 1.0.1`, `Serilog`
- `Propulsion.Kafka` [![NuGet](https://img.shields.io/nuget/vpre/Propulsion.Kafka.svg)](https://www.nuget.org/packages/Propulsion.Kafka/) Provides bindings for producing and consuming both streamwise and in parallel. Includes a standard codec for use with streamwise projection and consumption, `Propulsion.Kafka.Codec.RenderedSpan`. Implements a `KafkaMonitor` that can log status information based on [Burrow](https://github.com/linkedin/Burrow). [Depends](https://www.fuget.org/packages/Propulsion.Kafka) on `Jet.ConfluentKafka.FSharp` v ` >= 1.1.0`, `Serilog`
- `Propulsion.Kafka0` [![NuGet](https://img.shields.io/nuget/vpre/Propulsion.Kafka0.svg)](https://www.nuget.org/packages/Propulsion.Kafka0/). Same functionality/purpose as `Propulsion.Kafka` but targets older `Confluent.Kafka`/`librdkafka` version for for interoperability with systems that have a hard dependency on that. [Depends](https://www.fuget.org/packages/Propulsion.Kafka0) on `Confluent.Kafka [0.11.3]`, `librdkafka.redist [0.11.4]`, `Serilog`

The ubiquitous `Serilog` dependency is solely on the core module, not any sinks, i.e. you configure to emit to `NLog` etc.
Expand All @@ -20,9 +20,9 @@ The ubiquitous `Serilog` dependency is solely on the core module, not any sinks,

- See [the Jet `dotnet new` templates repo](https://github.com/jet/dotnet-templates) for examples using the packages herein:

- `eqxprojector` template for example `CosmosSource` logic consuming from a CosmosDb `ChangeFeedProcessor`.
- `eqxprojector` template (in `-k` mode) for example producer logic using `StreamsProducer`, `StreamsProjector` and `ParallelProducer`.
- `eqxprojector` template (in `-k` mode) for example consumer logic using `ParallelConsumer` and `StreamsConsumer`.
- `proProjector` template for example `CosmosSource` logic consuming from a CosmosDb `ChangeFeedProcessor`.
- `proProjector` template (in `-k` mode) for example producer logic using `StreamsProducer`, `StreamsProjector` and `ParallelProducer`.
- `proConsumer` template for example consumer logic using `ParallelConsumer` and `StreamsConsumer`.
- `eqxsync` template for examples of binding a `CosmosSource` or `EventStoreSource` to a `CosmosSink` or `EventStoreSink`.

- See [the `Jet.ConfluentKafka.FSharp` repo](https://github.com/jet/Jet.ConfluentKafka.FSharp) for `BatchedProducer` and `BatchedConsumer` implementations (together with the `KafkaConsumerConfig` and `KafkaProducerConfig` used in the Parallel and Streams wrappers in `Propulsion.Kafka`)
Expand Down
1 change: 1 addition & 0 deletions src/Propulsion.Kafka/Bindings.fs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ open System.Collections.Generic
module Bindings =
let mapConsumeResult (x : ConsumeResult<string,string>) = KeyValuePair(x.Key,x.Value)
let inline partitionId (x : ConsumeResult<_,_>) = let p = x.Partition in p.Value
let partitionValue (partition : Partition) = let p = partition in p.Value
let createConsumer log config : IConsumer<string,string> * (unit -> unit) =
let consumer = ConsumerBuilder.WithLogging(log, config)
consumer, consumer.Close
Expand Down
5 changes: 3 additions & 2 deletions src/Propulsion.Kafka/Consumers.fs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ type ConsumerPipeline private (inner : IConsumer<string, string>, task : Task<un
float config.Buffering.maxInFlightBytes / 1024. / 1024. / 1024., (let t = config.Buffering.maxBatchDelay in t.TotalSeconds))
let limiterLog = log.ForContext(Serilog.Core.Constants.SourceContextPropertyName, Core.Constants.messageCounterSourceContext)
let limiter = new Core.InFlightMessageCounter(limiterLog, config.Buffering.minInFlightBytes, config.Buffering.maxInFlightBytes)
let consumer, closeConsumer = Bindings.createConsumer log config // teardown is managed by ingester.Pump()
let consumer, closeConsumer = Bindings.createConsumer log config.Inner // teardown is managed by ingester.Pump()
consumer.Subscribe config.Topics
let ingester = KafkaIngestionEngine<'M>(log, limiter, consumer, closeConsumer, mapResult, submit, emitInterval = config.Buffering.maxBatchDelay, statsInterval = statsInterval)
let cts = new CancellationTokenSource()
let ct = cts.Token
Expand All @@ -137,7 +138,7 @@ type ConsumerPipeline private (inner : IConsumer<string, string>, task : Task<un
log.Information("Exiting pipeline component {name}", name)
with e ->
log.Fatal(e, "Abend from pipeline component {name}", name)
triggerStop() }
triggerStop () }
Async.Start(wrap name f, ct)
// if scheduler encounters a faulted handler, we propagate that as the consumer's Result
let abend (exns : AggregateException) =
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.Kafka/Propulsion.Kafka.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<PackageReference Include="FSharp.Core" Version="3.1.2.5" Condition=" '$(TargetFramework)' == 'net461' " />
<PackageReference Include="FSharp.Core" Version="4.3.4" Condition=" '$(TargetFramework)' != 'net461' " />

<PackageReference Include="Jet.ConfluentKafka.FSharp" Version="1.0.1" />
<PackageReference Include="Jet.ConfluentKafka.FSharp" Version="1.1.0" />
</ItemGroup>

<ItemGroup>
Expand Down
6 changes: 3 additions & 3 deletions src/Propulsion.Kafka0/Bindings.fs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ type IConsumer<'K,'V> = Consumer<'K,'V>
type ConsumeResult<'K,'V> = Message<'K,'V>

module Bindings =
let mapConsumeResult (x : ConsumeResult<string,string>) =
KeyValuePair(x.Key,x.Value)
let mapConsumeResult (x : ConsumeResult<string,string>) = KeyValuePair(x.Key,x.Value)
let inline partitionId (x : ConsumeResult<_,_>) = x.Partition
let partitionValue = id
let createConsumer log config : IConsumer<string,string> * (unit -> unit) =
ConsumerBuilder.WithLogging(log, config)
let inline partitionId (x : ConsumeResult<_,_>) = x.Partition
let inline storeOffset (log : ILogger) (consumer : IConsumer<_,_>) (highWaterMark : ConsumeResult<string,string>) =
try let e = consumer.StoreOffset(highWaterMark)
if e.Error.HasError then log.Error("Consuming... storing offsets failed {@e}", e.Error)
Expand Down
37 changes: 24 additions & 13 deletions src/Propulsion.Kafka0/JetConfluentKafkaFSharpShims.fs
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ type KafkaProducerConfig private (inner, broker : Uri) =
MessageSendMaxRetries = Nullable (defaultArg retries 60), // default 2
Acks = Nullable acks,
SocketKeepaliveEnable = Nullable (defaultArg socketKeepAlive true), // default: false
LogConnectionClose = Nullable false) // https://github.com/confluentinc/confluent-kafka-dotnet/issues/124#issuecomment-289727017
maxInFlight |> Option.iter (fun x -> c.MaxInFlight <- Nullable x) // default 1_000_000
LogConnectionClose = Nullable false, // https://github.com/confluentinc/confluent-kafka-dotnet/issues/124#issuecomment-289727017
MaxInFlight = Nullable (defaultArg maxInFlight 1_000_000)) // default 1_000_000
linger |> Option.iter<TimeSpan> (fun x -> c.LingerMs <- Nullable (int x.TotalMilliseconds)) // default 0
partitioner |> Option.iter (fun x -> c.Partitioner <- x)
partitioner |> Option.iter (fun x -> c.Partitioner <- Nullable x)
compression |> Option.iter (fun x -> c.CompressionType <- Nullable x)
statisticsInterval |> Option.iter<TimeSpan> (fun x -> c.StatisticsIntervalMs <- Nullable (int x.TotalMilliseconds))
custom |> Option.iter (fun xs -> for KeyValue (k,v) in xs do c.Set(k,v))
Expand Down Expand Up @@ -271,8 +271,21 @@ type KafkaPartitionMetrics =
[<JsonProperty("consumer_lag")>]
consumerLag: int64 }

type OffsetValue =
| Unset
| Valid of value: int64
override this.ToString() =
match this with
| Unset -> "Unset"
| Valid value -> value.ToString()
module OffsetValue =
let ofOffset (offset : Offset) =
match offset.Value with
| _ when offset = Offset.Invalid -> Unset
| valid -> Valid valid

type ConsumerBuilder =
static member private WithLogging(log: ILogger, c : Consumer<_,_>, topics, ?onRevoke) =
static member private WithLogging(log: ILogger, c : Consumer<_,_>, ?onRevoke) =
let d1 = c.OnLog.Subscribe(fun m ->
log.Information("Consuming... {message} level={level} name={name} facility={facility}", m.Message, m.Level, m.Name, m.Facility))
let d2 = c.OnError.Subscribe(fun e ->
Expand All @@ -290,14 +303,14 @@ type ConsumerBuilder =
log.Verbose("Consuming... EOF {topic} partition={partition} offset={offset}", tpo.Topic, tpo.Partition, let o = tpo.Offset in o.Value))
let d6 = c.OnOffsetsCommitted.Subscribe(fun cos ->
for t,ps in cos.Offsets |> Seq.groupBy (fun p -> p.Topic) do
let o = [for p in ps -> p.Partition, let o = p.Offset in if o.IsSpecial then box (string o) else box o.Value(*, fmtError p.Error*)]
let o = seq { for p in ps -> p.Partition, OffsetValue.ofOffset p.Offset(*, fmtError p.Error*) }
let e = cos.Error
if not e.HasError then log.Information("Consuming... Committed {topic} {@offsets}", t, o)
else log.Warning("Consuming... Committed {topic} {@offsets} reason={error} code={code} isBrokerError={isBrokerError}", t, o, e.Reason, e.Code, e.IsBrokerError))
if not e.HasError then log.Information("Consuming... Committed {topic} {offsets}", t, o)
else log.Warning("Consuming... Committed {topic} {offsets} reason={error} code={code} isBrokerError={isBrokerError}", t, o, e.Reason, e.Code, e.IsBrokerError))
let d7 = c.OnStatistics.Subscribe(fun json ->
let stats = JToken.Parse json
for t in stats.Item("topics").Children() do
if t.HasValues && topics |> Seq.exists (fun ct -> ct = t.First.Item("topic").ToString()) then
if t.HasValues && c.Subscription |> Seq.exists (fun ct -> ct = t.First.Item("topic").ToString()) then
let topic, partitions = let tm = t.First in tm.Item("topic").ToString(), tm.Item("partitions").Children()
let metrics = [|
for tm in partitions do
Expand All @@ -308,9 +321,7 @@ type ConsumerBuilder =
let totalLag = metrics |> Array.sumBy (fun x -> x.consumerLag)
log.Information("Consuming... Stats {topic:l} totalLag {totalLag} {@stats}", topic, totalLag, metrics))
fun () -> for d in [d1;d2;d3;d4;d5;d6;d7] do d.Dispose()
static member WithLogging(log : ILogger, config, ?onRevoke) =
if List.isEmpty config.topics then invalidArg "config" "must specify at least one topic"
let consumer = new Consumer<_,_>(config.inner.Render(), mkDeserializer(), mkDeserializer())
consumer.Subscribe config.topics
let unsubLog = ConsumerBuilder.WithLogging(log, consumer, config.topics, ?onRevoke = onRevoke)
static member WithLogging(log : ILogger, config : ConsumerConfig, ?onRevoke) =
let consumer = new Consumer<_,_>(config.Render(), mkDeserializer(), mkDeserializer())
let unsubLog = ConsumerBuilder.WithLogging(log, consumer, ?onRevoke = onRevoke)
consumer, unsubLog
2 changes: 1 addition & 1 deletion tests/Propulsion.Kafka.Integration/ParallelIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ type T3(testOutputHelper) =

do! runConsumers log config 1 None
(fun c m -> async {
let partition = let p = m.raw.Partition in p.Value
let partition = Bindings.partitionValue m.raw.Partition

// check per-partition handlers are serialized
let concurrentBatchCell = getBatchPartitionCount partition
Expand Down
Loading

0 comments on commit 653c1b0

Please sign in to comment.