Skip to content

Commit

Permalink
ingester: Support for disallowing Push API for ingest storage (#7503)
Browse files Browse the repository at this point in the history
* ingest: add flag to disable push API

Signed-off-by: Vladimir Varankin <vladimir.varankin@grafana.com>
Co-authored-by: Marco Pracucci <marco@pracucci.com>

* make: build config-inspector with same version of Go as mimir

Signed-off-by: Vladimir Varankin <vladimir.varankin@grafana.com>

* re-build generated files

Signed-off-by: Vladimir Varankin <vladimir.varankin@grafana.com>

* address review comments

Signed-off-by: Vladimir Varankin <vladimir.varankin@grafana.com>

* re-build generated files

Signed-off-by: Vladimir Varankin <vladimir.varankin@grafana.com>

* ingester: revert non-relevant changes

Signed-off-by: Vladimir Varankin <vladimir.varankin@grafana.com>

* ingester: rename config to PushGrpcMethodEnabled

Signed-off-by: Vladimir Varankin <vladimir.varankin@grafana.com>

* Apply suggestions from code review

Co-authored-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Vladimir Varankin <vladimir.varankin@grafana.com>

---------

Signed-off-by: Vladimir Varankin <vladimir.varankin@grafana.com>
Co-authored-by: Marco Pracucci <marco@pracucci.com>
Co-authored-by: Peter Štibraný <pstibrany@gmail.com>
  • Loading branch information
3 people committed Mar 13, 2024
1 parent 4c65387 commit 08a2bd2
Show file tree
Hide file tree
Showing 15 changed files with 254 additions and 182 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -615,10 +615,10 @@ check-doc-validator: ## Check documentation using doc-validator tool

.PHONY: reference-help
reference-help: ## Generates the reference help documentation.
reference-help: cmd/mimir/mimir
reference-help: cmd/mimir/mimir tools/config-inspector/config-inspector
@(./cmd/mimir/mimir -h || true) > cmd/mimir/help.txt.tmpl
@(./cmd/mimir/mimir -help-all || true) > cmd/mimir/help-all.txt.tmpl
@(go run ./tools/config-inspector || true) > cmd/mimir/config-descriptor.json
@(./tools/config-inspector/config-inspector || true) > cmd/mimir/config-descriptor.json

clean-white-noise: ## Clean the white noise in the markdown files.
@find . -path ./.pkg -prune -o -path ./.cache -prune -o -path "*/vendor/*" -prune -or -type f -name "*.md" -print | \
Expand Down
3 changes: 0 additions & 3 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package api

import (
"context"
"flag"
"net/http"
"path"
Expand All @@ -32,7 +31,6 @@ import (
frontendv2 "github.com/grafana/mimir/pkg/frontend/v2"
"github.com/grafana/mimir/pkg/frontend/v2/frontendv2pb"
"github.com/grafana/mimir/pkg/ingester/client"
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/querier"
querierapi "github.com/grafana/mimir/pkg/querier/api"
"github.com/grafana/mimir/pkg/querier/tenantfederation"
Expand Down Expand Up @@ -281,7 +279,6 @@ type Ingester interface {
ShutdownHandler(http.ResponseWriter, *http.Request)
PrepareShutdownHandler(http.ResponseWriter, *http.Request)
PreparePartitionDownscaleHandler(http.ResponseWriter, *http.Request)
PushWithCleanup(context.Context, *mimirpb.WriteRequest, func()) error
UserRegistryHandler(http.ResponseWriter, *http.Request)
TenantsHandler(http.ResponseWriter, *http.Request)
TenantTSDBHandler(http.ResponseWriter, *http.Request)
Expand Down
7 changes: 4 additions & 3 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6293,9 +6293,10 @@ func newMockIngesterPusherAdapter(ingester *mockIngester) *mockIngesterPusherAda
}
}

// Push implements ingest.Pusher.
func (c *mockIngesterPusherAdapter) Push(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) {
return c.ingester.Push(ctx, req)
// PushToStorage implements ingest.Pusher.
func (c *mockIngesterPusherAdapter) PushToStorage(ctx context.Context, req *mimirpb.WriteRequest) error {
_, err := c.ingester.Push(ctx, req)
return err
}

// noopIngester is a mocked ingester which does nothing.
Expand Down
21 changes: 18 additions & 3 deletions pkg/ingester/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ import (

const (
integerUnavailableMsgFormat = "ingester is unavailable (current state: %s)"
tooBusyErrorMsg = "the ingester is currently too busy to process queries, try again later"
ingesterTooBusyMsg = "ingester is currently too busy to process queries, try again later"
ingesterPushGrpcDisabledMsg = "ingester is configured with Push gRPC method disabled"
)

var (
tooBusyError = ingesterTooBusyError{}
errTooBusy = ingesterTooBusyError{}
errPushGrpcDisabled = newErrorWithStatus(ingesterPushGrpcDisabledError{}, codes.Unimplemented)
)

// errorWithStatus is used for wrapping errors returned by ingester.
Expand Down Expand Up @@ -515,7 +517,7 @@ var _ ingesterError = tsdbUnavailableError{}
type ingesterTooBusyError struct{}

func (e ingesterTooBusyError) Error() string {
return tooBusyErrorMsg
return ingesterTooBusyMsg
}

func (e ingesterTooBusyError) errorCause() mimirpb.ErrorCause {
Expand All @@ -525,6 +527,19 @@ func (e ingesterTooBusyError) errorCause() mimirpb.ErrorCause {
// Ensure that ingesterTooBusyError is an ingesterError.
var _ ingesterError = ingesterTooBusyError{}

type ingesterPushGrpcDisabledError struct{}

func (e ingesterPushGrpcDisabledError) Error() string {
return ingesterPushGrpcDisabledMsg
}

func (e ingesterPushGrpcDisabledError) errorCause() mimirpb.ErrorCause {
return mimirpb.METHOD_NOT_ALLOWED
}

// Ensure that ingesterPushGrpcDisabledError is an ingesterError.
var _ ingesterError = ingesterPushGrpcDisabledError{}

type ingesterErrSamplers struct {
sampleTimestampTooOld *log.Sampler
sampleTimestampTooOldOOOEnabled *log.Sampler
Expand Down
34 changes: 17 additions & 17 deletions pkg/ingester/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,12 @@ func TestNewTSDBIngestExemplarErr(t *testing.T) {
}

func TestTooBusyError(t *testing.T) {
require.Error(t, tooBusyError)
require.Equal(t, "the ingester is currently too busy to process queries, try again later", tooBusyError.Error())
checkIngesterError(t, tooBusyError, mimirpb.TOO_BUSY, false)
require.Error(t, errTooBusy)
require.Equal(t, "ingester is currently too busy to process queries, try again later", errTooBusy.Error())
checkIngesterError(t, errTooBusy, mimirpb.TOO_BUSY, false)

wrappedErr := wrapOrAnnotateWithUser(tooBusyError, userID)
require.ErrorIs(t, wrappedErr, tooBusyError)
wrappedErr := wrapOrAnnotateWithUser(errTooBusy, userID)
require.ErrorIs(t, wrappedErr, errTooBusy)
var anotherIngesterTooBusyErr ingesterTooBusyError
require.ErrorAs(t, wrappedErr, &anotherIngesterTooBusyErr)
checkIngesterError(t, wrappedErr, mimirpb.TOO_BUSY, false)
Expand Down Expand Up @@ -731,16 +731,16 @@ func TestMapReadErrorToErrorWithStatus(t *testing.T) {
expectedMessage: fmt.Sprintf("wrapped: %s", newUnavailableError(services.Stopping).Error()),
expectedDetails: &mimirpb.ErrorDetails{Cause: mimirpb.SERVICE_UNAVAILABLE},
},
"tooBusyError gets translated into an errorWithStatus ResourceExhausted error with details": {
err: tooBusyError,
"errTooBusy gets translated into an errorWithStatus ResourceExhausted error with details": {
err: errTooBusy,
expectedCode: codes.ResourceExhausted,
expectedMessage: tooBusyErrorMsg,
expectedMessage: ingesterTooBusyMsg,
expectedDetails: &mimirpb.ErrorDetails{Cause: mimirpb.TOO_BUSY},
},
"a wrapped tooBusyError gets translated into an errorWithStatus ResourceExhausted error with details": {
err: fmt.Errorf("wrapped: %w", tooBusyError),
"a wrapped errTooBusy gets translated into an errorWithStatus ResourceExhausted error with details": {
err: fmt.Errorf("wrapped: %w", errTooBusy),
expectedCode: codes.ResourceExhausted,
expectedMessage: fmt.Sprintf("wrapped: %s", tooBusyErrorMsg),
expectedMessage: fmt.Sprintf("wrapped: %s", ingesterTooBusyMsg),
expectedDetails: &mimirpb.ErrorDetails{Cause: mimirpb.TOO_BUSY},
},
}
Expand Down Expand Up @@ -776,13 +776,13 @@ func TestMapReadErrorToErrorWithHTTPOrGRPCStatus(t *testing.T) {
err: fmt.Errorf("wrapped: %w", newUnavailableError(services.Stopping)),
expectedTranslation: newErrorWithStatus(fmt.Errorf("wrapped: %w", newUnavailableError(services.Stopping)), codes.Unavailable),
},
"tooBusyError gets translated into an errorWithHTTPStatus with status code 503": {
err: tooBusyError,
expectedTranslation: newErrorWithHTTPStatus(tooBusyError, http.StatusServiceUnavailable),
"errTooBusy gets translated into an errorWithHTTPStatus with status code 503": {
err: errTooBusy,
expectedTranslation: newErrorWithHTTPStatus(errTooBusy, http.StatusServiceUnavailable),
},
"a wrapped tooBusyError gets translated into an errorWithStatus with status code 503": {
err: fmt.Errorf("wrapped: %w", tooBusyError),
expectedTranslation: newErrorWithHTTPStatus(fmt.Errorf("wrapped: %w", tooBusyError), http.StatusServiceUnavailable),
"a wrapped errTooBusy gets translated into an errorWithStatus with status code 503": {
err: fmt.Errorf("wrapped: %w", errTooBusy),
expectedTranslation: newErrorWithHTTPStatus(fmt.Errorf("wrapped: %w", errTooBusy), http.StatusServiceUnavailable),
},
}
for name, tc := range testCases {
Expand Down
29 changes: 22 additions & 7 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ type Config struct {
UpdateIngesterOwnedSeries bool `yaml:"track_ingester_owned_series" category:"experimental"`
OwnedSeriesUpdateInterval time.Duration `yaml:"owned_series_update_interval" category:"experimental"`

PushGrpcMethodEnabled bool `yaml:"push_grpc_method_enabled" category:"experimental" doc:"hidden"`

// This config is dynamically injected because defined outside the ingester config.
IngestStorageConfig ingest.Config `yaml:"-"`
}
Expand All @@ -230,6 +232,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.BoolVar(&cfg.UseIngesterOwnedSeriesForLimits, "ingester.use-ingester-owned-series-for-limits", false, "When enabled, only series currently owned by ingester according to the ring are used when checking user per-tenant series limit.")
f.BoolVar(&cfg.UpdateIngesterOwnedSeries, "ingester.track-ingester-owned-series", false, "This option enables tracking of ingester-owned series based on ring state, even if -ingester.use-ingester-owned-series-for-limits is disabled.")
f.DurationVar(&cfg.OwnedSeriesUpdateInterval, "ingester.owned-series-update-interval", 15*time.Second, "How often to check for ring changes and possibly recompute owned series as a result of detected change.")
f.BoolVar(&cfg.PushGrpcMethodEnabled, "ingester.push-grpc-method-enabled", true, "Enables Push gRPC method on ingester. Can be only disabled when using ingest-storage to make sure ingesters only receive data from Kafka.")

// The ingester.return-only-grpc-errors flag has been deprecated.
// According to the migration plan (https://github.com/grafana/mimir/issues/6008#issuecomment-1854320098)
Expand Down Expand Up @@ -3487,14 +3490,26 @@ func (i *Ingester) checkAvailable() error {
return newUnavailableError(s)
}

// Push implements client.IngesterServer
func (i *Ingester) Push(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) {
// PushToStorage implements ingest.Pusher interface for ingestion via ingest-storage.
func (i *Ingester) PushToStorage(ctx context.Context, req *mimirpb.WriteRequest) error {
err := i.PushWithCleanup(ctx, req, func() { mimirpb.ReuseSlice(req.Timeseries) })
if err == nil {
return &mimirpb.WriteResponse{}, nil
if err != nil {
return i.mapPushErrorToErrorWithStatus(err)
}
return nil
}

// Push implements client.IngesterServer, which is registered into gRPC server.
func (i *Ingester) Push(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) {
if !i.cfg.PushGrpcMethodEnabled {
return nil, errPushGrpcDisabled
}

err := i.PushToStorage(ctx, req)
if err != nil {
return nil, err
}
handledErr := i.mapPushErrorToErrorWithStatus(err)
return nil, handledErr
return &mimirpb.WriteResponse{}, err
}

func (i *Ingester) mapPushErrorToErrorWithStatus(err error) error {
Expand Down Expand Up @@ -3724,7 +3739,7 @@ func (i *Ingester) checkReadOverloaded() error {
}

i.metrics.utilizationLimitedRequests.WithLabelValues(reason).Inc()
return tooBusyError
return errTooBusy
}

type utilizationBasedLimiter interface {
Expand Down
5 changes: 0 additions & 5 deletions pkg/ingester/ingester_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,6 @@ func (i *ActivityTrackerWrapper) Push(ctx context.Context, request *mimirpb.Writ
return i.ing.Push(ctx, request)
}

func (i *ActivityTrackerWrapper) PushWithCleanup(ctx context.Context, r *mimirpb.WriteRequest, cleanUp func()) error {
// No tracking in PushWithCleanup
return i.ing.PushWithCleanup(ctx, r, cleanUp)
}

func (i *ActivityTrackerWrapper) QueryStream(request *client.QueryRequest, server client.Ingester_QueryStreamServer) error {
ix := i.tracker.Insert(func() string {
return requestActivity(server.Context(), "Ingester/QueryStream", request)
Expand Down
3 changes: 3 additions & 0 deletions pkg/ingester/ingester_ingest_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,9 @@ func TestIngester_ShouldNotCreatePartitionIfThereIsShutdownMarker(t *testing.T)
func createTestIngesterWithIngestStorage(t testing.TB, ingesterCfg *Config, overrides *validation.Overrides, reg prometheus.Registerer) (*Ingester, *kfake.Cluster, *ring.PartitionRingWatcher) {
defaultIngesterConfig := defaultIngesterTestConfig(t)

// Always disable gRPC Push API when testing ingest store.
ingesterCfg.PushGrpcMethodEnabled = false

ingesterCfg.IngestStorageConfig.Enabled = true
ingesterCfg.IngestStorageConfig.KafkaConfig.Topic = "mimir"
ingesterCfg.IngestStorageConfig.KafkaConfig.LastProducedOffsetPollInterval = 100 * time.Millisecond
Expand Down
41 changes: 33 additions & 8 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2537,7 +2537,7 @@ func Test_Ingester_LabelNames(t *testing.T) {
stat, ok := grpcutil.ErrorToStatus(err)
require.True(t, ok)
require.Equal(t, codes.ResourceExhausted, stat.Code())
require.Equal(t, tooBusyErrorMsg, stat.Message())
require.Equal(t, ingesterTooBusyMsg, stat.Message())
verifyUtilizationLimitedRequestsMetric(t, registry)
})
}
Expand Down Expand Up @@ -2601,7 +2601,7 @@ func Test_Ingester_LabelValues(t *testing.T) {
stat, ok := grpcutil.ErrorToStatus(err)
require.True(t, ok)
require.Equal(t, codes.ResourceExhausted, stat.Code())
require.Equal(t, tooBusyErrorMsg, stat.Message())
require.Equal(t, ingesterTooBusyMsg, stat.Message())
verifyUtilizationLimitedRequestsMetric(t, registry)
})
}
Expand Down Expand Up @@ -2802,7 +2802,7 @@ func TestIngester_LabelNamesAndValues(t *testing.T) {
stat, ok := grpcutil.ErrorToStatus(err)
require.True(t, ok)
require.Equal(t, codes.ResourceExhausted, stat.Code())
require.Equal(t, tooBusyErrorMsg, stat.Message())
require.Equal(t, ingesterTooBusyMsg, stat.Message())
verifyUtilizationLimitedRequestsMetric(t, registry)
})
}
Expand Down Expand Up @@ -2922,7 +2922,7 @@ func TestIngester_LabelValuesCardinality(t *testing.T) {
stat, ok := grpcutil.ErrorToStatus(err)
require.True(t, ok)
require.Equal(t, codes.ResourceExhausted, stat.Code())
require.Equal(t, tooBusyErrorMsg, stat.Message())
require.Equal(t, ingesterTooBusyMsg, stat.Message())
verifyUtilizationLimitedRequestsMetric(t, registry)
})
}
Expand Down Expand Up @@ -3414,7 +3414,7 @@ func Test_Ingester_MetricsForLabelMatchers(t *testing.T) {
stat, ok := grpcutil.ErrorToStatus(err)
require.True(t, ok)
require.Equal(t, codes.ResourceExhausted, stat.Code())
require.Equal(t, tooBusyErrorMsg, stat.Message())
require.Equal(t, ingesterTooBusyMsg, stat.Message())
verifyUtilizationLimitedRequestsMetric(t, registry)
})
}
Expand Down Expand Up @@ -3812,7 +3812,7 @@ func TestIngester_QueryStream(t *testing.T) {
stat, ok := grpcutil.ErrorToStatus(err)
require.True(t, ok)
require.Equal(t, codes.ResourceExhausted, stat.Code())
require.Equal(t, tooBusyErrorMsg, stat.Message())
require.Equal(t, ingesterTooBusyMsg, stat.Message())
verifyUtilizationLimitedRequestsMetric(t, registry)
})
}
Expand Down Expand Up @@ -4271,7 +4271,7 @@ func TestIngester_QueryExemplars(t *testing.T) {
stat, ok := grpcutil.ErrorToStatus(err)
require.True(t, ok)
require.Equal(t, codes.ResourceExhausted, stat.Code())
require.Equal(t, tooBusyErrorMsg, stat.Message())
require.Equal(t, ingesterTooBusyMsg, stat.Message())
verifyUtilizationLimitedRequestsMetric(t, registry)
})
}
Expand Down Expand Up @@ -5606,7 +5606,7 @@ func Test_Ingester_UserStats(t *testing.T) {
stat, ok := grpcutil.ErrorToStatus(err)
require.True(t, ok)
require.Equal(t, codes.ResourceExhausted, stat.Code())
require.Equal(t, tooBusyErrorMsg, stat.Message())
require.Equal(t, ingesterTooBusyMsg, stat.Message())
verifyUtilizationLimitedRequestsMetric(t, registry)
})
}
Expand Down Expand Up @@ -6748,6 +6748,31 @@ func TestIngester_PushInstanceLimitsWithCircuitBreaker_LimitInflightRequestsUsin
}
}

func TestIngester_PushGrpcMethod_Disabled(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.PushGrpcMethodEnabled = false

registry := prometheus.NewRegistry()

i, err := prepareIngesterWithBlocksStorage(t, cfg, nil, registry)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

// Wait until the ingester is healthy
test.Poll(t, 100*time.Millisecond, 1, func() any {
return i.lifecycler.HealthyInstancesCount()
})

ctx := user.InjectOrgID(context.Background(), "test")
req := writeRequestSingleSeries(
labels.FromStrings(labels.MetricName, "foo", "l", "1"),
[]mimirpb.Sample{{TimestampMs: 1_000, Value: 1}},
)
_, err = i.Push(ctx, req)
require.ErrorIs(t, err, errPushGrpcDisabled)
}

func TestIngester_instanceLimitsMetrics(t *testing.T) {
reg := prometheus.NewRegistry()

Expand Down
9 changes: 7 additions & 2 deletions pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,13 @@ func (c *Config) Validate(log log.Logger) error {
if err := c.IngestStorage.Validate(); err != nil {
return errors.Wrap(err, "invalid ingest storage config")
}
if c.isAnyModuleEnabled(Ingester, Write, All) && c.IngestStorage.Enabled && !c.Ingester.DeprecatedReturnOnlyGRPCErrors {
return errors.New("to use ingest storage (-ingest-storage.enabled) also enable -ingester.return-only-grpc-errors")
if c.isAnyModuleEnabled(Ingester, Write, All) {
if c.IngestStorage.Enabled && !c.Ingester.DeprecatedReturnOnlyGRPCErrors {
return errors.New("to use ingest storage (-ingest-storage.enabled) also enable -ingester.return-only-grpc-errors")
}
if !c.IngestStorage.Enabled && !c.Ingester.PushGrpcMethodEnabled {
return errors.New("cannot disable Push gRPC method in ingester, while ingest storage (-ingest-storage.enabled) is not enabled")
}
}
if err := c.BlocksStorage.Validate(c.Ingester.ActiveSeriesMetrics); err != nil {
return errors.Wrap(err, "invalid TSDB config")
Expand Down
12 changes: 12 additions & 0 deletions pkg/mimir/mimir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,18 @@ func TestConfigValidation(t *testing.T) {
},
expectAnyError: true,
},
{
name: "should fails if push api disabled in ingester, and the ingester isn't running with ingest storage",
getTestConfig: func() *Config {
cfg := newDefaultConfig()
_ = cfg.Target.Set("ingester")
cfg.Ingester.PushGrpcMethodEnabled = false
cfg.IngestStorage.Enabled = false

return cfg
},
expectAnyError: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
err := tc.getTestConfig().Validate(log.NewNopLogger())
Expand Down
Loading

0 comments on commit 08a2bd2

Please sign in to comment.