Skip to content

Commit

Permalink
Target FsKafka 1.4.3 (#69)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed May 20, 2020
1 parent c6f2ec4 commit 794a485
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 24 deletions.
15 changes: 9 additions & 6 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Added
### Changed

- `Kafka`: Targets [`FsKafka`/`FsKafka0` (and `Confluent.Kafka`) v `1.4.3`](https://github.com/jet/FsKafka/blob/master/CHANGELOG.md#1.4.3) [#68](https://github.com/jet/propulsion/pull/68)

### Removed
### Fixed

Expand All @@ -18,26 +21,26 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Fixed

- `Kafka`: Targets `FsKafka`/`FsKafka0` v `1.5.0-beta.2` to resolve `TypeLoadException`
- `Kafka`: Targets `FsKafka`/`FsKafka0` v `1.5.0-beta.2` to resolve `TypeLoadException` when using `1.5.0-beta.1`

<a name="2.5.1"></a>
## [2.5.1] - 2020-05-14
## [2.5.1] - 2020-05-14 (Unlisted/broken : :point_up:)

### Fixed

- `EventStoreSource`: Fixed `obj` -> `int` type discrepancy re [#63](https://github.com/jet/propulsion/pull/63)

<a name="2.5.0"></a>
## [2.5.0] - 2020-05-13
## [2.5.0] - 2020-05-13 (Unlisted/broken :point_up:)

### Changed

- `Kafka`: Targets [`FsKafka`/`FsKafka0` v `1.5.0-beta.2`](https://github.com/jet/FsKafka/blob/master/CHANGELOG.md#1.4.2) [#64](https://github.com/jet/propulsion/pull/64)
- `Kafka`: Targets [`FsKafka`/`FsKafka0` v `1.4.2`](https://github.com/jet/FsKafka/blob/master/CHANGELOG.md#1.4.2) [#67](https://github.com/jet/propulsion/pull/67)

### Removed

- `Propulsion.Kafka0` Some `Propulsion.Kafka0`-namespaced shimming elements are now found in the `FsKafka` namespace in `FsKafka0` [#64](https://github.com/jet/propulsion/pull/64)
- `Propulsion.Kafka`: `KafkaMonitor` is now found in the `FsKafka` namespace in `FsKafka`/FsKafka0` (NOTE: integration tests continue to live in this repo) [#64](https://github.com/jet/propulsion/pull/64)
- `Propulsion.Kafka0` Some `Propulsion.Kafka0`-namespaced shimming elements are now found in the `FsKafka` namespace in `FsKafka0` [#67](https://github.com/jet/propulsion/pull/67)
- `Propulsion.Kafka`: `KafkaMonitor` is now found in the `FsKafka` namespace in `FsKafka`/FsKafka0` (NOTE: integration tests continue to live in this repo) [#67](https://github.com/jet/propulsion/pull/67)

### Fixed

Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ The components within this repository are delivered as a multi-targeted Nuget pa
- `Propulsion` [![NuGet](https://img.shields.io/nuget/v/Propulsion.svg)](https://www.nuget.org/packages/Propulsion/) Implements core functionality in a channel-independent fashion including `ParallelProjector`, `StreamsProjector`. [Depends](https://www.fuget.org/packages/Propulsion) on `MathNet.Numerics`, `Serilog`
- `Propulsion.Cosmos` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Cosmos.svg)](https://www.nuget.org/packages/Propulsion.Cosmos/) Provides bindings to Azure CosmosDb a) writing to `Equinox.Cosmos` :- `CosmosSink` b) reading from CosmosDb's changefeed by wrapping the [`dotnet-changefeedprocessor` library](https://github.com/Azure/azure-documentdb-changefeedprocessor-dotnet) :- `CosmosSource`. [Depends](https://www.fuget.org/packages/Propulsion.Cosmos) on `Equinox.Cosmos`, `Microsoft.Azure.DocumentDB.ChangeFeedProcessor`, `Serilog`
- `Propulsion.EventStore` [![NuGet](https://img.shields.io/nuget/v/Propulsion.EventStore.svg)](https://www.nuget.org/packages/Propulsion.EventStore/). Provides bindings to [EventStore](https://www.eventstore.org), writing via `Propulsion.EventStore.EventStoreSink` [Depends](https://www.fuget.org/packages/Propulsion.EventStore) on `Equinox.EventStore`, `Serilog`
- `Propulsion.Kafka` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Kafka.svg)](https://www.nuget.org/packages/Propulsion.Kafka/) Provides bindings for producing and consuming both streamwise and in parallel. Includes a standard codec for use with streamwise projection and consumption, `Propulsion.Kafka.Codec.NewtonsoftJson.RenderedSpan`. [Depends](https://www.fuget.org/packages/Propulsion.Kafka) on `FsKafka` v `1.5.0-beta.2`, `Serilog`
- `Propulsion.Kafka0` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Kafka0.svg)](https://www.nuget.org/packages/Propulsion.Kafka0/). Same functionality/purpose as `Propulsion.Kafka` but uses `FsKafka0` instead of `FsKafka` in order to target an older `Confluent.Kafka`/`librdkafka` version pairing for interoperability with systems that have a hard dependency on that. [Depends](https://www.fuget.org/packages/Propulsion.Kafka0) on `FsKafka0` v `1.5.0-beta.2` (which depends on Confluent.Kafka [0.11.3]`, `librdkafka.redist [0.11.4]`), `Serilog`
- `Propulsion.Kafka` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Kafka.svg)](https://www.nuget.org/packages/Propulsion.Kafka/) Provides bindings for producing and consuming both streamwise and in parallel. Includes a standard codec for use with streamwise projection and consumption, `Propulsion.Kafka.Codec.NewtonsoftJson.RenderedSpan`. [Depends](https://www.fuget.org/packages/Propulsion.Kafka) on `FsKafka` v `1.4.3`, `Serilog`
- `Propulsion.Kafka0` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Kafka0.svg)](https://www.nuget.org/packages/Propulsion.Kafka0/). Same functionality/purpose as `Propulsion.Kafka` but uses `FsKafka0` instead of `FsKafka` in order to target an older `Confluent.Kafka`/`librdkafka` version pairing for interoperability with systems that have a hard dependency on that. [Depends](https://www.fuget.org/packages/Propulsion.Kafka0) on `FsKafka0` v `1.4.3` (which depends on Confluent.Kafka [0.11.3]`, `librdkafka.redist [0.11.4]`), `Serilog`

The ubiquitous `Serilog` dependency is solely on the core module, not any sinks, i.e. you configure to emit to `NLog` etc.

Expand Down
5 changes: 4 additions & 1 deletion src/Propulsion.Kafka/Binding.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ open System
open System.Collections.Generic

module Binding =
let mapConsumeResult (x : ConsumeResult<string,string>) = KeyValuePair(x.Message.Key,x.Message.Value)

let mapConsumeResult (result : ConsumeResult<string,string>) =
let m = Binding.message result
KeyValuePair(m.Key, m.Value)
let makeTopicPartition (topic : string) (partition : int) = TopicPartition(topic, Partition partition)
let createConsumer log config : IConsumer<string,string> * (unit -> unit) =
let consumer = ConsumerBuilder.WithLogging(log, config)
Expand Down
25 changes: 14 additions & 11 deletions src/Propulsion.Kafka/Consumers.fs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ module private Impl =
x

/// guesstimate approximate message size in bytes
let approximateMessageBytes (message : Message<string, string>) =
let approximateMessageBytes (m : Message<string, string>) =
let inline len (x : string) = match x with null -> 0 | x -> sizeof<char> * x.Length
16 + len message.Key + len message.Value |> int64
16 + len m.Key + len m.Value |> int64
let inline mb x = float x / 1024. / 1024.

/// Continuously polls across the assigned partitions, building spans; periodically (at intervals of `emitInterval`), `submit`s accumulated messages as
Expand All @@ -73,12 +73,13 @@ type KafkaIngestionEngine<'Info>
Binding.storeOffset log consumer span.highWaterMark
{ source = topicPartition; onCompletion = checkpoint; messages = span.messages.ToArray() }
let ingest result =
let message = Binding.message result
let sz = approximateMessageBytes message
let m = FsKafka.Binding.message result
if m = null then invalidOp "Cannot dereference null message"
let sz = approximateMessageBytes m
counter.Delta(+sz) // counterbalanced by Delta(-) in checkpoint(), below
intervalMsgs <- intervalMsgs + 1L
let inline stringLen (s : string) = match s with null -> 0 | x -> x.Length
intervalChars <- intervalChars + int64 (stringLen message.Key + stringLen message.Value)
intervalChars <- intervalChars + int64 (stringLen m.Key + stringLen m.Value)
let tp = result.TopicPartition
let span =
match acc.TryGetValue tp with
Expand Down Expand Up @@ -354,13 +355,15 @@ type StreamNameSequenceGenerator() =
let e = toTimelineEvent (consumeResult, __.GenerateIndex sn)
Seq.singleton { stream = sn; event = e }

/// Enables customizing of mapping from ConsumeResult to the StreamName<br/>
/// The body of the message is passed as the <c>ITimelineEvent.Data</c>
/// Enables customizing of mapping from ConsumeResult to the StreamName<br/>
/// The body of the message is passed as the <c>ITimelineEvent.Data</c><br/>
/// Stores the topic, partition and offset as a <c>ConsumeResultContext</c> in the <c>ITimelineEvent.Context</c>
member __.ConsumeResultToStreamEvent(toStreamName : ConsumeResult<_, _> -> StreamName)
: ConsumeResult<string, string> -> Propulsion.Streams.StreamEvent<byte[]> seq =
let toDataAndContext (result : ConsumeResult<string, string>) =
let message = Binding.message result
System.Text.Encoding.UTF8.GetBytes message.Value, null
let m = Binding.message result
( System.Text.Encoding.UTF8.GetBytes m.Value,
null)
__.ConsumeResultToStreamEvent(toStreamName, toDataAndContext)

/// Enables customizing of mapping from ConsumeResult to
Expand All @@ -382,8 +385,8 @@ type StreamNameSequenceGenerator() =
member __.ConsumeResultToStreamEvent(toDataAndContext : ConsumeResult<_, _> -> byte[] * obj, ?defaultCategory)
: ConsumeResult<string, string> -> Propulsion.Streams.StreamEvent<byte[]> seq =
let toStreamName (result : ConsumeResult<string, string>) =
let message = Binding.message result
Core.parseMessageKey (defaultArg defaultCategory "") message.Key
let m = Binding.message result
Core.parseMessageKey (defaultArg defaultCategory "") m.Key
let toTimelineEvent (result : ConsumeResult<string, string>, index) =
let data, context = toDataAndContext result
FsCodec.Core.TimelineEvent.Create(index, String.Empty, data, context = context)
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.Kafka/Propulsion.Kafka.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<PackageReference Include="FSharp.Core" Version="4.3.4" Condition=" '$(TargetFramework)' != 'net461' " />

<PackageReference Include="FsCodec.NewtonsoftJson" Version="2.0.0" />
<PackageReference Include="FsKafka" Version="1.5.0-beta.2" />
<PackageReference Include="FsKafka" Version="1.4.3" />
</ItemGroup>

<ItemGroup>
Expand Down
5 changes: 4 additions & 1 deletion src/Propulsion.Kafka0/Binding.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ open System
open System.Collections.Generic

module Binding =
let mapConsumeResult (x : ConsumeResult<string,string>) = KeyValuePair(x.Key,x.Value)

let mapConsumeResult (result : ConsumeResult<string,string>) =
let m = Binding.message result
KeyValuePair(m.Key, m.Value)
let inline makeTopicPartition (topic : string) (partition : int) = TopicPartition(topic, partition)
let createConsumer log config : IConsumer<string,string> * (unit -> unit) =
ConsumerBuilder.WithLogging(log, config)
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.Kafka0/Propulsion.Kafka0.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
<PackageReference Include="FSharp.Core" Version="4.3.4" Condition=" '$(TargetFramework)' != 'net461' " />

<PackageReference Include="FsCodec.NewtonsoftJson" Version="2.0.0" />
<PackageReference Include="FsKafka0" Version="1.5.0-beta.2" />
<PackageReference Include="FsKafka0" Version="1.4.3" />
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ module Helpers =
type TestMeta = { key : string; value : string; partition : int; offset : int64 }
let mapParallelConsumeResultToKeyValuePair (x : ConsumeResult<_, _>) : KeyValuePair<string, string> =
let m = Binding.message x
KeyValuePair(m.Key, JsonConvert.SerializeObject { key = m.Key; value = m.Value; partition = Binding.partitionValue x.Partition; offset = let o = x.Offset in o.Value })
KeyValuePair(m.Key, JsonConvert.SerializeObject { key = m.Key; value = m.Value; partition = Binding.partitionValue x.Partition; offset = Binding.offsetValue x.Offset })
type TestMessage = { producerId : int ; messageId : int }
type ConsumedTestMessage = { consumerId : int ; meta : TestMeta; payload : TestMessage }
type ConsumerCallback = ConsumerPipeline -> ConsumedTestMessage -> Async<unit>
Expand Down

0 comments on commit 794a485

Please sign in to comment.