Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added range query support to write-read-series-test in mimir-continuous-test tool #1603

Merged
merged 2 commits into from
Apr 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 60 additions & 7 deletions pkg/continuoustest/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import (
"github.com/golang/snappy"
"github.com/grafana/dskit/flagext"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"

util_math "github.com/grafana/mimir/pkg/util/math"
Expand All @@ -30,6 +33,9 @@ type MimirClient interface {
// WriteSeries writes input series to Mimir. Returns the response status code and optionally
// an error. The error is always returned if request was not successful (eg. received a 4xx or 5xx error).
WriteSeries(ctx context.Context, series []prompb.TimeSeries) (statusCode int, err error)

// QueryRange performs a query for the given range.
QueryRange(ctx context.Context, query string, start, end time.Time, step time.Duration) (model.Matrix, error)
}

type ClientConfig struct {
Expand All @@ -38,6 +44,9 @@ type ClientConfig struct {
WriteBaseEndpoint flagext.URLValue
WriteBatchSize int
WriteTimeout time.Duration

ReadBaseEndpoint flagext.URLValue
ReadTimeout time.Duration
}

func (cfg *ClientConfig) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -46,12 +55,16 @@ func (cfg *ClientConfig) RegisterFlags(f *flag.FlagSet) {
f.Var(&cfg.WriteBaseEndpoint, "tests.write-endpoint", "The base endpoint on the write path. The URL should have no trailing slash. The specific API path is appended by the tool to the URL, for example /api/v1/push for the remote write API endpoint, so the configured URL must not include it.")
f.IntVar(&cfg.WriteBatchSize, "tests.write-batch-size", 1000, "The maximum number of series to write in a single request.")
f.DurationVar(&cfg.WriteTimeout, "tests.write-timeout", 5*time.Second, "The timeout for a single write request.")

f.Var(&cfg.ReadBaseEndpoint, "tests.read-endpoint", "The base endpoint on the read path. The URL should have no trailing slash. The specific API path is appended by the tool to the URL, for example /api/v1/query_range for range query API, so the configured URL must not include it.")
f.DurationVar(&cfg.ReadTimeout, "tests.read-timeout", 30*time.Second, "The timeout for a single read request.")
}

type Client struct {
client *http.Client
cfg ClientConfig
logger log.Logger
writeClient *http.Client
readClient v1.API
cfg ClientConfig
logger log.Logger
}

func NewClient(cfg ClientConfig, logger log.Logger) (*Client, error) {
Expand All @@ -62,14 +75,54 @@ func NewClient(cfg ClientConfig, logger log.Logger) (*Client, error) {
if cfg.WriteBaseEndpoint.URL == nil {
return nil, errors.New("the write endpoint has not been set")
}
if cfg.ReadBaseEndpoint.URL == nil {
return nil, errors.New("the read endpoint has not been set")
}

apiCfg := api.Config{
Address: cfg.ReadBaseEndpoint.String(),
RoundTripper: rt,
}

readClient, err := api.NewClient(apiCfg)
if err != nil {
return nil, errors.Wrap(err, "failed to create read client")
}

return &Client{
client: &http.Client{Transport: rt},
cfg: cfg,
logger: logger,
writeClient: &http.Client{Transport: rt},
readClient: v1.NewAPI(readClient),
cfg: cfg,
logger: logger,
}, nil
}

// QueryRange implements MimirClient.
func (c *Client) QueryRange(ctx context.Context, query string, start, end time.Time, step time.Duration) (model.Matrix, error) {
ctx, cancel := context.WithTimeout(ctx, c.cfg.ReadTimeout)
defer cancel()

value, _, err := c.readClient.QueryRange(ctx, query, v1.Range{
Start: start,
End: end,
Step: step,
})
if err != nil {
return nil, err
}

if value.Type() != model.ValMatrix {
return nil, errors.New("was expecting to get a Matrix")
}

matrix, ok := value.(model.Matrix)
if !ok {
return nil, errors.New("failed to cast type to Matrix")
}

return matrix, nil
}

// WriteSeries implements MimirClient.
func (c *Client) WriteSeries(ctx context.Context, series []prompb.TimeSeries) (int, error) {
lastStatusCode := 0
Expand Down Expand Up @@ -111,7 +164,7 @@ func (c *Client) sendWriteRequest(ctx context.Context, req *prompb.WriteRequest)
httpReq.Header.Set("User-Agent", "mimir-continuous-test")
httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")

httpResp, err := c.client.Do(httpReq)
httpResp, err := c.writeClient.Do(httpReq)
if err != nil {
return 0, err
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/continuoustest/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/grafana/dskit/flagext"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -48,6 +49,7 @@ func TestClient_WriteSeries(t *testing.T) {
flagext.DefaultValues(&cfg)
cfg.WriteBatchSize = 10
require.NoError(t, cfg.WriteBaseEndpoint.Set(server.URL))
require.NoError(t, cfg.ReadBaseEndpoint.Set(server.URL))

c, err := NewClient(cfg, log.NewNopLogger())
require.NoError(t, err)
Expand Down Expand Up @@ -113,3 +115,8 @@ func (m *ClientMock) WriteSeries(ctx context.Context, series []prompb.TimeSeries
args := m.Called(ctx, series)
return args.Int(0), args.Error(1)
}

func (m *ClientMock) QueryRange(ctx context.Context, query string, start, end time.Time, step time.Duration) (model.Matrix, error) {
args := m.Called(ctx, query, start, end, step)
return args.Get(0).(model.Matrix), args.Error(1)
}
28 changes: 26 additions & 2 deletions pkg/continuoustest/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@ import (
// TestMetrics holds generic metrics tracked by tests. The common metrics are used to enforce the same
// metric names and labels to track the same information across different tests.
type TestMetrics struct {
writesTotal prometheus.Counter
writesFailedTotal *prometheus.CounterVec
writesTotal prometheus.Counter
writesFailedTotal *prometheus.CounterVec
queriesTotal prometheus.Counter
queriesFailedTotal prometheus.Counter
queryResultChecksTotal prometheus.Counter
queryResultChecksFailedTotal prometheus.Counter
}

func NewTestMetrics(testName string, reg prometheus.Registerer) *TestMetrics {
Expand All @@ -26,5 +30,25 @@ func NewTestMetrics(testName string, reg prometheus.Registerer) *TestMetrics {
Help: "Total number of failed write requests.",
ConstLabels: map[string]string{"test": testName},
}, []string{"status_code"}),
queriesTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "mimir_continuous_test_queries_total",
Help: "Total number of attempted query requests.",
ConstLabels: map[string]string{"test": testName},
}),
queriesFailedTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "mimir_continuous_test_queries_failed_total",
Help: "Total number of failed query requests.",
ConstLabels: map[string]string{"test": testName},
}),
queryResultChecksTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "mimir_continuous_test_query_result_checks_total",
Help: "Total number of query results checked for correctness.",
ConstLabels: map[string]string{"test": testName},
}),
queryResultChecksFailedTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "mimir_continuous_test_query_result_checks_failed_total",
Help: "Total number of query results failed when checking for correctness.",
ConstLabels: map[string]string{"test": testName},
}),
}
}
89 changes: 89 additions & 0 deletions pkg/continuoustest/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,43 @@
package continuoustest

import (
"fmt"
"math"
"math/rand"
"strconv"
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
)

const (
maxComparisonDelta = 0.001
)

func alignTimestampToInterval(ts time.Time, interval time.Duration) time.Time {
return ts.Truncate(interval)
}

// getQueryStep returns the query step to use to run a test query. The returned step
// is a guaranteed to be a multiple of alignInterval.
func getQueryStep(start, end time.Time, alignInterval time.Duration) time.Duration {
const maxSamples = 1000

// Compute the number of samples that we would have if we query every single sample.
actualSamples := end.Sub(start) / alignInterval
if actualSamples <= maxSamples {
return alignInterval
}

// Adjust the query step based on the max steps spread over the query time range,
// rounding it to alignInterval.
step := end.Sub(start) / time.Duration(maxSamples)
step = ((step / alignInterval) + 1) * alignInterval

return step
}

func generateSineWaveSeries(name string, t time.Time, numSeries int) []prompb.TimeSeries {
out := make([]prompb.TimeSeries, 0, numSeries)
value := generateSineWaveValue(t)
Expand Down Expand Up @@ -42,3 +68,66 @@ func generateSineWaveValue(t time.Time) float64 {
radians := 2 * math.Pi * float64(t.UnixNano()) / float64(period.Nanoseconds())
return math.Sin(radians)
}

// verifySineWaveSamplesSum assumes the input matrix is the result of a range query summing the values
// of expectedSeries sine wave series and checks whether the actual values match the expected ones.
// Returns error if values don't match.
func verifySineWaveSamplesSum(matrix model.Matrix, expectedSeries int, expectedStep time.Duration) error {
if len(matrix) != 1 {
return fmt.Errorf("expected 1 series in the result but got %d", len(matrix))
}

samples := matrix[0].Values

for idx, sample := range samples {
ts := time.UnixMilli(int64(sample.Timestamp)).UTC()

// Assert on value.
expectedValue := generateSineWaveValue(ts)
if !compareSampleValues(float64(sample.Value), expectedValue*float64(expectedSeries)) {
return fmt.Errorf("sample at timestamp %d (%s) has value %f while was expecting %f", sample.Timestamp, ts.String(), sample.Value, expectedValue)
}

// Assert on sample timestamp. We expect no gaps.
if idx > 0 {
prevTs := time.UnixMilli(int64(samples[idx-1].Timestamp)).UTC()
expectedTs := prevTs.Add(expectedStep)

if ts.UnixMilli() != expectedTs.UnixMilli() {
return fmt.Errorf("sample at timestamp %d (%s) was expected to have timestamp %d (%s) because previous sample had timestamp %d (%s)",
sample.Timestamp, ts.String(), expectedTs.UnixMilli(), expectedTs.String(), prevTs.UnixMilli(), prevTs.String())
}
}
}

return nil
}

func compareSampleValues(actual, expected float64) bool {
delta := math.Abs((actual - expected) / maxComparisonDelta)
return delta < maxComparisonDelta
}

func minTime(first, second time.Time) time.Time {
if first.After(second) {
return second
}
return first
}

func maxTime(first, second time.Time) time.Time {
if first.After(second) {
return first
}
return second
}

func randTime(min, max time.Time) time.Time {
delta := max.Unix() - min.Unix()
if delta <= 0 {
return min
}

sec := rand.Int63n(delta) + min.Unix()
return time.Unix(sec, 0)
}
Loading