Skip to content

Commit

Permalink
distributor: Wrap errors from pushing to ingesters
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
  • Loading branch information
aknuds1 committed Oct 26, 2022
1 parent 295b084 commit ad98691
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

* [CHANGE] Flag `-azure.msi-resource` is now ignored, and will be removed in Mimir 2.7. This setting is now made automatically by Azure. #2682
* [CHANGE] Experimental flag `-blocks-storage.tsdb.out-of-order-capacity-min` has been removed. #3261
* [CHANGE] Distributor: Wrap errors from pushing to ingesters with useful context, for example clarifying timeouts. #3307
* [FEATURE] Alertmanager: added Discord support. #3309
* [ENHANCEMENT] Added `<prefix>.tls-min-version` and `<prefix>.tls-cipher-suites` flags to configure cipher suites and min TLS version supported by servers. #2898
* [ENHANCEMENT] Distributor: Add age filter to forwarding functionality, to not forward samples which are older than defined duration. If such samples are not ingested, `cortex_discarded_samples_total{reason="forwarded-sample-too-old"}` is increased. #3049 #3133
Expand Down
13 changes: 10 additions & 3 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1155,7 +1155,11 @@ func (d *Distributor) push(ctx context.Context, pushReq *push.Request) (*mimirpb
}
}

return d.send(localCtx, ingester, timeseries, metadata, req.Source)
err := d.send(localCtx, ingester, timeseries, metadata, req.Source)
if errors.Is(err, context.DeadlineExceeded) {
return errors.Wrap(err, "exceeded configured distributor remote timeout")
}
return err
}, func() { pushReq.CleanUp(); cancel() })

if err != nil {
Expand Down Expand Up @@ -1203,8 +1207,11 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time
Source: source,
}
_, err = c.Push(ctx, &req)

return err
if resp, ok := httpgrpc.HTTPResponseFromError(err); ok {
// Wrap HTTP gRPC error with more explanatory message.
return httpgrpc.Errorf(int(resp.Code), "failed pushing to ingester: %s", resp.Body)
}
return errors.Wrap(err, "failed pushing to ingester")
}

// forReplicationSet runs f, in parallel, for all ingesters in the input replication set.
Expand Down
6 changes: 4 additions & 2 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ func TestDistributor_Push(t *testing.T) {
mtime.NowReset()
})

expErrFail := httpgrpc.Errorf(http.StatusInternalServerError, "failed pushing to ingester: Fail")

type samplesIn struct {
num int
startTimestampMs int64
Expand Down Expand Up @@ -155,7 +157,7 @@ func TestDistributor_Push(t *testing.T) {
numIngesters: 3,
happyIngesters: 1,
samples: samplesIn{num: 10, startTimestampMs: 123456789000},
expectedError: errFail,
expectedError: expErrFail,
metricNames: []string{lastSeenTimestamp},
expectedMetrics: `
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
Expand All @@ -167,7 +169,7 @@ func TestDistributor_Push(t *testing.T) {
numIngesters: 3,
happyIngesters: 0,
samples: samplesIn{num: 10, startTimestampMs: 123456789000},
expectedError: errFail,
expectedError: expErrFail,
metricNames: []string{lastSeenTimestamp},
expectedMetrics: `
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
Expand Down

0 comments on commit ad98691

Please sign in to comment.