Skip to content

Commit

Permalink
Separate write from replication errors
Browse files Browse the repository at this point in the history
Signed-off-by: fpetkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Nov 27, 2022
1 parent d83a624 commit a19fa93
Showing 1 changed file with 74 additions and 58 deletions.
132 changes: 74 additions & 58 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,20 +767,19 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e
}()

quorum := h.writeQuorum()
if seriesReplicated {
quorum = 1
}
successes := make([]int, numSeries)
seriesErrs := make([]writeErrors, numSeries)
seriesErrs := newSeriesErrors(quorum, numSeries)
for {
select {
case <-fctx.Done():
return fctx.Err()
case wresp, more := <-ec:
if !more {
for _, rerr := range seriesErrs {
if seriesReplicated {
errs.Add(rerr.ErrOrNil())
} else {
errs.Add(rerr.replicationErr(quorum).ErrOrNil())
}
errs.Add(rerr)
}
return errs.ErrOrNil()
}
Expand Down Expand Up @@ -911,18 +910,46 @@ func (a expectedErrors) Len() int { return len(a) }
func (a expectedErrors) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a expectedErrors) Less(i, j int) bool { return a[i].count < a[j].count }

// writeErrors contains all errors that have
// occurred during a remote-write request.
type writeErrors struct {
// errorSet is a set of errors.
type errorSet struct {
reasonSet map[string]struct{}
errs []error
}

// Add adds an error to the writeErrors.
func (es *writeErrors) Add(err error) {
// Error returns a string containing a deduplicated set of reasons.
func (es errorSet) Error() string {
if len(es.reasonSet) == 0 {
return ""
}
reasons := make([]string, 0, len(es.reasonSet))
for reason := range es.reasonSet {
reasons = append(reasons, reason)
}
sort.Strings(reasons)

var buf bytes.Buffer
if len(reasons) > 1 {
fmt.Fprintf(&buf, "%d errors: ", len(es.reasonSet))
}

var more bool
for _, reason := range reasons {
if more {
buf.WriteString("; ")
}
buf.WriteString(reason)
more = true
}

return buf.String()
}

// Add adds an error to the errorSet.
func (es *errorSet) Add(err error) {
if err == nil {
return
}

if len(es.errs) == 0 {
es.errs = []error{err}
} else {
Expand All @@ -931,18 +958,30 @@ func (es *writeErrors) Add(err error) {
if es.reasonSet == nil {
es.reasonSet = make(map[string]struct{})
}
if werr, ok := err.(*writeErrors); ok {

switch werr := err.(type) {
case *replicationErrors:
for reason := range werr.reasonSet {
es.reasonSet[reason] = struct{}{}
}
} else {
case *writeErrors:
for reason := range werr.reasonSet {
es.reasonSet[reason] = struct{}{}
}
default:
es.reasonSet[err.Error()] = struct{}{}
}
}

// writeErrors contains all errors that have
// occurred during a remote-write request.
type writeErrors struct {
errorSet
}

// ErrOrNil returns the writeErrors instance if any
// errors are contained in it.
// Otherwise it returns nil.
// Otherwise, it returns nil.
func (es *writeErrors) ErrOrNil() error {
if len(es.errs) == 0 {
return nil
Expand Down Expand Up @@ -981,18 +1020,21 @@ func (es *writeErrors) Cause() error {
return nonRecoverableErr
}

// replicationErr extracts a sentinel error with the highest occurrence
// that has happened more than the given threshold.
// replicationErrors contains errors that have happened while
// replicating a time series within a remote-write request.
type replicationErrors struct {
errorSet
threshold int
}

// Cause extracts a sentinel error with the highest occurrence that
// has happened more than the given threshold.
// If no single error has occurred more than the threshold, but the
// total number of errors meets the threshold,
// replicationErr will return errInternal.
func (es *writeErrors) replicationErr(threshold int) *writeErrors {
if es == nil {
return &writeErrors{}
}

func (es *replicationErrors) Cause() error {
if len(es.errs) == 0 {
return &writeErrors{}
return errorSet{}
}

expErrs := expectedErrors{
Expand All @@ -1011,49 +1053,23 @@ func (es *writeErrors) replicationErr(threshold int) *writeErrors {

// Determine which error occurred most.
sort.Sort(sort.Reverse(expErrs))
if exp := expErrs[0]; exp.count >= threshold {
return &writeErrors{
errs: []error{exp.err},
reasonSet: es.reasonSet,
}
if exp := expErrs[0]; exp.count >= es.threshold {
return exp.err
}

if len(es.errs) >= threshold {
return &writeErrors{
errs: []error{errInternal},
reasonSet: es.reasonSet,
}
if len(es.errs) >= es.threshold {
return errInternal
}

return &writeErrors{}
return nil
}

// Error returns a string containing a deduplicated set of reasons.
func (es *writeErrors) Error() string {
if es == nil || len(es.reasonSet) == 0 {
return ""
}
reasons := make([]string, 0, len(es.reasonSet))
for reason := range es.reasonSet {
reasons = append(reasons, reason)
}
sort.Strings(reasons)

var buf bytes.Buffer
if len(reasons) > 1 {
fmt.Fprintf(&buf, "%d errors: ", len(es.reasonSet))
}

var more bool
for _, reason := range reasons {
if more {
buf.WriteString("; ")
}
buf.WriteString(reason)
more = true
func newSeriesErrors(threshold, numErrors int) []*replicationErrors {
errs := make([]*replicationErrors, numErrors)
for i := range errs {
errs[i] = &replicationErrors{threshold: threshold}
}

return buf.String()
return errs
}

func newPeerGroup(dialOpts ...grpc.DialOption) *peerGroup {
Expand Down

0 comments on commit a19fa93

Please sign in to comment.