diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 12afb752b8e..5dcfe53b40b 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -31,13 +31,14 @@ import ( "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" - "github.com/thanos-io/thanos/pkg/api" - statusapi "github.com/thanos-io/thanos/pkg/api/status" - "github.com/thanos-io/thanos/pkg/logging" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "github.com/thanos-io/thanos/pkg/api" + statusapi "github.com/thanos-io/thanos/pkg/api/status" + "github.com/thanos-io/thanos/pkg/logging" + "github.com/thanos-io/thanos/pkg/errutil" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" "github.com/thanos-io/thanos/pkg/runutil" @@ -350,7 +351,25 @@ type replica struct { // endpointReplica is a pair of a receive endpoint and a write request replica. type endpointReplica struct { endpoint string - replica replica + replica uint64 +} + +type trackedSeries struct { + seriesIDs []int + timeSeries []prompb.TimeSeries +} + +type writeResponse struct { + seriesIDs []int + replica uint64 + err error +} + +func newWriteResponse(seriesIDs []int, err error) writeResponse { + return writeResponse{ + seriesIDs: seriesIDs, + err: err, + } } func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenant string, wreq *prompb.WriteRequest) error { @@ -552,30 +571,39 @@ func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *p return errors.New("hashring is not ready") } - // Batch all of the time series in the write request - // into several smaller write requests that are - // grouped by target endpoint. This ensures that - // for any incoming write request to a node, - // at most one outgoing write request will be made - // to every other node in the hashring, rather than - // one request per time series. - wreqs := make(map[endpointReplica]*prompb.WriteRequest) - for i := range wreq.Timeseries { - endpoint, err := h.hashring.GetN(tenant, &wreq.Timeseries[i], r.n) - if err != nil { - h.mtx.RUnlock() - return err + var replicas []uint64 + if r.replicated { + replicas = []uint64{r.n} + } else { + for rn := uint64(0); rn < h.options.ReplicationFactor; rn++ { + replicas = append(replicas, rn) } - key := endpointReplica{endpoint: endpoint, replica: r} - if _, ok := wreqs[key]; !ok { - wreqs[key] = &prompb.WriteRequest{} + } + + wreqs := make(map[endpointReplica]trackedSeries) + for tsID, ts := range wreq.Timeseries { + for _, rn := range replicas { + endpoint, err := h.hashring.GetN(tenant, &ts, rn) + if err != nil { + h.mtx.RUnlock() + return err + } + key := endpointReplica{endpoint: endpoint, replica: rn} + er, ok := wreqs[key] + if !ok { + er = trackedSeries{ + seriesIDs: make([]int, 0), + timeSeries: make([]prompb.TimeSeries, 0), + } + } + er.timeSeries = append(wreqs[key].timeSeries, ts) + er.seriesIDs = append(wreqs[key].seriesIDs, tsID) + wreqs[key] = er } - wr := wreqs[key] - wr.Timeseries = append(wr.Timeseries, wreq.Timeseries[i]) } h.mtx.RUnlock() - return h.fanoutForward(ctx, tenant, wreqs, len(wreqs)) + return h.fanoutForward(ctx, tenant, wreqs, len(wreq.Timeseries), r.replicated) } // writeQuorum returns minimum number of replicas that has to confirm write success before claiming replication success. @@ -583,16 +611,26 @@ func (h *Handler) writeQuorum() int { return int((h.options.ReplicationFactor / 2) + 1) } +func quorumReached(successes []int, successThreshold int) bool { + for _, success := range successes { + if success < successThreshold { + return false + } + } + + return true +} + // fanoutForward fans out concurrently given set of write requests. It returns status immediately when quorum of // requests succeeds or fails or if context is canceled. -func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[endpointReplica]*prompb.WriteRequest, successThreshold int) error { +func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[endpointReplica]trackedSeries, numSeries int, seriesReplicated bool) error { var errs errutil.MultiError fctx, cancel := context.WithTimeout(tracing.CopyTraceContext(context.Background(), pctx), h.options.ForwardTimeout) defer func() { if errs.Err() != nil { // NOTICE: The cancel function is not used on all paths intentionally, - // if there is no error when quorum successThreshold is reached, + // if there is no error when quorum is reached, // let forward requests to optimistically run until timeout. cancel() } @@ -607,38 +645,11 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e tLogger = log.With(h.logger, logTags) } - ec := make(chan error) + ec := make(chan writeResponse) var wg sync.WaitGroup for er := range wreqs { - er := er - r := er.replica - endpoint := er.endpoint - wg.Add(1) - // If the request is not yet replicated, let's replicate it. - // If the replication factor isn't greater than 1, let's - // just forward the requests. - if !r.replicated && h.options.ReplicationFactor > 1 { - go func(endpoint string) { - defer wg.Done() - - var err error - tracing.DoInSpan(fctx, "receive_replicate", func(ctx context.Context) { - err = h.replicate(ctx, tenant, wreqs[er]) - }) - if err != nil { - h.replications.WithLabelValues(labelError).Inc() - ec <- errors.Wrapf(err, "replicate write request for endpoint %v", endpoint) - return - } - - h.replications.WithLabelValues(labelSuccess).Inc() - ec <- nil - }(endpoint) - - continue - } // If the endpoint for the write request is the // local node, then don't make a request but store locally. @@ -646,29 +657,31 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e // function as replication to other nodes, we can treat // a failure to write locally as just another error that // can be ignored if the replication factor is met. - if endpoint == h.options.Endpoint { - go func(endpoint string) { + if er.endpoint == h.options.Endpoint { + go func(er endpointReplica) { defer wg.Done() var err error tracing.DoInSpan(fctx, "receive_tsdb_write", func(_ context.Context) { - err = h.writer.Write(fctx, tenant, wreqs[er]) + err = h.writer.Write(fctx, tenant, &prompb.WriteRequest{ + Timeseries: wreqs[er].timeSeries, + }) }) if err != nil { // When a MultiError is added to another MultiError, the error slices are concatenated, not nested. // To avoid breaking the counting logic, we need to flatten the error. level.Debug(tLogger).Log("msg", "local tsdb write failed", "err", err.Error()) - ec <- errors.Wrapf(determineWriteErrorCause(err, 1), "store locally for endpoint %v", endpoint) + ec <- newWriteResponse(wreqs[er].seriesIDs, errors.Wrapf(determineWriteErrorCause(err, 1), "store locally for endpoint %v", er.endpoint)) return } - ec <- nil - }(endpoint) + ec <- newWriteResponse(wreqs[er].seriesIDs, nil) + }(er) continue } // Make a request to the specified endpoint. - go func(endpoint string) { + go func(er endpointReplica) { defer wg.Done() var ( @@ -684,18 +697,18 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e h.forwardRequests.WithLabelValues(labelSuccess).Inc() }() - cl, err = h.peers.get(fctx, endpoint) + cl, err = h.peers.get(fctx, er.endpoint) if err != nil { - ec <- errors.Wrapf(err, "get peer connection for endpoint %v", endpoint) + ec <- newWriteResponse(wreqs[er].seriesIDs, errors.Wrapf(err, "get peer connection for endpoint %v", er.endpoint)) return } h.mtx.RLock() - b, ok := h.peerStates[endpoint] + b, ok := h.peerStates[er.endpoint] if ok { if time.Now().Before(b.nextAllowed) { h.mtx.RUnlock() - ec <- errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", endpoint) + ec <- newWriteResponse(wreqs[er].seriesIDs, errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", er.endpoint)) return } } @@ -705,10 +718,10 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e tracing.DoInSpan(fctx, "receive_forward", func(ctx context.Context) { // Actually make the request against the endpoint we determined should handle these time series. _, err = cl.RemoteWrite(ctx, &storepb.WriteRequest{ - Timeseries: wreqs[er].Timeseries, + Timeseries: wreqs[er].timeSeries, Tenant: tenant, // Increment replica since on-the-wire format is 1-indexed and 0 indicates un-replicated. - Replica: int64(r.n + 1), + Replica: int64(er.replica + 1), }) }) if err != nil { @@ -716,26 +729,27 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e if st, ok := status.FromError(err); ok { if st.Code() == codes.Unavailable { h.mtx.Lock() - if b, ok := h.peerStates[endpoint]; ok { + if b, ok := h.peerStates[er.endpoint]; ok { b.attempt++ dur := h.expBackoff.ForAttempt(b.attempt) b.nextAllowed = time.Now().Add(dur) level.Debug(tLogger).Log("msg", "target unavailable backing off", "for", dur) } else { - h.peerStates[endpoint] = &retryState{nextAllowed: time.Now().Add(h.expBackoff.ForAttempt(0))} + h.peerStates[er.endpoint] = &retryState{nextAllowed: time.Now().Add(h.expBackoff.ForAttempt(0))} } h.mtx.Unlock() } } - ec <- errors.Wrapf(err, "forwarding request to endpoint %v", endpoint) + werr := errors.Wrapf(err, "forwarding request to endpoint %v", er.endpoint) + ec <- newWriteResponse(wreqs[er].seriesIDs, werr) return } h.mtx.Lock() - delete(h.peerStates, endpoint) + delete(h.peerStates, er.endpoint) h.mtx.Unlock() - ec <- nil - }(endpoint) + ec <- newWriteResponse(wreqs[er].seriesIDs, nil) + }(er) } go func() { @@ -747,82 +761,49 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e // This is needed if context is canceled or if we reached success of fail quorum faster. defer func() { go func() { - for err := range ec { - if err != nil { - level.Debug(tLogger).Log("msg", "request failed, but not needed to achieve quorum", "err", err) + for wresp := range ec { + if wresp.err != nil { + level.Debug(tLogger).Log("msg", "request failed, but not needed to achieve quorum", "err", wresp.err) } } }() }() - var success int + quorum := h.writeQuorum() + failureThreshold := h.options.ReplicationFactor - uint64(quorum) + successes := make([]int, numSeries) + seriesErrs := make([]errutil.MultiError, numSeries) for { select { case <-fctx.Done(): return fctx.Err() - case err, more := <-ec: + case wresp, more := <-ec: if !more { + for _, rerr := range seriesErrs { + if seriesReplicated { + errs.Add(rerr.Err()) + } else if uint64(len(rerr)) >= failureThreshold { + cause := determineWriteErrorCause(rerr.Err(), quorum) + errs.Add(errors.Wrapf(cause, "failed to replicate series")) + } + } return errs.Err() } - if err == nil { - success++ - if success >= successThreshold { - // In case the success threshold is lower than the total - // number of requests, then we can finish early here. This - // is the case for quorum writes for example. - return nil + if wresp.err != nil { + for _, tsID := range wresp.seriesIDs { + seriesErrs[tsID].Add(wresp.err) } continue } - errs.Add(err) - } - } -} - -// replicate replicates a write request to (replication-factor) nodes -// selected by the tenant and time series. -// The function only returns when all replication requests have finished -// or the context is canceled. -func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.WriteRequest) error { - // It is possible that hashring is ready in testReady() but unready now, - // so need to lock here. - h.mtx.RLock() - if h.hashring == nil { - h.mtx.RUnlock() - return errors.New("hashring is not ready") - } - - replicatedRequests := make(map[endpointReplica]*prompb.WriteRequest) - for i := uint64(0); i < h.options.ReplicationFactor; i++ { - for _, ts := range wreq.Timeseries { - endpoint, err := h.hashring.GetN(tenant, &ts, i) - if err != nil { - h.mtx.RUnlock() - return err - } - er := endpointReplica{ - endpoint: endpoint, - replica: replica{n: i, replicated: true}, + for _, tsID := range wresp.seriesIDs { + successes[tsID]++ } - replicatedRequest, ok := replicatedRequests[er] - if !ok { - replicatedRequest = &prompb.WriteRequest{ - Timeseries: make([]prompb.TimeSeries, 0), - } - replicatedRequests[er] = replicatedRequest + if quorumReached(successes, quorum) { + return nil } - replicatedRequest.Timeseries = append(replicatedRequest.Timeseries, ts) } } - h.mtx.RUnlock() - - quorum := h.writeQuorum() - // fanoutForward only returns an error if successThreshold (quorum) is not reached. - if err := h.fanoutForward(ctx, tenant, replicatedRequests, quorum); err != nil { - return errors.Wrap(determineWriteErrorCause(err, quorum), "quorum not reached") - } - return nil } // RemoteWrite implements the gRPC remote write handler for storepb.WriteableStore. @@ -923,7 +904,9 @@ type retryState struct { nextAllowed time.Time } -type expectedErrors []*struct { +type expectedErrors []*expectedError + +type expectedError struct { err error cause func(error) bool count int diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 4766036fa54..eeb4cd6cbaf 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -9,7 +9,6 @@ import ( "fmt" "io" "math" - "math/rand" "net/http" "net/http/httptest" "os" @@ -53,10 +52,11 @@ import ( func TestDetermineWriteErrorCause(t *testing.T) { for _, tc := range []struct { - name string - err error - threshold int - exp error + name string + err error + threshold int + forReplication bool + exp error }{ { name: "nil", @@ -231,13 +231,15 @@ func TestDetermineWriteErrorCause(t *testing.T) { exp: errors.New("baz: 3 errors: 3 errors: qux; rpc error: code = AlreadyExists desc = conflict; rpc error: code = AlreadyExists desc = conflict; foo; bar"), }, } { - err := determineWriteErrorCause(tc.err, tc.threshold) - if tc.exp != nil { - testutil.NotOk(t, err) - testutil.Equals(t, tc.exp.Error(), err.Error()) - continue - } - testutil.Ok(t, err) + t.Run(tc.name, func(t *testing.T) { + err := determineWriteErrorCause(tc.err, tc.threshold) + if tc.exp != nil { + testutil.NotOk(t, err) + testutil.Equals(t, tc.exp.Error(), err.Error()) + return + } + testutil.Ok(t, err) + }) } } @@ -347,7 +349,7 @@ func (f *fakeAppender) Rollback() error { return f.rollbackErr() } -func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) ([]*Handler, Hashring) { +func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64, hashringAlgo HashringAlgorithm) ([]*Handler, Hashring) { var ( cfg = []HashringConfig{{Hashring: "test"}} handlers []*Handler @@ -366,60 +368,44 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin }, } + ag := addrGen{} limiter, _ := NewLimiter(NewNopConfig(), nil, RouterIngestor, log.NewNopLogger()) for i := range appendables { h := NewHandler(nil, &Options{ TenantHeader: DefaultTenantHeader, ReplicaHeader: DefaultReplicaHeader, ReplicationFactor: replicationFactor, - ForwardTimeout: 5 * time.Minute, + ForwardTimeout: 5 * time.Second, Writer: NewWriter(log.NewNopLogger(), newFakeTenantAppendable(appendables[i])), Limiter: limiter, }) handlers = append(handlers, h) h.peers = peers - addr := randomAddr() + addr := ag.newAddr() h.options.Endpoint = addr cfg[0].Endpoints = append(cfg[0].Endpoints, h.options.Endpoint) peers.cache[addr] = &fakeRemoteWriteGRPCServer{h: h} } - hashring := newMultiHashring(AlgorithmHashmod, replicationFactor, cfg) + // Use hashmod as default. + if hashringAlgo == "" { + hashringAlgo = AlgorithmHashmod + } + + hashring := newMultiHashring(hashringAlgo, replicationFactor, cfg) for _, h := range handlers { h.Hashring(hashring) } return handlers, hashring } -func TestReceiveQuorum(t *testing.T) { +func testReceiveQuorum(t *testing.T, hashringAlgo HashringAlgorithm, withConsistencyDelay bool) { appenderErrFn := func() error { return errors.New("failed to get appender") } conflictErrFn := func() error { return storage.ErrOutOfBounds } commitErrFn := func() error { return errors.New("failed to commit") } - wreq1 := &prompb.WriteRequest{ - Timeseries: []prompb.TimeSeries{ - { - Labels: []labelpb.ZLabel{ - { - Name: "foo", - Value: "bar", - }, - }, - Samples: []prompb.Sample{ - { - Value: 1, - Timestamp: 1, - }, - { - Value: 2, - Timestamp: 2, - }, - { - Value: 3, - Timestamp: 3, - }, - }, - }, - }, + wreq := &prompb.WriteRequest{ + Timeseries: makeSeriesWithValues(50), } + for _, tc := range []struct { name string status int @@ -431,7 +417,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 1 success", status: http.StatusOK, replicationFactor: 1, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, nil, nil), @@ -442,7 +428,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 1 commit error", status: http.StatusInternalServerError, replicationFactor: 1, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, commitErrFn, nil), @@ -453,7 +439,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 1 conflict", status: http.StatusConflict, replicationFactor: 1, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(conflictErrFn, nil, nil), @@ -464,7 +450,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 2 success", status: http.StatusOK, replicationFactor: 1, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, nil, nil), @@ -478,7 +464,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 success", status: http.StatusOK, replicationFactor: 1, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, nil, nil), @@ -495,7 +481,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 success with replication", status: http.StatusOK, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, nil, nil), @@ -512,7 +498,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 commit error", status: http.StatusInternalServerError, replicationFactor: 1, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, commitErrFn, nil), @@ -529,7 +515,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 commit error with replication", status: http.StatusInternalServerError, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, commitErrFn, nil), @@ -546,7 +532,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 appender error with replication", status: http.StatusInternalServerError, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, nil, nil), @@ -566,7 +552,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 conflict with replication", status: http.StatusConflict, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(conflictErrFn, nil, nil), @@ -583,7 +569,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 conflict and commit error with replication", status: http.StatusConflict, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(conflictErrFn, commitErrFn, nil), @@ -600,7 +586,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 with replication and one faulty", status: http.StatusOK, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(cycleErrors([]error{storage.ErrOutOfBounds, storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp}), nil, nil), @@ -617,7 +603,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 with replication and one commit error", status: http.StatusOK, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, commitErrFn, nil), @@ -634,7 +620,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 with replication and two conflicts", status: http.StatusConflict, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(cycleErrors([]error{storage.ErrOutOfBounds, storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp}), nil, nil), @@ -651,11 +637,28 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 with replication one conflict and one commit error", status: http.StatusInternalServerError, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ + { + appender: newFakeAppender(nil, commitErrFn, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, { appender: newFakeAppender(cycleErrors([]error{storage.ErrOutOfBounds, storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp}), nil, nil), }, + }, + }, + { + name: "size 3 with replication two commit errors", + status: http.StatusInternalServerError, + replicationFactor: 3, + wreq: wreq, + appendables: []*fakeAppendable{ + { + appender: newFakeAppender(nil, commitErrFn, nil), + }, { appender: newFakeAppender(nil, commitErrFn, nil), }, @@ -665,10 +668,62 @@ func TestReceiveQuorum(t *testing.T) { }, }, { - name: "size 3 with replication two commit errors", - status: http.StatusInternalServerError, + name: "size 6 with replication 3", + status: http.StatusOK, + replicationFactor: 3, + wreq: wreq, + appendables: []*fakeAppendable{ + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + }, + }, + { + name: "size 6 with replication 3 one commit and two conflict error", + status: http.StatusConflict, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, + appendables: []*fakeAppendable{ + { + appender: newFakeAppender(nil, commitErrFn, nil), + }, + { + appender: newFakeAppender(nil, conflictErrFn, nil), + }, + { + appender: newFakeAppender(nil, conflictErrFn, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + }, + }, + { + name: "size 6 with replication 5 two commit errors", + status: http.StatusOK, + replicationFactor: 5, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, commitErrFn, nil), @@ -679,11 +734,20 @@ func TestReceiveQuorum(t *testing.T) { { appender: newFakeAppender(nil, nil, nil), }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, }, }, } { t.Run(tc.name, func(t *testing.T) { - handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor) + handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor, hashringAlgo) tenant := "test" // Test from the point of view of every node // so that we know status code does not depend @@ -692,12 +756,17 @@ func TestReceiveQuorum(t *testing.T) { // Test that the correct status is returned. rec, err := makeRequest(handler, tenant, tc.wreq) if err != nil { - t.Fatalf("handler %d: unexpectedly failed making HTTP request: %v", tc.status, err) + t.Fatalf("handler %d: unexpectedly failed making HTTP request: %v", i+1, err) } if rec.Code != tc.status { - t.Errorf("handler %d: got unexpected HTTP status code: expected %d, got %d; body: %s", i, tc.status, rec.Code, rec.Body.String()) + t.Errorf("handler %d: got unexpected HTTP status code: expected %d, got %d; body: %s", i+1, tc.status, rec.Code, rec.Body.String()) } } + + if withConsistencyDelay { + time.Sleep(50 * time.Millisecond) + } + // Test that each time series is stored // the correct amount of times in each fake DB. for _, ts := range tc.wreq.Timeseries { @@ -709,16 +778,30 @@ func TestReceiveQuorum(t *testing.T) { } } for j, a := range tc.appendables { - var expectedMin int - n := a.appender.(*fakeAppender).Get(lset) - got := uint64(len(n)) - if a.appenderErr == nil && endpointHit(t, hashring, tc.replicationFactor, handlers[j].options.Endpoint, tenant, &ts) { - // We have len(handlers) copies of each sample because the test case - // is run once for each handler and they all use the same appender. - expectedMin = int((tc.replicationFactor/2)+1) * len(ts.Samples) - } - if uint64(expectedMin) > got { - t.Errorf("handler: %d, labels %q: expected minimum of %d samples, got %d", j, lset.String(), expectedMin, got) + if withConsistencyDelay { + var expected int + n := a.appender.(*fakeAppender).Get(lset) + got := uint64(len(n)) + if a.appenderErr == nil && endpointHit(t, hashring, tc.replicationFactor, handlers[j].options.Endpoint, tenant, &ts) { + // We have len(handlers) copies of each sample because the test case + // is run once for each handler and they all use the same appender. + expected = len(handlers) * len(ts.Samples) + } + if uint64(expected) != got { + t.Errorf("handler: %d, labels %q: expected %d samples, got %d", j, lset.String(), expected, got) + } + } else { + var expectedMin int + n := a.appender.(*fakeAppender).Get(lset) + got := uint64(len(n)) + if a.appenderErr == nil && endpointHit(t, hashring, tc.replicationFactor, handlers[j].options.Endpoint, tenant, &ts) { + // We have len(handlers) copies of each sample because the test case + // is run once for each handler and they all use the same appender. + expectedMin = int((tc.replicationFactor/2)+1) * len(ts.Samples) + } + if uint64(expectedMin) > got { + t.Errorf("handler: %d, labels %q: expected minimum of %d samples, got %d", j, lset.String(), expectedMin, got) + } } } } @@ -726,6 +809,22 @@ func TestReceiveQuorum(t *testing.T) { } } +func TestReceiveQuorumHashmod(t *testing.T) { + testReceiveQuorum(t, AlgorithmHashmod, false) +} + +func TestReceiveQuorumKetama(t *testing.T) { + testReceiveQuorum(t, AlgorithmKetama, false) +} + +func TestReceiveWithConsistencyDelayHashmod(t *testing.T) { + testReceiveQuorum(t, AlgorithmHashmod, true) +} + +func TestReceiveWithConsistencyDelayKetama(t *testing.T) { + testReceiveQuorum(t, AlgorithmKetama, true) +} + func TestReceiveWriteRequestLimits(t *testing.T) { for _, tc := range []struct { name string @@ -778,7 +877,7 @@ func TestReceiveWriteRequestLimits(t *testing.T) { appender: newFakeAppender(nil, nil, nil), }, } - handlers, _ := newTestHandlerHashring(appendables, 3) + handlers, _ := newTestHandlerHashring(appendables, 3, AlgorithmHashmod) handler := handlers[0] tenant := "test" @@ -832,348 +931,6 @@ func TestReceiveWriteRequestLimits(t *testing.T) { } } -func TestReceiveWithConsistencyDelay(t *testing.T) { - appenderErrFn := func() error { return errors.New("failed to get appender") } - conflictErrFn := func() error { return storage.ErrOutOfBounds } - commitErrFn := func() error { return errors.New("failed to commit") } - wreq1 := &prompb.WriteRequest{ - Timeseries: []prompb.TimeSeries{ - { - Labels: []labelpb.ZLabel{ - { - Name: "foo", - Value: "bar", - }, - }, - Samples: []prompb.Sample{ - { - Value: 1, - Timestamp: 1, - }, - { - Value: 2, - Timestamp: 2, - }, - { - Value: 3, - Timestamp: 3, - }, - }, - }, - }, - } - for _, tc := range []struct { - name string - status int - replicationFactor uint64 - wreq *prompb.WriteRequest - appendables []*fakeAppendable - }{ - { - name: "size 1 success", - status: http.StatusOK, - replicationFactor: 1, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - { - name: "size 1 commit error", - status: http.StatusInternalServerError, - replicationFactor: 1, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - }, - }, - { - name: "size 1 conflict", - status: http.StatusConflict, - replicationFactor: 1, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(conflictErrFn, nil, nil), - }, - }, - }, - { - name: "size 2 success", - status: http.StatusOK, - replicationFactor: 1, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - { - name: "size 3 success", - status: http.StatusOK, - replicationFactor: 1, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - { - name: "size 3 success with replication", - status: http.StatusOK, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - { - name: "size 3 commit error", - status: http.StatusInternalServerError, - replicationFactor: 1, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - }, - }, - { - name: "size 3 commit error with replication", - status: http.StatusInternalServerError, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - }, - }, - { - name: "size 3 appender error with replication", - status: http.StatusInternalServerError, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, nil, nil), - appenderErr: appenderErrFn, - }, - { - appender: newFakeAppender(nil, nil, nil), - appenderErr: appenderErrFn, - }, - { - appender: newFakeAppender(nil, nil, nil), - appenderErr: appenderErrFn, - }, - }, - }, - { - name: "size 3 conflict with replication", - status: http.StatusConflict, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(conflictErrFn, nil, nil), - }, - { - appender: newFakeAppender(conflictErrFn, nil, nil), - }, - { - appender: newFakeAppender(conflictErrFn, nil, nil), - }, - }, - }, - { - name: "size 3 conflict and commit error with replication", - status: http.StatusConflict, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(conflictErrFn, commitErrFn, nil), - }, - { - appender: newFakeAppender(conflictErrFn, commitErrFn, nil), - }, - { - appender: newFakeAppender(conflictErrFn, commitErrFn, nil), - }, - }, - }, - { - name: "size 3 with replication and one faulty", - status: http.StatusOK, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(cycleErrors([]error{storage.ErrOutOfBounds, storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp}), nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - { - name: "size 3 with replication and one commit error", - status: http.StatusOK, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - { - name: "size 3 with replication and two conflicts", - status: http.StatusConflict, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(cycleErrors([]error{storage.ErrOutOfBounds, storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp}), nil, nil), - }, - { - appender: newFakeAppender(conflictErrFn, nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - { - name: "size 3 with replication one conflict and one commit error", - status: http.StatusInternalServerError, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(cycleErrors([]error{storage.ErrOutOfBounds, storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp}), nil, nil), - }, - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - { - name: "size 3 with replication two commit errors", - status: http.StatusInternalServerError, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - } { - // Run the quorum tests with consistency delay, which should allow us - // to see all requests completing all the time, since we're using local - // network we are not expecting anything to go wrong with these. - t.Run(tc.name, func(t *testing.T) { - handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor) - tenant := "test" - // Test from the point of view of every node - // so that we know status code does not depend - // on which node is erroring and which node is receiving. - for i, handler := range handlers { - // Test that the correct status is returned. - rec, err := makeRequest(handler, tenant, tc.wreq) - if err != nil { - t.Fatalf("handler %d: unexpectedly failed making HTTP request: %v", tc.status, err) - } - if rec.Code != tc.status { - t.Errorf("handler %d: got unexpected HTTP status code: expected %d, got %d; body: %s", i, tc.status, rec.Code, rec.Body.String()) - } - } - - time.Sleep(50 * time.Millisecond) - - // Test that each time series is stored - // the correct amount of times in each fake DB. - for _, ts := range tc.wreq.Timeseries { - lset := make(labels.Labels, len(ts.Labels)) - for j := range ts.Labels { - lset[j] = labels.Label{ - Name: ts.Labels[j].Name, - Value: ts.Labels[j].Value, - } - } - for j, a := range tc.appendables { - var expected int - n := a.appender.(*fakeAppender).Get(lset) - got := uint64(len(n)) - if a.appenderErr == nil && endpointHit(t, hashring, tc.replicationFactor, handlers[j].options.Endpoint, tenant, &ts) { - // We have len(handlers) copies of each sample because the test case - // is run once for each handler and they all use the same appender. - expected = len(handlers) * len(ts.Samples) - } - if uint64(expected) != got { - t.Errorf("handler: %d, labels %q: expected %d samples, got %d", j, lset.String(), expected, got) - } - } - } - }) - } -} - // endpointHit is a helper to determine if a given endpoint in a hashring would be selected // for a given time series, tenant, and replication factor. func endpointHit(t *testing.T, h Hashring, rf uint64, endpoint, tenant string, timeSeries *prompb.TimeSeries) bool { @@ -1224,8 +981,11 @@ func makeRequest(h *Handler, tenant string, wreq *prompb.WriteRequest) (*httptes return rec, nil } -func randomAddr() string { - return fmt.Sprintf("http://%d.%d.%d.%d:%d", rand.Intn(256), rand.Intn(256), rand.Intn(256), rand.Intn(256), rand.Intn(35000)+30000) +type addrGen struct{ n int } + +func (a *addrGen) newAddr() string { + a.n++ + return fmt.Sprintf("http://node-%d:%d", a.n, 12345+a.n) } type fakeRemoteWriteGRPCServer struct { @@ -1302,10 +1062,31 @@ func serializeSeriesWithOneSample(t testing.TB, series [][]labelpb.ZLabel) []byt return snappy.Encode(nil, body) } +func makeSeriesWithValues(numSeries int) []prompb.TimeSeries { + series := make([]prompb.TimeSeries, numSeries) + for i := 0; i < numSeries; i++ { + series[i] = prompb.TimeSeries{ + Labels: []labelpb.ZLabel{ + { + Name: fmt.Sprintf("pod-%d", i), + Value: fmt.Sprintf("nginx-%d", i), + }, + }, + Samples: []prompb.Sample{ + { + Value: float64(i), + Timestamp: 10, + }, + }, + } + } + return series +} + func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) { dir := b.TempDir() - handlers, _ := newTestHandlerHashring([]*fakeAppendable{nil}, 1) + handlers, _ := newTestHandlerHashring([]*fakeAppendable{nil}, 1, AlgorithmHashmod) handler := handlers[0] reg := prometheus.NewRegistry()