Skip to content

Commit

Permalink
Add repro test case
Browse files Browse the repository at this point in the history
Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Nov 21, 2022
1 parent c120cfb commit 57f7fab
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 67 deletions.
72 changes: 43 additions & 29 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,17 @@ type writeRequest struct {
}

type writeResponse struct {
batch uint64
err error
batch uint64
replica uint64
err error
}

func newWriteResponse(batch, replica uint64, err error) writeResponse {
return writeResponse{
batch: batch,
replica: replica,
err: err,
}
}

func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenant string, wreq *prompb.WriteRequest) error {
Expand Down Expand Up @@ -592,17 +601,23 @@ func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *p
}
h.mtx.RUnlock()

return h.fanoutForward(ctx, tenant, wreqs, uint64(len(wreqs)), len(wreqs))
return h.fanoutForward(ctx, tenant, wreqs, uint64(len(wreqs)), 1, 0)
}

// writeQuorum returns minimum number of replicas that has to confirm write success before claiming replication success.
func (h *Handler) writeQuorum() int {
return int((h.options.ReplicationFactor / 2) + 1)
}

func quorumReached(replicationSuccess []int, successThreshold int) bool {
for _, successCount := range replicationSuccess {
if successCount < successThreshold {
func quorumReached(failures map[uint64]map[uint64]bool, successThreshold int) bool {
for _, replicas := range failures {
successCounts := 0
for _, failure := range replicas {
if !failure {
successCounts++
}
}
if successCounts < successThreshold {
return false
}
}
Expand All @@ -612,14 +627,14 @@ func quorumReached(replicationSuccess []int, successThreshold int) bool {

// fanoutForward fans out concurrently given set of write requests. It returns status immediately when quorum of
// requests succeeds or fails or if context is canceled.
func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[endpointReplica]*writeRequest, numBatches uint64, successThreshold int) error {
func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[endpointReplica]*writeRequest, numBatches uint64, numReplicas uint64, failureThreshold int) error {
var errs errutil.MultiError

fctx, cancel := context.WithTimeout(tracing.CopyTraceContext(context.Background(), pctx), h.options.ForwardTimeout)
defer func() {
if errs.Err() != nil {
// NOTICE: The cancel function is not used on all paths intentionally,
// if there is no error when quorum successThreshold is reached,
// if there is no error when quorum failureThreshold is reached,
// let forward requests to optimistically run until timeout.
cancel()
}
Expand Down Expand Up @@ -656,12 +671,12 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e
})
if err != nil {
h.replications.WithLabelValues(labelError).Inc()
ec <- writeResponse{batch: wreqs[er].batch, err: errors.Wrapf(err, "replicate write request for endpoint %v", endpoint)}
ec <- newWriteResponse(wreqs[er].batch, er.replica.n, errors.Wrapf(err, "replicate write request for endpoint %v", endpoint))
return
}

h.replications.WithLabelValues(labelSuccess).Inc()
ec <- writeResponse{batch: wreqs[er].batch, err: nil}
ec <- newWriteResponse(wreqs[er].batch, er.replica.n, nil)
}(endpoint)

continue
Expand All @@ -685,10 +700,10 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e
// When a MultiError is added to another MultiError, the error slices are concatenated, not nested.
// To avoid breaking the counting logic, we need to flatten the error.
level.Debug(tLogger).Log("msg", "local tsdb write failed", "err", err.Error())
ec <- writeResponse{batch: r.batch, err: errors.Wrapf(determineWriteErrorCause(err, 1), "store locally for endpoint %v", endpoint)}
ec <- newWriteResponse(wreqs[er].batch, er.replica.n, errors.Wrapf(determineWriteErrorCause(err, 1), "store locally for endpoint %v", endpoint))
return
}
ec <- writeResponse{batch: r.batch, err: nil}
ec <- newWriteResponse(wreqs[er].batch, er.replica.n, nil)
}(endpoint)

continue
Expand All @@ -713,7 +728,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e

cl, err = h.peers.get(fctx, endpoint)
if err != nil {
ec <- writeResponse{batch: r.batch, err: errors.Wrapf(err, "get peer connection for endpoint %v", endpoint)}
ec <- newWriteResponse(wreqs[er].batch, er.replica.n, errors.Wrapf(err, "get peer connection for endpoint %v", endpoint))
return
}

Expand All @@ -722,7 +737,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e
if ok {
if time.Now().Before(b.nextAllowed) {
h.mtx.RUnlock()
ec <- writeResponse{batch: r.batch, err: errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", endpoint)}
ec <- newWriteResponse(wreqs[er].batch, er.replica.n, errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", endpoint))
return
}
}
Expand Down Expand Up @@ -754,14 +769,14 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e
h.mtx.Unlock()
}
}
ec <- writeResponse{batch: r.batch, err: errors.Wrapf(err, "forwarding request to endpoint %v", endpoint)}
ec <- newWriteResponse(wreqs[er].batch, er.replica.n, errors.Wrapf(err, "forwarding request to endpoint %v", endpoint))
return
}
h.mtx.Lock()
delete(h.peerStates, endpoint)
h.mtx.Unlock()

ec <- writeResponse{batch: r.batch, err: nil}
ec <- newWriteResponse(wreqs[er].batch, er.replica.n, nil)
}(endpoint)
}

Expand All @@ -782,26 +797,25 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e
}()
}()

successCounts := make([]int, numBatches)
failures := make(map[uint64]map[uint64]bool, numBatches)
for {
select {
case <-fctx.Done():
return fctx.Err()
case wr, more := <-ec:
if !more {
if quorumReached(failures, failureThreshold) {
return nil
}
return errs.Err()
}
if wr.err == nil {
successCounts[wr.batch]++
// In case the success threshold is lower than the total
// number of requests, then we can finish early here. This
// is the case for quorum writes for example.
if quorumReached(successCounts, successThreshold) {
return nil
if wr.err != nil {
if _, ok := failures[wr.batch]; !ok {
failures[wr.batch] = map[uint64]bool{}
}
continue
failures[wr.batch][wr.replica] = true
errs.Add(wr.err)
}
errs.Add(wr.err)
}
}
}
Expand Down Expand Up @@ -847,10 +861,10 @@ func (h *Handler) replicate(ctx context.Context, tenant string, wreq *writeReque
}
h.mtx.RUnlock()

quorum := h.writeQuorum()
failureThreshold := int(h.options.ReplicationFactor) - h.writeQuorum()
// fanoutForward only returns an error if successThreshold (quorum) is not reached.
if err := h.fanoutForward(ctx, tenant, replicatedRequests, 1, quorum); err != nil {
return errors.Wrap(determineWriteErrorCause(err, quorum), "quorum not reached")
if err := h.fanoutForward(ctx, tenant, replicatedRequests, 1, h.options.ReplicationFactor, failureThreshold); err != nil {
return errors.Wrap(determineWriteErrorCause(err, h.writeQuorum()), "quorum not reached")
}
return nil
}
Expand Down
111 changes: 73 additions & 38 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func (f *fakeAppender) Rollback() error {
return f.rollbackErr()
}

func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) ([]*Handler, Hashring) {
func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64, hashringAlgorithm HashringAlgorithm) ([]*Handler, Hashring) {
var (
cfg = []HashringConfig{{Hashring: "test"}}
handlers []*Handler
Expand Down Expand Up @@ -383,7 +383,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin
cfg[0].Endpoints = append(cfg[0].Endpoints, h.options.Endpoint)
peers.cache[addr] = &fakeRemoteWriteGRPCServer{h: h}
}
hashring := newMultiHashring(AlgorithmHashmod, replicationFactor, cfg)
hashring := newMultiHashring(hashringAlgorithm, replicationFactor, cfg)
for _, h := range handlers {
h.Hashring(hashring)
}
Expand Down Expand Up @@ -420,6 +420,7 @@ func TestReceiveQuorum(t *testing.T) {
},
},
}

for _, tc := range []struct {
name string
status int
Expand Down Expand Up @@ -681,47 +682,81 @@ func TestReceiveQuorum(t *testing.T) {
},
},
},
{
name: "size 4 with replication two commit errors",
status: http.StatusOK,
replicationFactor: 3,
wreq: &prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{
{
Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}},
Samples: []prompb.Sample{{Value: 1, Timestamp: 1}, {Value: 2, Timestamp: 2}, {Value: 3, Timestamp: 3}},
},
{
Labels: []labelpb.ZLabel{{Name: "foo", Value: "xyz3"}},
Samples: []prompb.Sample{{Value: 1, Timestamp: 1}, {Value: 2, Timestamp: 2}, {Value: 3, Timestamp: 3}},
},
},
},
appendables: []*fakeAppendable{
{
appender: newFakeAppender(nil, commitErrFn, nil),
},
{
appender: newFakeAppender(nil, commitErrFn, nil),
},
{
appender: newFakeAppender(nil, nil, nil),
},
{
appender: newFakeAppender(nil, nil, nil),
},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor)
tenant := "test"
// Test from the point of view of every node
// so that we know status code does not depend
// on which node is erroring and which node is receiving.
for i, handler := range handlers {
// Test that the correct status is returned.
rec, err := makeRequest(handler, tenant, tc.wreq)
if err != nil {
t.Fatalf("handler %d: unexpectedly failed making HTTP request: %v", tc.status, err)
}
if rec.Code != tc.status {
t.Errorf("handler %d: got unexpected HTTP status code: expected %d, got %d; body: %s", i, tc.status, rec.Code, rec.Body.String())
}
}
// Test that each time series is stored
// the correct amount of times in each fake DB.
for _, ts := range tc.wreq.Timeseries {
lset := make(labels.Labels, len(ts.Labels))
for j := range ts.Labels {
lset[j] = labels.Label{
Name: ts.Labels[j].Name,
Value: ts.Labels[j].Value,
for _, hashringAlgo := range []HashringAlgorithm{AlgorithmKetama} {
handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor, hashringAlgo)
tenant := "test"
// Test from the point of view of every node
// so that we know status code does not depend
// on which node is erroring and which node is receiving.
for i, handler := range handlers {
// Test that the correct status is returned.
rec, err := makeRequest(handler, tenant, tc.wreq)
if err != nil {
t.Fatalf("handler %d: unexpectedly failed making HTTP request: %v", tc.status, err)
}
if rec.Code != tc.status {
t.Errorf("handler %d: got unexpected HTTP status code: expected %d, got %d; body: %s", i, tc.status, rec.Code, rec.Body.String())
}
}
for j, a := range tc.appendables {
var expectedMin int
n := a.appender.(*fakeAppender).Get(lset)
got := uint64(len(n))
if a.appenderErr == nil && endpointHit(t, hashring, tc.replicationFactor, handlers[j].options.Endpoint, tenant, &ts) {
// We have len(handlers) copies of each sample because the test case
// is run once for each handler and they all use the same appender.
expectedMin = int((tc.replicationFactor/2)+1) * len(ts.Samples)
// Test that each time series is stored
// the correct amount of times in each fake DB.
for _, ts := range tc.wreq.Timeseries {
lset := make(labels.Labels, len(ts.Labels))
for j := range ts.Labels {
lset[j] = labels.Label{
Name: ts.Labels[j].Name,
Value: ts.Labels[j].Value,
}
}
if uint64(expectedMin) > got {
t.Errorf("handler: %d, labels %q: expected minimum of %d samples, got %d", j, lset.String(), expectedMin, got)
for j, a := range tc.appendables {
var expectedMin int
n := a.appender.(*fakeAppender).Get(lset)
got := uint64(len(n))
if a.appenderErr == nil && endpointHit(t, hashring, tc.replicationFactor, handlers[j].options.Endpoint, tenant, &ts) {
// We have len(handlers) copies of each sample because the test case
// is run once for each handler and they all use the same appender.
expectedMin = int((tc.replicationFactor/2)+1) * len(ts.Samples)
}
if uint64(expectedMin) > got {
t.Errorf("handler: %d, labels %q: expected minimum of %d samples, got %d", j, lset.String(), expectedMin, got)
}
}
}
}

})
}
}
Expand Down Expand Up @@ -778,7 +813,7 @@ func TestReceiveWriteRequestLimits(t *testing.T) {
appender: newFakeAppender(nil, nil, nil),
},
}
handlers, _ := newTestHandlerHashring(appendables, 3)
handlers, _ := newTestHandlerHashring(appendables, 3, "")
handler := handlers[0]

tenant := "test"
Expand Down Expand Up @@ -1128,7 +1163,7 @@ func TestReceiveWithConsistencyDelay(t *testing.T) {
// to see all requests completing all the time, since we're using local
// network we are not expecting anything to go wrong with these.
t.Run(tc.name, func(t *testing.T) {
handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor)
handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor, "")
tenant := "test"
// Test from the point of view of every node
// so that we know status code does not depend
Expand Down Expand Up @@ -1305,7 +1340,7 @@ func serializeSeriesWithOneSample(t testing.TB, series [][]labelpb.ZLabel) []byt
func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) {
dir := b.TempDir()

handlers, _ := newTestHandlerHashring([]*fakeAppendable{nil}, 1)
handlers, _ := newTestHandlerHashring([]*fakeAppendable{nil}, 1, "")
handler := handlers[0]

reg := prometheus.NewRegistry()
Expand Down

0 comments on commit 57f7fab

Please sign in to comment.