Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingester: add experimental support for consuming records from kafka #6929

Merged
merged 21 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ require (
github.com/prometheus/procfs v0.12.0
github.com/thanos-io/objstore v0.0.0-20231123170144-bffedaa58acb
github.com/twmb/franz-go v1.15.3
github.com/twmb/franz-go/pkg/kadm v1.10.0
github.com/twmb/franz-go/pkg/kfake v0.0.0-20231206062516-c09dc92d2db1
github.com/twmb/franz-go/pkg/kmsg v1.7.0
github.com/twmb/franz-go/plugin/kprom v1.1.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,8 @@ github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhV
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/twmb/franz-go v1.15.3 h1:96nCgxz4DvGPSCumz6giquYy8GGDNsYCwWcloBdjJ4w=
github.com/twmb/franz-go v1.15.3/go.mod h1:aos+d/UBuigWkOs+6WoqEPto47EvC2jipLAO5qrAu48=
github.com/twmb/franz-go/pkg/kadm v1.10.0 h1:3oYKNP+e3HGo4GYadrDeRxOaAIsOXmX6LBVMz9PxpCU=
github.com/twmb/franz-go/pkg/kadm v1.10.0/go.mod h1:hUMoV4SRho+2ij/S9cL39JaLsr+XINjn0ZkCdBY2DXc=
github.com/twmb/franz-go/pkg/kfake v0.0.0-20231206062516-c09dc92d2db1 h1:xbSGm02av1df+hkaY+2jGfkuj/XwGaDnUpLo0VvOrY0=
github.com/twmb/franz-go/pkg/kfake v0.0.0-20231206062516-c09dc92d2db1/go.mod h1:n45fs28DdNx7PRAiYwBTwOORJGUMGqHzmFlr0pcW+BY=
github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E=
Expand Down
23 changes: 23 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/grafana/mimir/pkg/ingester/client"
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/storage/bucket"
"github.com/grafana/mimir/pkg/storage/ingest"
"github.com/grafana/mimir/pkg/storage/sharding"
mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb"
"github.com/grafana/mimir/pkg/storage/tsdb/block"
Expand Down Expand Up @@ -179,6 +180,9 @@ type Config struct {
UseIngesterOwnedSeriesForLimits bool `yaml:"use_ingester_owned_series_for_limits" category:"experimental"`
UpdateIngesterOwnedSeries bool `yaml:"track_ingester_owned_series" category:"experimental"`
OwnedSeriesUpdateInterval time.Duration `yaml:"owned_series_update_interval" category:"experimental"`

// This config is dynamically injected because defined outside the ingester config.
IngestStorageConfig ingest.Config `yaml:"-"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand Down Expand Up @@ -298,6 +302,8 @@ type Ingester struct {
utilizationBasedLimiter utilizationBasedLimiter

errorSamplers ingesterErrSamplers

ingestReader *ingest.PartitionReader
}

func newIngester(cfg Config, limits *validation.Overrides, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) {
Expand Down Expand Up @@ -399,6 +405,19 @@ func New(cfg Config, limits *validation.Overrides, ingestersRing ring.ReadRing,
i.compactionIdleTimeout = util.DurationWithPositiveJitter(i.cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout, compactionIdleTimeoutJitter)
level.Info(i.logger).Log("msg", "TSDB idle compaction timeout set", "timeout", i.compactionIdleTimeout)

if ingestCfg := cfg.IngestStorageConfig; ingestCfg.Enabled {
kafkaCfg := ingestCfg.KafkaConfig

partitionID, err := ingest.IngesterPartition(cfg.IngesterRing.InstanceID)
if err != nil {
return nil, errors.Wrap(err, "calculating ingest storage partition ID")
}
i.ingestReader, err = ingest.NewPartitionReaderForPusher(kafkaCfg, partitionID, i, log.With(logger, "component", "ingest_reader"), registerer)
if err != nil {
return nil, errors.Wrap(err, "creating ingest storage reader")
}
}

i.BasicService = services.NewBasicService(i.starting, i.updateLoop, i.stopping)
return i, nil
}
Expand Down Expand Up @@ -477,6 +496,10 @@ func (i *Ingester) starting(ctx context.Context) error {
servs = append(servs, i.ownedSeriesService)
}

if i.ingestReader != nil {
servs = append(servs, i.ingestReader)
}

shutdownMarkerPath := shutdownmarker.GetPath(i.cfg.BlocksStorageConfig.TSDB.Dir)
shutdownMarkerFound, err := shutdownmarker.Exists(shutdownMarkerPath)
if err != nil {
Expand Down
106 changes: 106 additions & 0 deletions pkg/storage/ingest/pusher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// SPDX-License-Identifier: AGPL-3.0-only

package ingest

import (
"context"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/cancellation"
"github.com/grafana/dskit/user"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/grafana/mimir/pkg/mimirpb"
)

type Pusher interface {
Push(context.Context, *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error)
}

type pusherConsumer struct {
p Pusher

processingTimeSeconds prometheus.Observer
l log.Logger
}

type parsedRecord struct {
*mimirpb.WriteRequest
tenantID string
err error
}

func newPusherConsumer(p Pusher, reg prometheus.Registerer, l log.Logger) *pusherConsumer {
return &pusherConsumer{
p: p,
l: l,
processingTimeSeconds: promauto.With(reg).NewSummary(prometheus.SummaryOpts{
Name: "cortex_ingest_storage_reader_processing_time_seconds",
Help: "Time taken to process a single record (write request).",
Objectives: latencySummaryObjectives,
MaxAge: time.Minute,
AgeBuckets: 10,
}),
}
}

func (c pusherConsumer) consume(ctx context.Context, records []record) error {
recC := make(chan parsedRecord)
ctx, cancel := context.WithCancelCause(ctx)
defer cancel(cancellation.NewErrorf("done consuming records"))
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved

go c.unmarshalRequests(ctx, records, recC)
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved
err := c.pushRequests(ctx, recC)
if err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't return error. I would not return any error from pushRequests() for now, and revisit it once we'll have the actual error handling logic, unless you already have that logic ready for a follow up PR (in that case keep it as is).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i have the change ready. While I was working on it I realized that the current logic skips records

return err
}
return nil
}

func (c pusherConsumer) pushRequests(ctx context.Context, reqC <-chan parsedRecord) error {
for wr := range reqC {
if wr.err != nil {
level.Error(c.l).Log("msg", "failed to parse write request; skipping", "err", wr.err)
continue
}
processingStart := time.Now()

ctx := user.InjectOrgID(ctx, wr.tenantID)
_, err := c.p.Push(ctx, wr.WriteRequest)

c.processingTimeSeconds.Observe(time.Since(processingStart).Seconds())
if err != nil {
level.Error(c.l).Log("msg", "failed to push write request; skipping", "err", err)
// TODO move distributor's isClientError to a separate package and use that here to swallow only client errors and abort on others
continue
}
}
return nil
}

func (c pusherConsumer) unmarshalRequests(ctx context.Context, records []record, reqC chan<- parsedRecord) {
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved
defer close(reqC)
done := ctx.Done()

for _, record := range records {
pRecord := parsedRecord{
tenantID: record.tenantID,
WriteRequest: &mimirpb.WriteRequest{},
}
// We don't free the WriteRequest slices because they are being freed by the Pusher.
err := pRecord.WriteRequest.Unmarshal(record.content)
if err != nil {
err = errors.Wrap(err, "parsing ingest consumer write request")
pRecord.err = err
}
select {
case <-done:
return
case reqC <- pRecord:
}
}
}
157 changes: 157 additions & 0 deletions pkg/storage/ingest/pusher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// SPDX-License-Identifier: AGPL-3.0-only

package ingest

import (
"context"
"testing"

"github.com/go-kit/log"
"github.com/grafana/dskit/tenant"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/mimir/pkg/mimirpb"
)

type pusherFunc func(context.Context, *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error)

func (p pusherFunc) Push(ctx context.Context, request *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) {
return p(ctx, request)
}

func TestPusherConsumer(t *testing.T) {
const tenantID = "t1"
writeReqs := []*mimirpb.WriteRequest{
{Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_1"), mockPreallocTimeseries("series_2")}},
{Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_3")}},
{Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_4")}},
}

wrBytes := make([][]byte, len(writeReqs))
for i, wr := range writeReqs {
var err error
wrBytes[i], err = wr.Marshal()
require.NoError(t, err)
}

type response struct {
*mimirpb.WriteResponse
err error
}

okResponse := response{WriteResponse: &mimirpb.WriteResponse{}}

testCases := map[string]struct {
records []record
responses []response
expectedWRs []*mimirpb.WriteRequest
expErr string
}{
"single record": {
records: []record{
{content: wrBytes[0], tenantID: tenantID},
},
responses: []response{
okResponse,
},
expectedWRs: writeReqs[0:1],
},
"multiple records": {
records: []record{
{content: wrBytes[0], tenantID: tenantID},
{content: wrBytes[1], tenantID: tenantID},
{content: wrBytes[2], tenantID: tenantID},
},
responses: []response{
okResponse,
okResponse,
okResponse,
},
expectedWRs: writeReqs[0:3],
},
"unparsable record": {
records: []record{
{content: wrBytes[0], tenantID: tenantID},
{content: []byte{0}, tenantID: tenantID},
{content: wrBytes[1], tenantID: tenantID},
},
responses: []response{
okResponse,
okResponse,
},
expectedWRs: writeReqs[0:2],
expErr: "",
},
"failed processing of record": {
records: []record{
{content: wrBytes[0], tenantID: tenantID},
{content: wrBytes[1], tenantID: tenantID},
{content: wrBytes[2], tenantID: tenantID},
},
responses: []response{
okResponse,
{err: assert.AnError},
okResponse,
},
expectedWRs: writeReqs[0:3],
expErr: "",
},
"failed processing of last record": {
records: []record{
{content: wrBytes[0], tenantID: tenantID},
{content: wrBytes[1], tenantID: tenantID},
},
responses: []response{
okResponse,
{err: assert.AnError},
},
expectedWRs: writeReqs[0:2],
expErr: "",
},
"failed processing & failed unmarshalling": {
records: []record{
{content: wrBytes[0], tenantID: tenantID},
{content: wrBytes[1], tenantID: tenantID},
{content: []byte{0}, tenantID: tenantID},
},
responses: []response{
okResponse,
{err: assert.AnError},
},
expectedWRs: writeReqs[0:2],
expErr: "",
},
"no records": {},
}

for name, tc := range testCases {
tc := tc
t.Run(name, func(t *testing.T) {
receivedReqs := 0
pusher := pusherFunc(func(ctx context.Context, request *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) {
defer func() { receivedReqs++ }()
require.GreaterOrEqualf(t, len(tc.expectedWRs), receivedReqs+1, "received more requests (%d) than expected (%d)", receivedReqs+1, len(tc.expectedWRs))

expectedWR := tc.expectedWRs[receivedReqs]
for i, ts := range request.Timeseries {
assert.Truef(t, ts.Equal(expectedWR.Timeseries[i].TimeSeries), "timeseries %d not equal; got %v, expected %v", i, ts, expectedWR.Timeseries[i].TimeSeries)
}

actualTenantID, err := tenant.TenantID(ctx)
assert.NoError(t, err)
assert.Equal(t, tenantID, actualTenantID)

return tc.responses[receivedReqs].WriteResponse, tc.responses[receivedReqs].err
})
c := newPusherConsumer(pusher, prometheus.NewPedanticRegistry(), log.NewNopLogger())
err := c.consume(context.Background(), tc.records)
if tc.expErr == "" {
assert.NoError(t, err)
} else {
assert.ErrorContains(t, err, tc.expErr)
}
})
}
}
Loading