Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingester: add experimental support for consuming records from kafka #6929

Merged
merged 21 commits into from
Dec 15, 2023
Merged
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 74 additions & 34 deletions pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"context"
"math"
"strconv"
"sync"
"time"

"github.com/go-kit/log"
Expand All @@ -21,6 +20,7 @@ import (
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/plugin/kprom"
"go.uber.org/atomic"
)

// consumerGroup is only used to store commit offsets, not for actual consuming.
Expand All @@ -37,19 +37,18 @@ type recordConsumer interface {

type PartitionReader struct {
services.Service
dependencies *services.Manager

kafkaCfg KafkaConfig
partitionID int32

client *kgo.Client
admClient *kadm.Client
client *kgo.Client

consumer recordConsumer
metrics readerMetrics

committer *partitionCommitter
commitInterval time.Duration
commitLoopWg *sync.WaitGroup
commitFetches chan kgo.Fetches

logger log.Logger
}
Expand All @@ -64,11 +63,9 @@ func newPartitionReader(kafkaCfg KafkaConfig, partitionID int32, consumer record
r := &PartitionReader{
kafkaCfg: kafkaCfg,
partitionID: partitionID,
consumer: consumer, // TODO consume records in parallel
commitInterval: time.Second,
commitLoopWg: &sync.WaitGroup{},
commitFetches: make(chan kgo.Fetches, 1),
consumer: consumer,
metrics: metrics,
commitInterval: time.Second,
logger: log.With(logger, "partition", partitionID),
}

Expand All @@ -87,26 +84,35 @@ func (r *PartitionReader) start(ctx context.Context) error {
if err != nil {
return errors.Wrap(err, "creating kafka reader client")
}
r.admClient = kadm.NewClient(r.client)
r.committer = newConsumerCommitter(r.kafkaCfg, kadm.NewClient(r.client), r.partitionID, r.commitInterval, r.logger)

r.dependencies, err = services.NewManager(r.committer)
if err != nil {
return errors.Wrap(err, "creating service manager")
}
err = services.StartManagerAndAwaitHealthy(ctx, r.dependencies)
if err != nil {
return errors.Wrap(err, "starting service manager")
}

return nil
}

func (r *PartitionReader) stop(error) error {
level.Info(r.logger).Log("msg", "stopping partition reader")

err := services.StopManagerAndAwaitStopped(context.Background(), r.dependencies)
if err != nil {
return errors.Wrap(err, "stopping service manager")
}
r.client.Close()
// r.admClient needs no closing since it's using r.client
return nil
}

func (r *PartitionReader) run(ctx context.Context) error {
defer r.commitLoopWg.Wait()

consumeCtx, cancel := context.WithCancel(context.Background())
defer cancel()

r.commitLoopWg.Add(1)
go r.commitLoop(consumeCtx)

for ctx.Err() == nil {
fetches := r.client.PollFetches(ctx)
if fetches.Err() != nil {
Expand Down Expand Up @@ -137,7 +143,14 @@ func collectFetchErrs(fetches kgo.Fetches) (_ error) {
}

func (r *PartitionReader) enqueueCommit(fetches kgo.Fetches) {
r.commitFetches <- fetches
if fetches.NumRecords() == 0 {
return
}
lastOffset := int64(0)
fetches.EachPartition(func(partition kgo.FetchTopicPartition) {
lastOffset = partition.Records[len(partition.Records)-1].Offset
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add an extra check on the partition, to make sure the partition matches the expected one. It should never happen, but I want to make sure we don't have a bug where we've consumed another partition and then we commit the wrong offset to the partition.

})
r.committer.enqueueOffset(lastOffset)
}

func (r *PartitionReader) consumeFetches(ctx context.Context, fetches kgo.Fetches) {
Expand Down Expand Up @@ -211,7 +224,7 @@ func (r *PartitionReader) fetchLastCommittedOffsetWithRetries(ctx context.Contex
return offset, nil
}

level.Warn(r.logger).Log("msg", "failed to fetch last committed offset", "partition", r.partitionID, "err", err)
level.Warn(r.logger).Log("msg", "failed to fetch last committed offset", "err", err)
retry.Wait()
}

Expand Down Expand Up @@ -243,37 +256,64 @@ func (r *PartitionReader) fetchLastCommittedOffset(ctx context.Context) (kgo.Off
return kgo.NewOffset().At(offset.At), nil
}

func (r *PartitionReader) commitLoop(ctx context.Context) {
defer r.commitLoopWg.Done()
type partitionCommitter struct {
services.Service

kafkaCfg KafkaConfig
commitInterval time.Duration
partitionID int32

toCommit *atomic.Int64
admClient *kadm.Client

logger log.Logger
}

func newConsumerCommitter(kafkaCfg KafkaConfig, admClient *kadm.Client, partitionID int32, commitInterval time.Duration, logger log.Logger) *partitionCommitter {
c := &partitionCommitter{
logger: logger,
kafkaCfg: kafkaCfg,
partitionID: partitionID,
toCommit: atomic.NewInt64(0),
admClient: admClient,
commitInterval: commitInterval,
}
c.Service = services.NewBasicService(nil, c.run, nil)
return c
}

func (r *partitionCommitter) enqueueOffset(o int64) {
r.toCommit.Store(o)
}

func (r *partitionCommitter) run(ctx context.Context) error {
commitTicker := time.NewTicker(r.commitInterval)
defer commitTicker.Stop()

toCommit := kadm.Offsets{}

previousOffset := r.toCommit.Load()
for {
select {
case <-ctx.Done():
return
case fetches := <-r.commitFetches:
fetches.EachPartition(func(p kgo.FetchTopicPartition) {
if len(p.Records) == 0 {
return
}
r := p.Records[len(p.Records)-1]
toCommit.AddOffset(r.Topic, r.Partition, r.Offset+1, r.LeaderEpoch)
})
return nil
case <-commitTicker.C:
if len(toCommit) == 0 {
currOffset := r.toCommit.Load()
if currOffset == previousOffset {
continue
}
previousOffset = currOffset

toCommit := kadm.Offsets{}
// Commit the offset after the last record.
// The reason for this is that we resume consumption at this offset.
// Leader epoch is -1 because we don't know it. This lets Kafka figure it out.
toCommit.AddOffset(r.kafkaCfg.Topic, r.partitionID, currOffset+1, -1)

committed, err := r.admClient.CommitOffsets(ctx, consumerGroup, toCommit)
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved
if err != nil || !committed.Ok() {
level.Error(r.logger).Log("msg", "encountered error while committing offsets", "err", err, "commit_err", committed.Error())
level.Error(r.logger).Log("msg", "encountered error while committing offsets", "err", err, "commit_err", committed.Error(), "offset", currOffset)
} else {
committedOffset, _ := committed.Lookup(r.kafkaCfg.Topic, r.partitionID)
level.Debug(r.logger).Log("msg", "committed offset", "offset", committedOffset.Offset.At)
clear(toCommit)
}
}
}
Expand Down