Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka: support static group membership #1335

Closed
dotnwat opened this issue May 4, 2021 · 1 comment · Fixed by #4684
Closed

kafka: support static group membership #1335

dotnwat opened this issue May 4, 2021 · 1 comment · Fixed by #4684
Assignees
Labels
area/kafka kind/enhance New feature or request

Comments

@dotnwat
Copy link
Member

dotnwat commented May 4, 2021

package main
​
import (
	"context"
	"fmt""github.com/twmb/franz-go/pkg/kgo"
)
​
func main() {
	seeds := []string{"localhost:9092"}
	var clientOpts []kgo.Opt
	//clientOpts = append(clientOpts, kgo.WithLogger(kgo.BasicLogger(os.Stdout, kgo.LogLevelDebug, nil)))
	clientOpts = append(clientOpts, kgo.SeedBrokers(seeds...))
​
	client, err := kgo.NewClient(clientOpts...)
	if err != nil {
		panic(err)
	}
	defer client.Close()
​
	ctx := context.Background()
​
	var groupOpts []kgo.GroupOpt
	groupOpts = append(groupOpts, kgo.GroupTopics("topic1", "topic2"))
​
	var balancer kgo.GroupBalancer
	balancer = kgo.CooperativeStickyBalancer()
	groupOpts = append(groupOpts, kgo.Balancers(balancer))
​
	groupOpts = append(groupOpts, kgo.InstanceID("client-0"))
	groupOpts = append(groupOpts, kgo.DisableAutoCommit())
​
	// sigs := make(chan os.Signal, 2)
	// signal.Notify(sigs, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP)client.AssignGroup("my-group", groupOpts...)
​
	for {
		fetches := client.PollFetches(ctx)
		if errs := fetches.Errors(); len(errs) > 0 {
			// All errors are retried internally when fetching, but non-retriable errors are
			// returned from polls so that users can notice and take action.
			panic(fmt.Sprint(errs))
		}
​
		// We can iterate through a record iterator...
		iter := fetches.RecordIter()
		for !iter.Done() {
			record := iter.Next()
			fmt.Println(string(record.Value), "from an iterator!")
		}
​
		// or a callback function.
		fetches.EachPartition(func(p kgo.FetchTopicPartition) {
			for _, record := range p.Partition.Records {
				fmt.Println(string(record.Value), "from range inside a callback!")
			}
​
			// We can even use a second callback!
			p.EachRecord(func(record *kgo.Record) {
				fmt.Println(string(record.Value), "from a second callback!")
			})
		})
	}
​
}
@dotnwat dotnwat self-assigned this May 4, 2021
@rkruze
Copy link
Contributor

rkruze commented May 18, 2021

A user shared this use case around using static group membership:

Using a K8s statefulset you can easily generate unique, deterministic consumer instance IDs that won’t change as you upgrade. But the default way for K8s to deploy a stateful set is go through the statefulset first shutting down, then replacing each pod. So every partition being consumed must wait for the entire shutdown and start-up time. This is probably fine for a lot for workloads. But I have near real-time constraints and even a 15 or 20 second delay is too much if we can avoid it.

There’s another feature of stateful sets that we’ll take advantage of. You can delete them without cascading the delete to the pods. So we’re able to orphan the running consumers while they continue to do their thing.

The implementation of static group member also has a nice detail we’ll take advantage of. When a new consumer makes a join group request with an existing instance ID and no member ID (so a brand new consumer trying to re-use an existing (and in-use) ID), Kafka will fence the old consumer instance to prevent it from continuing to consume while the new instance is allowed to replace it. So we get to bring up new pods which end up causing the old ones to terminate as they take over processing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/kafka kind/enhance New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants