Skip to content

Commit

Permalink
distributor: Wrap errors from pushing to ingesters (grafana#3307)
Browse files Browse the repository at this point in the history
* distributor: Wrap errors from pushing to ingesters
* distributor: Return httpgrpc error on timeout

Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
  • Loading branch information
aknuds1 authored and mason committed Nov 4, 2022
1 parent 6bae854 commit a7204aa
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

* [CHANGE] Flag `-azure.msi-resource` is now ignored, and will be removed in Mimir 2.7. This setting is now made automatically by Azure. #2682
* [CHANGE] Experimental flag `-blocks-storage.tsdb.out-of-order-capacity-min` has been removed. #3261
* [CHANGE] Distributor: Wrap errors from pushing to ingesters with useful context, for example clarifying timeouts. #3307
* [FEATURE] Alertmanager: added Discord support. #3309
* [ENHANCEMENT] Added `<prefix>.tls-min-version` and `<prefix>.tls-cipher-suites` flags to configure cipher suites and min TLS version supported by servers. #2898
* [ENHANCEMENT] Distributor: Add age filter to forwarding functionality, to not forward samples which are older than defined duration. If such samples are not ingested, `cortex_discarded_samples_total{reason="forwarded-sample-too-old"}` is increased. #3049 #3133
Expand Down
13 changes: 10 additions & 3 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1155,7 +1155,11 @@ func (d *Distributor) push(ctx context.Context, pushReq *push.Request) (*mimirpb
}
}

return d.send(localCtx, ingester, timeseries, metadata, req.Source)
err := d.send(localCtx, ingester, timeseries, metadata, req.Source)
if errors.Is(err, context.DeadlineExceeded) {
return httpgrpc.Errorf(500, "exceeded configured distributor remote timeout: %s", err.Error())
}
return err
}, func() { pushReq.CleanUp(); cancel() })

if err != nil {
Expand Down Expand Up @@ -1203,8 +1207,11 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time
Source: source,
}
_, err = c.Push(ctx, &req)

return err
if resp, ok := httpgrpc.HTTPResponseFromError(err); ok {
// Wrap HTTP gRPC error with more explanatory message.
return httpgrpc.Errorf(int(resp.Code), "failed pushing to ingester: %s", resp.Body)
}
return errors.Wrap(err, "failed pushing to ingester")
}

// forReplicationSet runs f, in parallel, for all ingesters in the input replication set.
Expand Down
29 changes: 27 additions & 2 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ func TestDistributor_Push(t *testing.T) {
mtime.NowReset()
})

expErrFail := httpgrpc.Errorf(http.StatusInternalServerError, "failed pushing to ingester: Fail")

type samplesIn struct {
num int
startTimestampMs int64
Expand All @@ -122,6 +124,7 @@ func TestDistributor_Push(t *testing.T) {
metadata int
expectedError error
expectedMetrics string
timeOut bool
}{
"A push of no samples shouldn't block or return error, even if ingesters are sad": {
numIngesters: 3,
Expand Down Expand Up @@ -155,7 +158,7 @@ func TestDistributor_Push(t *testing.T) {
numIngesters: 3,
happyIngesters: 1,
samples: samplesIn{num: 10, startTimestampMs: 123456789000},
expectedError: errFail,
expectedError: expErrFail,
metricNames: []string{lastSeenTimestamp},
expectedMetrics: `
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
Expand All @@ -167,7 +170,7 @@ func TestDistributor_Push(t *testing.T) {
numIngesters: 3,
happyIngesters: 0,
samples: samplesIn{num: 10, startTimestampMs: 123456789000},
expectedError: errFail,
expectedError: expErrFail,
metricNames: []string{lastSeenTimestamp},
expectedMetrics: `
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
Expand Down Expand Up @@ -266,6 +269,20 @@ func TestDistributor_Push(t *testing.T) {
cortex_distributor_sample_delay_seconds_count 0
`,
},
"A timed out push should fail": {
numIngesters: 3,
happyIngesters: 3,
samples: samplesIn{num: 10, startTimestampMs: 123456789000},
timeOut: true,
expectedError: httpgrpc.Errorf(http.StatusInternalServerError,
"exceeded configured distributor remote timeout: failed pushing to ingester: context deadline exceeded"),
metricNames: []string{lastSeenTimestamp},
expectedMetrics: `
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
# TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge
cortex_distributor_latest_seen_sample_timestamp_seconds{user="user"} 123456789.009
`,
},
} {
t.Run(name, func(t *testing.T) {
limits := &validation.Limits{}
Expand All @@ -278,6 +295,7 @@ func TestDistributor_Push(t *testing.T) {
happyIngesters: tc.happyIngesters,
numDistributors: 1,
limits: limits,
timeOut: tc.timeOut,
})

request := makeWriteRequest(tc.samples.startTimestampMs, tc.samples.num, tc.metadata, false)
Expand Down Expand Up @@ -3220,6 +3238,7 @@ type prepConfig struct {
labelNamesStreamZonesResponseDelay map[string]time.Duration
forwarding bool
getForwarder func() forwarding.Forwarder
timeOut bool
}

func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*prometheus.Registry) {
Expand All @@ -3240,6 +3259,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p
seriesCountTotal: cfg.ingestersSeriesCountTotal,
zone: zone,
labelNamesStreamResponseDelay: labelNamesStreamResponseDelay,
timeOut: cfg.timeOut,
})
}
for i := cfg.happyIngesters; i < cfg.numIngesters; i++ {
Expand Down Expand Up @@ -3621,6 +3641,7 @@ type mockIngester struct {
seriesCountTotal uint64
zone string
labelNamesStreamResponseDelay time.Duration
timeOut bool
}

func (i *mockIngester) series() map[uint32]*mimirpb.PreallocTimeseries {
Expand Down Expand Up @@ -3659,6 +3680,10 @@ func (i *mockIngester) Push(ctx context.Context, req *mimirpb.WriteRequest, opts
return nil, errFail
}

if i.timeOut {
return nil, context.DeadlineExceeded
}

if i.timeseries == nil {
i.timeseries = map[uint32]*mimirpb.PreallocTimeseries{}
}
Expand Down

0 comments on commit a7204aa

Please sign in to comment.