This repository has been archived by the owner on Aug 23, 2023. It is now read-only.
/
tag_query_id_selector.go
302 lines (257 loc) · 8.43 KB
/
tag_query_id_selector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
package memory
import (
"sync"
"github.com/grafana/metrictank/expr/tagquery"
"github.com/grafana/metrictank/schema"
)
// idSelector looks up ids from the metric & meta tag index according to
// a given tagquery expression.
// it is used to build the initial query set when running a tag query,
// this result set may later be filtered down by other expressions.
type idSelector struct {
ctx *TagQueryContext
expr tagquery.Expression
rawResCh chan schema.MKey
workerWg sync.WaitGroup
stopCh chan struct{}
concGate chan struct{}
}
// newIdSelector initializes an id selector based on the given arguments.
// each id selector instance is only intended for being used a single time,
// reusing it is not intended
func newIdSelector(expr tagquery.Expression, ctx *TagQueryContext) *idSelector {
return &idSelector{
ctx: ctx,
expr: expr,
stopCh: make(chan struct{}),
concGate: make(chan struct{}, TagQueryWorkers), // gates concurrency
}
}
// getIds asynchronously looks up all ID's of the initial result set
// It returns:
// a channel through which the IDs of the initial result set will be sent
// a stop channel, which when closed, will cause the lookup jobs to be aborted
// this is the only method of idSelector which shall ever be called by users,
// all other methods of this type are only helpers of getIds
func (i *idSelector) getIds(resCh chan schema.MKey, stopCh chan struct{}) {
if stopCh != nil {
i.stopCh = stopCh
}
// this wait group is used to wait for all id producing go routines to complete
// their respective jobs, once they're done we can close the id chan
// we initially set it to 2 because there will be at least 1 routine to look up
// ids from the metric index and 1 to check the meta tag index. when looking up
// from the meta tag index this waitgroup may temporarily get further increased
i.workerWg.Add(2)
// if meta tag support is enabled and we create subqueries out of looked up meta
// records, then its possible that we will end up with duplicate results. to
// prevent this we spawn a separate worker process which deduplicates them
deduplicateResults := i.ctx.useMetaTagIndex()
var dedupWg sync.WaitGroup
if deduplicateResults {
i.rawResCh = make(chan schema.MKey)
dedupWg.Add(1)
go i.deduplicateRawResults(&dedupWg, resCh)
} else {
i.rawResCh = resCh
}
if i.expr.OperatesOnTag() {
i.byTag()
} else {
i.byTagValue()
}
i.workerWg.Wait()
dedupWg.Wait()
}
// deduplicateRawResults reads the channel i.rawResCh and deduplicates all the ids
// in it, then it inserts the unique ids into the channel i.resCh. This is only
// necessary for queries involving meta tag looksup, without meta tag lookups its
// not possible that duplicate ids will end up in i.rawResCh
func (i *idSelector) deduplicateRawResults(dedupWg *sync.WaitGroup, resCh chan schema.MKey) {
// once all workers are finished we want to close the raw result chan
// to make this routine exit
go func() {
i.workerWg.Wait()
close(i.rawResCh)
}()
defer dedupWg.Done()
seen := make(map[schema.MKey]struct{})
for id := range i.rawResCh {
if _, ok := seen[id]; ok {
continue
}
resCh <- id
seen[id] = struct{}{}
}
}
// byTagValue looks up all ids matching the expression i.expr and pushes them into
// the id chan.
// it assumes that expression i.expr operates on tag values
func (i *idSelector) byTagValue() {
go i.byTagValueFromMetricTagIndex()
if !i.ctx.useMetaTagIndex() {
i.workerWg.Done()
return
}
go i.byTagValueFromMetaTagIndex()
}
// byTagValueFromMetricTagIndex looks up all ids matching the expression i.expr
// from the metric index, it then pushes all of them into the id chan.
// this method assumes that the expression i.expr operates on tag values
func (i *idSelector) byTagValueFromMetricTagIndex() {
defer i.workerWg.Done()
// if expression value matches exactly we can directly look up the ids by it as key.
// this is faster than having to call expr.Matches on each value
if i.expr.MatchesExactly() {
for id := range i.ctx.index[i.expr.GetKey()][i.expr.GetValue()] {
if i.ctx.query.From > 0 && !i.ctx.newerThanFrom(id) {
continue
}
select {
case <-i.stopCh:
return
case i.rawResCh <- id:
}
}
return
}
// look up all values of the given key and check for each of them if it
// matches the expression.
// if there's a match, push all ids of the value into the id chan
for value, ids := range i.ctx.index[i.expr.GetKey()] {
if !i.expr.Matches(value) {
continue
}
for id := range ids {
if i.ctx.query.From > 0 && !i.ctx.newerThanFrom(id) {
continue
}
select {
case <-i.stopCh:
return
case i.rawResCh <- id:
}
}
}
}
// byTagValueFromMetaTagIndex looks up all ids matching the expression i.expr
// from the meta tag index, it then pushes all of them into the id chan.
// this method assumes that the expression i.expr operates on tag values.
// this function creates sub-queries based on the expressions associated with the
// meta tags which match i.expr, it then merges all results of the subqueries
func (i *idSelector) byTagValueFromMetaTagIndex() {
defer i.workerWg.Done()
for _, recordId := range i.ctx.metaTagIndex.getByTagValue(i.expr, false) {
select {
case <-i.stopCh:
return
default:
}
i.evaluateMetaRecord(recordId)
}
}
// byTag looks up all ids matching the expression i.expr and pushes them into
// the id chan.
// it assumes that expression i.expr operates on tag keys
func (i *idSelector) byTag() {
go i.byTagFromMetricTagIndex()
if !i.ctx.useMetaTagIndex() {
i.workerWg.Done()
return
}
go i.byTagFromMetaTagIndex()
}
// byTagFromMetricTagIndex looks up all ids matching the expression i.expr
// from the metric index, it then pushes all of them into the id chan.
// this method assumes that the expression i.expr operates on tag keys
func (i *idSelector) byTagFromMetricTagIndex() {
defer i.workerWg.Done()
if i.expr.MatchesExactly() {
for _, ids := range i.ctx.index[i.expr.GetKey()] {
for id := range ids {
if i.ctx.query.From > 0 && !i.ctx.newerThanFrom(id) {
continue
}
select {
case <-i.stopCh:
break
case i.rawResCh <- id:
}
}
}
return
}
for tag := range i.ctx.index {
if !i.expr.Matches(tag) {
continue
}
for _, ids := range i.ctx.index[tag] {
for id := range ids {
if i.ctx.query.From > 0 && !i.ctx.newerThanFrom(id) {
continue
}
select {
case <-i.stopCh:
return
case i.rawResCh <- id:
}
}
}
}
}
// byTagFromMetaTagIndex looks up all ids matching the expression i.expr
// from the meta tag index, it then pushes all of them into the id chan.
// this method assumes that the expression i.expr operates on tag keys.
// this function creates sub-queries based on the expressions associated with the
// meta tags which match i.expr, it then merges all results of the subqueries
func (i *idSelector) byTagFromMetaTagIndex() {
defer i.workerWg.Done()
for _, recordId := range i.ctx.metaTagIndex.getByTag(i.expr, false) {
select {
case <-i.stopCh:
return
default:
}
i.evaluateMetaRecord(recordId)
}
}
// evaluateMetaRecord takes a meta record id, it then looks up the corresponding
// meta record, builds a sub query from its expressions and executes the sub query
func (i *idSelector) evaluateMetaRecord(id recordId) {
record, ok := i.ctx.metaTagRecords.getMetaRecordById(id)
if !ok {
return
}
query, err := i.subQueryFromExpressions(record.Expressions)
if err != nil {
corruptIndex.Inc()
return
}
i.concGate <- struct{}{}
i.workerWg.Add(1)
go i.runSubQuery(query)
}
// subQueryFromExpressions takes a set of expressions and instantiates a new
// sub query based on them.
// it is used as a helper to lookup ids matching the expressions associated
// with a meta tag
func (i *idSelector) subQueryFromExpressions(expressions tagquery.Expressions) (TagQueryContext, error) {
var queryCtx TagQueryContext
query, err := tagquery.NewQuery(expressions, i.ctx.query.From)
if err != nil {
// this means we've stored a meta record containing invalid queries
corruptIndex.Inc()
return queryCtx, err
}
queryCtx = NewTagQueryContext(query)
queryCtx.subQuery = true
return queryCtx, nil
}
// runSubQuery takes a sub-query and executes it.
// it passes the rawResCh into the sub query, so the query results get
// directly pushed into it
func (i *idSelector) runSubQuery(query TagQueryContext) {
defer i.workerWg.Done()
query.Run(i.ctx.index, i.ctx.byId, i.ctx.metaTagIndex, i.ctx.metaTagRecords, i.rawResCh)
<-i.concGate
}