From 5dbfdf133e90a94e481eafb7a7d3a66667a1fd59 Mon Sep 17 00:00:00 2001 From: Dmytro Milinevskyi Date: Wed, 16 Mar 2022 10:27:31 +0100 Subject: [PATCH] producer: ensure that the management message (fin) is never "leaked" Since async producer now support multiple inflight messages thanks to https://github.com/Shopify/sarama/pull/1686 and https://github.com/Shopify/sarama/pull/2094, it now may "leak" the "fin" internal management message to Kafka (and to the client) when broker producer is reconnecting to Kafka broker and retries multiple inflight messages at the same time. --- async_producer.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/async_producer.go b/async_producer.go index 27398bcda..e03d2daa3 100644 --- a/async_producer.go +++ b/async_producer.go @@ -840,6 +840,15 @@ func (bp *brokerProducer) run() { continue } + if msg.flags&fin == fin { + // New broker producer that was caught up by the retry loop + bp.parent.retryMessage(msg, ErrShuttingDown) + delete(bp.currentRetries[msg.Topic], msg.Partition) + Logger.Printf("producer/broker/%d state change to [dying-%d] on %s/%d\n", + bp.broker.ID(), msg.retries, msg.Topic, msg.Partition) + continue + } + if bp.buffer.wouldOverflow(msg) { Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID()) if err := bp.waitForSpace(msg, false); err != nil {