Skip to content

Commit

Permalink
Fix quorum calculation for Ketama hashring
Browse files Browse the repository at this point in the history
The quorum calculation is currently broken when using the Ketama
hashring. The reasons are explained in detail in issue
#5784.

This commit fixes quorum calculation by tracking successfull writes
for each individual time-series inside a remote-write request.

The commit also removes the replicate() method inside the Handler
and moves the entire logic of fanning out and calculating success
into the fanoutForward() method.

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Nov 24, 2022
1 parent ce2385e commit 2ec06ca
Show file tree
Hide file tree
Showing 2 changed files with 312 additions and 548 deletions.
241 changes: 112 additions & 129 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@ import (
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/api"
statusapi "github.com/thanos-io/thanos/pkg/api/status"
"github.com/thanos-io/thanos/pkg/logging"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/thanos-io/thanos/pkg/api"
statusapi "github.com/thanos-io/thanos/pkg/api/status"
"github.com/thanos-io/thanos/pkg/logging"

"github.com/thanos-io/thanos/pkg/errutil"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/runutil"
Expand Down Expand Up @@ -350,7 +351,25 @@ type replica struct {
// endpointReplica is a pair of a receive endpoint and a write request replica.
type endpointReplica struct {
endpoint string
replica replica
replica uint64
}

type trackedSeries struct {
seriesIDs []int
timeSeries []prompb.TimeSeries
}

type writeResponse struct {
seriesIDs []int
replica uint64
err error
}

func newWriteResponse(seriesIDs []int, err error) writeResponse {
return writeResponse{
seriesIDs: seriesIDs,
err: err,
}
}

func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenant string, wreq *prompb.WriteRequest) error {
Expand Down Expand Up @@ -552,47 +571,66 @@ func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *p
return errors.New("hashring is not ready")
}

// Batch all of the time series in the write request
// into several smaller write requests that are
// grouped by target endpoint. This ensures that
// for any incoming write request to a node,
// at most one outgoing write request will be made
// to every other node in the hashring, rather than
// one request per time series.
wreqs := make(map[endpointReplica]*prompb.WriteRequest)
for i := range wreq.Timeseries {
endpoint, err := h.hashring.GetN(tenant, &wreq.Timeseries[i], r.n)
if err != nil {
h.mtx.RUnlock()
return err
var replicas []uint64
if r.replicated {
replicas = []uint64{r.n}
} else {
for rn := uint64(0); rn < h.options.ReplicationFactor; rn++ {
replicas = append(replicas, rn)
}
key := endpointReplica{endpoint: endpoint, replica: r}
if _, ok := wreqs[key]; !ok {
wreqs[key] = &prompb.WriteRequest{}
}

wreqs := make(map[endpointReplica]trackedSeries)
for tsID, ts := range wreq.Timeseries {
for _, rn := range replicas {
endpoint, err := h.hashring.GetN(tenant, &ts, rn)
if err != nil {
h.mtx.RUnlock()
return err
}
key := endpointReplica{endpoint: endpoint, replica: rn}
er, ok := wreqs[key]
if !ok {
er = trackedSeries{
seriesIDs: make([]int, 0),
timeSeries: make([]prompb.TimeSeries, 0),
}
}
er.timeSeries = append(wreqs[key].timeSeries, ts)
er.seriesIDs = append(wreqs[key].seriesIDs, tsID)
wreqs[key] = er
}
wr := wreqs[key]
wr.Timeseries = append(wr.Timeseries, wreq.Timeseries[i])
}
h.mtx.RUnlock()

return h.fanoutForward(ctx, tenant, wreqs, len(wreqs))
return h.fanoutForward(ctx, tenant, wreqs, len(wreq.Timeseries), r.replicated)
}

// writeQuorum returns minimum number of replicas that has to confirm write success before claiming replication success.
func (h *Handler) writeQuorum() int {
return int((h.options.ReplicationFactor / 2) + 1)
}

func quorumReached(successes []int, successThreshold int) bool {
for _, success := range successes {
if success < successThreshold {
return false
}
}

return true
}

// fanoutForward fans out concurrently given set of write requests. It returns status immediately when quorum of
// requests succeeds or fails or if context is canceled.
func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[endpointReplica]*prompb.WriteRequest, successThreshold int) error {
func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[endpointReplica]trackedSeries, numSeries int, seriesReplicated bool) error {
var errs errutil.MultiError

fctx, cancel := context.WithTimeout(tracing.CopyTraceContext(context.Background(), pctx), h.options.ForwardTimeout)
defer func() {
if errs.Err() != nil {
// NOTICE: The cancel function is not used on all paths intentionally,
// if there is no error when quorum successThreshold is reached,
// if there is no error when quorum is reached,
// let forward requests to optimistically run until timeout.
cancel()
}
Expand All @@ -607,68 +645,43 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e
tLogger = log.With(h.logger, logTags)
}

ec := make(chan error)
ec := make(chan writeResponse)

var wg sync.WaitGroup
for er := range wreqs {
er := er
r := er.replica
endpoint := er.endpoint

wg.Add(1)
// If the request is not yet replicated, let's replicate it.
// If the replication factor isn't greater than 1, let's
// just forward the requests.
if !r.replicated && h.options.ReplicationFactor > 1 {
go func(endpoint string) {
defer wg.Done()

var err error
tracing.DoInSpan(fctx, "receive_replicate", func(ctx context.Context) {
err = h.replicate(ctx, tenant, wreqs[er])
})
if err != nil {
h.replications.WithLabelValues(labelError).Inc()
ec <- errors.Wrapf(err, "replicate write request for endpoint %v", endpoint)
return
}

h.replications.WithLabelValues(labelSuccess).Inc()
ec <- nil
}(endpoint)

continue
}

// If the endpoint for the write request is the
// local node, then don't make a request but store locally.
// By handing replication to the local node in the same
// function as replication to other nodes, we can treat
// a failure to write locally as just another error that
// can be ignored if the replication factor is met.
if endpoint == h.options.Endpoint {
go func(endpoint string) {
if er.endpoint == h.options.Endpoint {
go func(er endpointReplica) {
defer wg.Done()

var err error
tracing.DoInSpan(fctx, "receive_tsdb_write", func(_ context.Context) {
err = h.writer.Write(fctx, tenant, wreqs[er])
err = h.writer.Write(fctx, tenant, &prompb.WriteRequest{
Timeseries: wreqs[er].timeSeries,
})
})
if err != nil {
// When a MultiError is added to another MultiError, the error slices are concatenated, not nested.
// To avoid breaking the counting logic, we need to flatten the error.
level.Debug(tLogger).Log("msg", "local tsdb write failed", "err", err.Error())
ec <- errors.Wrapf(determineWriteErrorCause(err, 1), "store locally for endpoint %v", endpoint)
ec <- newWriteResponse(wreqs[er].seriesIDs, errors.Wrapf(determineWriteErrorCause(err, 1), "store locally for endpoint %v", er.endpoint))
return
}
ec <- nil
}(endpoint)
ec <- newWriteResponse(wreqs[er].seriesIDs, nil)
}(er)

continue
}

// Make a request to the specified endpoint.
go func(endpoint string) {
go func(er endpointReplica) {
defer wg.Done()

var (
Expand All @@ -684,18 +697,18 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e
h.forwardRequests.WithLabelValues(labelSuccess).Inc()
}()

cl, err = h.peers.get(fctx, endpoint)
cl, err = h.peers.get(fctx, er.endpoint)
if err != nil {
ec <- errors.Wrapf(err, "get peer connection for endpoint %v", endpoint)
ec <- newWriteResponse(wreqs[er].seriesIDs, errors.Wrapf(err, "get peer connection for endpoint %v", er.endpoint))
return
}

h.mtx.RLock()
b, ok := h.peerStates[endpoint]
b, ok := h.peerStates[er.endpoint]
if ok {
if time.Now().Before(b.nextAllowed) {
h.mtx.RUnlock()
ec <- errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", endpoint)
ec <- newWriteResponse(wreqs[er].seriesIDs, errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", er.endpoint))
return
}
}
Expand All @@ -705,37 +718,38 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e
tracing.DoInSpan(fctx, "receive_forward", func(ctx context.Context) {
// Actually make the request against the endpoint we determined should handle these time series.
_, err = cl.RemoteWrite(ctx, &storepb.WriteRequest{
Timeseries: wreqs[er].Timeseries,
Timeseries: wreqs[er].timeSeries,
Tenant: tenant,
// Increment replica since on-the-wire format is 1-indexed and 0 indicates un-replicated.
Replica: int64(r.n + 1),
Replica: int64(er.replica + 1),
})
})
if err != nil {
// Check if peer connection is unavailable, don't attempt to send requests constantly.
if st, ok := status.FromError(err); ok {
if st.Code() == codes.Unavailable {
h.mtx.Lock()
if b, ok := h.peerStates[endpoint]; ok {
if b, ok := h.peerStates[er.endpoint]; ok {
b.attempt++
dur := h.expBackoff.ForAttempt(b.attempt)
b.nextAllowed = time.Now().Add(dur)
level.Debug(tLogger).Log("msg", "target unavailable backing off", "for", dur)
} else {
h.peerStates[endpoint] = &retryState{nextAllowed: time.Now().Add(h.expBackoff.ForAttempt(0))}
h.peerStates[er.endpoint] = &retryState{nextAllowed: time.Now().Add(h.expBackoff.ForAttempt(0))}
}
h.mtx.Unlock()
}
}
ec <- errors.Wrapf(err, "forwarding request to endpoint %v", endpoint)
werr := errors.Wrapf(err, "forwarding request to endpoint %v", er.endpoint)
ec <- newWriteResponse(wreqs[er].seriesIDs, werr)
return
}
h.mtx.Lock()
delete(h.peerStates, endpoint)
delete(h.peerStates, er.endpoint)
h.mtx.Unlock()

ec <- nil
}(endpoint)
ec <- newWriteResponse(wreqs[er].seriesIDs, nil)
}(er)
}

go func() {
Expand All @@ -747,82 +761,49 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e
// This is needed if context is canceled or if we reached success of fail quorum faster.
defer func() {
go func() {
for err := range ec {
if err != nil {
level.Debug(tLogger).Log("msg", "request failed, but not needed to achieve quorum", "err", err)
for wresp := range ec {
if wresp.err != nil {
level.Debug(tLogger).Log("msg", "request failed, but not needed to achieve quorum", "err", wresp.err)
}
}
}()
}()

var success int
quorum := h.writeQuorum()
failureThreshold := h.options.ReplicationFactor - uint64(quorum)
successes := make([]int, numSeries)
seriesErrs := make([]errutil.MultiError, numSeries)
for {
select {
case <-fctx.Done():
return fctx.Err()
case err, more := <-ec:
case wresp, more := <-ec:
if !more {
for _, rerr := range seriesErrs {
if seriesReplicated {
errs.Add(rerr.Err())
} else if uint64(len(rerr)) >= failureThreshold {
cause := determineWriteErrorCause(rerr.Err(), quorum)
errs.Add(errors.Wrapf(cause, "failed to replicate series"))
}
}
return errs.Err()
}
if err == nil {
success++
if success >= successThreshold {
// In case the success threshold is lower than the total
// number of requests, then we can finish early here. This
// is the case for quorum writes for example.
return nil
if wresp.err != nil {
for _, tsID := range wresp.seriesIDs {
seriesErrs[tsID].Add(wresp.err)
}
continue
}
errs.Add(err)
}
}
}

// replicate replicates a write request to (replication-factor) nodes
// selected by the tenant and time series.
// The function only returns when all replication requests have finished
// or the context is canceled.
func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.WriteRequest) error {
// It is possible that hashring is ready in testReady() but unready now,
// so need to lock here.
h.mtx.RLock()
if h.hashring == nil {
h.mtx.RUnlock()
return errors.New("hashring is not ready")
}

replicatedRequests := make(map[endpointReplica]*prompb.WriteRequest)
for i := uint64(0); i < h.options.ReplicationFactor; i++ {
for _, ts := range wreq.Timeseries {
endpoint, err := h.hashring.GetN(tenant, &ts, i)
if err != nil {
h.mtx.RUnlock()
return err
}

er := endpointReplica{
endpoint: endpoint,
replica: replica{n: i, replicated: true},
for _, tsID := range wresp.seriesIDs {
successes[tsID]++
}
replicatedRequest, ok := replicatedRequests[er]
if !ok {
replicatedRequest = &prompb.WriteRequest{
Timeseries: make([]prompb.TimeSeries, 0),
}
replicatedRequests[er] = replicatedRequest
if quorumReached(successes, quorum) {
return nil
}
replicatedRequest.Timeseries = append(replicatedRequest.Timeseries, ts)
}
}
h.mtx.RUnlock()

quorum := h.writeQuorum()
// fanoutForward only returns an error if successThreshold (quorum) is not reached.
if err := h.fanoutForward(ctx, tenant, replicatedRequests, quorum); err != nil {
return errors.Wrap(determineWriteErrorCause(err, quorum), "quorum not reached")
}
return nil
}

// RemoteWrite implements the gRPC remote write handler for storepb.WriteableStore.
Expand Down Expand Up @@ -923,7 +904,9 @@ type retryState struct {
nextAllowed time.Time
}

type expectedErrors []*struct {
type expectedErrors []*expectedError

type expectedError struct {
err error
cause func(error) bool
count int
Expand Down
Loading

0 comments on commit 2ec06ca

Please sign in to comment.