Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add gRPC support to query-tee #2683

Merged
merged 21 commits into from
Aug 18, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/query-tee/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func main() {
// Parse CLI flags.
cfg := Config{}
flag.IntVar(&cfg.ServerMetricsPort, "server.metrics-port", 9900, "The port where metrics are exposed.")
flag.StringVar(&cfg.PathPrefix, "server.path-prefix", "", "Prefix for API paths (query-tee will accept Prometheus API calls at <prefix>/api/v1/...)")
flag.StringVar(&cfg.PathPrefix, "server.path-prefix", "", "Path prefix for API paths (query-tee will accept Prometheus API calls at <prefix>/api/v1/...). Example: -server.path-prefix=/prometheus")
cfg.LogLevel.RegisterFlags(flag.CommandLine)
cfg.ProxyConfig.RegisterFlags(flag.CommandLine)
flag.Parse()
Expand Down
81 changes: 42 additions & 39 deletions tools/querytee/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@
package querytee

import (
"context"
"flag"
"fmt"
"net"
"net/http"
"net/http/httputil"
"net/url"
Expand All @@ -20,15 +18,16 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/server"
)

var errMinBackends = errors.New("at least 1 backend is required")

type ProxyConfig struct {
ServerServicePort int
ServerGRPCServicePort int
BackendEndpoints string
PreferredBackend string
BackendReadTimeout time.Duration
Expand All @@ -40,7 +39,8 @@ type ProxyConfig struct {
}

func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.ServerServicePort, "server.service-port", 80, "The port where the query-tee service listens to.")
f.IntVar(&cfg.ServerServicePort, "server.service-port", 80, "The HTTP port where the query-tee service listens to.")
ssncferreira marked this conversation as resolved.
Show resolved Hide resolved
f.IntVar(&cfg.ServerGRPCServicePort, "server.grpc-service-port", 90, "The GRPC port where the query-tee service listens to HTTP over gRPC messages.")
ssncferreira marked this conversation as resolved.
Show resolved Hide resolved
f.StringVar(&cfg.BackendEndpoints, "backend.endpoints", "", "Comma separated list of backend endpoints to query.")
f.StringVar(&cfg.PreferredBackend, "backend.preferred", "", "The hostname of the preferred backend when selecting the response to send back to the client. If no preferred backend is configured then the query-tee will send back to the client the first successful response received without waiting for other backends.")
f.DurationVar(&cfg.BackendReadTimeout, "backend.read-timeout", 90*time.Second, "The timeout when reading the response from a backend.")
Expand All @@ -59,15 +59,15 @@ type Route struct {
}

type Proxy struct {
cfg ProxyConfig
backends []*ProxyBackend
logger log.Logger
metrics *ProxyMetrics
routes []Route
cfg ProxyConfig
backends []*ProxyBackend
logger log.Logger
registerer prometheus.Registerer
metrics *ProxyMetrics
routes []Route

// The HTTP server used to run the proxy service.
srv *http.Server
srvListener net.Listener
// The HTTP and gRPC servers used to run the proxy service.
server *server.Server

// Wait group used to wait until the server has done.
done sync.WaitGroup
Expand All @@ -83,10 +83,11 @@ func NewProxy(cfg ProxyConfig, logger log.Logger, routes []Route, registerer pro
}

p := &Proxy{
cfg: cfg,
logger: logger,
metrics: NewProxyMetrics(registerer),
routes: routes,
cfg: cfg,
logger: logger,
registerer: registerer,
metrics: NewProxyMetrics(registerer),
routes: routes,
}

// Parse the backend endpoints (comma separated).
Expand All @@ -108,7 +109,7 @@ func NewProxy(cfg ProxyConfig, logger log.Logger, routes []Route, registerer pro
name := u.Hostname()
preferred := name == cfg.PreferredBackend

// In tests we have the same hostname for all backends, so we also
// In tests, we have the same hostname for all backends, so we also
// support a numeric preferred backend which is the index in the list
// of backends.
if preferredIdx, err := strconv.Atoi(cfg.PreferredBackend); err == nil {
Expand All @@ -123,7 +124,7 @@ func NewProxy(cfg ProxyConfig, logger log.Logger, routes []Route, registerer pro
return nil, errMinBackends
}

// If the preferred backend is configured, then it must exists among the actual backends.
// If the preferred backend is configured, then it must exist among the actual backends.
if cfg.PreferredBackend != "" {
exists := false
for _, b := range p.backends {
Expand Down Expand Up @@ -151,13 +152,27 @@ func NewProxy(cfg ProxyConfig, logger log.Logger, routes []Route, registerer pro
}

func (p *Proxy) Start() error {
// Setup listener first, so we can fail early if the port is in use.
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", p.cfg.ServerServicePort))
// Setup server first, so we can fail early if the ports are in use.
serv, err := server.New(server.Config{
ssncferreira marked this conversation as resolved.
Show resolved Hide resolved
// HTTP configs
HTTPListenPort: p.cfg.ServerServicePort,
HTTPServerReadTimeout: 1 * time.Minute,
HTTPServerWriteTimeout: 2 * time.Minute,

// gRPC configs
GRPCListenPort: p.cfg.ServerGRPCServicePort,
// Same size configurations as in Mimir default gRPC configuration values
GPRCServerMaxRecvMsgSize: 100 * 1024 * 1024,
GRPCServerMaxSendMsgSize: 100 * 1024 * 1024,

// Use Proxy's prometheus registry
Registerer: p.registerer,
})
if err != nil {
return err
}

router := mux.NewRouter()
router := serv.HTTP

// Health check endpoint.
router.Path("/").Methods("GET").Handler(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
Expand All @@ -182,44 +197,32 @@ func (p *Proxy) Start() error {
}
}

p.srvListener = listener
p.srv = &http.Server{
ReadTimeout: 1 * time.Minute,
WriteTimeout: 2 * time.Minute,
Handler: router,
}
p.server = serv

// Run in a dedicated goroutine.
p.done.Add(1)
go func() {
defer p.done.Done()

if err := p.srv.Serve(p.srvListener); err != nil {
if err := p.server.Run(); err != nil {
level.Error(p.logger).Log("msg", "Proxy server failed", "err", err)
}
}()

level.Info(p.logger).Log("msg", "The proxy is up and running.")
level.Info(p.logger).Log("msg", "The proxy is up and running.", "httpPort", p.cfg.ServerServicePort, "grpcPort", p.cfg.ServerGRPCServicePort)
return nil
}

func (p *Proxy) Stop() error {
if p.srv == nil {
if p.server == nil {
return nil
}

return p.srv.Shutdown(context.Background())
p.server.Shutdown()
return nil
}

func (p *Proxy) Await() {
// Wait until terminated.
p.done.Wait()
}

func (p *Proxy) Endpoint() string {
if p.srvListener == nil {
return ""
}

return p.srvListener.Addr().String()
}
2 changes: 1 addition & 1 deletion tools/querytee/proxy_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (p *ProxyEndpoint) waitBackendResponseForDownstream(resCh chan *backendResp
return res
}

// If we received a non successful response from the preferred backend, then we can
// If we received a non-successful response from the preferred backend, then we can
// return the first successful response received so far (if any).
if res.backend.preferred && !res.succeeded() {
preferredResponseReceived = true
Expand Down
21 changes: 12 additions & 9 deletions tools/querytee/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/go-kit/log"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -159,25 +160,26 @@ func Test_Proxy_RequestsForwarding(t *testing.T) {

// Start the proxy.
cfg := ProxyConfig{
BackendEndpoints: strings.Join(backendURLs, ","),
PreferredBackend: strconv.Itoa(testData.preferredBackendIdx),
ServerServicePort: 0,
BackendReadTimeout: time.Second,
BackendEndpoints: strings.Join(backendURLs, ","),
PreferredBackend: strconv.Itoa(testData.preferredBackendIdx),
ServerServicePort: 80,
ServerGRPCServicePort: 90,
ssncferreira marked this conversation as resolved.
Show resolved Hide resolved
BackendReadTimeout: time.Second,
}

if len(backendURLs) == 2 {
cfg.CompareResponses = true
}

p, err := NewProxy(cfg, log.NewNopLogger(), testRoutes, nil)
p, err := NewProxy(cfg, log.NewNopLogger(), testRoutes, prometheus.NewRegistry())
require.NoError(t, err)
require.NotNil(t, p)
defer p.Stop() //nolint:errcheck

require.NoError(t, p.Start())

// Send a query request to the proxy.
res, err := http.Get(fmt.Sprintf("http://%s/api/v1/query", p.Endpoint()))
res, err := http.Get("http://localhost/api/v1/query")
require.NoError(t, err)

defer res.Body.Close()
Expand Down Expand Up @@ -313,12 +315,13 @@ func TestProxy_Passthrough(t *testing.T) {
cfg := ProxyConfig{
BackendEndpoints: strings.Join(backendURLs, ","),
PreferredBackend: strconv.Itoa(testData.preferredBackendIdx),
ServerServicePort: 0,
ServerServicePort: 80,
ServerGRPCServicePort: 90,
BackendReadTimeout: time.Second,
PassThroughNonRegisteredRoutes: true,
}

p, err := NewProxy(cfg, log.NewNopLogger(), testRoutes, nil)
p, err := NewProxy(cfg, log.NewNopLogger(), testRoutes, prometheus.NewRegistry())
require.NoError(t, err)
require.NotNil(t, p)
defer p.Stop() //nolint:errcheck
Expand All @@ -328,7 +331,7 @@ func TestProxy_Passthrough(t *testing.T) {
for _, query := range testData.queries {

// Send a query request to the proxy.
res, err := http.Get(fmt.Sprintf("http://%s%s", p.Endpoint(), query.path))
res, err := http.Get(fmt.Sprintf("http://localhost%s", query.path))
require.NoError(t, err)

defer res.Body.Close()
Expand Down