Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issues with using group instance id with librdkafka #3842

Closed
BalazsHoranyi opened this issue Feb 17, 2022 · 7 comments
Closed

Issues with using group instance id with librdkafka #3842

BalazsHoranyi opened this issue Feb 17, 2022 · 7 comments

Comments

@BalazsHoranyi
Copy link

Version & Environment

Redpanda version: (use rpk version):

Latest Helm release (21.1.6)

Please also give versions of other components:

  • Kubernetes (use kubectl version):
Server Version: version.Info{Major:"1", Minor:"20", GitVersion:"v1.20.7", GitCommit:"132a687512d7fb058d0f5890f07d4121b3f0a2e2", GitTreeState:"clean", BuildDate:"2021-05-12T12:32:49Z", GoVersion:"go1.15.12", Compiler:"gc", Platform:"linux/amd64"}

Bringing this conversation over from slack: https://redpandacommunity.slack.com/archives/C01AJDUT88N/p1643398053651669

It appears that librdkafka receives the wrong Kafka API version when it joins the group. This causes unexpected behavior when trying to use group instance IDs.

Minimal example to reproduce:

package main

import (
	"fmt"
	"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {

	// this works and consumes messages as expected
	//c, err := kafka.NewConsumer(&kafka.ConfigMap{
	//	"bootstrap.servers": "myHost",
	//	"group.id":          "myGroup",
	//	"auto.offset.reset": "earliest",
	//})

	// this does not work and seems to just hang
	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers": "myHost",
		"group.id":          "myGroup",
		"auto.offset.reset": "earliest",
		"group.instance.id": "myGroup/0",
	})
	if err != nil {
		panic(err)
	}
	fmt.Println(kafka.LibraryVersion())

	c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil)

	for {
		msg, err := c.ReadMessage(-1)
		if err == nil {
			fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
			break
		} else {
			// The client will automatically try to recover from all errors.
			fmt.Printf("Consumer error: %v (%v)\n", err, msg)
		}
	}

	c.Close()
}
@BalazsHoranyi BalazsHoranyi added the kind/bug Something isn't working label Feb 17, 2022
@chris-kimberley
Copy link

We've seen the same issue using the Python confluent-kafka library (also librdkafka based).

@dotnwat
Copy link
Member

dotnwat commented Feb 18, 2022

Hi @BalazsHoranyi and @chris-kimberley Redpanda doesn't currently support static group membership which is why you are seeing the errors. Disabling use of group instance id should help until we have support for this feature.

@dotnwat dotnwat added kind/compat and removed kind/bug Something isn't working labels Feb 18, 2022
@emaxerrno
Copy link
Contributor

ah boom. static assignment. makes sense.

@chris-kimberley
Copy link

Ah. Do we have an idea for when this would be supported? It's pretty important for our use-case.

@emaxerrno
Copy link
Contributor

@vsaraswat - we should add this to the compat work

@chris-kimberley if you subscribe to this tix it will notify you when we add

@rkruze
Copy link
Contributor

rkruze commented Mar 16, 2022

This is a duplicate of #1335

@emaxerrno
Copy link
Contributor

landed in dev.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants