From 7789133f12990bf8301d6b8a79e87e15a22b8ae6 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Sun, 9 Aug 2020 00:12:00 -0400 Subject: [PATCH] add response cache config Signed-off-by: Ben Ye --- cmd/thanos/query-frontend.go | 62 +++++++++++------------------ go.mod | 4 +- go.sum | 4 +- pkg/queryfrontend/cache.go | 25 ------------ pkg/queryfrontend/cache/factory.go | 52 ++++++++++++++++++++++++ pkg/queryfrontend/cache/inmemory.go | 40 +++++++++++++++++++ pkg/queryfrontend/roundtrip.go | 7 ++-- pkg/queryfrontend/roundtrip_test.go | 28 +++++++++++-- test/e2e/e2ethanos/services.go | 20 ++++++---- test/e2e/query_frontend_test.go | 12 +++++- 10 files changed, 169 insertions(+), 85 deletions(-) delete mode 100644 pkg/queryfrontend/cache.go create mode 100644 pkg/queryfrontend/cache/factory.go create mode 100644 pkg/queryfrontend/cache/inmemory.go diff --git a/cmd/thanos/query-frontend.go b/cmd/thanos/query-frontend.go index 6d1f3eaeeb0..f23e1365f96 100644 --- a/cmd/thanos/query-frontend.go +++ b/cmd/thanos/query-frontend.go @@ -6,7 +6,6 @@ package main import ( "time" - "github.com/alecthomas/units" "github.com/cortexproject/cortex/pkg/querier/frontend" "github.com/cortexproject/cortex/pkg/querier/queryrange" "github.com/go-kit/kit/log" @@ -21,9 +20,11 @@ import ( v1 "github.com/thanos-io/thanos/pkg/api/queryfrontend" "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/extflag" "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/queryfrontend" + "github.com/thanos-io/thanos/pkg/queryfrontend/cache" httpserver "github.com/thanos-io/thanos/pkg/server/http" ) @@ -37,44 +38,15 @@ type queryFrontendConfig struct { } type queryRangeConfig struct { - respCacheConfig responseCacheConfig - cacheResults bool + respCacheConfig extflag.PathOrContent + cacheMaxFreshness time.Duration splitInterval model.Duration maxRetries int maxQueryParallelism int maxQueryLength model.Duration } -type responseCacheConfig struct { - cacheMaxFreshness time.Duration - fifoCache fifoCacheConfig -} - -func (c *responseCacheConfig) registerFlag(cmd *kingpin.CmdClause) { - c.fifoCache.registerFlag(cmd) - cmd.Flag("query-range.response-cache-max-freshness", "Most recent allowed cacheable result, to prevent caching very recent results that might still be in flux."). - Default("1m").DurationVar(&c.cacheMaxFreshness) -} - -// fifoCacheConfig defines configurations for Cortex fifo cache. -type fifoCacheConfig struct { - maxSizeBytes units.Base2Bytes - maxSizeItems int - ttl time.Duration -} - -func (c *fifoCacheConfig) registerFlag(cmd *kingpin.CmdClause) { - cmd.Flag("fifocache.max-size-bytes", "Maximum memory size of the cache in bytes. A unit suffix (KB, MB, GB) may be applied.").BytesVar(&c.maxSizeBytes) - cmd.Flag("fifocache.max-size-items", "Maximum number of entries in the cache.").Default("0").IntVar(&c.maxSizeItems) - cmd.Flag("fifocache.ttl", "The expiry duration for the cache.").Default("0").DurationVar(&c.ttl) -} - func (c *queryRangeConfig) registerFlag(cmd *kingpin.CmdClause) { - c.respCacheConfig.registerFlag(cmd) - - cmd.Flag("query-range.cache-results", "Cache query range results.").Default("false"). - BoolVar(&c.cacheResults) - cmd.Flag("query-range.split-interval", "Split queries by an interval and execute in parallel, 0 disables it."). Default("24h").SetValue(&c.splitInterval) @@ -86,6 +58,11 @@ func (c *queryRangeConfig) registerFlag(cmd *kingpin.CmdClause) { cmd.Flag("query-range.max-query-parallelism", "Maximum number of queries will be scheduled in parallel by the frontend."). Default("14").IntVar(&c.maxQueryParallelism) + + cmd.Flag("query-range.response-cache-max-freshness", "Most recent allowed cacheable result, to prevent caching very recent results that might still be in flux."). + Default("1m").DurationVar(&c.cacheMaxFreshness) + + c.respCacheConfig = *extflag.RegisterPathOrContent(cmd, "query-range.response-cache-config", "YAML file that contains response cache configuration.", false) } func (c *queryFrontendConfig) registerFlag(cmd *kingpin.CmdClause) { @@ -109,7 +86,6 @@ func registerQueryFrontend(m map[string]setupFunc, app *kingpin.Application) { conf.registerFlag(cmd) m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error { - return runQueryFrontend(g, logger, reg, conf, comp) } } @@ -139,20 +115,28 @@ func runQueryFrontend( limits := queryfrontend.NewLimits( conf.queryRangeConfig.maxQueryParallelism, time.Duration(conf.queryRangeConfig.maxQueryLength), - conf.queryRangeConfig.respCacheConfig.cacheMaxFreshness, + conf.queryRangeConfig.cacheMaxFreshness, ) - // TODO(yeya24): support other cache when available. - // Using Cortex fifo cache temporarily. - fifoCache := conf.queryRangeConfig.respCacheConfig.fifoCache - cacheConfig := queryfrontend.NewFifoCacheConfig(fifoCache.maxSizeBytes.String(), fifoCache.maxSizeItems, fifoCache.ttl) + respCacheContentYaml, err := conf.queryRangeConfig.respCacheConfig.Content() + if err != nil { + return errors.Wrap(err, "get content of response cache configuration") + } + var cacheConfig *queryrange.ResultsCacheConfig + if len(respCacheContentYaml) > 0 { + cacheConfig, err = cache.NewResponseCacheConfig(respCacheContentYaml) + if err != nil { + return errors.Wrap(err, "create response cache") + } + } + + // TODO(yeya24): support other cache when available. tripperWare, err := queryfrontend.NewTripperWare( limits, cacheConfig, queryrange.PrometheusCodec, queryrange.PrometheusResponseExtractor{}, - conf.queryRangeConfig.cacheResults, time.Duration(conf.queryRangeConfig.splitInterval), conf.queryRangeConfig.maxRetries, reg, diff --git a/go.mod b/go.mod index 97334f9f12e..e2c16084311 100644 --- a/go.mod +++ b/go.mod @@ -49,7 +49,7 @@ require ( github.com/prometheus/client_golang v1.7.1 github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.11.1 - github.com/prometheus/prometheus v1.8.2-0.20200805082714-e0cf219f0de2 + github.com/prometheus/prometheus v1.8.2-0.20200807135816-2899773b0159 github.com/uber/jaeger-client-go v2.25.0+incompatible github.com/uber/jaeger-lib v2.2.0+incompatible github.com/weaveworks/common v0.0.0-20200625145055-4b1847531bc9 @@ -76,7 +76,7 @@ require ( // See https://github.com/thanos-io/thanos/issues/1415 replace ( // Make sure Cortex is not forcing us to some other Prometheus version. - github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20200805082714-e0cf219f0de2 + github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20200807135816-2899773b0159 google.golang.org/grpc => google.golang.org/grpc v1.29.1 k8s.io/klog => k8s.io/klog v0.3.1 k8s.io/kube-openapi => k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30 diff --git a/go.sum b/go.sum index 871c95a1416..6bd907019da 100644 --- a/go.sum +++ b/go.sum @@ -883,8 +883,8 @@ github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+Gx github.com/prometheus/procfs v0.0.11/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.1.3 h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFBS8= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= -github.com/prometheus/prometheus v1.8.2-0.20200805082714-e0cf219f0de2 h1:eQ88y1vfbXuclr6B04jYTmhc6ydXlBUSIaXCjEs0osk= -github.com/prometheus/prometheus v1.8.2-0.20200805082714-e0cf219f0de2/go.mod h1:i1KZsZmyDTJRvnR7zE8z/u2v+tkpPjoiPpnWp6nwhr0= +github.com/prometheus/prometheus v1.8.2-0.20200807135816-2899773b0159 h1:mT66e//l/+QugUat5A42YvIPxlsS6O/yr9UtjlDhPYw= +github.com/prometheus/prometheus v1.8.2-0.20200807135816-2899773b0159/go.mod h1:zfAqy/MwhMFajB9E2n12/9gG2fvofIE9uKDtlZCDxqs= github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1 h1:+kGqA4dNN5hn7WwvKdzHl0rdN5AEkbNZd0VjRltAiZg= github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1/go.mod h1:JaY6n2sDr+z2WTsXkOmNRUfDy6FN0L6Nk7x06ndm4tY= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= diff --git a/pkg/queryfrontend/cache.go b/pkg/queryfrontend/cache.go deleted file mode 100644 index a25af558db3..00000000000 --- a/pkg/queryfrontend/cache.go +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -package queryfrontend - -import ( - "time" - - cortexcache "github.com/cortexproject/cortex/pkg/chunk/cache" - "github.com/cortexproject/cortex/pkg/querier/queryrange" -) - -// NewFifoCacheConfig creates Cortex fifo cache config. -func NewFifoCacheConfig(maxSizeBytes string, maxSizeItems int, ttl time.Duration) queryrange.ResultsCacheConfig { - return queryrange.ResultsCacheConfig{ - CacheConfig: cortexcache.Config{ - EnableFifoCache: true, - Fifocache: cortexcache.FifoCacheConfig{ - MaxSizeBytes: maxSizeBytes, - MaxSizeItems: maxSizeItems, - Validity: ttl, - }, - }, - } -} diff --git a/pkg/queryfrontend/cache/factory.go b/pkg/queryfrontend/cache/factory.go new file mode 100644 index 00000000000..5282e079bd8 --- /dev/null +++ b/pkg/queryfrontend/cache/factory.go @@ -0,0 +1,52 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package cache + +import ( + "fmt" + "strings" + + "github.com/cortexproject/cortex/pkg/querier/queryrange" + "github.com/pkg/errors" + "gopkg.in/yaml.v2" +) + +type ResponseCacheProvider string + +// TODO: add other cache providers when available. +const ( + INMEMORY ResponseCacheProvider = "IN-MEMORY" +) + +// ResponseCacheConfig specifies the response cache config. +type ResponseCacheConfig struct { + Type ResponseCacheProvider `yaml:"type"` + Config interface{} `yaml:"config"` +} + +func NewResponseCacheConfig(confContentYaml []byte) (*queryrange.ResultsCacheConfig, error) { + cacheConfig := &ResponseCacheConfig{} + if err := yaml.UnmarshalStrict(confContentYaml, cacheConfig); err != nil { + return nil, errors.Wrap(err, "parsing config YAML file") + } + + backendConfig, err := yaml.Marshal(cacheConfig.Config) + if err != nil { + return nil, errors.Wrap(err, "marshal content of cache backend configuration") + } + + var resultsCacheConf *queryrange.ResultsCacheConfig + switch strings.ToUpper(string(cacheConfig.Type)) { + case string(INMEMORY): + resultsCacheConf, err = newInMemoryResponseCacheConfig(backendConfig) + default: + return nil, errors.Errorf("response cache with type %s is not supported", cacheConfig.Type) + } + + if err != nil { + return nil, errors.Wrap(err, fmt.Sprintf("create %s response cache", cacheConfig.Type)) + } + + return resultsCacheConf, nil +} diff --git a/pkg/queryfrontend/cache/inmemory.go b/pkg/queryfrontend/cache/inmemory.go new file mode 100644 index 00000000000..a55b46c88fa --- /dev/null +++ b/pkg/queryfrontend/cache/inmemory.go @@ -0,0 +1,40 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package cache + +import ( + "time" + + cortexcache "github.com/cortexproject/cortex/pkg/chunk/cache" + "github.com/cortexproject/cortex/pkg/querier/queryrange" + "gopkg.in/yaml.v2" +) + +type InMemoryResponseCacheConfig struct { + // MaxSize represents overall maximum number of bytes cache can contain. + MaxSize string `yaml:"max_size"` + // MaxSizeItems represents the maximum number of entries in the cache. + MaxSizeItems int `yaml:"max_size_items"` + // Validity represents the expiry duration for the cache. + Validity time.Duration `yaml:"validity"` +} + +// newInMemoryResponseCacheConfig is an adapter function +// to creates Cortex fifo results cache config. +func newInMemoryResponseCacheConfig(conf []byte) (*queryrange.ResultsCacheConfig, error) { + var config InMemoryResponseCacheConfig + if err := yaml.Unmarshal(conf, &config); err != nil { + return nil, err + } + return &queryrange.ResultsCacheConfig{ + CacheConfig: cortexcache.Config{ + EnableFifoCache: true, + Fifocache: cortexcache.FifoCacheConfig{ + MaxSizeBytes: config.MaxSize, + MaxSizeItems: config.MaxSizeItems, + Validity: config.Validity, + }, + }, + }, nil +} diff --git a/pkg/queryfrontend/roundtrip.go b/pkg/queryfrontend/roundtrip.go index 86008ec5844..487a9a00e6d 100644 --- a/pkg/queryfrontend/roundtrip.go +++ b/pkg/queryfrontend/roundtrip.go @@ -26,10 +26,9 @@ const ( func NewTripperWare( limits queryrange.Limits, - cacheConfig queryrange.ResultsCacheConfig, + cacheConfig *queryrange.ResultsCacheConfig, codec queryrange.Codec, cacheExtractor queryrange.Extractor, - cacheResults bool, splitQueryInterval time.Duration, maxRetries int, reg prometheus.Registerer, @@ -62,10 +61,10 @@ func NewTripperWare( ) } - if cacheResults { + if cacheConfig != nil { queryCacheMiddleware, _, err := queryrange.NewResultsCacheMiddleware( logger, - cacheConfig, + *cacheConfig, constSplitter(splitQueryInterval), limits, codec, diff --git a/pkg/queryfrontend/roundtrip_test.go b/pkg/queryfrontend/roundtrip_test.go index daaa2d55ee6..f9fba4f2160 100644 --- a/pkg/queryfrontend/roundtrip_test.go +++ b/pkg/queryfrontend/roundtrip_test.go @@ -12,6 +12,7 @@ import ( "testing" "time" + cortexcache "github.com/cortexproject/cortex/pkg/chunk/cache" "github.com/cortexproject/cortex/pkg/querier/queryrange" "github.com/go-kit/kit/log" "github.com/prometheus/prometheus/pkg/timestamp" @@ -104,8 +105,18 @@ func TestRoundTripRetryMiddleware(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { - cache := NewFifoCacheConfig("1MB", 1000, time.Minute) - tpw, err := NewTripperWare(&fakeLimits{}, cache, queryrange.PrometheusCodec, nil, false, + cacheConf := &queryrange.ResultsCacheConfig{ + CacheConfig: cortexcache.Config{ + EnableFifoCache: true, + Fifocache: cortexcache.FifoCacheConfig{ + MaxSizeBytes: "2KB", + MaxSizeItems: 1000, + Validity: time.Hour, + }, + }, + } + + tpw, err := NewTripperWare(&fakeLimits{}, cacheConf, queryrange.PrometheusCodec, nil, time.Hour, tc.maxRetries, nil, log.NewNopLogger()) testutil.Ok(t, err) @@ -188,8 +199,17 @@ func TestRoundTripSplitIntervalMiddleware(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { - cache := NewFifoCacheConfig("1MB", 1000, time.Minute) - tpw, err := NewTripperWare(&fakeLimits{}, cache, queryrange.PrometheusCodec, nil, false, + cacheConf := &queryrange.ResultsCacheConfig{ + CacheConfig: cortexcache.Config{ + EnableFifoCache: true, + Fifocache: cortexcache.FifoCacheConfig{ + MaxSizeBytes: "2KB", + MaxSizeItems: 1000, + Validity: time.Hour, + }, + }, + } + tpw, err := NewTripperWare(&fakeLimits{}, cacheConf, queryrange.PrometheusCodec, nil, tc.splitInterval, 0, nil, log.NewNopLogger()) testutil.Ok(t, err) diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index 1707dd31c32..28bfc780682 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -23,6 +23,7 @@ import ( "github.com/thanos-io/thanos/pkg/alert" "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/query" + "github.com/thanos-io/thanos/pkg/queryfrontend/cache" "github.com/thanos-io/thanos/pkg/receive" ) @@ -405,15 +406,18 @@ func NewCompactor(sharedDir string, name string, bucketConfig client.BucketConfi return compactor, nil } -func NewQueryFrontend(sharedDir string, name string, downstreamURL string) (*e2e.HTTPService, error) { +func NewQueryFrontend(name string, downstreamURL string, respCacheConf cache.ResponseCacheConfig) (*e2e.HTTPService, error) { + respCacheConfigBytes, err := yaml.Marshal(respCacheConf) + if err != nil { + return nil, errors.Wrapf(err, "generate response cache config file: %v", respCacheConf) + } + args := e2e.BuildArgs(map[string]string{ - "--debug.name": fmt.Sprintf("query-frontend-%s", name), - "--http-address": ":8080", - "--query-frontend.downstream-url": downstreamURL, - "--query-range.cache-results": "", - "--log.level": logLevel, - "--fifocache.max-size-bytes": "2KB", - "--fifocache.ttl": "6h", + "--debug.name": fmt.Sprintf("query-frontend-%s", name), + "--http-address": ":8080", + "--query-frontend.downstream-url": downstreamURL, + "--log.level": logLevel, + "--query-range.response-cache-config": string(respCacheConfigBytes), }) queryFrontend := e2e.NewHTTPService( diff --git a/test/e2e/query_frontend_test.go b/test/e2e/query_frontend_test.go index c0cf4b73605..309f0ca967a 100644 --- a/test/e2e/query_frontend_test.go +++ b/test/e2e/query_frontend_test.go @@ -5,6 +5,7 @@ package e2e_test import ( "context" + "github.com/thanos-io/thanos/pkg/queryfrontend/cache" "testing" "time" @@ -33,7 +34,16 @@ func TestQueryFrontend(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) - queryFrontend, err := e2ethanos.NewQueryFrontend(s.SharedDir(), "1", "http://"+q.NetworkHTTPEndpoint()) + inMemoryCacheConfig := cache.ResponseCacheConfig{ + Type: cache.INMEMORY, + Config: cache.InMemoryResponseCacheConfig{ + MaxSize: "2KB", + MaxSizeItems: 1000, + Validity: time.Hour, + }, + } + + queryFrontend, err := e2ethanos.NewQueryFrontend("1", "http://"+q.NetworkHTTPEndpoint(), inMemoryCacheConfig) testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(queryFrontend))