-
Notifications
You must be signed in to change notification settings - Fork 32
/
query.go
270 lines (225 loc) · 6.74 KB
/
query.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
package grafana
import (
"context"
"fmt"
"net/http"
"path"
"time"
"github.com/segmentio/objconv"
)
// QueryHandler is the handler for the /query endpoint in the
// simple-json-datasource API.
type QueryHandler interface {
// ServeQuery is expected to reply with a list of data points for the given
// "target" and time range (or a set of rows for table requests).
//
// Note: my understanding is that "target" is some kind of identifier that
// describes some data set in the source (like a SQL query for example), but
// it's treated as an opaque blob of data by Grafana itself.
ServeQuery(ctx context.Context, res QueryResponse, req *QueryRequest) error
}
// QueryHandlerFunc makes it possible to use regular function types as query
// handlers.
type QueryHandlerFunc func(context.Context, QueryResponse, *QueryRequest) error
// ServeQuery calls f, satisfies the QueryHandler interface.
func (f QueryHandlerFunc) ServeQuery(ctx context.Context, res QueryResponse, req *QueryRequest) error {
return f(ctx, res, req)
}
// QueryResponse is an interface used to respond to a search request.
type QueryResponse interface {
// Timeserie returns a TimeserieWriter which can be used to output the
// datapoint in response to a timeserie request.
Timeserie(target string) TimeserieWriter
// Table returns a TableWriter which can be used to output the rows in
// response to a table request.
Table(columns ...Column) TableWriter
}
// QueryRequest represents a request received on the /query endpoint.
type QueryRequest struct {
From time.Time
To time.Time
Interval time.Duration
Targets []Target
MaxDataPoints int
}
// Target is a data structure representing the target of a query.
type Target struct {
Query string `json:"target"`
RefID string `json:"refId"`
Type TargetType `json:"type"`
}
// TargetType is an enumeration of the various target types supported by
// Grafana.
type TargetType string
const (
Timeserie TargetType = "timeserie"
Table TargetType = "table"
)
// TimeserieWriter is an interface used to write timeserie data in response to a
// query.
type TimeserieWriter interface {
WriteDatapoint(value float64, time time.Time)
}
// TableWriter is an interface used to write timeserie data in response to a
// query.
type TableWriter interface {
WriteRow(values ...interface{})
}
// Column is a data structure representing a table column.
type Column struct {
Text string `json:"text"`
Type ColumnType `json:"type,omitempty"`
Sort bool `json:"sort,omitempty"`
Desc bool `json:"desc,omitempty"`
}
// Col constructs a new Column value from a text and column type.
func Col(text string, colType ColumnType) Column {
return Column{Text: text, Type: colType}
}
// AscCol constructs a ne Column value from a text a column type, which is
// configured as a sorted column in ascending order.
func AscCol(text string, colType ColumnType) Column {
return Column{Text: text, Type: colType, Sort: true}
}
// DescCol constructs a ne Column value from a text a column type, which is
// configured as a sorted column in descending order.
func DescCol(text string, colType ColumnType) Column {
return Column{Text: text, Type: colType, Sort: true, Desc: true}
}
// ColumnType is an enumeration of the various column types supported by
// Grafana.
type ColumnType string
const (
Untyped ColumnType = ""
String ColumnType = "string"
Time ColumnType = "time"
Number ColumnType = "number"
)
// NewQueryHandler returns a new http.Handler which delegates /query API calls
// to the given query handler.
func NewQueryHandler(handler QueryHandler) http.Handler {
return handlerFunc(func(ctx context.Context, enc *objconv.StreamEncoder, dec *objconv.Decoder) error {
req := queryRequest{}
res := queryResponse{enc: enc}
if err := dec.Decode(&req); err != nil {
return err
}
if err := handler.ServeQuery(ctx, &res, &QueryRequest{
From: req.Range.From,
To: req.Range.To,
Interval: req.Interval,
Targets: req.Targets,
MaxDataPoints: req.MaxDataPoints,
}); err != nil {
return err
}
return res.close()
})
}
// HandleQuery installs a handler on /query.
func HandleQuery(mux *http.ServeMux, prefix string, handler QueryHandler) {
mux.Handle(path.Join("/", prefix, "query"), NewQueryHandler(handler))
}
type queryRange struct {
From time.Time `json:"from"`
To time.Time `json:"to"`
}
type queryRequest struct {
Range queryRange `json:"range"`
Interval time.Duration `json:"interval"`
Targets []Target `json:"targets"`
MaxDataPoints int `json:"maxDataPoints"`
}
type queryResponse struct {
enc *objconv.StreamEncoder
timeserie *timeserie
table *table
}
func (res *queryResponse) Timeserie(target string) TimeserieWriter {
res.flush()
res.timeserie = ×erie{
Target: target,
Datapoints: make([]datapoint, 0, 100),
}
return res.timeserie
}
func (res *queryResponse) Table(columns ...Column) TableWriter {
res.flush()
res.table = &table{
Columns: columns,
Rows: make([]row, 0, 100),
Type: "table",
}
return res.table
}
func (res *queryResponse) close() error {
res.flush()
return res.enc.Close()
}
func (res *queryResponse) flush() {
if res.timeserie != nil {
res.enc.Encode(res.timeserie)
res.timeserie.closed = true
res.timeserie = nil
}
if res.table != nil {
res.enc.Encode(res.table)
res.table.closed = true
res.table = nil
}
}
type datapoint struct {
value float64
time int64
}
func (d datapoint) EncodeValue(e objconv.Encoder) error {
i := 0
return e.EncodeArray(2, func(e objconv.Encoder) error {
switch i++; i {
case 1:
return e.Encode(d.value)
default:
return e.Encode(d.time)
}
})
}
type timeserie struct {
Target string `json:"target"`
Datapoints []datapoint `json:"datapoints"`
closed bool
}
func (t *timeserie) WriteDatapoint(value float64, time time.Time) {
if t.closed {
panic("writing to a timeserie after it was already flushed")
}
t.Datapoints = append(t.Datapoints, datapoint{
value: value,
time: timestamp(time),
})
}
type row []interface{}
type table struct {
Columns []Column `json:"columns"`
Rows []row `json:"rows"`
Type string `json:"type"`
closed bool
}
func (t *table) WriteRow(values ...interface{}) {
if t.closed {
panic("writing to a table after it was already flushed")
}
if len(values) != len(t.Columns) {
panic(fmt.Sprintf("row value count doesn't match the number of columns, expected %d values but got %d", len(t.Columns), len(values)))
}
row := make(row, len(values))
copy(row, values)
for i := range row {
if t, ok := row[i].(time.Time); ok {
row[i] = timestamp(t)
}
}
t.Rows = append(t.Rows, row)
}
func timestamp(t time.Time) int64 {
return t.UnixNano() / int64(time.Millisecond)
}