Skip to content

Commit

Permalink
Added range query support to write-read-series-test in mimir-continuo…
Browse files Browse the repository at this point in the history
…us-test tool (#1603)

* Added query range support to write-read-series-test in mimir-continuous-test tool

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Log query start/end time with milliseconds precision

Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci committed Apr 1, 2022
1 parent 4050d03 commit 006cad8
Show file tree
Hide file tree
Showing 7 changed files with 668 additions and 22 deletions.
67 changes: 60 additions & 7 deletions pkg/continuoustest/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import (
"github.com/golang/snappy"
"github.com/grafana/dskit/flagext"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"

util_math "github.com/grafana/mimir/pkg/util/math"
Expand All @@ -30,6 +33,9 @@ type MimirClient interface {
// WriteSeries writes input series to Mimir. Returns the response status code and optionally
// an error. The error is always returned if request was not successful (eg. received a 4xx or 5xx error).
WriteSeries(ctx context.Context, series []prompb.TimeSeries) (statusCode int, err error)

// QueryRange performs a query for the given range.
QueryRange(ctx context.Context, query string, start, end time.Time, step time.Duration) (model.Matrix, error)
}

type ClientConfig struct {
Expand All @@ -38,6 +44,9 @@ type ClientConfig struct {
WriteBaseEndpoint flagext.URLValue
WriteBatchSize int
WriteTimeout time.Duration

ReadBaseEndpoint flagext.URLValue
ReadTimeout time.Duration
}

func (cfg *ClientConfig) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -46,12 +55,16 @@ func (cfg *ClientConfig) RegisterFlags(f *flag.FlagSet) {
f.Var(&cfg.WriteBaseEndpoint, "tests.write-endpoint", "The base endpoint on the write path. The URL should have no trailing slash. The specific API path is appended by the tool to the URL, for example /api/v1/push for the remote write API endpoint, so the configured URL must not include it.")
f.IntVar(&cfg.WriteBatchSize, "tests.write-batch-size", 1000, "The maximum number of series to write in a single request.")
f.DurationVar(&cfg.WriteTimeout, "tests.write-timeout", 5*time.Second, "The timeout for a single write request.")

f.Var(&cfg.ReadBaseEndpoint, "tests.read-endpoint", "The base endpoint on the read path. The URL should have no trailing slash. The specific API path is appended by the tool to the URL, for example /api/v1/query_range for range query API, so the configured URL must not include it.")
f.DurationVar(&cfg.ReadTimeout, "tests.read-timeout", 30*time.Second, "The timeout for a single read request.")
}

type Client struct {
client *http.Client
cfg ClientConfig
logger log.Logger
writeClient *http.Client
readClient v1.API
cfg ClientConfig
logger log.Logger
}

func NewClient(cfg ClientConfig, logger log.Logger) (*Client, error) {
Expand All @@ -62,14 +75,54 @@ func NewClient(cfg ClientConfig, logger log.Logger) (*Client, error) {
if cfg.WriteBaseEndpoint.URL == nil {
return nil, errors.New("the write endpoint has not been set")
}
if cfg.ReadBaseEndpoint.URL == nil {
return nil, errors.New("the read endpoint has not been set")
}

apiCfg := api.Config{
Address: cfg.ReadBaseEndpoint.String(),
RoundTripper: rt,
}

readClient, err := api.NewClient(apiCfg)
if err != nil {
return nil, errors.Wrap(err, "failed to create read client")
}

return &Client{
client: &http.Client{Transport: rt},
cfg: cfg,
logger: logger,
writeClient: &http.Client{Transport: rt},
readClient: v1.NewAPI(readClient),
cfg: cfg,
logger: logger,
}, nil
}

// QueryRange implements MimirClient.
func (c *Client) QueryRange(ctx context.Context, query string, start, end time.Time, step time.Duration) (model.Matrix, error) {
ctx, cancel := context.WithTimeout(ctx, c.cfg.ReadTimeout)
defer cancel()

value, _, err := c.readClient.QueryRange(ctx, query, v1.Range{
Start: start,
End: end,
Step: step,
})
if err != nil {
return nil, err
}

if value.Type() != model.ValMatrix {
return nil, errors.New("was expecting to get a Matrix")
}

matrix, ok := value.(model.Matrix)
if !ok {
return nil, errors.New("failed to cast type to Matrix")
}

return matrix, nil
}

// WriteSeries implements MimirClient.
func (c *Client) WriteSeries(ctx context.Context, series []prompb.TimeSeries) (int, error) {
lastStatusCode := 0
Expand Down Expand Up @@ -111,7 +164,7 @@ func (c *Client) sendWriteRequest(ctx context.Context, req *prompb.WriteRequest)
httpReq.Header.Set("User-Agent", "mimir-continuous-test")
httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")

httpResp, err := c.client.Do(httpReq)
httpResp, err := c.writeClient.Do(httpReq)
if err != nil {
return 0, err
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/continuoustest/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/grafana/dskit/flagext"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -48,6 +49,7 @@ func TestClient_WriteSeries(t *testing.T) {
flagext.DefaultValues(&cfg)
cfg.WriteBatchSize = 10
require.NoError(t, cfg.WriteBaseEndpoint.Set(server.URL))
require.NoError(t, cfg.ReadBaseEndpoint.Set(server.URL))

c, err := NewClient(cfg, log.NewNopLogger())
require.NoError(t, err)
Expand Down Expand Up @@ -113,3 +115,8 @@ func (m *ClientMock) WriteSeries(ctx context.Context, series []prompb.TimeSeries
args := m.Called(ctx, series)
return args.Int(0), args.Error(1)
}

func (m *ClientMock) QueryRange(ctx context.Context, query string, start, end time.Time, step time.Duration) (model.Matrix, error) {
args := m.Called(ctx, query, start, end, step)
return args.Get(0).(model.Matrix), args.Error(1)
}
28 changes: 26 additions & 2 deletions pkg/continuoustest/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@ import (
// TestMetrics holds generic metrics tracked by tests. The common metrics are used to enforce the same
// metric names and labels to track the same information across different tests.
type TestMetrics struct {
writesTotal prometheus.Counter
writesFailedTotal *prometheus.CounterVec
writesTotal prometheus.Counter
writesFailedTotal *prometheus.CounterVec
queriesTotal prometheus.Counter
queriesFailedTotal prometheus.Counter
queryResultChecksTotal prometheus.Counter
queryResultChecksFailedTotal prometheus.Counter
}

func NewTestMetrics(testName string, reg prometheus.Registerer) *TestMetrics {
Expand All @@ -26,5 +30,25 @@ func NewTestMetrics(testName string, reg prometheus.Registerer) *TestMetrics {
Help: "Total number of failed write requests.",
ConstLabels: map[string]string{"test": testName},
}, []string{"status_code"}),
queriesTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "mimir_continuous_test_queries_total",
Help: "Total number of attempted query requests.",
ConstLabels: map[string]string{"test": testName},
}),
queriesFailedTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "mimir_continuous_test_queries_failed_total",
Help: "Total number of failed query requests.",
ConstLabels: map[string]string{"test": testName},
}),
queryResultChecksTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "mimir_continuous_test_query_result_checks_total",
Help: "Total number of query results checked for correctness.",
ConstLabels: map[string]string{"test": testName},
}),
queryResultChecksFailedTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "mimir_continuous_test_query_result_checks_failed_total",
Help: "Total number of query results failed when checking for correctness.",
ConstLabels: map[string]string{"test": testName},
}),
}
}
89 changes: 89 additions & 0 deletions pkg/continuoustest/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,43 @@
package continuoustest

import (
"fmt"
"math"
"math/rand"
"strconv"
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
)

const (
maxComparisonDelta = 0.001
)

func alignTimestampToInterval(ts time.Time, interval time.Duration) time.Time {
return ts.Truncate(interval)
}

// getQueryStep returns the query step to use to run a test query. The returned step
// is a guaranteed to be a multiple of alignInterval.
func getQueryStep(start, end time.Time, alignInterval time.Duration) time.Duration {
const maxSamples = 1000

// Compute the number of samples that we would have if we query every single sample.
actualSamples := end.Sub(start) / alignInterval
if actualSamples <= maxSamples {
return alignInterval
}

// Adjust the query step based on the max steps spread over the query time range,
// rounding it to alignInterval.
step := end.Sub(start) / time.Duration(maxSamples)
step = ((step / alignInterval) + 1) * alignInterval

return step
}

func generateSineWaveSeries(name string, t time.Time, numSeries int) []prompb.TimeSeries {
out := make([]prompb.TimeSeries, 0, numSeries)
value := generateSineWaveValue(t)
Expand Down Expand Up @@ -42,3 +68,66 @@ func generateSineWaveValue(t time.Time) float64 {
radians := 2 * math.Pi * float64(t.UnixNano()) / float64(period.Nanoseconds())
return math.Sin(radians)
}

// verifySineWaveSamplesSum assumes the input matrix is the result of a range query summing the values
// of expectedSeries sine wave series and checks whether the actual values match the expected ones.
// Returns error if values don't match.
func verifySineWaveSamplesSum(matrix model.Matrix, expectedSeries int, expectedStep time.Duration) error {
if len(matrix) != 1 {
return fmt.Errorf("expected 1 series in the result but got %d", len(matrix))
}

samples := matrix[0].Values

for idx, sample := range samples {
ts := time.UnixMilli(int64(sample.Timestamp)).UTC()

// Assert on value.
expectedValue := generateSineWaveValue(ts)
if !compareSampleValues(float64(sample.Value), expectedValue*float64(expectedSeries)) {
return fmt.Errorf("sample at timestamp %d (%s) has value %f while was expecting %f", sample.Timestamp, ts.String(), sample.Value, expectedValue)
}

// Assert on sample timestamp. We expect no gaps.
if idx > 0 {
prevTs := time.UnixMilli(int64(samples[idx-1].Timestamp)).UTC()
expectedTs := prevTs.Add(expectedStep)

if ts.UnixMilli() != expectedTs.UnixMilli() {
return fmt.Errorf("sample at timestamp %d (%s) was expected to have timestamp %d (%s) because previous sample had timestamp %d (%s)",
sample.Timestamp, ts.String(), expectedTs.UnixMilli(), expectedTs.String(), prevTs.UnixMilli(), prevTs.String())
}
}
}

return nil
}

func compareSampleValues(actual, expected float64) bool {
delta := math.Abs((actual - expected) / maxComparisonDelta)
return delta < maxComparisonDelta
}

func minTime(first, second time.Time) time.Time {
if first.After(second) {
return second
}
return first
}

func maxTime(first, second time.Time) time.Time {
if first.After(second) {
return first
}
return second
}

func randTime(min, max time.Time) time.Time {
delta := max.Unix() - min.Unix()
if delta <= 0 {
return min
}

sec := rand.Int63n(delta) + min.Unix()
return time.Unix(sec, 0)
}
Loading

0 comments on commit 006cad8

Please sign in to comment.