This repository has been archived by the owner on Aug 23, 2023. It is now read-only.
/
reorder_buffer.go
110 lines (95 loc) · 3.09 KB
/
reorder_buffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package mdata
import (
"github.com/grafana/metrictank/mdata/errors"
"github.com/grafana/metrictank/schema"
)
// ReorderBuffer keeps a window of data during which it is ok to send data out of order.
// The reorder buffer itself is not thread safe because it is only used by AggMetric,
// which is thread safe, so there is no locking in the buffer.
//
// newest=0 may mean no points added yet, or newest point is at position 0.
// we use the Ts of points in the buffer to check for valid points. Ts == 0 means no point
// in particular newest.Ts == 0 means the buffer is empty
// the buffer is evenly spaced (points are `interval` apart) and may be sparsely populated
type ReorderBuffer struct {
newest uint32 // index of newest buffer entry
interval uint32 // metric interval
buf []schema.Point // the actual buffer holding the data
allowUpdate bool // whether or not to allow overwriting data points with same timestamps
}
func NewReorderBuffer(reorderWindow, interval uint32, allowUpdate bool) *ReorderBuffer {
return &ReorderBuffer{
interval: interval,
buf: make([]schema.Point, reorderWindow),
allowUpdate: allowUpdate,
}
}
// Add adds the point if it falls within the window.
// it returns points that have been purged out of the buffer, as well as whether the add succeeded.
func (rob *ReorderBuffer) Add(ts uint32, val float64) ([]schema.Point, error) {
ts = AggBoundary(ts, rob.interval)
// out of order and too old
if rob.buf[rob.newest].Ts != 0 && ts <= rob.buf[rob.newest].Ts-(uint32(cap(rob.buf))*rob.interval) {
return nil, errors.ErrMetricTooOld
}
var res []schema.Point
oldest := (rob.newest + 1) % uint32(cap(rob.buf))
index := (ts / rob.interval) % uint32(cap(rob.buf))
if ts == rob.buf[index].Ts {
if rob.allowUpdate {
rob.buf[index].Ts = ts
rob.buf[index].Val = val
} else {
return nil, errors.ErrMetricNewValueForTimestamp
}
} else if ts > rob.buf[rob.newest].Ts {
flushCount := (ts - rob.buf[rob.newest].Ts) / rob.interval
if flushCount > uint32(cap(rob.buf)) {
flushCount = uint32(cap(rob.buf))
}
for i := uint32(0); i < flushCount; i++ {
if rob.buf[oldest].Ts != 0 {
res = append(res, rob.buf[oldest])
rob.buf[oldest].Ts = 0
}
oldest = (oldest + 1) % uint32(cap(rob.buf))
}
rob.buf[index].Ts = ts
rob.buf[index].Val = val
rob.newest = index
} else {
metricsReordered.Inc()
rob.buf[index].Ts = ts
rob.buf[index].Val = val
}
return res, nil
}
// Get returns the points in the buffer
func (rob *ReorderBuffer) Get() []schema.Point {
res := make([]schema.Point, 0, cap(rob.buf))
oldest := (rob.newest + 1) % uint32(cap(rob.buf))
for {
if rob.buf[oldest].Ts != 0 {
res = append(res, rob.buf[oldest])
}
if oldest == rob.newest {
break
}
oldest = (oldest + 1) % uint32(cap(rob.buf))
}
return res
}
func (rob *ReorderBuffer) Reset() {
for i := range rob.buf {
rob.buf[i].Ts = 0
}
rob.newest = 0
}
func (rob *ReorderBuffer) Flush() []schema.Point {
res := rob.Get()
rob.Reset()
return res
}
func (rob *ReorderBuffer) IsEmpty() bool {
return rob.buf[rob.newest].Ts == 0
}