/
batchprocessor.go
143 lines (126 loc) · 3.92 KB
/
batchprocessor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
// Copyright 2018-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with the License. A copy of the License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and limitations under the License.
package processor
import (
"math"
"math/rand"
"time"
"github.com/aws/aws-xray-daemon/daemon/conn"
"github.com/aws/aws-xray-daemon/daemon/telemetry"
"github.com/aws/aws-xray-daemon/daemon/util/timer"
"github.com/aws/aws-sdk-go/service/xray"
log "github.com/cihub/seelog"
)
const (
backoffCapSeconds = 30
backoffMinAttempts = 10
backoffBaseSeconds = 1
)
// Structure for trace segments batch.
type segmentsBatch struct {
// Boolean channel set to true when processing the batch segments is done.
done chan bool
// String slice of trace segments.
batches chan []*string
// Instance of XRay, used to send data to X-Ray service.
xRay conn.XRay
// Random generator, used for back off logic in case of exceptions.
randGen *rand.Rand
// Instance of timer.
timer timer.Timer
}
func (s *segmentsBatch) send(batch []*string) {
select {
case s.batches <- batch:
default:
select {
case batchTruncated := <-s.batches:
telemetry.T.SegmentSpillover(int64(len(batchTruncated)))
log.Warnf("Spilling over %v segments", len(batchTruncated))
default:
log.Debug("Segment batch: channel is de-queued")
}
log.Debug("Segment batch: retrying batch")
s.send(batch)
}
}
func (s *segmentsBatch) poll() {
failedAttempt := 0
for {
batch, ok := <-s.batches
if ok {
params := &xray.PutTraceSegmentsInput{
TraceSegmentDocuments: batch,
}
start := time.Now()
// send segment to X-Ray service.
r, err := s.xRay.PutTraceSegments(params)
if err != nil {
telemetry.EvaluateConnectionError(err)
failedAttempt++
backOffSeconds := s.backOff(failedAttempt)
log.Errorf("Sending segment batch failed with: %v", err)
log.Warnf("Delaying sending of additional batches by %v seconds", backOffSeconds)
if backOffSeconds > 0 {
<-s.timer.After(time.Second * time.Duration(backOffSeconds))
}
continue
} else {
failedAttempt = 0
telemetry.T.SegmentSent(int64(len(batch)))
}
elapsed := time.Since(start)
log.Infof("Successfully sent batch of %d segments (%1.3f seconds)", len(batch), elapsed.Seconds())
for _, unprocessedSegment := range r.UnprocessedTraceSegments {
telemetry.T.SegmentRejected(1)
log.Errorf("Unprocessed segment: %v", unprocessedSegment)
log.Warn("Batch that contains unprocessed segments")
for i := 0; i < len(batch); i++ {
log.Warn(*batch[i])
}
}
} else {
log.Debug("Segment batch: done!")
s.done <- true
break
}
}
}
func (s *segmentsBatch) close() {
close(s.batches)
}
func min(x, y int32) int32 {
if x < y {
return x
}
return y
}
// Returns int32 number for Full Jitter Base
// If the computation result in value greater than Max Int31 it returns MAX Int31 value
func getValidJitterBase(backoffBase, attempt int) int32 {
base := float64(backoffBase) * math.Pow(2, float64(attempt))
var baseInt int32
if base > float64(math.MaxInt32/2) {
baseInt = math.MaxInt32 / 2
} else {
baseInt = int32(base)
}
return baseInt
}
func (s *segmentsBatch) backOff(attempt int) int32 {
if attempt <= backoffMinAttempts {
return 0
}
// Attempts to be considered for Jitter Backoff
backoffAttempts := attempt - backoffMinAttempts
// As per Full Jitter described in https://www.awsarchitectureblog.com/2015/03/backoff.html
base := getValidJitterBase(backoffBaseSeconds, backoffAttempts)
randomBackoff := s.randGen.Int31n(base)
return min(backoffCapSeconds, randomBackoff)
}