Skip to content

Commit

Permalink
add thanos query frontend sub command
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <yb532204897@gmail.com>
  • Loading branch information
yeya24 committed Aug 4, 2020
1 parent 040b69b commit 884a415
Show file tree
Hide file tree
Showing 14 changed files with 487 additions and 97 deletions.
4 changes: 1 addition & 3 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ type compactConfig struct {
label string
}

func (cc *compactConfig) registerFlag(cmd *kingpin.CmdClause) *compactConfig {
func (cc *compactConfig) registerFlag(cmd *kingpin.CmdClause) {
cmd.Flag("debug.halt-on-error", "Halt the process if a critical compaction error is detected.").
Hidden().Default("true").BoolVar(&cc.haltOnError)
cmd.Flag("debug.accept-malformed-index",
Expand Down Expand Up @@ -525,6 +525,4 @@ func (cc *compactConfig) registerFlag(cmd *kingpin.CmdClause) *compactConfig {
cc.webConf.registerFlag(cmd)

cmd.Flag("bucket-web-label", "Prometheus label to use as timeline title in the bucket web UI").StringVar(&cc.label)

return cc
}
1 change: 1 addition & 0 deletions cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func main() {
registerCompact(cmds, app)
registerTools(cmds, app)
registerReceive(cmds, app)
registerQueryFrontend(cmds, app)

cmd, err := app.Parse(os.Args[1:])
if err != nil {
Expand Down
174 changes: 174 additions & 0 deletions cmd/thanos/query-frontend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package main

import (
"time"

"github.com/cortexproject/cortex/pkg/querier/frontend"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/go-kit/kit/log"
"github.com/oklog/run"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/route"
"gopkg.in/alecthomas/kingpin.v2"

v1 "github.com/thanos-io/thanos/pkg/api/queryfrontend"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/queryfrontend"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
)

type queryFrontendConfig struct {
http httpConfig
queryRangeConfig queryRangeConfig

// TODO(yeya24): use this after promql engine is added
defaultEvaluationInterval time.Duration
downstreamURL string
compressResponses bool
}

type queryRangeConfig struct {
respCacheConfig responseCacheConfig
cacheResults bool
splitInterval time.Duration
disableStepAlign bool
maxRetries int
maxQueryParallelism int
maxQueryLength time.Duration
}

type responseCacheConfig struct {
cacheMaxFreshness time.Duration
}

func (c *responseCacheConfig) registerFlag(cmd *kingpin.CmdClause) {
cmd.Flag("query-range.response-cache-max-freshness", "Most recent allowed cacheable result per-tenant, to prevent caching very recent results that might still be in flux.").
Default("1m").DurationVar(&c.cacheMaxFreshness)
}

func (c *queryRangeConfig) registerFlag(cmd *kingpin.CmdClause) {
c.respCacheConfig.registerFlag(cmd)

cmd.Flag("query-range.cache-results", "Cache query range results.").Default("false").
BoolVar(&c.cacheResults)

cmd.Flag("query-range.split-interval", "Split queries by an interval and execute in parallel, 0 disables it.").
Default("24h").DurationVar(&c.splitInterval)

cmd.Flag("query-range.max-retries-per-request", "Maximum number of retries for a single request; beyond this, the downstream error is returned.").
Default("5").IntVar(&c.maxRetries)

cmd.Flag("query-range.disable-step-align", "Disable aligning incoming queries' start and end with their step.").
Default("true").BoolVar(&c.disableStepAlign)
}

func (c *queryFrontendConfig) registerFlag(cmd *kingpin.CmdClause) {
c.queryRangeConfig.registerFlag(cmd)
c.http.registerFlag(cmd)

cmd.Flag("query.default-evaluation-interval", "Set default evaluation interval for sub queries.").
Default("1m").DurationVar(&c.defaultEvaluationInterval)

cmd.Flag("query-frontend.downstream-url", "URL of downstream Prometheus Query compatible API.").
Default("http://localhost:9090").StringVar(&c.downstreamURL)

cmd.Flag("query-frontend.compress-responses", "Compress HTTP responses.").
Default("false").BoolVar(&c.compressResponses)
}

func registerQueryFrontend(m map[string]setupFunc, app *kingpin.Application) {
comp := component.QueryFrontend
cmd := app.Command(comp.String(), "query frontend")
conf := &queryFrontendConfig{}
conf.registerFlag(cmd)

m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {

return runQueryFrontend(
g,
logger,
reg,
conf,
comp,
)
}
}

func runQueryFrontend(
g *run.Group,
logger log.Logger,
reg *prometheus.Registry,
conf *queryFrontendConfig,
comp component.Component,
) error {

fe, err := frontend.New(frontend.Config{
DownstreamURL: conf.downstreamURL,
CompressResponses: conf.compressResponses,
}, logger, reg)
if err != nil {
return err
}

limiter := queryfrontend.NewLimiter(
conf.queryRangeConfig.maxQueryParallelism,
conf.queryRangeConfig.maxQueryLength,
conf.queryRangeConfig.respCacheConfig.cacheMaxFreshness,
)

tripperWare, err := queryfrontend.NewTripperWare(
limiter,
queryrange.PrometheusCodec,
queryrange.PrometheusResponseExtractor{},
conf.queryRangeConfig.disableStepAlign,
conf.queryRangeConfig.cacheResults,
conf.queryRangeConfig.splitInterval,
conf.queryRangeConfig.maxRetries,
reg,
logger,
)
if err != nil {
return err
}

fe.Wrap(tripperWare)

httpProbe := prober.NewHTTP()
statusProber := prober.Combine(
httpProbe,
prober.NewInstrumentation(comp, logger, extprom.WrapRegistererWithPrefix("thanos_", reg)),
)

// Start metrics HTTP server.
{
router := route.New()

api := v1.NewAPI(logger)
api.Register(router.WithPrefix("/api/v1"), fe.Handler().ServeHTTP)

srv := httpserver.New(logger, reg, comp, httpProbe,
httpserver.WithListen(conf.http.bindAddress),
httpserver.WithGracePeriod(time.Duration(conf.http.gracePeriod)),
)
srv.Handle("/", router)

g.Add(func() error {
statusProber.Healthy()

return srv.ListenAndServe()
}, func(err error) {
statusProber.NotReady(err)
defer statusProber.NotHealthy(err)

srv.Shutdown(err)
})
}

return nil
}
7 changes: 5 additions & 2 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,6 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
level.Warn(logger).Log("msg", "different values for --web.route-prefix and --web.external-prefix detected, web UI may not work without a reverse-proxy.")
}

promql.SetDefaultEvaluationInterval(time.Duration(*defaultEvaluationInterval))

flagsMap := getFlagsMap(cmd.Model().Flags)

return runQuery(
Expand Down Expand Up @@ -190,6 +188,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
time.Duration(*instantDefaultMaxSourceResolution),
*strictStores,
component.Query,
time.Duration(*defaultEvaluationInterval),
)
}
}
Expand Down Expand Up @@ -235,6 +234,7 @@ func runQuery(
instantDefaultMaxSourceResolution time.Duration,
strictStores []string,
comp component.Component,
defaultEvaluationInterval time.Duration,
) error {
// TODO(bplotka in PR #513 review): Move arguments into struct.
duplicatedStores := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Expand Down Expand Up @@ -305,6 +305,9 @@ func runQuery(
// TODO(bwplotka): Expose this as a flag: https://github.com/thanos-io/thanos/issues/703.
MaxSamples: math.MaxInt32,
Timeout: queryTimeout,
NoStepSubqueryIntervalFn: func(rangeMillis int64) int64 {
return defaultEvaluationInterval.Milliseconds()
},
},
)
)
Expand Down
3 changes: 1 addition & 2 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ type sidecarConfig struct {
limitMinTime thanosmodel.TimeOrDurationValue
}

func (sc *sidecarConfig) registerFlag(cmd *kingpin.CmdClause) *sidecarConfig {
func (sc *sidecarConfig) registerFlag(cmd *kingpin.CmdClause) {
sc.http.registerFlag(cmd)
sc.grpc.registerFlag(cmd)
sc.prometheus.registerFlag(cmd)
Expand All @@ -424,5 +424,4 @@ func (sc *sidecarConfig) registerFlag(cmd *kingpin.CmdClause) *sidecarConfig {
sc.shipper.registerFlag(cmd)
cmd.Flag("min-time", "Start of time range limit to serve. Thanos sidecar will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z").SetValue(&sc.limitMinTime)
return sc
}
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/cespare/xxhash v1.1.0
github.com/chromedp/cdproto v0.0.0-20200424080200-0de008e41fa0
github.com/chromedp/chromedp v0.5.3
github.com/cortexproject/cortex v0.6.1-0.20200228110116-92ab6cbe0995
github.com/cortexproject/cortex v1.2.1-0.20200728113518-da30af7f98c4
github.com/davecgh/go-spew v1.1.1
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb
github.com/fatih/structtag v1.1.0
Expand Down Expand Up @@ -49,10 +49,10 @@ require (
github.com/prometheus/client_golang v1.7.1
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.10.0
github.com/prometheus/prometheus v1.8.2-0.20200707115909-30505a202a4c
github.com/sercand/kuberesolver v2.4.0+incompatible // indirect
github.com/prometheus/prometheus v1.8.2-0.20200728044242-348ff4285ffa
github.com/uber/jaeger-client-go v2.24.0+incompatible
github.com/uber/jaeger-lib v2.2.0+incompatible
github.com/weaveworks/common v0.0.0-20200625145055-4b1847531bc9
go.elastic.co/apm v1.5.0
go.elastic.co/apm/module/apmot v1.5.0
go.uber.org/atomic v1.6.0
Expand All @@ -76,7 +76,8 @@ require (
// See https://github.com/thanos-io/thanos/issues/1415
replace (
// Make sure Cortex is not forcing us to some other Prometheus version.
github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20200714083622-823b218e1b2e
github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20200722151933-4a8531a64b32
google.golang.org/grpc => google.golang.org/grpc v1.29.1
k8s.io/klog => k8s.io/klog v0.3.1
k8s.io/kube-openapi => k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30
)
Expand Down
Loading

0 comments on commit 884a415

Please sign in to comment.