From 57f7fab8b1b137e2acacafae4554c28861aa16f4 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Mon, 21 Nov 2022 13:15:27 +0100 Subject: [PATCH] Add repro test case Signed-off-by: Filip Petkovski --- pkg/receive/handler.go | 72 +++++++++++++---------- pkg/receive/handler_test.go | 111 ++++++++++++++++++++++++------------ 2 files changed, 116 insertions(+), 67 deletions(-) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index d07e0c4b73c..0a1bba381c1 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -361,8 +361,17 @@ type writeRequest struct { } type writeResponse struct { - batch uint64 - err error + batch uint64 + replica uint64 + err error +} + +func newWriteResponse(batch, replica uint64, err error) writeResponse { + return writeResponse{ + batch: batch, + replica: replica, + err: err, + } } func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenant string, wreq *prompb.WriteRequest) error { @@ -592,7 +601,7 @@ func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *p } h.mtx.RUnlock() - return h.fanoutForward(ctx, tenant, wreqs, uint64(len(wreqs)), len(wreqs)) + return h.fanoutForward(ctx, tenant, wreqs, uint64(len(wreqs)), 1, 0) } // writeQuorum returns minimum number of replicas that has to confirm write success before claiming replication success. @@ -600,9 +609,15 @@ func (h *Handler) writeQuorum() int { return int((h.options.ReplicationFactor / 2) + 1) } -func quorumReached(replicationSuccess []int, successThreshold int) bool { - for _, successCount := range replicationSuccess { - if successCount < successThreshold { +func quorumReached(failures map[uint64]map[uint64]bool, successThreshold int) bool { + for _, replicas := range failures { + successCounts := 0 + for _, failure := range replicas { + if !failure { + successCounts++ + } + } + if successCounts < successThreshold { return false } } @@ -612,14 +627,14 @@ func quorumReached(replicationSuccess []int, successThreshold int) bool { // fanoutForward fans out concurrently given set of write requests. It returns status immediately when quorum of // requests succeeds or fails or if context is canceled. -func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[endpointReplica]*writeRequest, numBatches uint64, successThreshold int) error { +func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[endpointReplica]*writeRequest, numBatches uint64, numReplicas uint64, failureThreshold int) error { var errs errutil.MultiError fctx, cancel := context.WithTimeout(tracing.CopyTraceContext(context.Background(), pctx), h.options.ForwardTimeout) defer func() { if errs.Err() != nil { // NOTICE: The cancel function is not used on all paths intentionally, - // if there is no error when quorum successThreshold is reached, + // if there is no error when quorum failureThreshold is reached, // let forward requests to optimistically run until timeout. cancel() } @@ -656,12 +671,12 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e }) if err != nil { h.replications.WithLabelValues(labelError).Inc() - ec <- writeResponse{batch: wreqs[er].batch, err: errors.Wrapf(err, "replicate write request for endpoint %v", endpoint)} + ec <- newWriteResponse(wreqs[er].batch, er.replica.n, errors.Wrapf(err, "replicate write request for endpoint %v", endpoint)) return } h.replications.WithLabelValues(labelSuccess).Inc() - ec <- writeResponse{batch: wreqs[er].batch, err: nil} + ec <- newWriteResponse(wreqs[er].batch, er.replica.n, nil) }(endpoint) continue @@ -685,10 +700,10 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e // When a MultiError is added to another MultiError, the error slices are concatenated, not nested. // To avoid breaking the counting logic, we need to flatten the error. level.Debug(tLogger).Log("msg", "local tsdb write failed", "err", err.Error()) - ec <- writeResponse{batch: r.batch, err: errors.Wrapf(determineWriteErrorCause(err, 1), "store locally for endpoint %v", endpoint)} + ec <- newWriteResponse(wreqs[er].batch, er.replica.n, errors.Wrapf(determineWriteErrorCause(err, 1), "store locally for endpoint %v", endpoint)) return } - ec <- writeResponse{batch: r.batch, err: nil} + ec <- newWriteResponse(wreqs[er].batch, er.replica.n, nil) }(endpoint) continue @@ -713,7 +728,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e cl, err = h.peers.get(fctx, endpoint) if err != nil { - ec <- writeResponse{batch: r.batch, err: errors.Wrapf(err, "get peer connection for endpoint %v", endpoint)} + ec <- newWriteResponse(wreqs[er].batch, er.replica.n, errors.Wrapf(err, "get peer connection for endpoint %v", endpoint)) return } @@ -722,7 +737,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e if ok { if time.Now().Before(b.nextAllowed) { h.mtx.RUnlock() - ec <- writeResponse{batch: r.batch, err: errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", endpoint)} + ec <- newWriteResponse(wreqs[er].batch, er.replica.n, errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", endpoint)) return } } @@ -754,14 +769,14 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e h.mtx.Unlock() } } - ec <- writeResponse{batch: r.batch, err: errors.Wrapf(err, "forwarding request to endpoint %v", endpoint)} + ec <- newWriteResponse(wreqs[er].batch, er.replica.n, errors.Wrapf(err, "forwarding request to endpoint %v", endpoint)) return } h.mtx.Lock() delete(h.peerStates, endpoint) h.mtx.Unlock() - ec <- writeResponse{batch: r.batch, err: nil} + ec <- newWriteResponse(wreqs[er].batch, er.replica.n, nil) }(endpoint) } @@ -782,26 +797,25 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e }() }() - successCounts := make([]int, numBatches) + failures := make(map[uint64]map[uint64]bool, numBatches) for { select { case <-fctx.Done(): return fctx.Err() case wr, more := <-ec: if !more { + if quorumReached(failures, failureThreshold) { + return nil + } return errs.Err() } - if wr.err == nil { - successCounts[wr.batch]++ - // In case the success threshold is lower than the total - // number of requests, then we can finish early here. This - // is the case for quorum writes for example. - if quorumReached(successCounts, successThreshold) { - return nil + if wr.err != nil { + if _, ok := failures[wr.batch]; !ok { + failures[wr.batch] = map[uint64]bool{} } - continue + failures[wr.batch][wr.replica] = true + errs.Add(wr.err) } - errs.Add(wr.err) } } } @@ -847,10 +861,10 @@ func (h *Handler) replicate(ctx context.Context, tenant string, wreq *writeReque } h.mtx.RUnlock() - quorum := h.writeQuorum() + failureThreshold := int(h.options.ReplicationFactor) - h.writeQuorum() // fanoutForward only returns an error if successThreshold (quorum) is not reached. - if err := h.fanoutForward(ctx, tenant, replicatedRequests, 1, quorum); err != nil { - return errors.Wrap(determineWriteErrorCause(err, quorum), "quorum not reached") + if err := h.fanoutForward(ctx, tenant, replicatedRequests, 1, h.options.ReplicationFactor, failureThreshold); err != nil { + return errors.Wrap(determineWriteErrorCause(err, h.writeQuorum()), "quorum not reached") } return nil } diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 4a2a5360385..fce05194b26 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -347,7 +347,7 @@ func (f *fakeAppender) Rollback() error { return f.rollbackErr() } -func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) ([]*Handler, Hashring) { +func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64, hashringAlgorithm HashringAlgorithm) ([]*Handler, Hashring) { var ( cfg = []HashringConfig{{Hashring: "test"}} handlers []*Handler @@ -383,7 +383,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin cfg[0].Endpoints = append(cfg[0].Endpoints, h.options.Endpoint) peers.cache[addr] = &fakeRemoteWriteGRPCServer{h: h} } - hashring := newMultiHashring(AlgorithmHashmod, replicationFactor, cfg) + hashring := newMultiHashring(hashringAlgorithm, replicationFactor, cfg) for _, h := range handlers { h.Hashring(hashring) } @@ -420,6 +420,7 @@ func TestReceiveQuorum(t *testing.T) { }, }, } + for _, tc := range []struct { name string status int @@ -681,47 +682,81 @@ func TestReceiveQuorum(t *testing.T) { }, }, }, + { + name: "size 4 with replication two commit errors", + status: http.StatusOK, + replicationFactor: 3, + wreq: &prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}, + Samples: []prompb.Sample{{Value: 1, Timestamp: 1}, {Value: 2, Timestamp: 2}, {Value: 3, Timestamp: 3}}, + }, + { + Labels: []labelpb.ZLabel{{Name: "foo", Value: "xyz3"}}, + Samples: []prompb.Sample{{Value: 1, Timestamp: 1}, {Value: 2, Timestamp: 2}, {Value: 3, Timestamp: 3}}, + }, + }, + }, + appendables: []*fakeAppendable{ + { + appender: newFakeAppender(nil, commitErrFn, nil), + }, + { + appender: newFakeAppender(nil, commitErrFn, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + }, + }, } { t.Run(tc.name, func(t *testing.T) { - handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor) - tenant := "test" - // Test from the point of view of every node - // so that we know status code does not depend - // on which node is erroring and which node is receiving. - for i, handler := range handlers { - // Test that the correct status is returned. - rec, err := makeRequest(handler, tenant, tc.wreq) - if err != nil { - t.Fatalf("handler %d: unexpectedly failed making HTTP request: %v", tc.status, err) - } - if rec.Code != tc.status { - t.Errorf("handler %d: got unexpected HTTP status code: expected %d, got %d; body: %s", i, tc.status, rec.Code, rec.Body.String()) - } - } - // Test that each time series is stored - // the correct amount of times in each fake DB. - for _, ts := range tc.wreq.Timeseries { - lset := make(labels.Labels, len(ts.Labels)) - for j := range ts.Labels { - lset[j] = labels.Label{ - Name: ts.Labels[j].Name, - Value: ts.Labels[j].Value, + for _, hashringAlgo := range []HashringAlgorithm{AlgorithmKetama} { + handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor, hashringAlgo) + tenant := "test" + // Test from the point of view of every node + // so that we know status code does not depend + // on which node is erroring and which node is receiving. + for i, handler := range handlers { + // Test that the correct status is returned. + rec, err := makeRequest(handler, tenant, tc.wreq) + if err != nil { + t.Fatalf("handler %d: unexpectedly failed making HTTP request: %v", tc.status, err) + } + if rec.Code != tc.status { + t.Errorf("handler %d: got unexpected HTTP status code: expected %d, got %d; body: %s", i, tc.status, rec.Code, rec.Body.String()) } } - for j, a := range tc.appendables { - var expectedMin int - n := a.appender.(*fakeAppender).Get(lset) - got := uint64(len(n)) - if a.appenderErr == nil && endpointHit(t, hashring, tc.replicationFactor, handlers[j].options.Endpoint, tenant, &ts) { - // We have len(handlers) copies of each sample because the test case - // is run once for each handler and they all use the same appender. - expectedMin = int((tc.replicationFactor/2)+1) * len(ts.Samples) + // Test that each time series is stored + // the correct amount of times in each fake DB. + for _, ts := range tc.wreq.Timeseries { + lset := make(labels.Labels, len(ts.Labels)) + for j := range ts.Labels { + lset[j] = labels.Label{ + Name: ts.Labels[j].Name, + Value: ts.Labels[j].Value, + } } - if uint64(expectedMin) > got { - t.Errorf("handler: %d, labels %q: expected minimum of %d samples, got %d", j, lset.String(), expectedMin, got) + for j, a := range tc.appendables { + var expectedMin int + n := a.appender.(*fakeAppender).Get(lset) + got := uint64(len(n)) + if a.appenderErr == nil && endpointHit(t, hashring, tc.replicationFactor, handlers[j].options.Endpoint, tenant, &ts) { + // We have len(handlers) copies of each sample because the test case + // is run once for each handler and they all use the same appender. + expectedMin = int((tc.replicationFactor/2)+1) * len(ts.Samples) + } + if uint64(expectedMin) > got { + t.Errorf("handler: %d, labels %q: expected minimum of %d samples, got %d", j, lset.String(), expectedMin, got) + } } } } + }) } } @@ -778,7 +813,7 @@ func TestReceiveWriteRequestLimits(t *testing.T) { appender: newFakeAppender(nil, nil, nil), }, } - handlers, _ := newTestHandlerHashring(appendables, 3) + handlers, _ := newTestHandlerHashring(appendables, 3, "") handler := handlers[0] tenant := "test" @@ -1128,7 +1163,7 @@ func TestReceiveWithConsistencyDelay(t *testing.T) { // to see all requests completing all the time, since we're using local // network we are not expecting anything to go wrong with these. t.Run(tc.name, func(t *testing.T) { - handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor) + handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor, "") tenant := "test" // Test from the point of view of every node // so that we know status code does not depend @@ -1305,7 +1340,7 @@ func serializeSeriesWithOneSample(t testing.TB, series [][]labelpb.ZLabel) []byt func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) { dir := b.TempDir() - handlers, _ := newTestHandlerHashring([]*fakeAppendable{nil}, 1) + handlers, _ := newTestHandlerHashring([]*fakeAppendable{nil}, 1, "") handler := handlers[0] reg := prometheus.NewRegistry()