This repository has been archived by the owner on Aug 23, 2023. It is now read-only.
/
write_queue.go
112 lines (100 loc) · 2.26 KB
/
write_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package memory
import (
"sync"
"time"
"github.com/grafana/metrictank/idx"
"github.com/grafana/metrictank/schema"
)
type WriteQueue struct {
shutdown chan struct{}
done chan struct{}
maxBuffered int
maxDelay time.Duration
flushed chan struct{}
archives map[schema.MKey]*idx.Archive
sync.RWMutex
idx *UnpartitionedMemoryIdx
}
// NewWriteQueue creates a new writeQueue that will add archives to the passed UnpartitionedMemoryIdx
// in batches
func NewWriteQueue(index *UnpartitionedMemoryIdx, maxDelay time.Duration, maxBuffered int) *WriteQueue {
wq := &WriteQueue{
archives: make(map[schema.MKey]*idx.Archive),
shutdown: make(chan struct{}),
done: make(chan struct{}),
maxBuffered: maxBuffered,
maxDelay: maxDelay,
flushed: make(chan struct{}, 1),
idx: index,
}
go wq.loop()
return wq
}
func (wq *WriteQueue) Stop() {
close(wq.shutdown)
<-wq.done
}
func (wq *WriteQueue) Queue(archive *idx.Archive) {
wq.Lock()
wq.archives[archive.Id] = archive
if len(wq.archives) >= wq.maxBuffered {
wq.flush()
}
wq.Unlock()
}
func (wq *WriteQueue) Get(id schema.MKey) (*idx.Archive, bool) {
wq.RLock()
a, ok := wq.archives[id]
wq.RUnlock()
return a, ok
}
// flush adds the buffered archives to the memoryIdx.
// callers need to acquire a writeLock before calling this function.
func (wq *WriteQueue) flush() {
if len(wq.archives) == 0 {
// non blocking write to the flushed chan.
// if we cant write to the flushed chan it means there is a previous flush
// signal that hasnt been processed. In that case, we dont need to send another one.
select {
case wq.flushed <- struct{}{}:
default:
}
return
}
wq.idx.Lock()
for _, archive := range wq.archives {
wq.idx.add(archive)
}
wq.idx.Unlock()
wq.archives = make(map[schema.MKey]*idx.Archive)
select {
case wq.flushed <- struct{}{}:
default:
}
}
func (wq *WriteQueue) loop() {
defer close(wq.done)
timer := time.NewTimer(wq.maxDelay)
for {
select {
case <-wq.flushed:
if !timer.Stop() {
<-timer.C
}
timer.Reset(wq.maxDelay)
case <-timer.C:
wq.Lock()
wq.flush()
wq.Unlock()
timer.Reset(wq.maxDelay)
case <-wq.shutdown:
wq.Lock()
wq.flush()
wq.Unlock()
if !timer.Stop() {
<-timer.C
}
return
}
}
}