Skip to content

Commit

Permalink
mimir-continuous-test tool: write series to remote endpoint (#1540)
Browse files Browse the repository at this point in the history
* mimir-continuous-test tool: write series to remote endpoint

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Addressed review feedback

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Fixed sendWriteRequest() context usage

Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci committed Mar 24, 2022
1 parent bb52a4f commit e8982fb
Show file tree
Hide file tree
Showing 9 changed files with 719 additions and 2 deletions.
25 changes: 23 additions & 2 deletions cmd/mimir-continuous-test/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package main

import (
"context"
"flag"
"os"

Expand All @@ -12,18 +13,23 @@ import (
"github.com/weaveworks/common/logging"
"github.com/weaveworks/common/server"

"github.com/grafana/mimir/pkg/continuoustest"
"github.com/grafana/mimir/pkg/util/instrumentation"
util_log "github.com/grafana/mimir/pkg/util/log"
)

type Config struct {
ServerMetricsPort int
LogLevel logging.Level
ServerMetricsPort int
LogLevel logging.Level
Client continuoustest.ClientConfig
WriteReadSeriesTest continuoustest.WriteReadSeriesTestConfig
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.ServerMetricsPort, "server.metrics-port", 9900, "The port where metrics are exposed.")
cfg.LogLevel.RegisterFlags(f)
cfg.Client.RegisterFlags(f)
cfg.WriteReadSeriesTest.RegisterFlags(f)
}

func main() {
Expand All @@ -46,4 +52,19 @@ func main() {
level.Error(logger).Log("msg", "Unable to start instrumentation server", "err", err.Error())
os.Exit(1)
}

// Init the client used to write/read to/from Mimir.
client, err := continuoustest.NewClient(cfg.Client, logger)
if err != nil {
level.Error(logger).Log("msg", "Failed to initialize client", "err", err.Error())
os.Exit(1)
}

// Run continuous testing.
m := continuoustest.NewManager()
m.AddTest(continuoustest.NewWriteReadSeriesTest(cfg.WriteReadSeriesTest, client, logger, registry))
if err := m.Run(context.Background()); err != nil {
level.Error(logger).Log("msg", "Failed to run continuous test", "err", err.Error())
os.Exit(1)
}
}
141 changes: 141 additions & 0 deletions pkg/continuoustest/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// SPDX-License-Identifier: AGPL-3.0-only

package continuoustest

import (
"bytes"
"context"
"flag"
"fmt"
"io"
"net/http"
"time"

"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/grafana/dskit/flagext"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/prompb"

util_math "github.com/grafana/mimir/pkg/util/math"
)

const (
maxErrMsgLen = 256
)

// MimirClient is the interface implemented by a client used to interact with Mimir.
type MimirClient interface {
// WriteSeries writes input series to Mimir. Returns the response status code and optionally
// an error. The error is always returned if request was not successful (eg. received a 4xx or 5xx error).
WriteSeries(ctx context.Context, series []prompb.TimeSeries) (statusCode int, err error)
}

type ClientConfig struct {
TenantID string

WriteBaseEndpoint flagext.URLValue
WriteBatchSize int
WriteTimeout time.Duration
}

func (cfg *ClientConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.TenantID, "tests.tenant-id", "anonymous", "The tenant ID to use to write and read metrics in tests.")

f.Var(&cfg.WriteBaseEndpoint, "tests.write-endpoint", "The base endpoint on the write path. The URL should have no trailing slash. The specific API path is appended by the tool to the URL, for example /api/v1/push for the remote write API endpoint, so the configured URL must not include it.")
f.IntVar(&cfg.WriteBatchSize, "tests.write-batch-size", 1000, "The maximum number of series to write in a single request.")
f.DurationVar(&cfg.WriteTimeout, "tests.write-timeout", 5*time.Second, "The timeout for a single write request.")
}

type Client struct {
client *http.Client
cfg ClientConfig
logger log.Logger
}

func NewClient(cfg ClientConfig, logger log.Logger) (*Client, error) {
rt := http.DefaultTransport
rt = &clientRoundTripper{tenantID: cfg.TenantID, rt: rt}

// Ensure the required config has been set.
if cfg.WriteBaseEndpoint.URL == nil {
return nil, errors.New("the write endpoint has not been set")
}

return &Client{
client: &http.Client{Transport: rt},
cfg: cfg,
logger: logger,
}, nil
}

// WriteSeries implements MimirClient.
func (c *Client) WriteSeries(ctx context.Context, series []prompb.TimeSeries) (int, error) {
lastStatusCode := 0

// Honor the batch size.
for len(series) > 0 {
end := util_math.Min(len(series), c.cfg.WriteBatchSize)
batch := series[0:end]
series = series[end:]

var err error
lastStatusCode, err = c.sendWriteRequest(ctx, &prompb.WriteRequest{Timeseries: batch})
if err != nil {
return lastStatusCode, err
}
}

return lastStatusCode, nil
}

func (c *Client) sendWriteRequest(ctx context.Context, req *prompb.WriteRequest) (int, error) {
data, err := proto.Marshal(req)
if err != nil {
return 0, err
}

ctx, cancel := context.WithTimeout(ctx, c.cfg.WriteTimeout)
defer cancel()

compressed := snappy.Encode(nil, data)
httpReq, err := http.NewRequestWithContext(ctx, "POST", c.cfg.WriteBaseEndpoint.String()+"/api/v1/push", bytes.NewReader(compressed))
if err != nil {
// Errors from NewRequest are from unparseable URLs, so are not
// recoverable.
return 0, err
}
httpReq.Header.Add("Content-Encoding", "snappy")
httpReq.Header.Set("Content-Type", "application/x-protobuf")
httpReq.Header.Set("User-Agent", "mimir-continuous-test")
httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")

httpResp, err := c.client.Do(httpReq)
if err != nil {
return 0, err
}
defer httpResp.Body.Close()

if httpResp.StatusCode/100 != 2 {
truncatedBody, err := io.ReadAll(io.LimitReader(httpResp.Body, maxErrMsgLen))
if err != nil {
return httpResp.StatusCode, errors.Wrapf(err, "server returned HTTP status %s and client failed to read response body", httpResp.Status)
}

return httpResp.StatusCode, fmt.Errorf("server returned HTTP status %s and body %q (truncated to %d bytes)", httpResp.Status, string(truncatedBody), maxErrMsgLen)
}

return httpResp.StatusCode, nil
}

type clientRoundTripper struct {
tenantID string
rt http.RoundTripper
}

// RoundTrip add the tenant ID header required by Mimir.
func (rt *clientRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
req.Header.Set("X-Scope-OrgID", rt.tenantID)
return rt.rt.RoundTrip(req)
}
115 changes: 115 additions & 0 deletions pkg/continuoustest/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// SPDX-License-Identifier: AGPL-3.0-only

package continuoustest

import (
"context"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/grafana/dskit/flagext"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestClient_WriteSeries(t *testing.T) {
var (
nextStatusCode = http.StatusOK
receivedRequests []prompb.WriteRequest
)

server := httptest.NewServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
// Read the entire body.
body, err := ioutil.ReadAll(request.Body)
require.NoError(t, err)
require.NoError(t, request.Body.Close())

// Decode and unmarshal it.
body, err = snappy.Decode(nil, body)
require.NoError(t, err)

var req prompb.WriteRequest
require.NoError(t, proto.Unmarshal(body, &req))
receivedRequests = append(receivedRequests, req)

writer.WriteHeader(nextStatusCode)
}))
t.Cleanup(server.Close)

cfg := ClientConfig{}
flagext.DefaultValues(&cfg)
cfg.WriteBatchSize = 10
require.NoError(t, cfg.WriteBaseEndpoint.Set(server.URL))

c, err := NewClient(cfg, log.NewNopLogger())
require.NoError(t, err)

ctx := context.Background()
now := time.Now()

t.Run("write series in a single batch", func(t *testing.T) {
receivedRequests = nil
nextStatusCode = http.StatusOK

series := generateSineWaveSeries("test", now, 10)
statusCode, err := c.WriteSeries(ctx, series)
require.NoError(t, err)
assert.Equal(t, 200, statusCode)

require.Len(t, receivedRequests, 1)
assert.Equal(t, series, receivedRequests[0].Timeseries)
})

t.Run("write series in multiple batches", func(t *testing.T) {
receivedRequests = nil
nextStatusCode = http.StatusOK

series := generateSineWaveSeries("test", now, 22)
statusCode, err := c.WriteSeries(ctx, series)
require.NoError(t, err)
assert.Equal(t, 200, statusCode)

require.Len(t, receivedRequests, 3)
assert.Equal(t, series[0:10], receivedRequests[0].Timeseries)
assert.Equal(t, series[10:20], receivedRequests[1].Timeseries)
assert.Equal(t, series[20:22], receivedRequests[2].Timeseries)
})

t.Run("request failed with 4xx error", func(t *testing.T) {
receivedRequests = nil
nextStatusCode = http.StatusBadRequest

series := generateSineWaveSeries("test", now, 1)
statusCode, err := c.WriteSeries(ctx, series)
require.Error(t, err)
assert.Equal(t, 400, statusCode)
})

t.Run("request failed with 5xx error", func(t *testing.T) {
receivedRequests = nil
nextStatusCode = http.StatusInternalServerError

series := generateSineWaveSeries("test", now, 1)
statusCode, err := c.WriteSeries(ctx, series)
require.Error(t, err)
assert.Equal(t, 500, statusCode)
})
}

// ClientMock mocks MimirClient.
type ClientMock struct {
mock.Mock
}

func (m *ClientMock) WriteSeries(ctx context.Context, series []prompb.TimeSeries) (int, error) {
args := m.Called(ctx, series)
return args.Int(0), args.Error(1)
}
69 changes: 69 additions & 0 deletions pkg/continuoustest/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// SPDX-License-Identifier: AGPL-3.0-only

package continuoustest

import (
"context"
"sync"
"time"
)

type Test interface {
// Name returns the test name. The name is used to uniquely identify the test in metrics and logs.
Name() string

// Init initializes the test. If the initialization fails, the testing tool will terminate.
Init() error

// Run runs a single test cycle. This function is called multiple times, at periodic intervals.
Run(ctx context.Context, now time.Time)
}

type Manager struct {
tests []Test
}

func NewManager() *Manager {
return &Manager{}
}

func (m *Manager) AddTest(t Test) {
m.tests = append(m.tests, t)
}

func (m *Manager) Run(ctx context.Context) error {
// Initialize all tests.
for _, t := range m.tests {
if err := t.Init(); err != nil {
return err
}
}

// Continuously run all tests. Each test is executed in a dedicated goroutine.
wg := sync.WaitGroup{}
wg.Add(len(m.tests))

for _, test := range m.tests {
go func(t Test) {
defer wg.Done()

// Run it immediately, and then every configured period.
t.Run(ctx, time.Now())

// TODO We may consider to allow to configure the test interval.
ticker := time.NewTicker(time.Minute)

for {
select {
case <-ticker.C:
t.Run(ctx, time.Now())
case <-ctx.Done():
return
}
}
}(test)
}

wg.Wait()
return nil
}
Loading

0 comments on commit e8982fb

Please sign in to comment.