Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid starvation in subscriptionManager #2109

Merged
merged 1 commit into from
Feb 25, 2022
Merged

Conversation

dnwe
Copy link
Collaborator

@dnwe dnwe commented Jan 12, 2022

(from @pavius)

The first few fetches from Kafka may only fetch data from one or two partitions, starving the rest for a very long time (depending on message size / processing time)

Once a member joins the consumer groups and receives its partitions, they are fed into the "subscription manager" from different go routines. The subscription manager then performs batching and executes a fetch for all the partitions. However, it seems like the batching logic in subscriptionManager is faulty, perhaps assuming that case: order prioritizes which case should be handled when all are signaled which is not the case, according to the go docs (https://golang.org/ref/spec#Select_statements):

If one or more of the communications can proceed, a single one that can proceed is chosen via a uniform pseudo-random selection. Otherwise, if there is a default case, that case is chosen. If there is no default case, the "select" statement blocks until at least one of the communications can proceed.

For example - if you receive 64 partitions, each will be handled in their own go routine which sends the partition information to the bc.input channel. After an iteration there is a race between case event, ok := <-bc.input which will batch the request and case bc.newSubscriptions <- buffer which will trigger an immediate fetch of the 1 or 2 partitions that made it into the batch.

This issue only really affects slow consumers with short messages. If the condition happens with 1 partition being in the batch (even though 63 extra partitions have been claimed but didn't make it into the batch) the fetch will ask for 1MB (by default) of messages from that single partition. If the messages are only a few bytes long and processing time is minutes, you will not perform another fetch for hours.

Contributes-to: #1608 #1897

@dnwe dnwe requested a review from bai as a code owner January 12, 2022 14:26
@ghost ghost added the cla-needed label Jan 12, 2022
@dnwe
Copy link
Collaborator Author

dnwe commented Jan 12, 2022

@pavius I extracted this change out of your original issue and fork so we can test and review it in isolation — thanks for your work on this, please could you sign the CLA?

consumer.go Outdated
@@ -752,7 +752,7 @@ func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {
bc := &brokerConsumer{
consumer: c,
broker: broker,
input: make(chan *partitionConsumer),
input: make(chan *partitionConsumer, 4096),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pavius can you explain the switch to unbuffered channel w/ size 4096 here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was quite a while ago, so I can only guess that I wanted to reduce the chance writers would block trying to write to the channel.

@ghost ghost removed the cla-needed label Jan 21, 2022
@dnwe dnwe force-pushed the dnwe/subscription-starvation branch from 5770556 to 3483f0f Compare January 31, 2022 15:36
@dnwe
Copy link
Collaborator Author

dnwe commented Jan 31, 2022

Hmm. I've been trying to debug the failing test on this one, and after fixing up the test to be a bit more event driven I do seem to get to a state of deadlock in the consumer where subscriptionManager is calling bc.wait <- none{} but nothing is receiving from that channel because subscriptionConsumer is stuck trying to recv on the range loop for newSubscriptions := range bc.newSubscriptions

@dnwe dnwe force-pushed the dnwe/subscription-starvation branch 4 times, most recently from 2e30c2c to c59477d Compare February 25, 2022 14:05
The first few fetches from Kafka may only fetch data from one or two
partitions, starving the rest for a very long time (depending on message
size / processing time)

Once a member joins the consumer groups and receives its partitions,
they are fed into the "subscription manager" from different go routines.
The subscription manager then performs batching and executes a fetch for
all the partitions. However, it seems like the batching logic in
`subscriptionManager` is faulty, perhaps assuming that `case:` order
prioritizes which `case` should be handled when all are signaled which
is not the case, according to the go docs
(https://golang.org/ref/spec#Select_statements):
```
If one or more of the communications can proceed, a single one that can
proceed is chosen via a uniform pseudo-random selection. Otherwise, if
there is a default case, that case is chosen. If there is no default
case, the "select" statement blocks until at least one of the
communications can proceed.
```

For example - if you receive 64 partitions, each will be handled in
their own go routine which sends the partition information to the
`bc.input` channel. After an iteration there is a race between `case
event, ok := <-bc.input` which will batch the request and `case
bc.newSubscriptions <- buffer` which will trigger an immediate fetch of
the 1 or 2 partitions that made it into the batch.

This issue only really affects slow consumers with short messages. If
the condition happens with 1 partition being in the batch (even though
63 extra partitions have been claimed but didn't make it into the batch)
the fetch will ask for 1MB (by default) of messages from that single
partition. If the messages are only a few bytes long and processing time
is minutes, you will not perform another fetch for hours.

Contributes-to: #1608 #1897

Co-authored-by: Dominic Evans <dominic.evans@uk.ibm.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants