diff --git a/async_producer.go b/async_producer.go index 27398bcda..e03d2daa3 100644 --- a/async_producer.go +++ b/async_producer.go @@ -840,6 +840,15 @@ func (bp *brokerProducer) run() { continue } + if msg.flags&fin == fin { + // New broker producer that was caught up by the retry loop + bp.parent.retryMessage(msg, ErrShuttingDown) + delete(bp.currentRetries[msg.Topic], msg.Partition) + Logger.Printf("producer/broker/%d state change to [dying-%d] on %s/%d\n", + bp.broker.ID(), msg.retries, msg.Topic, msg.Partition) + continue + } + if bp.buffer.wouldOverflow(msg) { Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID()) if err := bp.waitForSpace(msg, false); err != nil {