Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add gRPC support to query-tee #2683

Merged
merged 21 commits into from
Aug 18, 2022
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@

* [BUGFIX] Version checking no longer prompts for updating when already on latest version. #2723

### Query-tee

* [CHANGE] Renamed CLI flag `-server.service-port` to `-server.http-service-port`. #2683
* [CHANGE] Renamed metric `cortex_querytee_request_duration_seconds` to `cortex_querytee_backend_request_duration_seconds`. Metric `cortex_querytee_request_duration_seconds` is now reported without label `backend`. #2683
* [ENHANCEMENT] Added HTTP over gRPC support to `query-tee` to allow testing gRPC requests to Mimir instances. #2683

### Mimir Continuous Test

### Documentation
Expand Down
2 changes: 1 addition & 1 deletion cmd/query-tee/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func main() {
// Parse CLI flags.
cfg := Config{}
flag.IntVar(&cfg.ServerMetricsPort, "server.metrics-port", 9900, "The port where metrics are exposed.")
flag.StringVar(&cfg.PathPrefix, "server.path-prefix", "", "Prefix for API paths (query-tee will accept Prometheus API calls at <prefix>/api/v1/...)")
flag.StringVar(&cfg.PathPrefix, "server.path-prefix", "", "Path prefix for API paths (query-tee will accept Prometheus API calls at <prefix>/api/v1/...). Example: -server.path-prefix=/prometheus")
cfg.LogLevel.RegisterFlags(flag.CommandLine)
cfg.ProxyConfig.RegisterFlags(flag.CommandLine)
flag.Parse()
Expand Down
40 changes: 33 additions & 7 deletions docs/sources/operators-guide/tools/query-tee.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,26 @@ chmod +x query-tee
## Configure the query-tee

The query-tee requires the endpoints of the backend Grafana Mimir clusters.
You can configure the backend endpoints by setting the `-backend.endpoints` flag to a comma-separated list of HTTP or HTTPS URLs.
You can configure the backend endpoints by setting the `-backend.endpoints` flag to a comma-separated list endpoints:

- HTTP endpoints: via HTTP or HTTPS URLs. Example: `http://query-frontend:80`.
- gRPC endpoints: via gRPC URI scheme. Example: `dns:///query-frontend:9095`.
ssncferreira marked this conversation as resolved.
Show resolved Hide resolved

For each incoming request, the query-tee clones the request and sends it to each configured backend.

> **Note:** You can configure the query-tee proxy listening port via the `-server.service-port` flag.
> **Note:** You can configure the query-tee proxy listening ports via the `-server.http-service-port` flag for the HTTP port and `server.grpc-service-port` flag for the gRPC port.

## How the query-tee works

This section describes how the query-tee tool works.

### API endpoints

Query-tee accepts two types of requests:

1. HTTP requests on the configured `-server.http-service-port` flag (default port 80)
1. [HTTP over gRPC](https://github.com/weaveworks/common/tree/master/httpgrpc) requests on the configured `-server.grpc-service-port` flag (default port: 9095)

The following Prometheus API endpoints are supported by `query-tee`:

- `GET <prefix>/api/v1/query`
Expand Down Expand Up @@ -113,11 +122,11 @@ When the query results comparison is enabled, the query-tee compares the respons
The query-tee exposes the following Prometheus metrics at the `/metrics` endpoint listening on the port configured via the flag `-server.metrics-port`:

```bash
# HELP cortex_querytee_request_duration_seconds Time (in seconds) spent serving HTTP requests.
# TYPE cortex_querytee_request_duration_seconds histogram
cortex_querytee_request_duration_seconds_bucket{backend="<hostname>",method="<method>",route="<route>",status_code="<status>",le="<bucket>"}
cortex_querytee_request_duration_seconds_sum{backend="<hostname>",method="<method>",route="<route>",status_code="<status>"}
cortex_querytee_request_duration_seconds_count{backend="<hostname>",method="<method>",route="<route>",status_code="<status>"}
# HELP cortex_querytee_backend_request_duration_seconds Time (in seconds) spent serving requests.
# TYPE cortex_querytee_backend_request_duration_seconds histogram
cortex_querytee_backend_request_duration_seconds_bucket{backend="<hostname>",method="<method>",route="<route>",status_code="<status>",le="<bucket>"}
cortex_querytee_backend_request_duration_seconds_sum{backend="<hostname>",method="<method>",route="<route>",status_code="<status>"}
cortex_querytee_backend_request_duration_seconds_count{backend="<hostname>",method="<method>",route="<route>",status_code="<status>"}

# HELP cortex_querytee_responses_total Total number of responses sent back to the client by the selected backend.
# TYPE cortex_querytee_responses_total counter
Expand All @@ -127,3 +136,20 @@ cortex_querytee_responses_total{backend="<hostname>",method="<method>",route="<r
# TYPE cortex_querytee_responses_compared_total counter
cortex_querytee_responses_compared_total{route="<route>",result="<success|fail>"}
```

### Ruler remote operational mode test

When the ruler is configured with the [remote evaluation mode]({{< relref "../architecture/components/ruler/index.md" >}}) you can use the query-tee to compare rule evaluations too.
To test ruler evaluations with query-tee, set the `-ruler.query-frontend.address` CLI flag or its respective YAML configuration parameter for the ruler with query-tee's gRPC address:
ssncferreira marked this conversation as resolved.
Show resolved Hide resolved

```
ruler:
query_frontend:
address: "dns://query-tee:9095"
```

When the ruler evaluates a rule, the test flow is the following:

1. ruler sends gRPC request to query-tee
1. query-tee forwards the request to the query-frontend backends configured setting the `-backend.endpoints` CLI flag
1. query-tee receives the response from the query-frontend and forwards the result (based on the preferred backend) to the ruler
89 changes: 49 additions & 40 deletions tools/querytee/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@
package querytee

import (
"context"
"flag"
"fmt"
"net"
"net/http"
"net/http/httputil"
"net/url"
Expand All @@ -20,15 +18,16 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/server"
)

var errMinBackends = errors.New("at least 1 backend is required")

type ProxyConfig struct {
ServerServicePort int
ServerHTTPServicePort int
ServerGRPCServicePort int
BackendEndpoints string
PreferredBackend string
BackendReadTimeout time.Duration
Expand All @@ -40,7 +39,8 @@ type ProxyConfig struct {
}

func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.ServerServicePort, "server.service-port", 80, "The port where the query-tee service listens to.")
f.IntVar(&cfg.ServerHTTPServicePort, "server.http-service-port", 80, "The HTTP port where the query-tee service listens to HTTP requests.")
f.IntVar(&cfg.ServerGRPCServicePort, "server.grpc-service-port", 9095, "The GRPC port where the query-tee service listens to HTTP over gRPC messages.")
f.StringVar(&cfg.BackendEndpoints, "backend.endpoints", "", "Comma separated list of backend endpoints to query.")
f.StringVar(&cfg.PreferredBackend, "backend.preferred", "", "The hostname of the preferred backend when selecting the response to send back to the client. If no preferred backend is configured then the query-tee will send back to the client the first successful response received without waiting for other backends.")
f.DurationVar(&cfg.BackendReadTimeout, "backend.read-timeout", 90*time.Second, "The timeout when reading the response from a backend.")
Expand All @@ -59,15 +59,15 @@ type Route struct {
}

type Proxy struct {
cfg ProxyConfig
backends []*ProxyBackend
logger log.Logger
metrics *ProxyMetrics
routes []Route
cfg ProxyConfig
backends []*ProxyBackend
logger log.Logger
registerer prometheus.Registerer
metrics *ProxyMetrics
routes []Route

// The HTTP server used to run the proxy service.
srv *http.Server
srvListener net.Listener
// The HTTP and gRPC servers used to run the proxy service.
server *server.Server

// Wait group used to wait until the server has done.
done sync.WaitGroup
Expand All @@ -83,10 +83,11 @@ func NewProxy(cfg ProxyConfig, logger log.Logger, routes []Route, registerer pro
}

p := &Proxy{
cfg: cfg,
logger: logger,
metrics: NewProxyMetrics(registerer),
routes: routes,
cfg: cfg,
logger: logger,
registerer: registerer,
metrics: NewProxyMetrics(registerer),
routes: routes,
}

// Parse the backend endpoints (comma separated).
Expand All @@ -108,7 +109,7 @@ func NewProxy(cfg ProxyConfig, logger log.Logger, routes []Route, registerer pro
name := u.Hostname()
preferred := name == cfg.PreferredBackend

// In tests we have the same hostname for all backends, so we also
// In tests, we have the same hostname for all backends, so we also
// support a numeric preferred backend which is the index in the list
// of backends.
if preferredIdx, err := strconv.Atoi(cfg.PreferredBackend); err == nil {
Expand All @@ -123,7 +124,7 @@ func NewProxy(cfg ProxyConfig, logger log.Logger, routes []Route, registerer pro
return nil, errMinBackends
}

// If the preferred backend is configured, then it must exists among the actual backends.
// If the preferred backend is configured, then it must exist among the actual backends.
if cfg.PreferredBackend != "" {
exists := false
for _, b := range p.backends {
Expand Down Expand Up @@ -151,13 +152,33 @@ func NewProxy(cfg ProxyConfig, logger log.Logger, routes []Route, registerer pro
}

func (p *Proxy) Start() error {
// Setup listener first, so we can fail early if the port is in use.
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", p.cfg.ServerServicePort))
// Setup server first, so we can fail early if the ports are in use.
serv, err := server.New(server.Config{
ssncferreira marked this conversation as resolved.
Show resolved Hide resolved
// HTTP configs
HTTPListenPort: p.cfg.ServerHTTPServicePort,
HTTPServerReadTimeout: 1 * time.Minute,
HTTPServerWriteTimeout: 2 * time.Minute,
ServerGracefulShutdownTimeout: 0,

// gRPC configs
GRPCListenPort: p.cfg.ServerGRPCServicePort,
// Same size configurations as in Mimir default gRPC configuration values
GPRCServerMaxRecvMsgSize: 100 * 1024 * 1024,
GRPCServerMaxSendMsgSize: 100 * 1024 * 1024,
GPRCServerMaxConcurrentStreams: 10000,
GRPCServerMinTimeBetweenPings: 10 * time.Second,
GRPCServerPingWithoutStreamAllowed: true,

// Use Proxy's prometheus registry
MetricsNamespace: queryTeeMetricsNamespace,
Registerer: p.registerer,
RegisterInstrumentation: false,
})
if err != nil {
return err
}

router := mux.NewRouter()
router := serv.HTTP

// Health check endpoint.
router.Path("/").Methods("GET").Handler(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
Expand All @@ -182,44 +203,32 @@ func (p *Proxy) Start() error {
}
}

p.srvListener = listener
p.srv = &http.Server{
ReadTimeout: 1 * time.Minute,
WriteTimeout: 2 * time.Minute,
Handler: router,
}
p.server = serv

// Run in a dedicated goroutine.
p.done.Add(1)
go func() {
defer p.done.Done()

if err := p.srv.Serve(p.srvListener); err != nil {
if err := p.server.Run(); err != nil {
level.Error(p.logger).Log("msg", "Proxy server failed", "err", err)
}
}()

level.Info(p.logger).Log("msg", "The proxy is up and running.")
level.Info(p.logger).Log("msg", "The proxy is up and running.", "httpPort", p.cfg.ServerHTTPServicePort, "grpcPort", p.cfg.ServerGRPCServicePort)
return nil
}

func (p *Proxy) Stop() error {
if p.srv == nil {
if p.server == nil {
return nil
}

return p.srv.Shutdown(context.Background())
p.server.Shutdown()
return nil
}

func (p *Proxy) Await() {
// Wait until terminated.
p.done.Wait()
}

func (p *Proxy) Endpoint() string {
if p.srvListener == nil {
return ""
}

return p.srvListener.Addr().String()
}
2 changes: 1 addition & 1 deletion tools/querytee/proxy_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (p *ProxyEndpoint) waitBackendResponseForDownstream(resCh chan *backendResp
return res
}

// If we received a non successful response from the preferred backend, then we can
// If we received a non-successful response from the preferred backend, then we can
// return the first successful response received so far (if any).
if res.backend.preferred && !res.succeeded() {
preferredResponseReceived = true
Expand Down
13 changes: 7 additions & 6 deletions tools/querytee/proxy_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import (
)

const (
comparisonSuccess = "success"
comparisonFailed = "fail"
queryTeeMetricsNamespace = "cortex_querytee"
comparisonSuccess = "success"
comparisonFailed = "fail"
)

type ProxyMetrics struct {
Expand All @@ -24,18 +25,18 @@ type ProxyMetrics struct {
func NewProxyMetrics(registerer prometheus.Registerer) *ProxyMetrics {
m := &ProxyMetrics{
requestDuration: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex_querytee",
Name: "request_duration_seconds",
Namespace: queryTeeMetricsNamespace,
Name: "backend_request_duration_seconds",
ssncferreira marked this conversation as resolved.
Show resolved Hide resolved
Help: "Time (in seconds) spent serving HTTP requests.",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 0.75, 1, 1.5, 2, 3, 4, 5, 10, 25, 50, 100},
}, []string{"backend", "method", "route", "status_code"}),
responsesTotal: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex_querytee",
Namespace: queryTeeMetricsNamespace,
Name: "responses_total",
Help: "Total number of responses sent back to the client by the selected backend.",
}, []string{"backend", "method", "route"}),
responsesComparedTotal: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex_querytee",
Namespace: queryTeeMetricsNamespace,
Name: "responses_compared_total",
Help: "Total number of responses compared per route name by result.",
}, []string{"route", "result"}),
Expand Down
Loading