-
Notifications
You must be signed in to change notification settings - Fork 512
/
client.go
307 lines (251 loc) · 10.6 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
// SPDX-License-Identifier: AGPL-3.0-only
package continuoustest
import (
"context"
"flag"
"fmt"
"net/http"
"time"
"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
querierapi "github.com/grafana/mimir/pkg/querier/api"
"github.com/grafana/mimir/pkg/util/chunkinfologger"
"github.com/grafana/mimir/pkg/util/instrumentation"
util_math "github.com/grafana/mimir/pkg/util/math"
)
const (
maxErrMsgLen = 256
defaultTenant = "anonymous"
defaultUserAgent = "mimir-continuous-test"
)
// MimirClient is the interface implemented by a client used to interact with Mimir.
type MimirClient interface {
// WriteSeries writes input series to Mimir. Returns the response status code and optionally
// an error. The error is always returned if request was not successful (eg. received a 4xx or 5xx error).
WriteSeries(ctx context.Context, series []prompb.TimeSeries) (statusCode int, err error)
// QueryRange performs a range query.
QueryRange(ctx context.Context, query string, start, end time.Time, step time.Duration, options ...RequestOption) (model.Matrix, error)
// Query performs an instant query.
Query(ctx context.Context, query string, ts time.Time, options ...RequestOption) (model.Vector, error)
}
type ClientConfig struct {
TenantID string
BasicAuthUser string
BasicAuthPassword string
BearerToken string
WriteBaseEndpoint flagext.URLValue
WriteBatchSize int
WriteTimeout time.Duration
WriteProtocol string
ReadBaseEndpoint flagext.URLValue
ReadTimeout time.Duration
RequestDebug bool
UserAgent string
}
func (cfg *ClientConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.TenantID, "tests.tenant-id", defaultTenant, "The tenant ID to use to write and read metrics in tests.")
f.StringVar(&cfg.BasicAuthUser, "tests.basic-auth-user", "", "The username to use for HTTP bearer authentication. (mutually exclusive with bearer-token flag)")
f.StringVar(&cfg.BasicAuthPassword, "tests.basic-auth-password", "", "The password to use for HTTP bearer authentication. (mutually exclusive with bearer-token flag)")
f.StringVar(&cfg.BearerToken, "tests.bearer-token", "", "The bearer token to use for HTTP bearer authentication. (mutually exclusive with basic-auth flags)")
f.Var(&cfg.WriteBaseEndpoint, "tests.write-endpoint", "The base endpoint on the write path. The URL should have no trailing slash. The specific API path is appended by the tool to the URL, for example /api/v1/push for the remote write API endpoint, so the configured URL must not include it.")
f.IntVar(&cfg.WriteBatchSize, "tests.write-batch-size", 1000, "The maximum number of series to write in a single request.")
f.DurationVar(&cfg.WriteTimeout, "tests.write-timeout", 5*time.Second, "The timeout for a single write request.")
f.StringVar(&cfg.WriteProtocol, "tests.write-protocol", "prometheus", "The protocol to use to write series data. Supported values are: prometheus, otlp-http")
f.Var(&cfg.ReadBaseEndpoint, "tests.read-endpoint", "The base endpoint on the read path. The URL should have no trailing slash. The specific API path is appended by the tool to the URL, for example /api/v1/query_range for range query API, so the configured URL must not include it.")
f.DurationVar(&cfg.ReadTimeout, "tests.read-timeout", 60*time.Second, "The timeout for a single read request.")
f.BoolVar(&cfg.RequestDebug, "tests.send-chunks-debugging-header", false, "Request debugging on the server side via header.")
f.StringVar(&cfg.UserAgent, "tests.client.user-agent", defaultUserAgent, "The value the Mimir client should send in the User-Agent header.")
}
type Client struct {
writeClient clientWriter
readClient v1.API
cfg ClientConfig
logger log.Logger
}
type clientWriter interface {
sendWriteRequest(ctx context.Context, req *prompb.WriteRequest) (int, error)
}
func NewClient(cfg ClientConfig, logger log.Logger) (*Client, error) {
rt := &clientRoundTripper{
tenantID: cfg.TenantID,
basicAuthUser: cfg.BasicAuthUser,
basicAuthPassword: cfg.BasicAuthPassword,
bearerToken: cfg.BearerToken,
rt: instrumentation.TracerTransport{},
requestDebug: cfg.RequestDebug,
userAgent: cfg.UserAgent,
}
// Ensure the required config has been set.
if cfg.WriteBaseEndpoint.URL == nil {
return nil, errors.New("the write endpoint has not been set")
}
if cfg.ReadBaseEndpoint.URL == nil {
return nil, errors.New("the read endpoint has not been set")
}
if cfg.WriteProtocol != "prometheus" && cfg.WriteProtocol != "otlp-http" {
return nil, fmt.Errorf("the only supported write protocols are \"prometheus\" or \"otlp-http\"")
}
// Ensure not multiple auth methods set at the same time
// Allow tenantID and auth to be defined at the same time for tenant testing
// anonymous is the default value for TenantID.
if (cfg.TenantID != defaultTenant && cfg.BasicAuthUser != "" && cfg.BasicAuthPassword != "" && cfg.BearerToken != "") || // all authentication at once
(cfg.BasicAuthUser != "" && cfg.BasicAuthPassword != "" && cfg.BearerToken != "") { // basic auth and bearer token
return nil, errors.New("either set tests.tenant-id or tests.basic-auth-user/tests.basic-auth-password or tests.bearer-token")
}
apiCfg := api.Config{
Address: cfg.ReadBaseEndpoint.String(),
RoundTripper: rt,
}
readClient, err := api.NewClient(apiCfg)
if err != nil {
return nil, errors.Wrap(err, "failed to create read client")
}
var writeClient clientWriter
switch cfg.WriteProtocol {
case "prometheus":
writeClient = &prometheusWriter{
httpClient: &http.Client{Transport: rt},
writeBaseEndpoint: cfg.WriteBaseEndpoint,
writeBatchSize: cfg.WriteBatchSize,
writeTimeout: cfg.WriteTimeout,
}
case "otlp-http":
writeClient = &otlpHTTPWriter{
httpClient: &http.Client{Transport: rt},
writeBaseEndpoint: cfg.WriteBaseEndpoint,
writeBatchSize: cfg.WriteBatchSize,
writeTimeout: cfg.WriteTimeout,
}
}
return &Client{
writeClient: writeClient,
readClient: v1.NewAPI(readClient),
cfg: cfg,
logger: logger,
}, nil
}
// QueryRange implements MimirClient.
func (c *Client) QueryRange(ctx context.Context, query string, start, end time.Time, step time.Duration, options ...RequestOption) (model.Matrix, error) {
ctx = contextWithRequestOptions(ctx, options...)
ctx, cancel := context.WithTimeout(ctx, c.cfg.ReadTimeout)
defer cancel()
ctx = querierapi.ContextWithReadConsistencyLevel(ctx, querierapi.ReadConsistencyStrong)
value, _, err := c.readClient.QueryRange(ctx, query, v1.Range{
Start: start,
End: end,
Step: step,
})
if err != nil {
return nil, err
}
if value.Type() != model.ValMatrix {
return nil, fmt.Errorf("was expecting to get a Matrix, but got %s", value.Type().String())
}
matrix, ok := value.(model.Matrix)
if !ok {
return nil, fmt.Errorf("failed to cast type to Matrix, type was %T", value)
}
return matrix, nil
}
// Query implements MimirClient.
func (c *Client) Query(ctx context.Context, query string, ts time.Time, options ...RequestOption) (model.Vector, error) {
ctx = contextWithRequestOptions(ctx, options...)
ctx, cancel := context.WithTimeout(ctx, c.cfg.ReadTimeout)
defer cancel()
ctx = querierapi.ContextWithReadConsistencyLevel(ctx, querierapi.ReadConsistencyStrong)
value, _, err := c.readClient.Query(ctx, query, ts)
if err != nil {
return nil, err
}
if value.Type() != model.ValVector {
return nil, fmt.Errorf("was expecting to get a Vector, but got %s", value.Type().String())
}
vector, ok := value.(model.Vector)
if !ok {
return nil, fmt.Errorf("failed to cast type to Vector, type was %T", value)
}
return vector, nil
}
// WriteSeries implements MimirClient.
func (c *Client) WriteSeries(ctx context.Context, series []prompb.TimeSeries) (int, error) {
lastStatusCode := 0
// Honor the batch size.
for len(series) > 0 {
end := util_math.Min(len(series), c.cfg.WriteBatchSize)
batch := series[0:end]
series = series[end:]
var err error
lastStatusCode, err = c.writeClient.sendWriteRequest(ctx, &prompb.WriteRequest{Timeseries: batch})
if err != nil {
return lastStatusCode, err
}
}
return lastStatusCode, nil
}
// RequestOption defines a functional-style request option.
type RequestOption func(options *requestOptions)
// WithResultsCacheEnabled controls whether the query-frontend results cache should be enabled or disabled for the request.
// This function assumes query-frontend results cache is enabled by default.
func WithResultsCacheEnabled(enabled bool) RequestOption {
return func(options *requestOptions) {
options.resultsCacheDisabled = !enabled
}
}
// contextWithRequestOptions returns a context.Context with the request options applied.
func contextWithRequestOptions(ctx context.Context, options ...RequestOption) context.Context {
actual := &requestOptions{}
for _, option := range options {
option(actual)
}
return context.WithValue(ctx, requestOptionsKey, actual)
}
type requestOptions struct {
resultsCacheDisabled bool
}
type key int
var requestOptionsKey key
type clientRoundTripper struct {
tenantID string
basicAuthUser string
basicAuthPassword string
bearerToken string
rt http.RoundTripper
requestDebug bool
userAgent string
}
// RoundTrip add the tenant ID header required by Mimir.
func (rt *clientRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
options, _ := req.Context().Value(requestOptionsKey).(*requestOptions)
if options != nil && options.resultsCacheDisabled {
// Despite the name, the "no-store" directive also disables results cache lookup in Mimir.
req.Header.Set("Cache-Control", "no-store")
}
if rt.bearerToken != "" {
req.Header.Set("Authorization", "Bearer "+rt.bearerToken)
if rt.tenantID != defaultTenant {
req.Header.Set("X-Scope-OrgID", rt.tenantID)
}
} else if rt.basicAuthUser != "" && rt.basicAuthPassword != "" {
req.SetBasicAuth(rt.basicAuthUser, rt.basicAuthPassword)
if rt.tenantID != defaultTenant {
req.Header.Set("X-Scope-OrgID", rt.tenantID)
}
} else {
req.Header.Set("X-Scope-OrgID", rt.tenantID)
}
if rt.userAgent != "" {
req.Header.Set("User-Agent", rt.userAgent)
}
if lvl, ok := querierapi.ReadConsistencyLevelFromContext(req.Context()); ok {
req.Header.Add(querierapi.ReadConsistencyHeader, lvl)
}
if rt.requestDebug {
req.Header.Add(chunkinfologger.ChunkInfoLoggingHeader, "series_id")
}
return rt.rt.RoundTrip(req)
}