diff --git a/pkg/continuoustest/client.go b/pkg/continuoustest/client.go index 68874148b92..1a196f5e2ed 100644 --- a/pkg/continuoustest/client.go +++ b/pkg/continuoustest/client.go @@ -16,6 +16,9 @@ import ( "github.com/golang/snappy" "github.com/grafana/dskit/flagext" "github.com/pkg/errors" + "github.com/prometheus/client_golang/api" + v1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/prompb" util_math "github.com/grafana/mimir/pkg/util/math" @@ -30,6 +33,9 @@ type MimirClient interface { // WriteSeries writes input series to Mimir. Returns the response status code and optionally // an error. The error is always returned if request was not successful (eg. received a 4xx or 5xx error). WriteSeries(ctx context.Context, series []prompb.TimeSeries) (statusCode int, err error) + + // QueryRange performs a query for the given range. + QueryRange(ctx context.Context, query string, start, end time.Time, step time.Duration) (model.Matrix, error) } type ClientConfig struct { @@ -38,6 +44,9 @@ type ClientConfig struct { WriteBaseEndpoint flagext.URLValue WriteBatchSize int WriteTimeout time.Duration + + ReadBaseEndpoint flagext.URLValue + ReadTimeout time.Duration } func (cfg *ClientConfig) RegisterFlags(f *flag.FlagSet) { @@ -46,12 +55,16 @@ func (cfg *ClientConfig) RegisterFlags(f *flag.FlagSet) { f.Var(&cfg.WriteBaseEndpoint, "tests.write-endpoint", "The base endpoint on the write path. The URL should have no trailing slash. The specific API path is appended by the tool to the URL, for example /api/v1/push for the remote write API endpoint, so the configured URL must not include it.") f.IntVar(&cfg.WriteBatchSize, "tests.write-batch-size", 1000, "The maximum number of series to write in a single request.") f.DurationVar(&cfg.WriteTimeout, "tests.write-timeout", 5*time.Second, "The timeout for a single write request.") + + f.Var(&cfg.ReadBaseEndpoint, "tests.read-endpoint", "The base endpoint on the read path. The URL should have no trailing slash. The specific API path is appended by the tool to the URL, for example /api/v1/query_range for range query API, so the configured URL must not include it.") + f.DurationVar(&cfg.ReadTimeout, "tests.read-timeout", 30*time.Second, "The timeout for a single read request.") } type Client struct { - client *http.Client - cfg ClientConfig - logger log.Logger + writeClient *http.Client + readClient v1.API + cfg ClientConfig + logger log.Logger } func NewClient(cfg ClientConfig, logger log.Logger) (*Client, error) { @@ -62,14 +75,54 @@ func NewClient(cfg ClientConfig, logger log.Logger) (*Client, error) { if cfg.WriteBaseEndpoint.URL == nil { return nil, errors.New("the write endpoint has not been set") } + if cfg.ReadBaseEndpoint.URL == nil { + return nil, errors.New("the read endpoint has not been set") + } + + apiCfg := api.Config{ + Address: cfg.ReadBaseEndpoint.String(), + RoundTripper: rt, + } + + readClient, err := api.NewClient(apiCfg) + if err != nil { + return nil, errors.Wrap(err, "failed to create read client") + } return &Client{ - client: &http.Client{Transport: rt}, - cfg: cfg, - logger: logger, + writeClient: &http.Client{Transport: rt}, + readClient: v1.NewAPI(readClient), + cfg: cfg, + logger: logger, }, nil } +// QueryRange implements MimirClient. +func (c *Client) QueryRange(ctx context.Context, query string, start, end time.Time, step time.Duration) (model.Matrix, error) { + ctx, cancel := context.WithTimeout(ctx, c.cfg.ReadTimeout) + defer cancel() + + value, _, err := c.readClient.QueryRange(ctx, query, v1.Range{ + Start: start, + End: end, + Step: step, + }) + if err != nil { + return nil, err + } + + if value.Type() != model.ValMatrix { + return nil, errors.New("was expecting to get a Matrix") + } + + matrix, ok := value.(model.Matrix) + if !ok { + return nil, errors.New("failed to cast type to Matrix") + } + + return matrix, nil +} + // WriteSeries implements MimirClient. func (c *Client) WriteSeries(ctx context.Context, series []prompb.TimeSeries) (int, error) { lastStatusCode := 0 @@ -111,7 +164,7 @@ func (c *Client) sendWriteRequest(ctx context.Context, req *prompb.WriteRequest) httpReq.Header.Set("User-Agent", "mimir-continuous-test") httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") - httpResp, err := c.client.Do(httpReq) + httpResp, err := c.writeClient.Do(httpReq) if err != nil { return 0, err } diff --git a/pkg/continuoustest/client_test.go b/pkg/continuoustest/client_test.go index 65d96c46c26..8433d19e3db 100644 --- a/pkg/continuoustest/client_test.go +++ b/pkg/continuoustest/client_test.go @@ -14,6 +14,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/golang/snappy" "github.com/grafana/dskit/flagext" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/prompb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -48,6 +49,7 @@ func TestClient_WriteSeries(t *testing.T) { flagext.DefaultValues(&cfg) cfg.WriteBatchSize = 10 require.NoError(t, cfg.WriteBaseEndpoint.Set(server.URL)) + require.NoError(t, cfg.ReadBaseEndpoint.Set(server.URL)) c, err := NewClient(cfg, log.NewNopLogger()) require.NoError(t, err) @@ -113,3 +115,8 @@ func (m *ClientMock) WriteSeries(ctx context.Context, series []prompb.TimeSeries args := m.Called(ctx, series) return args.Int(0), args.Error(1) } + +func (m *ClientMock) QueryRange(ctx context.Context, query string, start, end time.Time, step time.Duration) (model.Matrix, error) { + args := m.Called(ctx, query, start, end, step) + return args.Get(0).(model.Matrix), args.Error(1) +} diff --git a/pkg/continuoustest/metrics.go b/pkg/continuoustest/metrics.go index 28c124be92e..c636964cb7a 100644 --- a/pkg/continuoustest/metrics.go +++ b/pkg/continuoustest/metrics.go @@ -10,8 +10,12 @@ import ( // TestMetrics holds generic metrics tracked by tests. The common metrics are used to enforce the same // metric names and labels to track the same information across different tests. type TestMetrics struct { - writesTotal prometheus.Counter - writesFailedTotal *prometheus.CounterVec + writesTotal prometheus.Counter + writesFailedTotal *prometheus.CounterVec + queriesTotal prometheus.Counter + queriesFailedTotal prometheus.Counter + queryResultChecksTotal prometheus.Counter + queryResultChecksFailedTotal prometheus.Counter } func NewTestMetrics(testName string, reg prometheus.Registerer) *TestMetrics { @@ -26,5 +30,25 @@ func NewTestMetrics(testName string, reg prometheus.Registerer) *TestMetrics { Help: "Total number of failed write requests.", ConstLabels: map[string]string{"test": testName}, }, []string{"status_code"}), + queriesTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "mimir_continuous_test_queries_total", + Help: "Total number of attempted query requests.", + ConstLabels: map[string]string{"test": testName}, + }), + queriesFailedTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "mimir_continuous_test_queries_failed_total", + Help: "Total number of failed query requests.", + ConstLabels: map[string]string{"test": testName}, + }), + queryResultChecksTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "mimir_continuous_test_query_result_checks_total", + Help: "Total number of query results checked for correctness.", + ConstLabels: map[string]string{"test": testName}, + }), + queryResultChecksFailedTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "mimir_continuous_test_query_result_checks_failed_total", + Help: "Total number of query results failed when checking for correctness.", + ConstLabels: map[string]string{"test": testName}, + }), } } diff --git a/pkg/continuoustest/util.go b/pkg/continuoustest/util.go index eb5e14b4413..a6957b87a6a 100644 --- a/pkg/continuoustest/util.go +++ b/pkg/continuoustest/util.go @@ -3,17 +3,43 @@ package continuoustest import ( + "fmt" "math" + "math/rand" "strconv" "time" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/prompb" ) +const ( + maxComparisonDelta = 0.001 +) + func alignTimestampToInterval(ts time.Time, interval time.Duration) time.Time { return ts.Truncate(interval) } +// getQueryStep returns the query step to use to run a test query. The returned step +// is a guaranteed to be a multiple of alignInterval. +func getQueryStep(start, end time.Time, alignInterval time.Duration) time.Duration { + const maxSamples = 1000 + + // Compute the number of samples that we would have if we query every single sample. + actualSamples := end.Sub(start) / alignInterval + if actualSamples <= maxSamples { + return alignInterval + } + + // Adjust the query step based on the max steps spread over the query time range, + // rounding it to alignInterval. + step := end.Sub(start) / time.Duration(maxSamples) + step = ((step / alignInterval) + 1) * alignInterval + + return step +} + func generateSineWaveSeries(name string, t time.Time, numSeries int) []prompb.TimeSeries { out := make([]prompb.TimeSeries, 0, numSeries) value := generateSineWaveValue(t) @@ -42,3 +68,66 @@ func generateSineWaveValue(t time.Time) float64 { radians := 2 * math.Pi * float64(t.UnixNano()) / float64(period.Nanoseconds()) return math.Sin(radians) } + +// verifySineWaveSamplesSum assumes the input matrix is the result of a range query summing the values +// of expectedSeries sine wave series and checks whether the actual values match the expected ones. +// Returns error if values don't match. +func verifySineWaveSamplesSum(matrix model.Matrix, expectedSeries int, expectedStep time.Duration) error { + if len(matrix) != 1 { + return fmt.Errorf("expected 1 series in the result but got %d", len(matrix)) + } + + samples := matrix[0].Values + + for idx, sample := range samples { + ts := time.UnixMilli(int64(sample.Timestamp)).UTC() + + // Assert on value. + expectedValue := generateSineWaveValue(ts) + if !compareSampleValues(float64(sample.Value), expectedValue*float64(expectedSeries)) { + return fmt.Errorf("sample at timestamp %d (%s) has value %f while was expecting %f", sample.Timestamp, ts.String(), sample.Value, expectedValue) + } + + // Assert on sample timestamp. We expect no gaps. + if idx > 0 { + prevTs := time.UnixMilli(int64(samples[idx-1].Timestamp)).UTC() + expectedTs := prevTs.Add(expectedStep) + + if ts.UnixMilli() != expectedTs.UnixMilli() { + return fmt.Errorf("sample at timestamp %d (%s) was expected to have timestamp %d (%s) because previous sample had timestamp %d (%s)", + sample.Timestamp, ts.String(), expectedTs.UnixMilli(), expectedTs.String(), prevTs.UnixMilli(), prevTs.String()) + } + } + } + + return nil +} + +func compareSampleValues(actual, expected float64) bool { + delta := math.Abs((actual - expected) / maxComparisonDelta) + return delta < maxComparisonDelta +} + +func minTime(first, second time.Time) time.Time { + if first.After(second) { + return second + } + return first +} + +func maxTime(first, second time.Time) time.Time { + if first.After(second) { + return first + } + return second +} + +func randTime(min, max time.Time) time.Time { + delta := max.Unix() - min.Unix() + if delta <= 0 { + return min + } + + sec := rand.Int63n(delta) + min.Unix() + return time.Unix(sec, 0) +} diff --git a/pkg/continuoustest/util_test.go b/pkg/continuoustest/util_test.go index 2240b2014a5..402cea1c692 100644 --- a/pkg/continuoustest/util_test.go +++ b/pkg/continuoustest/util_test.go @@ -6,7 +6,9 @@ import ( "testing" "time" + "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestAlignTimestampToInterval(t *testing.T) { @@ -15,3 +17,131 @@ func TestAlignTimestampToInterval(t *testing.T) { assert.Equal(t, time.Unix(30, 0), alignTimestampToInterval(time.Unix(39, 0), 10*time.Second)) assert.Equal(t, time.Unix(40, 0), alignTimestampToInterval(time.Unix(40, 0), 10*time.Second)) } + +func TestGetQueryStep(t *testing.T) { + tests := map[string]struct { + start time.Time + end time.Time + writeInterval time.Duration + expectedStep time.Duration + }{ + "should return write interval if expected number of samples is < 1000": { + start: time.UnixMilli(0), + end: time.UnixMilli(3600 * 1000), + writeInterval: 10 * time.Second, + expectedStep: 10 * time.Second, + }, + "should align step to write interval and guarantee no more than 1000 samples": { + start: time.UnixMilli(0), + end: time.UnixMilli(86400 * 1000), + writeInterval: 10 * time.Second, + expectedStep: 90 * time.Second, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + actualStep := getQueryStep(testData.start, testData.end, testData.writeInterval) + assert.Equal(t, testData.expectedStep, actualStep) + }) + } +} + +func TestVerifySineWaveSamplesSums(t *testing.T) { + // Round to millis since that's the precision of Prometheus timestamps. + now := time.UnixMilli(time.Now().UnixMilli()).UTC() + + tests := map[string]struct { + samples []model.SamplePair + expectedSeries int + expectedStep time.Duration + expectedErr string + }{ + "should return no error if all samples value and timestamp match the expected one (1 series)": { + samples: []model.SamplePair{ + newSamplePair(now.Add(10*time.Second), generateSineWaveValue(now.Add(10*time.Second))), + newSamplePair(now.Add(20*time.Second), generateSineWaveValue(now.Add(20*time.Second))), + newSamplePair(now.Add(30*time.Second), generateSineWaveValue(now.Add(30*time.Second))), + }, + expectedSeries: 1, + expectedStep: 10 * time.Second, + expectedErr: "", + }, + "should return no error if all samples value and timestamp match the expected one (multiple series)": { + samples: []model.SamplePair{ + newSamplePair(now.Add(10*time.Second), 5*generateSineWaveValue(now.Add(10*time.Second))), + newSamplePair(now.Add(20*time.Second), 5*generateSineWaveValue(now.Add(20*time.Second))), + newSamplePair(now.Add(30*time.Second), 5*generateSineWaveValue(now.Add(30*time.Second))), + }, + expectedSeries: 5, + expectedStep: 10 * time.Second, + expectedErr: "", + }, + "should return error if there's a missing series": { + samples: []model.SamplePair{ + newSamplePair(now.Add(10*time.Second), 4*generateSineWaveValue(now.Add(10*time.Second))), + newSamplePair(now.Add(20*time.Second), 4*generateSineWaveValue(now.Add(20*time.Second))), + newSamplePair(now.Add(30*time.Second), 4*generateSineWaveValue(now.Add(30*time.Second))), + }, + expectedSeries: 5, + expectedStep: 10 * time.Second, + expectedErr: "sample at timestamp .* has value .* while was expecting .*", + }, + "should return error if there's a missing sample": { + samples: []model.SamplePair{ + newSamplePair(now.Add(10*time.Second), 5*generateSineWaveValue(now.Add(10*time.Second))), + newSamplePair(now.Add(30*time.Second), 5*generateSineWaveValue(now.Add(30*time.Second))), + }, + expectedSeries: 5, + expectedStep: 10 * time.Second, + expectedErr: "sample at timestamp .* was expected to have timestamp .*", + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + matrix := model.Matrix{{Values: testData.samples}} + actual := verifySineWaveSamplesSum(matrix, testData.expectedSeries, testData.expectedStep) + if testData.expectedErr == "" { + assert.NoError(t, actual) + } else { + assert.Error(t, actual) + assert.Regexp(t, testData.expectedErr, actual.Error()) + } + }) + } +} + +func TestMinTime(t *testing.T) { + first := time.Now() + second := first.Add(time.Second) + + assert.Equal(t, first, minTime(first, second)) + assert.Equal(t, first, minTime(second, first)) +} + +func TestMaxTime(t *testing.T) { + first := time.Now() + second := first.Add(time.Second) + + assert.Equal(t, second, maxTime(first, second)) + assert.Equal(t, second, maxTime(second, first)) +} + +func TestRandTime(t *testing.T) { + min := time.Unix(1000, 0) + max := time.Unix(10000, 0) + + for i := 0; i < 100; i++ { + actual := randTime(min, max) + require.GreaterOrEqual(t, actual.Unix(), min.Unix()) + require.LessOrEqual(t, actual.Unix(), max.Unix()) + } +} + +func newSamplePair(ts time.Time, value float64) model.SamplePair { + return model.SamplePair{ + Timestamp: model.Time(ts.UnixMilli()), + Value: model.SampleValue(value), + } +} diff --git a/pkg/continuoustest/write_read_series.go b/pkg/continuoustest/write_read_series.go index 1f45b8e459a..07201e39164 100644 --- a/pkg/continuoustest/write_read_series.go +++ b/pkg/continuoustest/write_read_series.go @@ -5,6 +5,7 @@ package continuoustest import ( "context" "flag" + "fmt" "strconv" "time" @@ -19,11 +20,13 @@ const ( ) type WriteReadSeriesTestConfig struct { - NumSeries int + NumSeries int + MaxQueryAge time.Duration } func (cfg *WriteReadSeriesTestConfig) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.NumSeries, "tests.write-read-series-test.num-series", 10000, "Number of series used for the test.") + f.DurationVar(&cfg.MaxQueryAge, "tests.write-read-series-test.max-query-age", 7*24*time.Hour, "How back in the past metrics can be queried at most.") } type WriteReadSeriesTest struct { @@ -81,8 +84,6 @@ func (t *WriteReadSeriesTest) Run(ctx context.Context, now time.Time) { // assert on query results due to possible gaps. if statusCode/100 == 4 { t.lastWrittenTimestamp = timestamp - - // TODO The following reset is related to the read path (not implemented yet), but was added to ensure we don't forget about it. t.queryMinTime = time.Time{} t.queryMaxTime = time.Time{} continue @@ -96,9 +97,93 @@ func (t *WriteReadSeriesTest) Run(ctx context.Context, now time.Time) { // The write request succeeded. t.lastWrittenTimestamp = timestamp + t.queryMaxTime = timestamp + if t.queryMinTime.IsZero() { + t.queryMinTime = timestamp + } + } + + for _, timeRange := range t.getRangeQueryTimeRanges(now) { + t.runRangeQueryAndVerifyResult(ctx, timeRange[0], timeRange[1]) + } +} + +// getRangeQueryTimeRanges returns the start/end time ranges to use to run test range queries. +func (t *WriteReadSeriesTest) getRangeQueryTimeRanges(now time.Time) (ranges [][2]time.Time) { + // The min and max allowed query timestamps are zero if there's no successfully written data yet. + if t.queryMinTime.IsZero() || t.queryMaxTime.IsZero() { + level.Info(t.logger).Log("msg", "Skipped range queries because there's no valid time range to query") + return nil + } + + // Honor the configured max age. + adjustedQueryMinTime := maxTime(t.queryMinTime, now.Add(-t.cfg.MaxQueryAge)) + if t.queryMaxTime.Before(adjustedQueryMinTime) { + level.Info(t.logger).Log("msg", "Skipped range queries because there's no valid time range to query after honoring configured max query age", "min_valid_time", t.queryMinTime, "max_valid_time", t.queryMaxTime, "max_query_age", t.cfg.MaxQueryAge) + return + } + + // Last 1h. + if t.queryMaxTime.After(now.Add(-1 * time.Hour)) { + ranges = append(ranges, [2]time.Time{ + maxTime(adjustedQueryMinTime, now.Add(-1*time.Hour)), + minTime(t.queryMaxTime, now), + }) + } + + // Last 24h (only if the actual time range is not already covered by "Last 1h"). + if t.queryMaxTime.After(now.Add(-24*time.Hour)) && adjustedQueryMinTime.Before(now.Add(-1*time.Hour)) { + ranges = append(ranges, [2]time.Time{ + maxTime(adjustedQueryMinTime, now.Add(-24*time.Hour)), + minTime(t.queryMaxTime, now), + }) } - // TODO Here we should query the written data and assert on correctness. + // From last 23h to last 24h. + if adjustedQueryMinTime.Before(now.Add(-23*time.Hour)) && t.queryMaxTime.After(now.Add(-23*time.Hour)) { + ranges = append(ranges, [2]time.Time{ + maxTime(adjustedQueryMinTime, now.Add(-24*time.Hour)), + minTime(t.queryMaxTime, now.Add(-23*time.Hour)), + }) + } + + // A random time range. + randMinTime := randTime(adjustedQueryMinTime, t.queryMaxTime) + ranges = append(ranges, [2]time.Time{randMinTime, randTime(randMinTime, t.queryMaxTime)}) + + return ranges +} + +func (t *WriteReadSeriesTest) runRangeQueryAndVerifyResult(ctx context.Context, start, end time.Time) { + // We align start, end and step to write interval in order to avoid any false positives + // when checking results correctness. The min/max query time is always aligned. + start = maxTime(t.queryMinTime, alignTimestampToInterval(start, writeInterval)) + end = minTime(t.queryMaxTime, alignTimestampToInterval(end, writeInterval)) + if end.Before(start) { + return + } + + step := getQueryStep(start, end, writeInterval) + query := fmt.Sprintf("sum(%s)", metricName) + + logger := log.With(t.logger, "query", query, "start", start.UnixMilli(), "end", end.UnixMilli(), "step", step) + level.Debug(logger).Log("msg", "Running range query") + + t.metrics.queriesTotal.Inc() + matrix, err := t.client.QueryRange(ctx, query, start, end, step) + if err != nil { + t.metrics.queriesFailedTotal.Inc() + level.Warn(logger).Log("msg", "Failed to execute range query", "err", err) + return + } + + t.metrics.queryResultChecksTotal.Inc() + err = verifySineWaveSamplesSum(matrix, t.cfg.NumSeries, step) + if err != nil { + t.metrics.queryResultChecksFailedTotal.Inc() + level.Warn(logger).Log("msg", "Range query result check failed", "err", err) + return + } } func (t *WriteReadSeriesTest) nextWriteTimestamp(now time.Time) time.Time { diff --git a/pkg/continuoustest/write_read_series_test.go b/pkg/continuoustest/write_read_series_test.go index 8851b1df913..50f4bff0edd 100644 --- a/pkg/continuoustest/write_read_series_test.go +++ b/pkg/continuoustest/write_read_series_test.go @@ -10,21 +10,25 @@ import ( "time" "github.com/go-kit/log" + "github.com/grafana/dskit/flagext" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" ) func TestWriteReadSeriesTest_Run(t *testing.T) { logger := log.NewNopLogger() - cfg := WriteReadSeriesTestConfig{ - NumSeries: 2, - } + cfg := WriteReadSeriesTestConfig{} + flagext.DefaultValues(&cfg) + cfg.NumSeries = 2 t.Run("should write series with current timestamp if it's already aligned to write interval", func(t *testing.T) { client := &ClientMock{} client.On("WriteSeries", mock.Anything, mock.Anything).Return(200, nil) + client.On("QueryRange", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(model.Matrix{}, nil) reg := prometheus.NewPedanticRegistry() test := NewWriteReadSeriesTest(cfg, client, logger, reg) @@ -36,16 +40,30 @@ func TestWriteReadSeriesTest_Run(t *testing.T) { client.AssertCalled(t, "WriteSeries", mock.Anything, generateSineWaveSeries(metricName, now, 2)) assert.Equal(t, int64(1000), test.lastWrittenTimestamp.Unix()) + client.AssertNumberOfCalls(t, "QueryRange", 2) + client.AssertCalled(t, "QueryRange", mock.Anything, "sum(mimir_continuous_test_sine_wave)", time.Unix(1000, 0), time.Unix(1000, 0), writeInterval) + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` # HELP mimir_continuous_test_writes_total Total number of attempted write requests. # TYPE mimir_continuous_test_writes_total counter mimir_continuous_test_writes_total{test="write-read-series"} 1 - `))) + + # HELP mimir_continuous_test_queries_total Total number of attempted query requests. + # TYPE mimir_continuous_test_queries_total counter + mimir_continuous_test_queries_total{test="write-read-series"} 2 + + # HELP mimir_continuous_test_queries_failed_total Total number of failed query requests. + # TYPE mimir_continuous_test_queries_failed_total counter + mimir_continuous_test_queries_failed_total{test="write-read-series"} 0 + `), + "mimir_continuous_test_writes_total", "mimir_continuous_test_writes_failed_total", + "mimir_continuous_test_queries_total", "mimir_continuous_test_queries_failed_total")) }) t.Run("should write series with timestamp aligned to write interval", func(t *testing.T) { client := &ClientMock{} client.On("WriteSeries", mock.Anything, mock.Anything).Return(200, nil) + client.On("QueryRange", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(model.Matrix{}, nil) reg := prometheus.NewPedanticRegistry() test := NewWriteReadSeriesTest(cfg, client, logger, reg) @@ -57,16 +75,30 @@ func TestWriteReadSeriesTest_Run(t *testing.T) { client.AssertCalled(t, "WriteSeries", mock.Anything, generateSineWaveSeries(metricName, time.Unix(980, 0), 2)) assert.Equal(t, int64(980), test.lastWrittenTimestamp.Unix()) + client.AssertNumberOfCalls(t, "QueryRange", 2) + client.AssertCalled(t, "QueryRange", mock.Anything, "sum(mimir_continuous_test_sine_wave)", time.Unix(980, 0), time.Unix(980, 0), writeInterval) + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` # HELP mimir_continuous_test_writes_total Total number of attempted write requests. # TYPE mimir_continuous_test_writes_total counter mimir_continuous_test_writes_total{test="write-read-series"} 1 - `))) + + # HELP mimir_continuous_test_queries_total Total number of attempted query requests. + # TYPE mimir_continuous_test_queries_total counter + mimir_continuous_test_queries_total{test="write-read-series"} 2 + + # HELP mimir_continuous_test_queries_failed_total Total number of failed query requests. + # TYPE mimir_continuous_test_queries_failed_total counter + mimir_continuous_test_queries_failed_total{test="write-read-series"} 0 + `), + "mimir_continuous_test_writes_total", "mimir_continuous_test_writes_failed_total", + "mimir_continuous_test_queries_total", "mimir_continuous_test_queries_failed_total")) }) t.Run("should write series from last written timestamp until now", func(t *testing.T) { client := &ClientMock{} client.On("WriteSeries", mock.Anything, mock.Anything).Return(200, nil) + client.On("QueryRange", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(model.Matrix{}, nil) reg := prometheus.NewPedanticRegistry() test := NewWriteReadSeriesTest(cfg, client, logger, reg) @@ -81,11 +113,24 @@ func TestWriteReadSeriesTest_Run(t *testing.T) { client.AssertCalled(t, "WriteSeries", mock.Anything, generateSineWaveSeries(metricName, time.Unix(1000, 0), 2)) assert.Equal(t, int64(1000), test.lastWrittenTimestamp.Unix()) + client.AssertNumberOfCalls(t, "QueryRange", 2) + client.AssertCalled(t, "QueryRange", mock.Anything, "sum(mimir_continuous_test_sine_wave)", time.Unix(960, 0), time.Unix(1000, 0), writeInterval) + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` # HELP mimir_continuous_test_writes_total Total number of attempted write requests. # TYPE mimir_continuous_test_writes_total counter mimir_continuous_test_writes_total{test="write-read-series"} 3 - `))) + + # HELP mimir_continuous_test_queries_total Total number of attempted query requests. + # TYPE mimir_continuous_test_queries_total counter + mimir_continuous_test_queries_total{test="write-read-series"} 2 + + # HELP mimir_continuous_test_queries_failed_total Total number of failed query requests. + # TYPE mimir_continuous_test_queries_failed_total counter + mimir_continuous_test_queries_failed_total{test="write-read-series"} 0 + `), + "mimir_continuous_test_writes_total", "mimir_continuous_test_writes_failed_total", + "mimir_continuous_test_queries_total", "mimir_continuous_test_queries_failed_total")) }) t.Run("should stop remote writing on network error", func(t *testing.T) { @@ -111,7 +156,11 @@ func TestWriteReadSeriesTest_Run(t *testing.T) { # HELP mimir_continuous_test_writes_failed_total Total number of failed write requests. # TYPE mimir_continuous_test_writes_failed_total counter mimir_continuous_test_writes_failed_total{status_code="0",test="write-read-series"} 1 - `))) + + # HELP mimir_continuous_test_queries_total Total number of attempted query requests. + # TYPE mimir_continuous_test_queries_total counter + mimir_continuous_test_queries_total{test="write-read-series"} 0 + `), "mimir_continuous_test_writes_total", "mimir_continuous_test_writes_failed_total", "mimir_continuous_test_queries_total")) }) t.Run("should stop remote writing on 5xx error", func(t *testing.T) { @@ -137,7 +186,11 @@ func TestWriteReadSeriesTest_Run(t *testing.T) { # HELP mimir_continuous_test_writes_failed_total Total number of failed write requests. # TYPE mimir_continuous_test_writes_failed_total counter mimir_continuous_test_writes_failed_total{status_code="500",test="write-read-series"} 1 - `))) + + # HELP mimir_continuous_test_queries_total Total number of attempted query requests. + # TYPE mimir_continuous_test_queries_total counter + mimir_continuous_test_queries_total{test="write-read-series"} 0 + `), "mimir_continuous_test_writes_total", "mimir_continuous_test_writes_failed_total", "mimir_continuous_test_queries_total")) }) t.Run("should keep remote writing next intervals on 4xx error", func(t *testing.T) { @@ -165,6 +218,211 @@ func TestWriteReadSeriesTest_Run(t *testing.T) { # HELP mimir_continuous_test_writes_failed_total Total number of failed write requests. # TYPE mimir_continuous_test_writes_failed_total counter mimir_continuous_test_writes_failed_total{status_code="400",test="write-read-series"} 3 - `))) + + # HELP mimir_continuous_test_queries_total Total number of attempted query requests. + # TYPE mimir_continuous_test_queries_total counter + mimir_continuous_test_queries_total{test="write-read-series"} 0 + `), "mimir_continuous_test_writes_total", "mimir_continuous_test_writes_failed_total", "mimir_continuous_test_queries_total")) + }) + + t.Run("should query written series, compare results and track no failure if results match", func(t *testing.T) { + now := time.Unix(1000, 0) + + client := &ClientMock{} + client.On("WriteSeries", mock.Anything, mock.Anything).Return(200, nil) + client.On("QueryRange", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(model.Matrix{ + {Values: []model.SamplePair{newSamplePair(now, generateSineWaveValue(now)*float64(cfg.NumSeries))}}, + }, nil) + + reg := prometheus.NewPedanticRegistry() + test := NewWriteReadSeriesTest(cfg, client, logger, reg) + + test.Run(context.Background(), now) + + client.AssertNumberOfCalls(t, "WriteSeries", 1) + client.AssertCalled(t, "WriteSeries", mock.Anything, generateSineWaveSeries(metricName, now, 2)) + assert.Equal(t, int64(1000), test.lastWrittenTimestamp.Unix()) + + client.AssertNumberOfCalls(t, "QueryRange", 2) + client.AssertCalled(t, "QueryRange", mock.Anything, "sum(mimir_continuous_test_sine_wave)", time.Unix(1000, 0), time.Unix(1000, 0), writeInterval) + + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP mimir_continuous_test_writes_total Total number of attempted write requests. + # TYPE mimir_continuous_test_writes_total counter + mimir_continuous_test_writes_total{test="write-read-series"} 1 + + # HELP mimir_continuous_test_queries_total Total number of attempted query requests. + # TYPE mimir_continuous_test_queries_total counter + mimir_continuous_test_queries_total{test="write-read-series"} 2 + + # HELP mimir_continuous_test_queries_failed_total Total number of failed query requests. + # TYPE mimir_continuous_test_queries_failed_total counter + mimir_continuous_test_queries_failed_total{test="write-read-series"} 0 + + # HELP mimir_continuous_test_query_result_checks_total Total number of query results checked for correctness. + # TYPE mimir_continuous_test_query_result_checks_total counter + mimir_continuous_test_query_result_checks_total{test="write-read-series"} 2 + + # HELP mimir_continuous_test_query_result_checks_failed_total Total number of query results failed when checking for correctness. + # TYPE mimir_continuous_test_query_result_checks_failed_total counter + mimir_continuous_test_query_result_checks_failed_total{test="write-read-series"} 0 + `), + "mimir_continuous_test_writes_total", "mimir_continuous_test_writes_failed_total", + "mimir_continuous_test_queries_total", "mimir_continuous_test_queries_failed_total", + "mimir_continuous_test_query_result_checks_total", "mimir_continuous_test_query_result_checks_failed_total")) + }) + + t.Run("should query written series, compare results and track failure if results don't match", func(t *testing.T) { + now := time.Unix(1000, 0) + + client := &ClientMock{} + client.On("WriteSeries", mock.Anything, mock.Anything).Return(200, nil) + client.On("QueryRange", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(model.Matrix{ + {Values: []model.SamplePair{{Timestamp: model.Time(now.UnixMilli()), Value: 12345}}}, + }, nil) + + reg := prometheus.NewPedanticRegistry() + test := NewWriteReadSeriesTest(cfg, client, logger, reg) + + test.Run(context.Background(), now) + + client.AssertNumberOfCalls(t, "WriteSeries", 1) + client.AssertCalled(t, "WriteSeries", mock.Anything, generateSineWaveSeries(metricName, now, 2)) + assert.Equal(t, int64(1000), test.lastWrittenTimestamp.Unix()) + + client.AssertNumberOfCalls(t, "QueryRange", 2) + client.AssertCalled(t, "QueryRange", mock.Anything, "sum(mimir_continuous_test_sine_wave)", time.Unix(1000, 0), time.Unix(1000, 0), writeInterval) + + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP mimir_continuous_test_writes_total Total number of attempted write requests. + # TYPE mimir_continuous_test_writes_total counter + mimir_continuous_test_writes_total{test="write-read-series"} 1 + + # HELP mimir_continuous_test_queries_total Total number of attempted query requests. + # TYPE mimir_continuous_test_queries_total counter + mimir_continuous_test_queries_total{test="write-read-series"} 2 + + # HELP mimir_continuous_test_queries_failed_total Total number of failed query requests. + # TYPE mimir_continuous_test_queries_failed_total counter + mimir_continuous_test_queries_failed_total{test="write-read-series"} 0 + + # HELP mimir_continuous_test_query_result_checks_total Total number of query results checked for correctness. + # TYPE mimir_continuous_test_query_result_checks_total counter + mimir_continuous_test_query_result_checks_total{test="write-read-series"} 2 + + # HELP mimir_continuous_test_query_result_checks_failed_total Total number of query results failed when checking for correctness. + # TYPE mimir_continuous_test_query_result_checks_failed_total counter + mimir_continuous_test_query_result_checks_failed_total{test="write-read-series"} 2 + `), + "mimir_continuous_test_writes_total", "mimir_continuous_test_writes_failed_total", + "mimir_continuous_test_queries_total", "mimir_continuous_test_queries_failed_total", + "mimir_continuous_test_query_result_checks_total", "mimir_continuous_test_query_result_checks_failed_total")) + }) +} + +func TestWriteReadSeriesTest_getRangeQueryTimeRanges(t *testing.T) { + cfg := WriteReadSeriesTestConfig{} + flagext.DefaultValues(&cfg) + cfg.MaxQueryAge = 2 * 24 * time.Hour + + now := time.Unix(int64((10*24*time.Hour)+(2*time.Second)), 0) + + t.Run("min/max query time has not been set yet", func(t *testing.T) { + test := NewWriteReadSeriesTest(cfg, &ClientMock{}, log.NewNopLogger(), nil) + + assert.Empty(t, test.getRangeQueryTimeRanges(now)) + }) + + t.Run("min/max query time is older than max age", func(t *testing.T) { + test := NewWriteReadSeriesTest(cfg, &ClientMock{}, log.NewNopLogger(), nil) + test.queryMinTime = now.Add(-cfg.MaxQueryAge).Add(-time.Minute) + test.queryMaxTime = now.Add(-cfg.MaxQueryAge).Add(-time.Minute) + + assert.Empty(t, test.getRangeQueryTimeRanges(now)) + }) + + t.Run("min query time = max query time", func(t *testing.T) { + test := NewWriteReadSeriesTest(cfg, &ClientMock{}, log.NewNopLogger(), nil) + test.queryMinTime = now.Add(-time.Minute) + test.queryMaxTime = now.Add(-time.Minute) + + actual := test.getRangeQueryTimeRanges(now) + require.Len(t, actual, 2) + require.Equal(t, [2]time.Time{now.Add(-time.Minute), now.Add(-time.Minute)}, actual[0]) // Last 1h. + require.Equal(t, [2]time.Time{now.Add(-time.Minute), now.Add(-time.Minute)}, actual[1]) // Random time range. + }) + + t.Run("min and max query time are within the last 1h", func(t *testing.T) { + test := NewWriteReadSeriesTest(cfg, &ClientMock{}, log.NewNopLogger(), nil) + test.queryMinTime = now.Add(-30 * time.Minute) + test.queryMaxTime = now.Add(-time.Minute) + + actual := test.getRangeQueryTimeRanges(now) + require.Len(t, actual, 2) + require.Equal(t, [2]time.Time{now.Add(-30 * time.Minute), now.Add(-time.Minute)}, actual[0]) // Last 1h. + + // Random time range. + require.GreaterOrEqual(t, actual[len(actual)-1][0].Unix(), test.queryMinTime.Unix()) + require.LessOrEqual(t, actual[len(actual)-1][1].Unix(), test.queryMaxTime.Unix()) + }) + + t.Run("min and max query time are within the last 2h", func(t *testing.T) { + test := NewWriteReadSeriesTest(cfg, &ClientMock{}, log.NewNopLogger(), nil) + test.queryMinTime = now.Add(-90 * time.Minute) + test.queryMaxTime = now.Add(-80 * time.Minute) + + actual := test.getRangeQueryTimeRanges(now) + require.Len(t, actual, 2) + require.Equal(t, [2]time.Time{now.Add(-90 * time.Minute), now.Add(-80 * time.Minute)}, actual[0]) // Last 24h. + + // Random time range. + require.GreaterOrEqual(t, actual[len(actual)-1][0].Unix(), test.queryMinTime.Unix()) + require.LessOrEqual(t, actual[len(actual)-1][1].Unix(), test.queryMaxTime.Unix()) + }) + + t.Run("min query time is older than 24h", func(t *testing.T) { + test := NewWriteReadSeriesTest(cfg, &ClientMock{}, log.NewNopLogger(), nil) + test.queryMinTime = now.Add(-30 * time.Hour) + test.queryMaxTime = now.Add(-time.Minute) + + actual := test.getRangeQueryTimeRanges(now) + require.Len(t, actual, 4) + require.Equal(t, [2]time.Time{now.Add(-time.Hour), now.Add(-time.Minute)}, actual[0]) // Last 1h. + require.Equal(t, [2]time.Time{now.Add(-24 * time.Hour), now.Add(-time.Minute)}, actual[1]) // Last 24h. + require.Equal(t, [2]time.Time{now.Add(-24 * time.Hour), now.Add(-23 * time.Hour)}, actual[2]) // From last 23h to last 24h. + + // Random time range. + require.GreaterOrEqual(t, actual[len(actual)-1][0].Unix(), test.queryMinTime.Unix()) + require.LessOrEqual(t, actual[len(actual)-1][1].Unix(), test.queryMaxTime.Unix()) + }) + + t.Run("max query time is older than 24h but more recent than max query age", func(t *testing.T) { + test := NewWriteReadSeriesTest(cfg, &ClientMock{}, log.NewNopLogger(), nil) + test.queryMinTime = now.Add(-30 * time.Hour) + test.queryMaxTime = now.Add(-25 * time.Hour) + + actual := test.getRangeQueryTimeRanges(now) + require.Len(t, actual, 1) + + // Random time range. + require.GreaterOrEqual(t, actual[len(actual)-1][0].Unix(), test.queryMinTime.Unix()) + require.LessOrEqual(t, actual[len(actual)-1][1].Unix(), test.queryMaxTime.Unix()) + }) + + t.Run("min query time is older than 24h but max query age is only 10m", func(t *testing.T) { + cfg := cfg + cfg.MaxQueryAge = 10 * time.Minute + + test := NewWriteReadSeriesTest(cfg, &ClientMock{}, log.NewNopLogger(), nil) + test.queryMinTime = now.Add(-30 * time.Hour) + test.queryMaxTime = now.Add(-time.Minute) + + actual := test.getRangeQueryTimeRanges(now) + require.Len(t, actual, 2) + require.Equal(t, [2]time.Time{now.Add(-10 * time.Minute), now.Add(-time.Minute)}, actual[0]) // Last 1h. + + // Random time range. + require.GreaterOrEqual(t, actual[len(actual)-1][0].Unix(), test.queryMinTime.Unix()) + require.LessOrEqual(t, actual[len(actual)-1][1].Unix(), test.queryMaxTime.Unix()) }) }