This repository has been archived by the owner on Aug 23, 2023. It is now read-only.
/
test_runner.go
231 lines (208 loc) · 6.23 KB
/
test_runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
package runner
import (
"context"
"fmt"
"hash/fnv"
"sync"
"time"
"github.com/grafana/metrictank/conf"
"github.com/grafana/metrictank/idx"
"github.com/grafana/metrictank/idx/memory"
"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/schema"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
)
// TestRun represents one test run, including all the structures used for the test
type TestRun struct {
index idx.MetricIndex
metricsChan chan *schema.MetricData
queriesChan chan string
addsPerSec uint32 // how many adds per second we want to execute. 0 means unlimited, as many as possible
addThreads uint32 // number of concurrent add threads
initialIndexSize uint32 // prepopulate the index with the defined number of entries before starting the actual test run
queriesPerSec uint32 // how many queries per second we want to execute. 0 means unlimited, as many as possible
concQueries int // how many queries we want to execute concurrently.
startTime time.Time
workers *errgroup.Group
ctx context.Context
done chan struct{}
}
var (
metricAdd = NewStat("metric-add")
metricUpdate = NewStat("metric-update")
queryExec = NewStat("query")
)
const orgID = 1
// NewTestRun Instantiates a new test run
func NewTestRun(metricsChan chan *schema.MetricData, queriesChan chan string, addsPerSec, addThreads, initialIndexSize, queriesPerSec uint32, concQueries int) *TestRun {
index := memory.New()
index.Init()
// initializing with a `nil` store, that's a bit risky but good enough for the moment
mdata.Schemas = conf.NewSchemas(nil)
runner := TestRun{
index: index,
metricsChan: metricsChan,
queriesChan: queriesChan,
addsPerSec: addsPerSec,
addThreads: addThreads,
initialIndexSize: initialIndexSize,
queriesPerSec: queriesPerSec,
concQueries: concQueries,
done: make(chan struct{}),
}
return &runner
}
func (t *TestRun) Wait() {
<-t.done
}
// Init prepares to run
func (t *TestRun) Init(ctx context.Context) {
t.workers, t.ctx = errgroup.WithContext(ctx)
log.Printf("TestRun.Init: index pre-population starting")
t.prepopulateIndex()
log.Printf("TestRun.Init: pre-populated the index with %d entries", t.initialIndexSize)
t.startTime = time.Now()
log.Printf("TestRun.Init: launching query routine")
t.workers.Go(t.queryRoutine(t.ctx))
log.Printf("TestRun.Init: done")
}
// Run executes the run
func (t *TestRun) Run() {
log.Printf("TestRun.Run: starting index add routines")
mdChans := make([]chan *schema.MetricData, t.addThreads)
for i := uint32(0); i < t.addThreads; i++ {
ch := make(chan *schema.MetricData, 1000)
mdChans[i] = ch
partition := int32(i)
t.workers.Go(t.addRoutine(t.ctx, ch, partition))
}
log.Printf("TestRun.Run: starting index routing routine")
t.workers.Go(t.routeMetrics(t.ctx, mdChans))
log.Printf("TestRun.Run: starting benchmark")
t.workers.Wait()
log.Printf("TestRun.Run: benchmark complete")
t.PrintStats()
close(t.done)
}
// PrintStats writes all the statistics in human readable format into stdout
func (t *TestRun) PrintStats() {
metricAdd.Report()
fmt.Println()
metricUpdate.Report()
fmt.Println()
queryExec.Report()
fmt.Println()
fmt.Println()
metricAdd.ReportComputer()
metricUpdate.ReportComputer()
queryExec.ReportComputer()
}
func getPartitionFromName(name string, partitionCount uint32) int32 {
h := fnv.New32a()
h.Write([]byte(name))
p := int32(h.Sum32() % partitionCount)
if p < 0 {
p = p * -1
}
return p
}
func (t *TestRun) prepopulateIndex() {
for i := uint32(0); i < t.initialIndexSize; i++ {
md := <-t.metricsChan
partitionID := getPartitionFromName(md.Name, t.addThreads)
key, _ := schema.MKeyFromString(md.Id)
t.index.AddOrUpdate(key, md, partitionID)
}
}
func (t *TestRun) routeMetrics(ctx context.Context, mdChans []chan *schema.MetricData) func() error {
return func() error {
log.Printf("routeMetrics thread started")
defer log.Printf("routeMetrics thread ended")
limiter := rate.NewLimiter(rate.Limit(t.addsPerSec), int(t.addsPerSec))
for {
select {
case <-ctx.Done():
log.Printf("routeMetrics thread shutting down")
for _, ch := range mdChans {
close(ch)
}
return nil
case md := <-t.metricsChan:
if md == nil {
log.Printf("routeMetrics thread shutting down")
for _, ch := range mdChans {
close(ch)
}
return nil
}
limiter.Wait(ctx)
partitionID := getPartitionFromName(md.Name, t.addThreads)
ch := mdChans[partitionID]
ch <- md
}
}
}
}
func (t *TestRun) addRoutine(ctx context.Context, in chan *schema.MetricData, partitionID int32) func() error {
return func() error {
log.Printf("addRoutine(%d) thread started", partitionID)
defer log.Printf("addRoutine(%d) thread ended", partitionID)
for md := range in {
key, _ := schema.MKeyFromString(md.Id)
pre := time.Now()
_, _, update := t.index.AddOrUpdate(key, md, partitionID)
if !update {
metricAdd.Add(time.Since(pre))
} else {
metricUpdate.Add(time.Since(pre))
}
}
return nil
}
}
func (t *TestRun) queryRoutine(ctx context.Context) func() error {
return func() error {
log.Printf("queryRoutine thread started")
defer log.Printf("queryRoutine thread ended")
limiter := rate.NewLimiter(rate.Limit(t.queriesPerSec), int(t.queriesPerSec))
var wg sync.WaitGroup
active := make(chan struct{}, t.concQueries)
count := 0
ticker := time.NewTicker(time.Second * 5)
LOOP:
for {
select {
case <-ctx.Done():
break LOOP
case <-ticker.C:
log.Printf("%d find queries active. %d launched", len(active), count)
case pattern, ok := <-t.queriesChan:
if !ok {
break LOOP
}
limiter.Wait(ctx)
wg.Add(1)
count++
go t.runQuery(pattern, &wg, active)
}
}
log.Printf("queryRoutine shutting down. Waiting for %d running finds to complete", len(active))
wg.Wait()
return nil
}
}
func (t *TestRun) runQuery(pattern string, wg *sync.WaitGroup, active chan struct{}) {
defer func() {
<-active
wg.Done()
}()
pre := time.Now()
active <- struct{}{}
_, err := t.index.Find(orgID, pattern, 0)
if err != nil {
log.Printf("Warning: Query failed with error: %s", err)
}
queryExec.Add(time.Since(pre))
}