diff --git a/consumer.go b/consumer.go index dd1d3658a..b2039e9c0 100644 --- a/consumer.go +++ b/consumer.go @@ -861,7 +861,7 @@ func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer { broker: broker, input: make(chan *partitionConsumer), newSubscriptions: make(chan []*partitionConsumer), - wait: make(chan none), + wait: make(chan none, 1), subscriptions: make(map[*partitionConsumer]none), refs: 0, } @@ -878,36 +878,54 @@ func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer { // it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available, // so the main goroutine can block waiting for work if it has none. func (bc *brokerConsumer) subscriptionManager() { - var buffer []*partitionConsumer + var partitionConsumers []*partitionConsumer for { - if len(buffer) > 0 { - select { - case event, ok := <-bc.input: - if !ok { - goto done - } - buffer = append(buffer, event) - case bc.newSubscriptions <- buffer: - buffer = nil - case bc.wait <- none{}: + // check for any partition consumer asking to subscribe if there aren't + // any, trigger the network request by sending "nil" to the + // newSubscriptions channel + select { + case pc, ok := <-bc.input: + if !ok { + goto done } - } else { - select { - case event, ok := <-bc.input: - if !ok { - goto done + + // add to list of subscribing consumers + partitionConsumers = append(partitionConsumers, pc) + + // wait up to 250ms to drain input of any further incoming + // subscriptions + for batchComplete := false; !batchComplete; { + select { + case pc, ok := <-bc.input: + if !ok { + goto done + } + + partitionConsumers = append(partitionConsumers, pc) + case <-time.After(250 * time.Millisecond): + batchComplete = true } - buffer = append(buffer, event) - case bc.newSubscriptions <- nil: } + + Logger.Printf( + "consumer/broker/%d accumulated %d new subscriptions\n", + bc.broker.ID(), len(partitionConsumers)) + + bc.wait <- none{} + bc.newSubscriptions <- partitionConsumers + + // clear out the batch + partitionConsumers = nil + + case bc.newSubscriptions <- nil: } } done: close(bc.wait) - if len(buffer) > 0 { - bc.newSubscriptions <- buffer + if len(partitionConsumers) > 0 { + bc.newSubscriptions <- partitionConsumers } close(bc.newSubscriptions) }