Skip to content

Commit

Permalink
Fix error propagation
Browse files Browse the repository at this point in the history
Signed-off-by: fpetkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Nov 26, 2022
1 parent 572bddf commit b002700
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 234 deletions.
145 changes: 115 additions & 30 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
statusapi "github.com/thanos-io/thanos/pkg/api/status"
"github.com/thanos-io/thanos/pkg/logging"

"github.com/thanos-io/thanos/pkg/errutil"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/server/http/middleware"
Expand Down Expand Up @@ -79,6 +78,7 @@ var (
errBadReplica = errors.New("request replica exceeds receiver replication factor")
errNotReady = errors.New("target not ready")
errUnavailable = errors.New("target not available")
errInternal = errors.New("internal error")
)

// Options for the web Handler.
Expand Down Expand Up @@ -361,7 +361,6 @@ type trackedSeries struct {

type writeResponse struct {
seriesIDs []int
replica uint64
err error
}

Expand Down Expand Up @@ -532,7 +531,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
responseStatusCode := http.StatusOK
if err = h.handleRequest(ctx, rep, tenant, &wreq); err != nil {
level.Debug(tLogger).Log("msg", "failed to handle request", "err", err)
switch determineWriteErrorCause(err, 1) {
switch errors.Cause(err) {
case errNotReady:
responseStatusCode = http.StatusServiceUnavailable
case errUnavailable:
Expand Down Expand Up @@ -624,11 +623,11 @@ func quorumReached(successes []int, successThreshold int) bool {
// fanoutForward fans out concurrently given set of write requests. It returns status immediately when quorum of
// requests succeeds or fails or if context is canceled.
func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[endpointReplica]trackedSeries, numSeries int, seriesReplicated bool) error {
var errs errutil.MultiError
var errs writeErrors

fctx, cancel := context.WithTimeout(tracing.CopyTraceContext(context.Background(), pctx), h.options.ForwardTimeout)
defer func() {
if errs.Err() != nil {
if errs.ErrOrNil() != nil {
// NOTICE: The cancel function is not used on all paths intentionally,
// if there is no error when quorum is reached,
// let forward requests to optimistically run until timeout.
Expand Down Expand Up @@ -668,10 +667,8 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e
})
})
if err != nil {
// When a MultiError is added to another MultiError, the error slices are concatenated, not nested.
// To avoid breaking the counting logic, we need to flatten the error.
level.Debug(tLogger).Log("msg", "local tsdb write failed", "err", err.Error())
ec <- newWriteResponse(wreqs[er].seriesIDs, errors.Wrapf(determineWriteErrorCause(err, 1), "store locally for endpoint %v", er.endpoint))
ec <- newWriteResponse(wreqs[er].seriesIDs, errors.Wrapf(err, "store locally for endpoint %v", er.endpoint))
return
}
ec <- newWriteResponse(wreqs[er].seriesIDs, nil)
Expand Down Expand Up @@ -770,9 +767,8 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e
}()

quorum := h.writeQuorum()
failureThreshold := h.options.ReplicationFactor - uint64(quorum)
successes := make([]int, numSeries)
seriesErrs := make([]errutil.MultiError, numSeries)
seriesErrs := make([]writeErrors, numSeries)
for {
select {
case <-fctx.Done():
Expand All @@ -781,21 +777,21 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e
if !more {
for _, rerr := range seriesErrs {
if seriesReplicated {
errs.Add(rerr.Err())
} else if uint64(len(rerr)) >= failureThreshold {
cause := determineWriteErrorCause(rerr.Err(), quorum)
errs.Add(errors.Wrapf(cause, "failed to replicate series"))
errs.Add(rerr.Cause())
} else {
cause := rerr.replicationErr(quorum)
errs.Add(errors.Wrapf(cause, rerr.Error()))
}
}
return errs.Err()
return errs.ErrOrNil()
}

if wresp.err != nil {
for _, tsID := range wresp.seriesIDs {
seriesErrs[tsID].Add(wresp.err)
}
continue
}

for _, tsID := range wresp.seriesIDs {
successes[tsID]++
}
Expand All @@ -815,7 +811,7 @@ func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*st
if err != nil {
level.Debug(h.logger).Log("msg", "failed to handle request", "err", err)
}
switch determineWriteErrorCause(err, 1) {
switch errors.Cause(err) {
case nil:
return &storepb.WriteResponse{}, nil
case errNotReady:
Expand Down Expand Up @@ -916,25 +912,82 @@ func (a expectedErrors) Len() int { return len(a) }
func (a expectedErrors) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a expectedErrors) Less(i, j int) bool { return a[i].count < a[j].count }

// determineWriteErrorCause extracts a sentinel error that has occurred more than the given threshold from a given fan-out error.
// It will inspect the error's cause if the error is a MultiError,
// It will return cause of each contained error but will not traverse any deeper.
func determineWriteErrorCause(err error, threshold int) error {
// writeErrors contains all errors that have
// occurred during a remote-write request.
type writeErrors struct {
reasonSet map[string]struct{}
errs []error
}

// Add adds an error to the writeErrors.
func (es *writeErrors) Add(err error) {
if err == nil {
return
}
if len(es.errs) == 0 {
es.errs = []error{err}
} else {
es.errs = append(es.errs, err)
}
if es.reasonSet == nil {
es.reasonSet = make(map[string]struct{})
}
es.reasonSet[err.Error()] = struct{}{}
}

// ErrOrNil returns the writeErrors instance if any
// errors are contained in it.
// Otherwise it returns nil.
func (es *writeErrors) ErrOrNil() error {
if len(es.errs) == 0 {
return nil
}
return es
}

// Cause returns the primary cause for a write failure.
// If multiple errors have occurred, Cause will prefer
// recoverable over non-recoverable errors.
func (es *writeErrors) Cause() error {
if len(es.errs) == 0 {
return nil
}

unwrappedErr := errors.Cause(err)
errs, ok := unwrappedErr.(errutil.NonNilMultiError)
if !ok {
errs = []error{unwrappedErr}
var recoverableErr error
var nonRecoverableErr error
for _, werr := range es.errs {
cause := errors.Cause(werr)
if isUnavailable(cause) {
return errUnavailable
}
if isNotReady(cause) {
return errNotReady
}
if isConflict(cause) {
nonRecoverableErr = errConflict
continue
}
recoverableErr = errInternal
}
if len(errs) == 0 {

if recoverableErr != nil {
return recoverableErr
}
return nonRecoverableErr
}

// replicationErr extracts a sentinel error with the highest occurrence
// that has happened more than the given threshold.
// If no single error has occurred more than the threshold, but the
// total number of errors meets the threshold,
// replicationErr will return errInternal.
func (es *writeErrors) replicationErr(threshold int) error {
if es == nil {
return nil
}

if threshold < 1 {
return err
if len(es.errs) == 0 {
return nil
}

expErrs := expectedErrors{
Expand All @@ -944,7 +997,7 @@ func determineWriteErrorCause(err error, threshold int) error {
}
for _, exp := range expErrs {
exp.count = 0
for _, err := range errs {
for _, err := range es.errs {
if exp.cause(errors.Cause(err)) {
exp.count++
}
Expand All @@ -956,7 +1009,39 @@ func determineWriteErrorCause(err error, threshold int) error {
return exp.err
}

return err
if len(es.errs) >= threshold {
return errInternal
}

return nil
}

// Error returns a string containing a deduplicated set of reasons.
func (es *writeErrors) Error() string {
if es == nil || len(es.reasonSet) == 0 {
return ""
}
reasons := make([]string, 0, len(es.reasonSet))
for reason := range es.reasonSet {
reasons = append(reasons, reason)
}
sort.Strings(reasons)

var buf bytes.Buffer
if len(reasons) > 1 {
fmt.Fprintf(&buf, "%d errors: ", len(es.reasonSet))
}

var more bool
for _, reason := range reasons {
if more {
buf.WriteString("; ")
}
buf.WriteString(reason)
more = true
}

return buf.String()
}

func newPeerGroup(dialOpts ...grpc.DialOption) *peerGroup {
Expand Down
Loading

0 comments on commit b002700

Please sign in to comment.