-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Confluent.Kafka 0.11.3 / librdakfa.redist 0.11.4 support #4
Conversation
1e0a5ee
to
79facf3
Compare
| null -> () | ||
| message -> ingest message | ||
with| :? System.OperationCanceledException -> log.Warning("Consuming... cancelled") | ||
| :? ConsumeException as e -> log.Warning(e, "Consuming... exception") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these always the only exceptions that can be thrown by the consumer.Consume function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes for V1; for V0 its a different set. The consumer thread is guarded by a top-level catch which abend
s the pipeline (and, resultantly, typically the process) if something outside the set is encountered. For V1, I inspected the source, for V0 it makes sense to base this on what equivalent wrappers do - open to ideas...
|
||
let runProducers log (broker : Uri) (topic : string) (numProducers : int) (messagesPerProducer : int) = async { | ||
let runProducer (producerId : int) = async { | ||
let cfg = KafkaProducerConfig.Create("panther", broker, Acks.Leader) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be a variable and not "panther" or is the test topic panther?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this becomes the client.id
and is also used in some ephemeral topic names in the test context (normally with a guid or id to guearantee uniqueness) - it could also be "leopard"
or "foo"
;)
This provides a source-code compatible version of
Propulsion.Kafka
that, instead of depending onJet.ConfluentKafka.FSharp
v>= 1.0.1
andConfluent.Kafka 1.0.1
:Confluent.Kafka 0.11.3
+librdkafka.redist 0.11.4
Jet.ConfluentKafka.FSharp
0.x