Skip to content

Commit

Permalink
producer: ensure that the management message (fin) is never "leaked"
Browse files Browse the repository at this point in the history
Since async producer now support multiple inflight messages
thanks to IBM#1686 and
IBM#2094, it now may "leak"
the "fin" internal management message to Kafka (and to the client)
when broker producer is reconnecting to Kafka broker and retries
multiple inflight messages at the same time.
  • Loading branch information
niamster committed Mar 16, 2022
1 parent bad67e5 commit 5dbfdf1
Showing 1 changed file with 9 additions and 0 deletions.
9 changes: 9 additions & 0 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,15 @@ func (bp *brokerProducer) run() {
continue
}

if msg.flags&fin == fin {
// New broker producer that was caught up by the retry loop
bp.parent.retryMessage(msg, ErrShuttingDown)
delete(bp.currentRetries[msg.Topic], msg.Partition)
Logger.Printf("producer/broker/%d state change to [dying-%d] on %s/%d\n",
bp.broker.ID(), msg.retries, msg.Topic, msg.Partition)
continue
}

if bp.buffer.wouldOverflow(msg) {
Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
if err := bp.waitForSpace(msg, false); err != nil {
Expand Down

0 comments on commit 5dbfdf1

Please sign in to comment.