From 7349a0873ab7db3205c041fb79ec836cd223b8e9 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Thu, 14 May 2020 18:30:31 +0100 Subject: [PATCH 1/8] Fixed make docs; Updated last disprepancies. (#2611) Signed-off-by: Bartlomiej Plotka --- Dockerfile | 2 +- Makefile | 12 +++++---- docs/components/compact.md | 3 +-- docs/components/store.md | 50 ++++++++++++++++++++++++++++++++++---- scripts/genflagdocs.sh | 7 +++--- 5 files changed, 58 insertions(+), 16 deletions(-) diff --git a/Dockerfile b/Dockerfile index bf71a154f3..51cf401fb0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ FROM quay.io/prometheus/busybox:latest LABEL maintainer="The Thanos Authors" -COPY thanos /bin/thanos +COPY /thanos_tmp_for_docker /bin/thanos ENTRYPOINT [ "/bin/thanos" ] diff --git a/Makefile b/Makefile index 46a3f6ed53..7ca3472902 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,3 @@ -PREFIX ?= $(shell pwd) FILES_TO_FMT ?= $(shell find . -path ./vendor -prune -o -name '*.go' -print) DOCKER_IMAGE_REPO ?= quay.io/thanos/thanos @@ -176,8 +175,8 @@ react-app-start: $(REACT_APP_NODE_MODULES_PATH) .PHONY: build build: ## Builds Thanos binary using `promu`. build: check-git deps $(PROMU) - @echo ">> building binaries $(GOBIN)" - @$(PROMU) build --prefix $(PREFIX) + @echo ">> building Thanos binary in $(GOBIN)" + @$(PROMU) build --prefix $(GOBIN) .PHONY: crossbuild crossbuild: ## Builds all binaries for all platforms. @@ -193,8 +192,11 @@ deps: ## Ensures fresh go.mod and go.sum. .PHONY: docker docker: ## Builds 'thanos' docker with no tag. docker: build + @echo ">> copying Thanos from $(GOBIN) to ./thanos_tmp_for_docker" + @cp $(GOBIN)/thanos ./thanos_tmp_for_docker @echo ">> building docker image 'thanos'" @docker build -t "thanos" . + @rm ./thanos_tmp_for_docker .PHONY: docker-multi-stage docker-multi-stage: ## Builds 'thanos' docker image using multi-stage. @@ -212,13 +214,13 @@ docker-push: .PHONY: docs docs: ## Regenerates flags in docs for all thanos commands. docs: $(EMBEDMD) build - @EMBEDMD_BIN="$(EMBEDMD)" SED_BIN="$(SED)" scripts/genflagdocs.sh + @EMBEDMD_BIN="$(EMBEDMD)" SED_BIN="$(SED)" THANOS_BIN="$(GOBIN)/thanos" scripts/genflagdocs.sh @find . -type f -name "*.md" | SED_BIN="$(SED)" xargs scripts/cleanup-white-noise.sh .PHONY: check-docs check-docs: ## checks docs against discrepancy with flags, links, white noise. check-docs: $(EMBEDMD) $(LICHE) build - @EMBEDMD_BIN="$(EMBEDMD)" SED_BIN="$(SED)" scripts/genflagdocs.sh check + @EMBEDMD_BIN="$(EMBEDMD)" SED_BIN="$(SED)" THANOS_BIN="$(GOBIN)/thanos" scripts/genflagdocs.sh check @$(LICHE) --recursive docs --exclude "(couchdb.apache.org/bylaws.html|cloud.tencent.com|alibabacloud.com|zoom.us)" --document-root . @$(LICHE) --exclude "goreportcard.com" --document-root . *.md @find . -type f -name "*.md" | SED_BIN="$(SED)" xargs scripts/cleanup-white-noise.sh diff --git a/docs/components/compact.md b/docs/components/compact.md index 4272a29cf1..1ca48e14a8 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -74,8 +74,7 @@ In order to achieve this co-ordination, blocks are not deleted directly. Instead ## Flags -[embedmd]: # "flags/compact.txt $" - +[embedmd]:# (flags/compact.txt $) ```$ usage: thanos compact [] diff --git a/docs/components/store.md b/docs/components/store.md index fa9b1546dd..74a70f348d 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -27,7 +27,7 @@ In general about 1MB of local disk space is required per TSDB block stored in th ## Flags -[embedmd]: # "flags/store.txt $" +[embedmd]:# (flags/store.txt $) ```$ usage: thanos store [] @@ -137,11 +137,51 @@ Flags: Prometheus relabel-config syntax. See format details: https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config - --consistency-delay=30m Minimum age of all blocks before they are being read. + --consistency-delay=0s Minimum age of all blocks before they are being + read. Set it to safe value (e.g 30m) if your + object storage is eventually consistent. GCS + and S3 are (roughly) strongly consistent. --ignore-deletion-marks-delay=24h - Duration after which the blocks marked for deletion will be filtered out while fetching blocks. - The idea of ignore-deletion-marks-delay is to ignore blocks that are marked for deletion with some delay. This ensures store can still serve blocks that are meant to be deleted but do not have a replacement yet. If delete-delay duration is provided to compactor or bucket verify component, it will upload deletion-mark.json file to mark after what duration the block should be deleted rather than deleting the block straight away. - If delete-delay is non-zero for compactor or bucket verify component, ignore-deletion-marks-delay should be set to (delete-delay)/2 so that blocks marked for deletion are filtered out while fetching blocks before being deleted from bucket. Default is 24h, half of the default value for --delete-delay on compactor. + Duration after which the blocks marked for + deletion will be filtered out while fetching + blocks. The idea of ignore-deletion-marks-delay + is to ignore blocks that are marked for + deletion with some delay. This ensures store + can still serve blocks that are meant to be + deleted but do not have a replacement yet. If + delete-delay duration is provided to compactor + or bucket verify component, it will upload + deletion-mark.json file to mark after what + duration the block should be deleted rather + than deleting the block straight away. If + delete-delay is non-zero for compactor or + bucket verify component, + ignore-deletion-marks-delay should be set to + (delete-delay)/2 so that blocks marked for + deletion are filtered out while fetching blocks + before being deleted from bucket. Default is + 24h, half of the default value for + --delete-delay on compactor. + --web.external-prefix="" Static prefix for all HTML links and redirect + URLs in the bucket web UI interface. Actual + endpoints are still served on / or the + web.route-prefix. This allows thanos bucket web + UI to be served behind a reverse proxy that + strips a URL sub-path. + --web.prefix-header="" Name of HTTP request header used for dynamic + prefixing of UI links and redirects. This + option is ignored if web.external-prefix + argument is set. Security risk: enable this + option only if a reverse proxy in front of + thanos is resetting the header. The + --web.prefix-header=X-Forwarded-Prefix option + can be useful, for example, if Thanos UI is + served via Traefik reverse proxy with + PathPrefixStrip option enabled, which sends the + stripped prefix value in X-Forwarded-Prefix + header. This allows thanos UI to be served on a + sub-path. + ``` ## Time based partitioning diff --git a/scripts/genflagdocs.sh b/scripts/genflagdocs.sh index 5e4543ce51..108f8920f0 100755 --- a/scripts/genflagdocs.sh +++ b/scripts/genflagdocs.sh @@ -6,6 +6,7 @@ set -u EMBEDMD_BIN=${EMBEDMD_BIN:-embedmd} SED_BIN=${SED_BIN:-sed} +THANOS_BIN=${THANOS_BIN:-${GOBIN}/thanos} function docs { # If check arg was passed, instead of the docs generation verifies if docs coincide with the codebase. @@ -38,17 +39,17 @@ CHECK=${1:-} commands=("compact" "query" "rule" "sidecar" "store" "tools") for x in "${commands[@]}"; do - ./thanos "${x}" --help &> "docs/components/flags/${x}.txt" + ${THANOS_BIN} "${x}" --help &> "docs/components/flags/${x}.txt" done toolsCommands=("bucket" "rules-check") for x in "${toolsCommands[@]}"; do - ./thanos tools "${x}" --help &> "docs/components/flags/tools_${x}.txt" + ${THANOS_BIN} tools "${x}" --help &> "docs/components/flags/tools_${x}.txt" done toolsBucketCommands=("verify" "ls" "inspect" "web" "replicate" "downsample") for x in "${toolsBucketCommands[@]}"; do - ./thanos tools bucket "${x}" --help &> "docs/components/flags/tools_bucket_${x}.txt" + ${THANOS_BIN} tools bucket "${x}" --help &> "docs/components/flags/tools_bucket_${x}.txt" done # Remove white noise. From ecf78247e920698e3c6942cde51f71b01ef5f2d6 Mon Sep 17 00:00:00 2001 From: Frederic Branczyk Date: Fri, 15 May 2020 10:28:29 +0200 Subject: [PATCH 2/8] mixin: Alert on receive not uploading recent data (#2612) Signed-off-by: Frederic Branczyk --- examples/alerts/alerts.md | 8 ++++++++ examples/alerts/alerts.yaml | 8 ++++++++ mixin/thanos/alerts/receive.libsonnet | 11 +++++++++++ 3 files changed, 27 insertions(+) diff --git a/examples/alerts/alerts.md b/examples/alerts/alerts.md index 61098e7cee..76c435af22 100644 --- a/examples/alerts/alerts.md +++ b/examples/alerts/alerts.md @@ -447,6 +447,14 @@ rules: for: 5m labels: severity: warning +- alert: ThanosReceiveNoUpload + annotations: + message: Thanos Receive {{$labels.job}} has not uploaded latest data to object + storage. + expr: increase(thanos_shipper_uploads_total{job=~"thanos-receive.*"}[2h]) == 0 + for: 30m + labels: + severity: warning ``` ## Replicate diff --git a/examples/alerts/alerts.yaml b/examples/alerts/alerts.yaml index f9c3d09945..d83c1e107d 100644 --- a/examples/alerts/alerts.yaml +++ b/examples/alerts/alerts.yaml @@ -210,6 +210,14 @@ groups: for: 5m labels: severity: warning + - alert: ThanosReceiveNoUpload + annotations: + message: Thanos Receive {{$labels.job}} has not uploaded latest data to object + storage. + expr: increase(thanos_shipper_uploads_total{job=~"thanos-receive.*"}[2h]) == 0 + for: 30m + labels: + severity: warning - name: thanos-sidecar.rules rules: - alert: ThanosSidecarPrometheusDown diff --git a/mixin/thanos/alerts/receive.libsonnet b/mixin/thanos/alerts/receive.libsonnet index a52bc193de..9750681cf7 100644 --- a/mixin/thanos/alerts/receive.libsonnet +++ b/mixin/thanos/alerts/receive.libsonnet @@ -93,6 +93,17 @@ severity: 'warning', }, }, + { + alert: 'ThanosReceiveNoUpload', + annotations: { + message: 'Thanos Receive {{$labels.job}} has not uploaded latest data to object storage.', + }, + expr: 'increase(thanos_shipper_uploads_total{%(selector)s}[2h]) == 0' % thanos.receive, + 'for': '30m', + labels: { + severity: 'warning', + }, + }, ], }, ], From 2659848c3bc167522e255f67b99edd49606ff4b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Fri, 15 May 2020 16:39:49 +0200 Subject: [PATCH 3/8] Metadata caching in bucket (#2579) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Added caching for Iter. Signed-off-by: Peter Štibraný * Added cache for Exists call for meta-files. Signed-off-by: Peter Štibraný * Added cache for reading block metadata files. Signed-off-by: Peter Štibraný * Make caching bucket configurable with different caches for different type of objects. Signed-off-by: Peter Štibraný * Fixed tests. Signed-off-by: Peter Štibraný * Added caching for ObjectSize. Enabled caching of index. Signed-off-by: Peter Štibraný * Lint feedback. Signed-off-by: Peter Štibraný * Use single set of metrics for all operations. Signed-off-by: Peter Štibraný * Constants. Signed-off-by: Peter Štibraný * Use operation specific config. Generic configuration is only for user. Signed-off-by: Peter Štibraný * Fix typo, make lint happy. Signed-off-by: Peter Štibraný * Simplify constants. Signed-off-by: Peter Štibraný * Simplify caching configuration. Signed-off-by: Peter Štibraný * Refactor cache configuration. Configuration is now passed to the cache when created. Signed-off-by: Peter Štibraný * Review feedback. Signed-off-by: Peter Štibraný * Fix operationRequests and operationHits for getRange. Signed-off-by: Peter Štibraný * Make codec for Iter results configurable. Signed-off-by: Peter Štibraný * Added header. Signed-off-by: Peter Štibraný * Renamed "dir" config to "blocks-iter". Signed-off-by: Peter Štibraný * Bump default values for meta exists/doesntExist ttls. Signed-off-by: Peter Štibraný * Removed example how cache could be configured for index. Signed-off-by: Peter Štibraný * Address review feedback. Signed-off-by: Peter Štibraný * Get now implements streaming reader, and buffers object in memory. Signed-off-by: Peter Štibraný * Added test for partial read. Signed-off-by: Peter Štibraný * Removed unused function. Signed-off-by: Peter Štibraný --- pkg/store/cache/caching_bucket.go | 434 ++++++++++++++++------ pkg/store/cache/caching_bucket_config.go | 208 +++++++++++ pkg/store/cache/caching_bucket_factory.go | 78 +++- pkg/store/cache/caching_bucket_test.go | 350 ++++++++++++++++- 4 files changed, 935 insertions(+), 135 deletions(-) create mode 100644 pkg/store/cache/caching_bucket_config.go diff --git a/pkg/store/cache/caching_bucket.go b/pkg/store/cache/caching_bucket.go index 2ce7e6dd11..c725719a73 100644 --- a/pkg/store/cache/caching_bucket.go +++ b/pkg/store/cache/caching_bucket.go @@ -4,16 +4,19 @@ package storecache import ( + "bytes" "context" "encoding/binary" + "encoding/json" "fmt" "io" "io/ioutil" - "regexp" + "strconv" "sync" "time" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -28,86 +31,82 @@ import ( const ( originCache = "cache" originBucket = "bucket" -) - -type CachingBucketConfig struct { - // Basic unit used to cache chunks. - ChunkSubrangeSize int64 `yaml:"chunk_subrange_size"` - - // Maximum number of GetRange requests issued by this bucket for single GetRange call. Zero or negative value = unlimited. - MaxChunksGetRangeRequests int `yaml:"max_chunks_get_range_requests"` - // TTLs for various cache items. - ChunkObjectSizeTTL time.Duration `yaml:"chunk_object_size_ttl"` - ChunkSubrangeTTL time.Duration `yaml:"chunk_subrange_ttl"` -} + opGet = "get" + opGetRange = "getrange" + opIter = "iter" + opExists = "exists" + opObjectSize = "objectsize" +) -func DefaultCachingBucketConfig() CachingBucketConfig { - return CachingBucketConfig{ - ChunkSubrangeSize: 16000, // Equal to max chunk size. - ChunkObjectSizeTTL: 24 * time.Hour, - ChunkSubrangeTTL: 24 * time.Hour, - MaxChunksGetRangeRequests: 3, - } -} +var errObjNotFound = errors.Errorf("object not found") -// Bucket implementation that provides some caching features, using knowledge about how Thanos accesses data. +// CachingBucket implementation that provides some caching features, based on passed configuration. type CachingBucket struct { objstore.Bucket - cache cache.Cache - - config CachingBucketConfig - + cfg *CachingBucketConfig logger log.Logger - requestedChunkBytes prometheus.Counter - fetchedChunkBytes *prometheus.CounterVec - refetchedChunkBytes *prometheus.CounterVec + requestedGetRangeBytes *prometheus.CounterVec + fetchedGetRangeBytes *prometheus.CounterVec + refetchedGetRangeBytes *prometheus.CounterVec - objectSizeRequests prometheus.Counter - objectSizeHits prometheus.Counter + operationConfigs map[string][]*operationConfig + operationRequests *prometheus.CounterVec + operationHits *prometheus.CounterVec } -func NewCachingBucket(b objstore.Bucket, c cache.Cache, chunks CachingBucketConfig, logger log.Logger, reg prometheus.Registerer) (*CachingBucket, error) { +// NewCachingBucket creates new caching bucket with provided configuration. Configuration should not be +// changed after creating caching bucket. +func NewCachingBucket(b objstore.Bucket, cfg *CachingBucketConfig, logger log.Logger, reg prometheus.Registerer) (*CachingBucket, error) { if b == nil { return nil, errors.New("bucket is nil") } - if c == nil { - return nil, errors.New("cache is nil") - } cb := &CachingBucket{ Bucket: b, - config: chunks, - cache: c, + cfg: cfg, logger: logger, - requestedChunkBytes: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_store_bucket_cache_requested_chunk_bytes_total", - Help: "Total number of requested bytes for chunk data.", - }), - fetchedChunkBytes: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "thanos_store_bucket_cache_fetched_chunk_bytes_total", - Help: "Total number of fetched chunk bytes. Data from bucket is then stored to cache.", - }, []string{"origin"}), - refetchedChunkBytes: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "thanos_store_bucket_cache_refetched_chunk_bytes_total", - Help: "Total number of chunk bytes re-fetched from storage, despite being in cache already.", - }, []string{"origin"}), - objectSizeRequests: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_store_bucket_cache_objectsize_requests_total", - Help: "Number of object size requests for objects.", - }), - objectSizeHits: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_store_bucket_cache_objectsize_hits_total", - Help: "Number of object size hits for objects.", - }), - } - - cb.fetchedChunkBytes.WithLabelValues(originBucket) - cb.fetchedChunkBytes.WithLabelValues(originCache) - cb.refetchedChunkBytes.WithLabelValues(originCache) + operationConfigs: map[string][]*operationConfig{}, + + requestedGetRangeBytes: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_bucket_cache_getrange_requested_bytes_total", + Help: "Total number of bytes requested via GetRange.", + }, []string{"config"}), + fetchedGetRangeBytes: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_bucket_cache_getrange_fetched_bytes_total", + Help: "Total number of bytes fetched because of GetRange operation. Data from bucket is then stored to cache.", + }, []string{"origin", "config"}), + refetchedGetRangeBytes: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_bucket_cache_getrange_refetched_bytes_total", + Help: "Total number of bytes re-fetched from storage because of GetRange operation, despite being in cache already.", + }, []string{"origin", "config"}), + + operationRequests: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_bucket_cache_operation_requests_total", + Help: "Number of requested operations matching given config.", + }, []string{"operation", "config"}), + operationHits: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_bucket_cache_operation_hits_total", + Help: "Number of operations served from cache for given config.", + }, []string{"operation", "config"}), + } + + for op, names := range cfg.allConfigNames() { + for _, n := range names { + cb.operationRequests.WithLabelValues(op, n) + cb.operationHits.WithLabelValues(op, n) + + if op == opGetRange { + cb.requestedGetRangeBytes.WithLabelValues(n) + cb.fetchedGetRangeBytes.WithLabelValues(originCache, n) + cb.fetchedGetRangeBytes.WithLabelValues(originBucket, n) + cb.refetchedGetRangeBytes.WithLabelValues(originCache, n) + } + } + } return cb, nil } @@ -132,35 +131,183 @@ func (cb *CachingBucket) ReaderWithExpectedErrs(expectedFunc objstore.IsOpFailur return cb.WithExpectedErrs(expectedFunc) } -var chunksMatcher = regexp.MustCompile(`^.*/chunks/\d+$`) +func (cb *CachingBucket) Iter(ctx context.Context, dir string, f func(string) error) error { + cfgName, cfg := cb.cfg.findIterConfig(dir) + if cfg == nil { + return cb.Bucket.Iter(ctx, dir, f) + } -func isTSDBChunkFile(name string) bool { - return chunksMatcher.MatchString(name) + cb.operationRequests.WithLabelValues(opIter, cfgName).Inc() + + key := cachingKeyIter(dir) + data := cfg.cache.Fetch(ctx, []string{key}) + if data[key] != nil { + list, err := cfg.codec.Decode(data[key]) + if err == nil { + cb.operationHits.WithLabelValues(opIter, cfgName).Inc() + for _, n := range list { + if err := f(n); err != nil { + return err + } + } + return nil + } + level.Warn(cb.logger).Log("msg", "failed to decode cached Iter result", "key", key, "err", err) + } + + // Iteration can take a while (esp. since it calls function), and iterTTL is generally low. + // We will compute TTL based time when iteration started. + iterTime := time.Now() + var list []string + err := cb.Bucket.Iter(ctx, dir, func(s string) error { + list = append(list, s) + return f(s) + }) + + remainingTTL := cfg.ttl - time.Since(iterTime) + if err == nil && remainingTTL > 0 { + data, encErr := cfg.codec.Encode(list) + if encErr == nil { + cfg.cache.Store(ctx, map[string][]byte{key: data}, remainingTTL) + return nil + } + level.Warn(cb.logger).Log("msg", "failed to encode Iter result", "key", key, "err", encErr) + } + return err +} + +func (cb *CachingBucket) Exists(ctx context.Context, name string) (bool, error) { + cfgName, cfg := cb.cfg.findExistConfig(name) + if cfg == nil { + return cb.Bucket.Exists(ctx, name) + } + + cb.operationRequests.WithLabelValues(opExists, cfgName).Inc() + + key := cachingKeyExists(name) + hits := cfg.cache.Fetch(ctx, []string{key}) + + if ex := hits[key]; ex != nil { + exists, err := strconv.ParseBool(string(ex)) + if err == nil { + cb.operationHits.WithLabelValues(opExists, cfgName).Inc() + return exists, nil + } + level.Warn(cb.logger).Log("msg", "unexpected cached 'exists' value", "key", key, "val", string(ex)) + } + + existsTime := time.Now() + ok, err := cb.Bucket.Exists(ctx, name) + if err == nil { + storeExistsCacheEntry(ctx, key, ok, existsTime, cfg.cache, cfg.existsTTL, cfg.doesntExistTTL) + } + + return ok, err +} + +func storeExistsCacheEntry(ctx context.Context, cachingKey string, exists bool, ts time.Time, cache cache.Cache, existsTTL, doesntExistTTL time.Duration) { + var ttl time.Duration + if exists { + ttl = existsTTL - time.Since(ts) + } else { + ttl = doesntExistTTL - time.Since(ts) + } + + if ttl > 0 { + cache.Store(ctx, map[string][]byte{cachingKey: []byte(strconv.FormatBool(exists))}, ttl) + } +} + +func (cb *CachingBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { + cfgName, cfg := cb.cfg.findGetConfig(name) + if cfg == nil { + return cb.Bucket.Get(ctx, name) + } + + cb.operationRequests.WithLabelValues(opGet, cfgName).Inc() + + contentKey := cachingKeyContent(name) + existsKey := cachingKeyExists(name) + + hits := cfg.cache.Fetch(ctx, []string{contentKey, existsKey}) + if hits[contentKey] != nil { + cb.operationHits.WithLabelValues(opGet, cfgName).Inc() + return ioutil.NopCloser(bytes.NewReader(hits[contentKey])), nil + } + + // If we know that file doesn't exist, we can return that. Useful for deletion marks. + if ex := hits[existsKey]; ex != nil { + if exists, err := strconv.ParseBool(string(ex)); err == nil && !exists { + cb.operationHits.WithLabelValues(opGet, cfgName).Inc() + return nil, errObjNotFound + } + } + + getTime := time.Now() + reader, err := cb.Bucket.Get(ctx, name) + if err != nil { + if cb.Bucket.IsObjNotFoundErr(err) { + // Cache that object doesn't exist. + storeExistsCacheEntry(ctx, existsKey, false, getTime, cfg.cache, cfg.existsTTL, cfg.doesntExistTTL) + } + + return nil, err + } + + storeExistsCacheEntry(ctx, existsKey, true, getTime, cfg.cache, cfg.existsTTL, cfg.doesntExistTTL) + return &getReader{ + c: cfg.cache, + ctx: ctx, + r: reader, + buf: new(bytes.Buffer), + startTime: getTime, + ttl: cfg.contentTTL, + cacheKey: contentKey, + maxSize: cfg.maxCacheableSize, + }, nil +} + +func (cb *CachingBucket) IsObjNotFoundErr(err error) bool { + return err == errObjNotFound || cb.Bucket.IsObjNotFoundErr(err) } func (cb *CachingBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { - if isTSDBChunkFile(name) && off >= 0 && length > 0 { - var ( - r io.ReadCloser - err error - ) - tracing.DoInSpan(ctx, "cachingbucket_getrange_chunkfile", func(ctx context.Context) { - r, err = cb.getRangeChunkFile(ctx, name, off, length) - }) - return r, err + if off < 0 || length <= 0 { + return cb.Bucket.GetRange(ctx, name, off, length) } - return cb.Bucket.GetRange(ctx, name, off, length) + cfgName, cfg := cb.cfg.findGetRangeConfig(name) + if cfg == nil { + return cb.Bucket.GetRange(ctx, name, off, length) + } + + var ( + r io.ReadCloser + err error + ) + tracing.DoInSpan(ctx, "cachingbucket_getrange", func(ctx context.Context) { + r, err = cb.cachedGetRange(ctx, name, off, length, cfgName, cfg) + }) + return r, err } -func (cb *CachingBucket) cachedObjectSize(ctx context.Context, name string, ttl time.Duration) (uint64, error) { +func (cb *CachingBucket) ObjectSize(ctx context.Context, name string) (uint64, error) { + cfgName, cfg := cb.cfg.findObjectSizeConfig(name) + if cfg == nil { + return cb.Bucket.ObjectSize(ctx, name) + } + + return cb.cachedObjectSize(ctx, name, cfgName, cfg.cache, cfg.ttl) +} + +func (cb *CachingBucket) cachedObjectSize(ctx context.Context, name string, cfgName string, cache cache.Cache, ttl time.Duration) (uint64, error) { key := cachingKeyObjectSize(name) - cb.objectSizeRequests.Add(1) + cb.operationRequests.WithLabelValues(opObjectSize, cfgName).Inc() - hits := cb.cache.Fetch(ctx, []string{key}) + hits := cache.Fetch(ctx, []string{key}) if s := hits[key]; len(s) == 8 { - cb.objectSizeHits.Add(1) + cb.operationHits.WithLabelValues(opObjectSize, cfgName).Inc() return binary.BigEndian.Uint64(s), nil } @@ -171,17 +318,18 @@ func (cb *CachingBucket) cachedObjectSize(ctx context.Context, name string, ttl var buf [8]byte binary.BigEndian.PutUint64(buf[:], size) - cb.cache.Store(ctx, map[string][]byte{key: buf[:]}, ttl) + cache.Store(ctx, map[string][]byte{key: buf[:]}, ttl) return size, nil } -func (cb *CachingBucket) getRangeChunkFile(ctx context.Context, name string, offset, length int64) (io.ReadCloser, error) { - cb.requestedChunkBytes.Add(float64(length)) +func (cb *CachingBucket) cachedGetRange(ctx context.Context, name string, offset, length int64, cfgName string, cfg *getRangeConfig) (io.ReadCloser, error) { + cb.operationRequests.WithLabelValues(opGetRange, cfgName).Inc() + cb.requestedGetRangeBytes.WithLabelValues(cfgName).Add(float64(length)) - size, err := cb.cachedObjectSize(ctx, name, cb.config.ChunkObjectSizeTTL) + size, err := cb.cachedObjectSize(ctx, name, cfgName, cfg.cache, cfg.objectSizeTTL) if err != nil { - return nil, errors.Wrapf(err, "failed to get size of chunk file: %s", name) + return nil, errors.Wrapf(err, "failed to get size of object: %s", name) } // If length goes over object size, adjust length. We use it later to limit number of read bytes. @@ -190,30 +338,32 @@ func (cb *CachingBucket) getRangeChunkFile(ctx context.Context, name string, off } // Start and end range are subrange-aligned offsets into object, that we're going to read. - startRange := (offset / cb.config.ChunkSubrangeSize) * cb.config.ChunkSubrangeSize - endRange := ((offset + length) / cb.config.ChunkSubrangeSize) * cb.config.ChunkSubrangeSize - if (offset+length)%cb.config.ChunkSubrangeSize > 0 { - endRange += cb.config.ChunkSubrangeSize + startRange := (offset / cfg.subrangeSize) * cfg.subrangeSize + endRange := ((offset + length) / cfg.subrangeSize) * cfg.subrangeSize + if (offset+length)%cfg.subrangeSize > 0 { + endRange += cfg.subrangeSize } // The very last subrange in the object may have length that is not divisible by subrange size. - lastSubrangeOffset := endRange - cb.config.ChunkSubrangeSize - lastSubrangeLength := int(cb.config.ChunkSubrangeSize) + lastSubrangeOffset := endRange - cfg.subrangeSize + lastSubrangeLength := int(cfg.subrangeSize) if uint64(endRange) > size { - lastSubrangeOffset = (int64(size) / cb.config.ChunkSubrangeSize) * cb.config.ChunkSubrangeSize + lastSubrangeOffset = (int64(size) / cfg.subrangeSize) * cfg.subrangeSize lastSubrangeLength = int(int64(size) - lastSubrangeOffset) } - numSubranges := (endRange - startRange) / cb.config.ChunkSubrangeSize + numSubranges := (endRange - startRange) / cfg.subrangeSize offsetKeys := make(map[int64]string, numSubranges) keys := make([]string, 0, numSubranges) - for off := startRange; off < endRange; off += cb.config.ChunkSubrangeSize { - end := off + cb.config.ChunkSubrangeSize + totalRequestedBytes := int64(0) + for off := startRange; off < endRange; off += cfg.subrangeSize { + end := off + cfg.subrangeSize if end > int64(size) { end = int64(size) } + totalRequestedBytes += (end - off) k := cachingKeyObjectSubrange(name, off, end) keys = append(keys, k) @@ -221,44 +371,47 @@ func (cb *CachingBucket) getRangeChunkFile(ctx context.Context, name string, off } // Try to get all subranges from the cache. - hits := cb.cache.Fetch(ctx, keys) + totalCachedBytes := int64(0) + hits := cfg.cache.Fetch(ctx, keys) for _, b := range hits { - cb.fetchedChunkBytes.WithLabelValues(originCache).Add(float64(len(b))) + totalCachedBytes += int64(len(b)) } + cb.fetchedGetRangeBytes.WithLabelValues(originCache, cfgName).Add(float64(totalCachedBytes)) + cb.operationHits.WithLabelValues(opGetRange, cfgName).Add(float64(len(hits)) / float64(len(keys))) if len(hits) < len(keys) { if hits == nil { hits = map[string][]byte{} } - err := cb.fetchMissingChunkSubranges(ctx, name, startRange, endRange, offsetKeys, hits, lastSubrangeOffset, lastSubrangeLength) + err := cb.fetchMissingSubranges(ctx, name, startRange, endRange, offsetKeys, hits, lastSubrangeOffset, lastSubrangeLength, cfgName, cfg) if err != nil { return nil, err } } - return ioutil.NopCloser(newSubrangesReader(cb.config.ChunkSubrangeSize, offsetKeys, hits, offset, length)), nil + return ioutil.NopCloser(newSubrangesReader(cfg.subrangeSize, offsetKeys, hits, offset, length)), nil } type rng struct { start, end int64 } -// fetchMissingChunkSubranges fetches missing subranges, stores them into "hits" map +// fetchMissingSubranges fetches missing subranges, stores them into "hits" map // and into cache as well (using provided cacheKeys). -func (cb *CachingBucket) fetchMissingChunkSubranges(ctx context.Context, name string, startRange, endRange int64, cacheKeys map[int64]string, hits map[string][]byte, lastSubrangeOffset int64, lastSubrangeLength int) error { +func (cb *CachingBucket) fetchMissingSubranges(ctx context.Context, name string, startRange, endRange int64, cacheKeys map[int64]string, hits map[string][]byte, lastSubrangeOffset int64, lastSubrangeLength int, cfgName string, cfg *getRangeConfig) error { // Ordered list of missing sub-ranges. var missing []rng - for off := startRange; off < endRange; off += cb.config.ChunkSubrangeSize { + for off := startRange; off < endRange; off += cfg.subrangeSize { if hits[cacheKeys[off]] == nil { - missing = append(missing, rng{start: off, end: off + cb.config.ChunkSubrangeSize}) + missing = append(missing, rng{start: off, end: off + cfg.subrangeSize}) } } missing = mergeRanges(missing, 0) // Merge adjacent ranges. // Keep merging until we have only max number of ranges (= requests). - for limit := cb.config.ChunkSubrangeSize; cb.config.MaxChunksGetRangeRequests > 0 && len(missing) > cb.config.MaxChunksGetRangeRequests; limit = limit * 2 { + for limit := cfg.subrangeSize; cfg.maxSubRequests > 0 && len(missing) > cfg.maxSubRequests; limit = limit * 2 { missing = mergeRanges(missing, limit) } @@ -275,7 +428,7 @@ func (cb *CachingBucket) fetchMissingChunkSubranges(ctx context.Context, name st } defer runutil.CloseWithLogOnErr(cb.logger, r, "fetching range [%d, %d]", m.start, m.end) - for off := m.start; off < m.end && gctx.Err() == nil; off += cb.config.ChunkSubrangeSize { + for off := m.start; off < m.end && gctx.Err() == nil; off += cfg.subrangeSize { key := cacheKeys[off] if key == "" { return errors.Errorf("fetching range [%d, %d]: caching key for offset %d not found", m.start, m.end, off) @@ -288,7 +441,7 @@ func (cb *CachingBucket) fetchMissingChunkSubranges(ctx context.Context, name st // if object length isn't divisible by subrange size. subrangeData = make([]byte, lastSubrangeLength) } else { - subrangeData = make([]byte, cb.config.ChunkSubrangeSize) + subrangeData = make([]byte, cfg.subrangeSize) } _, err := io.ReadFull(r, subrangeData) if err != nil { @@ -304,10 +457,10 @@ func (cb *CachingBucket) fetchMissingChunkSubranges(ctx context.Context, name st hitsMutex.Unlock() if storeToCache { - cb.fetchedChunkBytes.WithLabelValues(originBucket).Add(float64(len(subrangeData))) - cb.cache.Store(gctx, map[string][]byte{key: subrangeData}, cb.config.ChunkSubrangeTTL) + cb.fetchedGetRangeBytes.WithLabelValues(originBucket, cfgName).Add(float64(len(subrangeData))) + cfg.cache.Store(gctx, map[string][]byte{key: subrangeData}, cfg.subrangeTTL) } else { - cb.refetchedChunkBytes.WithLabelValues(originCache).Add(float64(len(subrangeData))) + cb.refetchedGetRangeBytes.WithLabelValues(originCache, cfgName).Add(float64(len(subrangeData))) } } @@ -344,6 +497,18 @@ func cachingKeyObjectSubrange(name string, start int64, end int64) string { return fmt.Sprintf("subrange:%s:%d:%d", name, start, end) } +func cachingKeyIter(name string) string { + return fmt.Sprintf("iter:%s", name) +} + +func cachingKeyExists(name string) string { + return fmt.Sprintf("exists:%s", name) +} + +func cachingKeyContent(name string) string { + return fmt.Sprintf("content:%s", name) +} + // Reader implementation that uses in-memory subranges. type subrangesReader struct { subrangeSize int64 @@ -409,3 +574,56 @@ func (c *subrangesReader) subrangeAt(offset int64) ([]byte, error) { } return b, nil } + +type getReader struct { + c cache.Cache + ctx context.Context + r io.ReadCloser + buf *bytes.Buffer + startTime time.Time + ttl time.Duration + cacheKey string + maxSize int +} + +func (g *getReader) Close() error { + // We don't know if entire object was read, don't store it here. + g.buf = nil + return g.r.Close() +} + +func (g *getReader) Read(p []byte) (n int, err error) { + n, err = g.r.Read(p) + if n > 0 && g.buf != nil { + if g.buf.Len()+n <= g.maxSize { + g.buf.Write(p[:n]) + } else { + // Object is larger than max size, stop caching. + g.buf = nil + } + } + + if err == io.EOF && g.buf != nil { + remainingTTL := g.ttl - time.Since(g.startTime) + if remainingTTL > 0 { + g.c.Store(g.ctx, map[string][]byte{g.cacheKey: g.buf.Bytes()}, remainingTTL) + } + // Clear reference, to avoid doing another Store on next read. + g.buf = nil + } + + return n, err +} + +// JSONIterCodec encodes iter results into JSON. Suitable for root dir. +type JSONIterCodec struct{} + +func (jic JSONIterCodec) Encode(files []string) ([]byte, error) { + return json.Marshal(files) +} + +func (jic JSONIterCodec) Decode(data []byte) ([]string, error) { + var list []string + err := json.Unmarshal(data, &list) + return list, err +} diff --git a/pkg/store/cache/caching_bucket_config.go b/pkg/store/cache/caching_bucket_config.go new file mode 100644 index 0000000000..dce0350fdf --- /dev/null +++ b/pkg/store/cache/caching_bucket_config.go @@ -0,0 +1,208 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package storecache + +import ( + "time" + + "github.com/thanos-io/thanos/pkg/cache" +) + +// Codec for encoding and decoding results of Iter call. +type IterCodec interface { + Encode(files []string) ([]byte, error) + Decode(cachedData []byte) ([]string, error) +} + +// CachingBucketConfig contains low-level configuration for individual bucket operations. +// This is not exposed to the user, but it is expected that code sets up individual +// operations based on user-provided configuration. +type CachingBucketConfig struct { + get map[string]*getConfig + iter map[string]*iterConfig + exists map[string]*existsConfig + getRange map[string]*getRangeConfig + objectSize map[string]*objectSizeConfig +} + +func NewCachingBucketConfig() *CachingBucketConfig { + return &CachingBucketConfig{ + get: map[string]*getConfig{}, + iter: map[string]*iterConfig{}, + exists: map[string]*existsConfig{}, + getRange: map[string]*getRangeConfig{}, + objectSize: map[string]*objectSizeConfig{}, + } +} + +// Generic config for single operation. +type operationConfig struct { + matcher func(name string) bool + cache cache.Cache +} + +// Operation-specific configs. +type iterConfig struct { + operationConfig + ttl time.Duration + codec IterCodec +} + +type existsConfig struct { + operationConfig + existsTTL time.Duration + doesntExistTTL time.Duration +} + +type getConfig struct { + existsConfig + contentTTL time.Duration + maxCacheableSize int +} + +type getRangeConfig struct { + operationConfig + subrangeSize int64 + maxSubRequests int + objectSizeTTL time.Duration + subrangeTTL time.Duration +} + +type objectSizeConfig struct { + operationConfig + ttl time.Duration +} + +func newOperationConfig(cache cache.Cache, matcher func(string) bool) operationConfig { + if cache == nil { + panic("cache") + } + if matcher == nil { + panic("matcher") + } + + return operationConfig{ + matcher: matcher, + cache: cache, + } +} + +// CacheIter configures caching of "Iter" operation for matching directories. +func (cfg *CachingBucketConfig) CacheIter(configName string, cache cache.Cache, matcher func(string) bool, ttl time.Duration, codec IterCodec) { + cfg.iter[configName] = &iterConfig{ + operationConfig: newOperationConfig(cache, matcher), + ttl: ttl, + codec: codec, + } +} + +// CacheGet configures caching of "Get" operation for matching files. Content of the object is cached, as well as whether object exists or not. +func (cfg *CachingBucketConfig) CacheGet(configName string, cache cache.Cache, matcher func(string) bool, maxCacheableSize int, contentTTL, existsTTL, doesntExistTTL time.Duration) { + cfg.get[configName] = &getConfig{ + existsConfig: existsConfig{ + operationConfig: newOperationConfig(cache, matcher), + existsTTL: existsTTL, + doesntExistTTL: doesntExistTTL, + }, + contentTTL: contentTTL, + maxCacheableSize: maxCacheableSize, + } +} + +// CacheExists configures caching of "Exists" operation for matching files. Negative values are cached as well. +func (cfg *CachingBucketConfig) CacheExists(configName string, cache cache.Cache, matcher func(string) bool, existsTTL, doesntExistTTL time.Duration) { + cfg.exists[configName] = &existsConfig{ + operationConfig: newOperationConfig(cache, matcher), + existsTTL: existsTTL, + doesntExistTTL: doesntExistTTL, + } +} + +// CacheGetRange configures caching of "GetRange" operation. Subranges (aligned on subrange size) are cached individually. +// Since caching operation needs to know the object size to compute correct subranges, object size is cached as well. +// Single "GetRange" requests can result in multiple smaller GetRange sub-requests issued on the underlying bucket. +// MaxSubRequests specifies how many such subrequests may be issued. Values <= 0 mean there is no limit (requests +// for adjacent missing subranges are still merged). +func (cfg *CachingBucketConfig) CacheGetRange(configName string, cache cache.Cache, matcher func(string) bool, subrangeSize int64, objectSizeTTL, subrangeTTL time.Duration, maxSubRequests int) { + cfg.getRange[configName] = &getRangeConfig{ + operationConfig: newOperationConfig(cache, matcher), + subrangeSize: subrangeSize, + objectSizeTTL: objectSizeTTL, + subrangeTTL: subrangeTTL, + maxSubRequests: maxSubRequests, + } +} + +// CacheObjectSize configures caching of "ObjectSize" operation for matching files. +func (cfg *CachingBucketConfig) CacheObjectSize(configName string, cache cache.Cache, matcher func(name string) bool, ttl time.Duration) { + cfg.objectSize[configName] = &objectSizeConfig{ + operationConfig: newOperationConfig(cache, matcher), + ttl: ttl, + } +} + +func (cfg *CachingBucketConfig) allConfigNames() map[string][]string { + result := map[string][]string{} + for n := range cfg.get { + result[opGet] = append(result[opGet], n) + } + for n := range cfg.iter { + result[opIter] = append(result[opIter], n) + } + for n := range cfg.exists { + result[opExists] = append(result[opExists], n) + } + for n := range cfg.getRange { + result[opGetRange] = append(result[opGetRange], n) + } + for n := range cfg.objectSize { + result[opObjectSize] = append(result[opObjectSize], n) + } + return result +} + +func (cfg *CachingBucketConfig) findIterConfig(dir string) (string, *iterConfig) { + for n, cfg := range cfg.iter { + if cfg.matcher(dir) { + return n, cfg + } + } + return "", nil +} + +func (cfg *CachingBucketConfig) findExistConfig(name string) (string, *existsConfig) { + for n, cfg := range cfg.exists { + if cfg.matcher(name) { + return n, cfg + } + } + return "", nil +} + +func (cfg *CachingBucketConfig) findGetConfig(name string) (string, *getConfig) { + for n, cfg := range cfg.get { + if cfg.matcher(name) { + return n, cfg + } + } + return "", nil +} + +func (cfg *CachingBucketConfig) findGetRangeConfig(name string) (string, *getRangeConfig) { + for n, cfg := range cfg.getRange { + if cfg.matcher(name) { + return n, cfg + } + } + return "", nil +} + +func (cfg *CachingBucketConfig) findObjectSizeConfig(name string) (string, *objectSizeConfig) { + for n, cfg := range cfg.objectSize { + if cfg.matcher(name) { + return n, cfg + } + } + return "", nil +} diff --git a/pkg/store/cache/caching_bucket_factory.go b/pkg/store/cache/caching_bucket_factory.go index 0bc0a78458..3dbd60276e 100644 --- a/pkg/store/cache/caching_bucket_factory.go +++ b/pkg/store/cache/caching_bucket_factory.go @@ -4,38 +4,71 @@ package storecache import ( + "regexp" + "strings" + "time" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "gopkg.in/yaml.v2" + "github.com/thanos-io/thanos/pkg/block/metadata" cache "github.com/thanos-io/thanos/pkg/cache" "github.com/thanos-io/thanos/pkg/cacheutil" + "github.com/thanos-io/thanos/pkg/model" "github.com/thanos-io/thanos/pkg/objstore" ) // BucketCacheProvider is a type used to evaluate all bucket cache providers. type BucketCacheProvider string -const ( - MemcachedBucketCacheProvider BucketCacheProvider = "memcached" // Memcached cache-provider for caching bucket. -) +const MemcachedBucketCacheProvider BucketCacheProvider = "memcached" // Memcached cache-provider for caching bucket. -// CachingBucketWithBackendConfig is a configuration of caching bucket used by Store component. -type CachingBucketWithBackendConfig struct { +// CachingWithBackendConfig is a configuration of caching bucket used by Store component. +type CachingWithBackendConfig struct { Type BucketCacheProvider `yaml:"backend"` BackendConfig interface{} `yaml:"backend_config"` - CachingBucketConfig CachingBucketConfig `yaml:"caching_config"` + // Basic unit used to cache chunks. + ChunkSubrangeSize int64 `yaml:"chunk_subrange_size"` + + // Maximum number of GetRange requests issued by this bucket for single GetRange call. Zero or negative value = unlimited. + MaxChunksGetRangeRequests int `yaml:"max_chunks_get_range_requests"` + + // TTLs for various cache items. + ChunkObjectSizeTTL time.Duration `yaml:"chunk_object_size_ttl"` + ChunkSubrangeTTL time.Duration `yaml:"chunk_subrange_ttl"` + + // How long to cache result of Iter call in root directory. + BlocksIterTTL time.Duration `yaml:"blocks_iter_ttl"` + + // Config for Exists and Get operations for metadata files. + MetafileExistsTTL time.Duration `yaml:"metafile_exists_ttl"` + MetafileDoesntExistTTL time.Duration `yaml:"metafile_doesnt_exist_ttl"` + MetafileContentTTL time.Duration `yaml:"metafile_content_ttl"` + MetafileMaxSize model.Bytes `yaml:"metafile_max_size"` +} + +func (cfg *CachingWithBackendConfig) Defaults() { + cfg.ChunkSubrangeSize = 16000 // Equal to max chunk size. + cfg.ChunkObjectSizeTTL = 24 * time.Hour + cfg.ChunkSubrangeTTL = 24 * time.Hour + cfg.MaxChunksGetRangeRequests = 3 + cfg.BlocksIterTTL = 5 * time.Minute + cfg.MetafileExistsTTL = 2 * time.Hour + cfg.MetafileDoesntExistTTL = 15 * time.Minute + cfg.MetafileContentTTL = 24 * time.Hour + cfg.MetafileMaxSize = 1024 * 1024 // Equal to default MaxItemSize in memcached client. } // NewCachingBucketFromYaml uses YAML configuration to create new caching bucket. func NewCachingBucketFromYaml(yamlContent []byte, bucket objstore.Bucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) { level.Info(logger).Log("msg", "loading caching bucket configuration") - config := &CachingBucketWithBackendConfig{} - config.CachingBucketConfig = DefaultCachingBucketConfig() + config := &CachingWithBackendConfig{} + config.Defaults() if err := yaml.UnmarshalStrict(yamlContent, config); err != nil { return nil, errors.Wrap(err, "parsing config YAML file") @@ -60,5 +93,32 @@ func NewCachingBucketFromYaml(yamlContent []byte, bucket objstore.Bucket, logger return nil, errors.Errorf("unsupported cache type: %s", config.Type) } - return NewCachingBucket(bucket, c, config.CachingBucketConfig, logger, reg) + cfg := NewCachingBucketConfig() + + // Configure cache. + cfg.CacheGetRange("chunks", c, isTSDBChunkFile, config.ChunkSubrangeSize, config.ChunkObjectSizeTTL, config.ChunkSubrangeTTL, config.MaxChunksGetRangeRequests) + cfg.CacheExists("meta.jsons", c, isMetaFile, config.MetafileExistsTTL, config.MetafileDoesntExistTTL) + cfg.CacheGet("meta.jsons", c, isMetaFile, int(config.MetafileMaxSize), config.MetafileContentTTL, config.MetafileExistsTTL, config.MetafileDoesntExistTTL) + + // Cache Iter requests for root. + cfg.CacheIter("blocks-iter", c, isBlocksRootDir, config.BlocksIterTTL, JSONIterCodec{}) + + cb, err := NewCachingBucket(bucket, cfg, logger, reg) + if err != nil { + return nil, err + } + + return cb, nil +} + +var chunksMatcher = regexp.MustCompile(`^.*/chunks/\d+$`) + +func isTSDBChunkFile(name string) bool { return chunksMatcher.MatchString(name) } + +func isMetaFile(name string) bool { + return strings.HasSuffix(name, "/"+metadata.MetaFilename) || strings.HasSuffix(name, "/"+metadata.DeletionMarkFilename) +} + +func isBlocksRootDir(name string) bool { + return name == "" } diff --git a/pkg/store/cache/caching_bucket_test.go b/pkg/store/cache/caching_bucket_test.go index 2bbc37c887..d392d9066e 100644 --- a/pkg/store/cache/caching_bucket_test.go +++ b/pkg/store/cache/caching_bucket_test.go @@ -9,6 +9,8 @@ import ( "fmt" "io" "io/ioutil" + "sort" + "strings" "sync" "testing" "time" @@ -17,10 +19,13 @@ import ( promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/testutil" ) -func TestCachingBucket(t *testing.T) { +const testFilename = "/random_object" + +func TestChunksCaching(t *testing.T) { length := int64(1024 * 1024) subrangeSize := int64(16000) // All tests are based on this value. @@ -35,7 +40,7 @@ func TestCachingBucket(t *testing.T) { testutil.Ok(t, inmem.Upload(context.Background(), name, bytes.NewReader(data))) // We reuse cache between tests (!) - cache := &mockCache{cache: make(map[string][]byte)} + cache := newMockCache() // Warning, these tests must be run in order, they depend cache state from previous test. for _, tc := range []struct { @@ -106,7 +111,7 @@ func TestCachingBucket(t *testing.T) { expectedFetchedBytes: length, expectedCachedBytes: 0, // Cache is flushed. init: func() { - cache.cache = map[string][]byte{} // Flush cache. + cache.flush() }, }, @@ -217,24 +222,21 @@ func TestCachingBucket(t *testing.T) { tc.init() } - cfg := DefaultCachingBucketConfig() - cfg.ChunkSubrangeSize = subrangeSize - cfg.MaxChunksGetRangeRequests = tc.maxGetRangeRequests + cfg := NewCachingBucketConfig() + cfg.CacheGetRange("chunks", cache, isTSDBChunkFile, subrangeSize, time.Hour, time.Hour, tc.maxGetRangeRequests) - cachingBucket, err := NewCachingBucket(inmem, cache, cfg, nil, nil) + cachingBucket, err := NewCachingBucket(inmem, cfg, nil, nil) testutil.Ok(t, err) verifyGetRange(t, cachingBucket, name, tc.offset, tc.length, tc.expectedLength) - testutil.Equals(t, tc.expectedCachedBytes, int64(promtest.ToFloat64(cachingBucket.fetchedChunkBytes.WithLabelValues(originCache)))) - testutil.Equals(t, tc.expectedFetchedBytes, int64(promtest.ToFloat64(cachingBucket.fetchedChunkBytes.WithLabelValues(originBucket)))) - testutil.Equals(t, tc.expectedRefetchedBytes, int64(promtest.ToFloat64(cachingBucket.refetchedChunkBytes.WithLabelValues(originCache)))) + testutil.Equals(t, tc.expectedCachedBytes, int64(promtest.ToFloat64(cachingBucket.fetchedGetRangeBytes.WithLabelValues(originCache, "chunks")))) + testutil.Equals(t, tc.expectedFetchedBytes, int64(promtest.ToFloat64(cachingBucket.fetchedGetRangeBytes.WithLabelValues(originBucket, "chunks")))) + testutil.Equals(t, tc.expectedRefetchedBytes, int64(promtest.ToFloat64(cachingBucket.refetchedGetRangeBytes.WithLabelValues(originCache, "chunks")))) }) } } func verifyGetRange(t *testing.T, cachingBucket *CachingBucket, name string, offset, length int64, expectedLength int64) { - t.Helper() - r, err := cachingBucket.GetRange(context.Background(), name, offset, length) testutil.Ok(t, err) @@ -249,16 +251,29 @@ func verifyGetRange(t *testing.T, cachingBucket *CachingBucket, name string, off } } +type cacheItem struct { + data []byte + exp time.Time +} + type mockCache struct { mu sync.Mutex - cache map[string][]byte + cache map[string]cacheItem +} + +func newMockCache() *mockCache { + c := &mockCache{} + c.flush() + return c } -func (m *mockCache) Store(_ context.Context, data map[string][]byte, _ time.Duration) { +func (m *mockCache) Store(_ context.Context, data map[string][]byte, ttl time.Duration) { m.mu.Lock() defer m.mu.Unlock() + + exp := time.Now().Add(ttl) for key, val := range data { - m.cache[key] = val + m.cache[key] = cacheItem{data: val, exp: exp} } } @@ -268,16 +283,21 @@ func (m *mockCache) Fetch(_ context.Context, keys []string) map[string][]byte { found := make(map[string][]byte, len(keys)) + now := time.Now() for _, k := range keys { v, ok := m.cache[k] - if ok { - found[k] = v + if ok && now.Before(v.exp) { + found[k] = v.data } } return found } +func (m *mockCache) flush() { + m.cache = map[string]cacheItem{} +} + func TestMergeRanges(t *testing.T) { for ix, tc := range []struct { input []rng @@ -315,7 +335,11 @@ func TestMergeRanges(t *testing.T) { func TestInvalidOffsetAndLength(t *testing.T) { b := &testBucket{objstore.NewInMemBucket()} - c, err := NewCachingBucket(b, &mockCache{cache: make(map[string][]byte)}, DefaultCachingBucketConfig(), nil, nil) + + cfg := NewCachingBucketConfig() + cfg.CacheGetRange("chunks", newMockCache(), func(string) bool { return true }, 10000, time.Hour, time.Hour, 3) + + c, err := NewCachingBucket(b, cfg, nil, nil) testutil.Ok(t, err) r, err := c.GetRange(context.Background(), "test", -1, 1000) @@ -342,3 +366,293 @@ func (b *testBucket) GetRange(ctx context.Context, name string, off, length int6 return b.InMemBucket.GetRange(ctx, name, off, length) } + +func TestCachedIter(t *testing.T) { + inmem := objstore.NewInMemBucket() + testutil.Ok(t, inmem.Upload(context.Background(), "/file-1", strings.NewReader("hej"))) + testutil.Ok(t, inmem.Upload(context.Background(), "/file-2", strings.NewReader("ahoj"))) + testutil.Ok(t, inmem.Upload(context.Background(), "/file-3", strings.NewReader("hello"))) + testutil.Ok(t, inmem.Upload(context.Background(), "/file-4", strings.NewReader("ciao"))) + + allFiles := []string{"/file-1", "/file-2", "/file-3", "/file-4"} + + // We reuse cache between tests (!) + cache := newMockCache() + + const cfgName = "dirs" + cfg := NewCachingBucketConfig() + cfg.CacheIter(cfgName, cache, func(string) bool { return true }, 5*time.Minute, JSONIterCodec{}) + + cb, err := NewCachingBucket(inmem, cfg, nil, nil) + testutil.Ok(t, err) + + verifyIter(t, cb, allFiles, false, cfgName) + + testutil.Ok(t, inmem.Upload(context.Background(), "/file-5", strings.NewReader("nazdar"))) + verifyIter(t, cb, allFiles, true, cfgName) // Iter returns old response. + + cache.flush() + allFiles = append(allFiles, "/file-5") + verifyIter(t, cb, allFiles, false, cfgName) + + cache.flush() + + e := errors.Errorf("test error") + + // This iteration returns false. Result will not be cached. + testutil.Equals(t, e, cb.Iter(context.Background(), "/", func(_ string) error { + return e + })) + + // Nothing cached now. + verifyIter(t, cb, allFiles, false, cfgName) +} + +func verifyIter(t *testing.T, cb *CachingBucket, expectedFiles []string, expectedCache bool, cfgName string) { + hitsBefore := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opIter, cfgName))) + + col := iterCollector{} + testutil.Ok(t, cb.Iter(context.Background(), "/", col.collect)) + + hitsAfter := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opIter, cfgName))) + + sort.Strings(col.items) + testutil.Equals(t, expectedFiles, col.items) + + expectedHitsDiff := 0 + if expectedCache { + expectedHitsDiff = 1 + } + + testutil.Equals(t, expectedHitsDiff, hitsAfter-hitsBefore) +} + +type iterCollector struct { + items []string +} + +func (it *iterCollector) collect(s string) error { + it.items = append(it.items, s) + return nil +} + +func TestExists(t *testing.T) { + inmem := objstore.NewInMemBucket() + + // We reuse cache between tests (!) + cache := newMockCache() + + cfg := NewCachingBucketConfig() + const cfgName = "test" + cfg.CacheExists(cfgName, cache, matchAll, 10*time.Minute, 2*time.Minute) + + cb, err := NewCachingBucket(inmem, cfg, nil, nil) + testutil.Ok(t, err) + + verifyExists(t, cb, testFilename, false, false, cfgName) + + testutil.Ok(t, inmem.Upload(context.Background(), testFilename, strings.NewReader("hej"))) + verifyExists(t, cb, testFilename, false, true, cfgName) // Reused cache result. + cache.flush() + verifyExists(t, cb, testFilename, true, false, cfgName) + + testutil.Ok(t, inmem.Delete(context.Background(), testFilename)) + verifyExists(t, cb, testFilename, true, true, cfgName) // Reused cache result. + cache.flush() + verifyExists(t, cb, testFilename, false, false, cfgName) +} + +func TestExistsCachingDisabled(t *testing.T) { + inmem := objstore.NewInMemBucket() + + // We reuse cache between tests (!) + cache := newMockCache() + + cfg := NewCachingBucketConfig() + const cfgName = "test" + cfg.CacheExists(cfgName, cache, func(string) bool { return false }, 10*time.Minute, 2*time.Minute) + + cb, err := NewCachingBucket(inmem, cfg, nil, nil) + testutil.Ok(t, err) + + verifyExists(t, cb, testFilename, false, false, cfgName) + + testutil.Ok(t, inmem.Upload(context.Background(), testFilename, strings.NewReader("hej"))) + verifyExists(t, cb, testFilename, true, false, cfgName) + + testutil.Ok(t, inmem.Delete(context.Background(), testFilename)) + verifyExists(t, cb, testFilename, false, false, cfgName) +} + +func verifyExists(t *testing.T, cb *CachingBucket, file string, exists bool, fromCache bool, cfgName string) { + t.Helper() + hitsBefore := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opExists, cfgName))) + ok, err := cb.Exists(context.Background(), file) + testutil.Ok(t, err) + testutil.Equals(t, exists, ok) + hitsAfter := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opExists, cfgName))) + + if fromCache { + testutil.Equals(t, 1, hitsAfter-hitsBefore) + } else { + testutil.Equals(t, 0, hitsAfter-hitsBefore) + } +} + +func TestGet(t *testing.T) { + inmem := objstore.NewInMemBucket() + + // We reuse cache between tests (!) + cache := newMockCache() + + cfg := NewCachingBucketConfig() + const cfgName = "metafile" + cfg.CacheGet(cfgName, cache, matchAll, 1024, 10*time.Minute, 10*time.Minute, 2*time.Minute) + cfg.CacheExists(cfgName, cache, matchAll, 10*time.Minute, 2*time.Minute) + + cb, err := NewCachingBucket(inmem, cfg, nil, nil) + testutil.Ok(t, err) + + verifyGet(t, cb, testFilename, nil, false, cfgName) + verifyExists(t, cb, testFilename, false, true, cfgName) + + data := []byte("hello world") + testutil.Ok(t, inmem.Upload(context.Background(), testFilename, bytes.NewBuffer(data))) + + // Even if file is now uploaded, old data is served from cache. + verifyGet(t, cb, testFilename, nil, true, cfgName) + verifyExists(t, cb, testFilename, false, true, cfgName) + + cache.flush() + + verifyGet(t, cb, testFilename, data, false, cfgName) + verifyGet(t, cb, testFilename, data, true, cfgName) + verifyExists(t, cb, testFilename, true, true, cfgName) +} + +func TestGetTooBigObject(t *testing.T) { + inmem := objstore.NewInMemBucket() + + // We reuse cache between tests (!) + cache := newMockCache() + + cfg := NewCachingBucketConfig() + const cfgName = "metafile" + // Only allow 5 bytes to be cached. + cfg.CacheGet(cfgName, cache, matchAll, 5, 10*time.Minute, 10*time.Minute, 2*time.Minute) + cfg.CacheExists(cfgName, cache, matchAll, 10*time.Minute, 2*time.Minute) + + cb, err := NewCachingBucket(inmem, cfg, nil, nil) + testutil.Ok(t, err) + + data := []byte("hello world") + testutil.Ok(t, inmem.Upload(context.Background(), testFilename, bytes.NewBuffer(data))) + + // Object is too big, so it will not be stored to cache on first read. + verifyGet(t, cb, testFilename, data, false, cfgName) + verifyGet(t, cb, testFilename, data, false, cfgName) + verifyExists(t, cb, testFilename, true, true, cfgName) +} + +func TestGetPartialRead(t *testing.T) { + inmem := objstore.NewInMemBucket() + + cache := newMockCache() + + cfg := NewCachingBucketConfig() + const cfgName = "metafile" + cfg.CacheGet(cfgName, cache, matchAll, 1024, 10*time.Minute, 10*time.Minute, 2*time.Minute) + cfg.CacheExists(cfgName, cache, matchAll, 10*time.Minute, 2*time.Minute) + + cb, err := NewCachingBucket(inmem, cfg, nil, nil) + testutil.Ok(t, err) + + data := []byte("hello world") + testutil.Ok(t, inmem.Upload(context.Background(), testFilename, bytes.NewBuffer(data))) + + // Read only few bytes from data. + r, err := cb.Get(context.Background(), testFilename) + testutil.Ok(t, err) + _, err = r.Read(make([]byte, 1)) + testutil.Ok(t, err) + testutil.Ok(t, r.Close()) + + // Object wasn't cached as it wasn't fully read. + verifyGet(t, cb, testFilename, data, false, cfgName) + // VerifyGet read object, so now it's cached. + verifyGet(t, cb, testFilename, data, true, cfgName) +} + +func verifyGet(t *testing.T, cb *CachingBucket, file string, expectedData []byte, cacheUsed bool, cfgName string) { + hitsBefore := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opGet, cfgName))) + + r, err := cb.Get(context.Background(), file) + if expectedData == nil { + testutil.Assert(t, cb.IsObjNotFoundErr(err)) + + hitsAfter := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opGet, cfgName))) + if cacheUsed { + testutil.Equals(t, 1, hitsAfter-hitsBefore) + } else { + testutil.Equals(t, 0, hitsAfter-hitsBefore) + } + } else { + testutil.Ok(t, err) + defer runutil.CloseWithLogOnErr(nil, r, "verifyGet") + data, err := ioutil.ReadAll(r) + testutil.Ok(t, err) + testutil.Equals(t, expectedData, data) + + hitsAfter := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opGet, cfgName))) + if cacheUsed { + testutil.Equals(t, 1, hitsAfter-hitsBefore) + } else { + testutil.Equals(t, 0, hitsAfter-hitsBefore) + } + } +} + +func TestObjectSize(t *testing.T) { + inmem := objstore.NewInMemBucket() + + // We reuse cache between tests (!) + cache := newMockCache() + + cfg := NewCachingBucketConfig() + const cfgName = "test" + cfg.CacheObjectSize(cfgName, cache, matchAll, time.Minute) + + cb, err := NewCachingBucket(inmem, cfg, nil, nil) + testutil.Ok(t, err) + + verifyObjectSize(t, cb, testFilename, -1, false, cfgName) + verifyObjectSize(t, cb, testFilename, -1, false, cfgName) // ObjectSize doesn't cache non-existent files. + + data := []byte("hello world") + testutil.Ok(t, inmem.Upload(context.Background(), testFilename, bytes.NewBuffer(data))) + + verifyObjectSize(t, cb, testFilename, len(data), false, cfgName) + verifyObjectSize(t, cb, testFilename, len(data), true, cfgName) +} + +func verifyObjectSize(t *testing.T, cb *CachingBucket, file string, expectedLength int, cacheUsed bool, cfgName string) { + t.Helper() + hitsBefore := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opObjectSize, cfgName))) + + length, err := cb.ObjectSize(context.Background(), file) + if expectedLength < 0 { + testutil.Assert(t, cb.IsObjNotFoundErr(err)) + } else { + testutil.Ok(t, err) + testutil.Equals(t, uint64(expectedLength), length) + + hitsAfter := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opObjectSize, cfgName))) + if cacheUsed { + testutil.Equals(t, 1, hitsAfter-hitsBefore) + } else { + testutil.Equals(t, 0, hitsAfter-hitsBefore) + } + } +} + +func matchAll(string) bool { return true } From a3e8b0229bac7208ba7aac1d6199de8647a0a098 Mon Sep 17 00:00:00 2001 From: Thota Harshitha Chowdary <37958850+Harshitha1234@users.noreply.github.com> Date: Sat, 16 May 2020 16:02:10 +0530 Subject: [PATCH 4/8] Updated the help message for --data-dir flag (#2601) * Update store.go Signed-off-by: Harshitha Chowdary * Update store.go Signed-off-by: Harshitha Chowdary * Update cmd/thanos/store.go Co-authored-by: Bartlomiej Plotka Signed-off-by: Harshitha Chowdary * Updated the docs folder Signed-off-by: Harshitha Chowdary Co-authored-by: Bartlomiej Plotka --- cmd/thanos/store.go | 2 +- docs/components/store.md | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 9425e36dda..fb21439c31 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -45,7 +45,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { httpBindAddr, httpGracePeriod := regHTTPFlags(cmd) grpcBindAddr, grpcGracePeriod, grpcCert, grpcKey, grpcClientCA := regGRPCFlags(cmd) - dataDir := cmd.Flag("data-dir", "Data directory in which to cache remote blocks."). + dataDir := cmd.Flag("data-dir", "Local data directory used for caching purposes (index-header, in-mem cache items and meta.jsons). If removed, no data will be lost, just store will have to rebuild the cache. NOTE: Putting raw blocks here will not cause the store to read them. For such use cases use Prometheus + sidecar."). Default("./data").String() indexCacheSize := cmd.Flag("index-cache-size", "Maximum size of items held in the in-memory index cache. Ignored if --index-cache.config or --index-cache.config-file option is specified."). diff --git a/docs/components/store.md b/docs/components/store.md index 74a70f348d..9043ee6a35 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -68,7 +68,13 @@ Flags: TLS CA to verify clients against. If no client CA is specified, there is no client verification on server side. (tls.NoClientCert) - --data-dir="./data" Data directory in which to cache remote blocks. + --data-dir="./data" Local data directory used for caching purposes + (index-header, in-mem cache items and + meta.jsons). If removed, no data will be lost, + just store will have to rebuild the cache. + NOTE: Putting raw blocks here will not cause + the store to read them. For such use cases use + Prometheus + sidecar. --index-cache-size=250MB Maximum size of items held in the in-memory index cache. Ignored if --index-cache.config or --index-cache.config-file option is specified. From 334a41b4caf2206e606a21efbfa8f572ba167792 Mon Sep 17 00:00:00 2001 From: Lili Cosic Date: Sun, 17 May 2020 15:08:34 +0200 Subject: [PATCH 5/8] rule: Fix bug when rules were out of sync (#2615) Co-authored-by: johncming Signed-off-by: Lili Cosic Co-authored-by: johncming --- CHANGELOG.md | 1 + cmd/thanos/rule.go | 13 ++++++++--- pkg/rule/rule.go | 21 ++++++++++++++--- pkg/rule/rule_test.go | 53 ++++++++++++++++++++++++++++++++++++++----- 4 files changed, 76 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 855c3518a0..3fe32a75fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#2536](https://github.com/thanos-io/thanos/pull/2536) minio-go: Fixed AWS STS endpoint url to https for Web Identity providers on AWS EKS - [#2501](https://github.com/thanos-io/thanos/pull/2501) Query: gracefully handle additional fields in `SeriesResponse` protobuf message that may be added in the future. - [#2568](https://github.com/thanos-io/thanos/pull/2568) Query: does not close the connection of strict, static nodes if establishing a connection had succeeded but Info() call failed +- [#2615](https://github.com/thanos-io/thanos/pull/2615) Rule: Fix bugs where rules were out of sync. ### Added diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index d49409627b..21272a3fc7 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -763,8 +763,9 @@ func reloadRules(logger log.Logger, metrics *RuleMetrics) error { level.Debug(logger).Log("msg", "configured rule files", "files", strings.Join(ruleFiles, ",")) var ( - errs tsdberrors.MultiError - files []string + errs tsdberrors.MultiError + files []string + seenFiles = make(map[string]struct{}) ) for _, pat := range ruleFiles { fs, err := filepath.Glob(pat) @@ -774,7 +775,13 @@ func reloadRules(logger log.Logger, continue } - files = append(files, fs...) + for _, fp := range fs { + if _, ok := seenFiles[fp]; ok { + continue + } + files = append(files, fp) + seenFiles[fp] = struct{}{} + } } level.Info(logger).Log("msg", "reload rule files", "numFiles", len(files)) diff --git a/pkg/rule/rule.go b/pkg/rule/rule.go index 8491478c13..70c442d5d9 100644 --- a/pkg/rule/rule.go +++ b/pkg/rule/rule.go @@ -70,17 +70,17 @@ func (m *Manager) SetRuleManager(s storepb.PartialResponseStrategy, mgr *rules.M func (m *Manager) RuleGroups() []Group { m.mtx.RLock() defer m.mtx.RUnlock() - var res []Group + var groups []Group for s, r := range m.mgrs { for _, group := range r.RuleGroups() { - res = append(res, Group{ + groups = append(groups, Group{ Group: group, PartialResponseStrategy: s, originalFile: m.ruleFiles[group.File()], }) } } - return res + return groups } func (m *Manager) AlertingRules() []AlertingRule { @@ -216,6 +216,21 @@ func (m *Manager) Update(evalInterval time.Duration, files []string) error { continue } } + + // Removes the rules from a manager when a strategy has no more rule. + for s, mgr := range m.mgrs { + if _, ok := filesByStrategy[s]; ok { + continue + } + + if len(mgr.RuleGroups()) == 0 { + continue + } + + if err := mgr.Update(evalInterval, []string{}, nil); err != nil { + errs = append(errs, err) + } + } m.ruleFiles = ruleFiles m.mtx.Unlock() diff --git a/pkg/rule/rule_test.go b/pkg/rule/rule_test.go index d238db10d4..7c503efd6e 100644 --- a/pkg/rule/rule_test.go +++ b/pkg/rule/rule_test.go @@ -66,14 +66,17 @@ groups: Appendable: nopAppendable{}, } thanosRuleMgr := NewManager(dir) - ruleMgr := rules.NewManager(&opts) - thanosRuleMgr.SetRuleManager(storepb.PartialResponseStrategy_ABORT, ruleMgr) - thanosRuleMgr.SetRuleManager(storepb.PartialResponseStrategy_WARN, ruleMgr) + ruleMgrAbort := rules.NewManager(&opts) + ruleMgrWarn := rules.NewManager(&opts) + thanosRuleMgr.SetRuleManager(storepb.PartialResponseStrategy_ABORT, ruleMgrAbort) + thanosRuleMgr.SetRuleManager(storepb.PartialResponseStrategy_WARN, ruleMgrWarn) - testutil.Ok(t, thanosRuleMgr.Update(10*time.Second, []string{filepath.Join(dir, "rule.yaml")})) + ruleMgrAbort.Run() + ruleMgrWarn.Run() + defer ruleMgrAbort.Stop() + defer ruleMgrWarn.Stop() - ruleMgr.Run() - defer ruleMgr.Stop() + testutil.Ok(t, thanosRuleMgr.Update(10*time.Second, []string{filepath.Join(dir, "rule.yaml")})) select { case <-time.After(2 * time.Minute): @@ -225,6 +228,44 @@ groups: } } +func TestUpdateAfterClear(t *testing.T) { + dir, err := ioutil.TempDir("", "test_rule_rule_groups") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() + + testutil.Ok(t, ioutil.WriteFile(filepath.Join(dir, "no_strategy.yaml"), []byte(` +groups: +- name: "something1" + rules: + - alert: "some" + expr: "up" +`), os.ModePerm)) + + opts := rules.ManagerOptions{ + Logger: log.NewLogfmtLogger(os.Stderr), + } + m := NewManager(dir) + ruleMgrAbort := rules.NewManager(&opts) + ruleMgrWarn := rules.NewManager(&opts) + m.SetRuleManager(storepb.PartialResponseStrategy_ABORT, ruleMgrAbort) + m.SetRuleManager(storepb.PartialResponseStrategy_WARN, ruleMgrWarn) + + ruleMgrAbort.Run() + ruleMgrWarn.Run() + defer ruleMgrAbort.Stop() + defer ruleMgrWarn.Stop() + + err = m.Update(1*time.Second, []string{ + filepath.Join(dir, "no_strategy.yaml"), + }) + testutil.Ok(t, err) + testutil.Equals(t, 1, len(m.RuleGroups())) + + err = m.Update(1*time.Second, []string{}) + testutil.Ok(t, err) + testutil.Equals(t, 0, len(m.RuleGroups())) +} + func TestRuleGroupMarshalYAML(t *testing.T) { const expected = `groups: - name: something1 From 11c75aeb908fd1b680cf4ec480100f2976acc0e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Krupa?= Date: Mon, 18 May 2020 07:38:31 +0200 Subject: [PATCH 6/8] Mixins: Align schema with other mixins (#2619) * mixin/thanos: align mixin schema with other mixins - promote default.libsonnet to main config file - include config.libsonnet in mixin.libsonnet - include jsonnetfile.json - allow generating directly from mixin/thanos directory Signed-off-by: paulfantom * mixin: move one directory higher Signed-off-by: paulfantom --- Makefile | 13 ++++++------- mixin/{thanos => }/README.md | 4 ++-- mixin/alerts.jsonnet | 3 +++ mixin/{thanos => }/alerts/absent.libsonnet | 0 mixin/{thanos => }/alerts/alerts.libsonnet | 0 .../{thanos => }/alerts/bucket_replicate.libsonnet | 0 mixin/{thanos => }/alerts/compact.libsonnet | 0 mixin/{thanos => }/alerts/query.libsonnet | 0 mixin/{thanos => }/alerts/receive.libsonnet | 0 mixin/{thanos => }/alerts/rule.libsonnet | 0 mixin/{thanos => }/alerts/sidecar.libsonnet | 0 mixin/{thanos => }/alerts/store.libsonnet | 0 .../{thanos/defaults.libsonnet => config.libsonnet} | 0 mixin/dashboards.jsonnet | 9 +++++++++ .../dashboards/bucket_replicate.libsonnet | 0 mixin/{thanos => }/dashboards/compact.libsonnet | 0 mixin/{thanos => }/dashboards/dashboards.libsonnet | 0 mixin/{thanos => }/dashboards/defaults.libsonnet | 0 mixin/{thanos => }/dashboards/overview.libsonnet | 0 mixin/{thanos => }/dashboards/query.libsonnet | 0 mixin/{thanos => }/dashboards/receive.libsonnet | 0 mixin/{thanos => }/dashboards/rule.libsonnet | 0 mixin/{thanos => }/dashboards/sidecar.libsonnet | 0 mixin/{thanos => }/dashboards/store.libsonnet | 0 mixin/jsonnetfile.json | 9 --------- mixin/jsonnetfile.lock.json | 9 --------- .../lib/thanos-grafana-builder/README.md | 0 .../lib/thanos-grafana-builder/builder.libsonnet | 0 .../lib/thanos-grafana-builder/grpc.libsonnet | 0 .../lib/thanos-grafana-builder/http.libsonnet | 0 .../lib/thanos-grafana-builder/slo.libsonnet | 0 mixin/{thanos => }/mixin.libsonnet | 3 ++- mixin/rules.jsonnet | 3 +++ mixin/{thanos => }/rules/bucket_replicate.libsonnet | 0 mixin/{thanos => }/rules/query.libsonnet | 0 mixin/{thanos => }/rules/receive.libsonnet | 0 mixin/{thanos => }/rules/rules.libsonnet | 0 mixin/{thanos => }/rules/store.libsonnet | 0 mixin/separated_alerts.jsonnet | 7 +++++++ mixin/thanos/alerts.jsonnet | 4 ---- mixin/thanos/dashboards.jsonnet | 9 --------- mixin/thanos/rules.jsonnet | 4 ---- mixin/thanos/separated_alerts.jsonnet | 7 ------- 43 files changed, 32 insertions(+), 52 deletions(-) rename mixin/{thanos => }/README.md (97%) create mode 100644 mixin/alerts.jsonnet rename mixin/{thanos => }/alerts/absent.libsonnet (100%) rename mixin/{thanos => }/alerts/alerts.libsonnet (100%) rename mixin/{thanos => }/alerts/bucket_replicate.libsonnet (100%) rename mixin/{thanos => }/alerts/compact.libsonnet (100%) rename mixin/{thanos => }/alerts/query.libsonnet (100%) rename mixin/{thanos => }/alerts/receive.libsonnet (100%) rename mixin/{thanos => }/alerts/rule.libsonnet (100%) rename mixin/{thanos => }/alerts/sidecar.libsonnet (100%) rename mixin/{thanos => }/alerts/store.libsonnet (100%) rename mixin/{thanos/defaults.libsonnet => config.libsonnet} (100%) create mode 100644 mixin/dashboards.jsonnet rename mixin/{thanos => }/dashboards/bucket_replicate.libsonnet (100%) rename mixin/{thanos => }/dashboards/compact.libsonnet (100%) rename mixin/{thanos => }/dashboards/dashboards.libsonnet (100%) rename mixin/{thanos => }/dashboards/defaults.libsonnet (100%) rename mixin/{thanos => }/dashboards/overview.libsonnet (100%) rename mixin/{thanos => }/dashboards/query.libsonnet (100%) rename mixin/{thanos => }/dashboards/receive.libsonnet (100%) rename mixin/{thanos => }/dashboards/rule.libsonnet (100%) rename mixin/{thanos => }/dashboards/sidecar.libsonnet (100%) rename mixin/{thanos => }/dashboards/store.libsonnet (100%) rename mixin/{thanos => }/lib/thanos-grafana-builder/README.md (100%) rename mixin/{thanos => }/lib/thanos-grafana-builder/builder.libsonnet (100%) rename mixin/{thanos => }/lib/thanos-grafana-builder/grpc.libsonnet (100%) rename mixin/{thanos => }/lib/thanos-grafana-builder/http.libsonnet (100%) rename mixin/{thanos => }/lib/thanos-grafana-builder/slo.libsonnet (100%) rename mixin/{thanos => }/mixin.libsonnet (56%) create mode 100644 mixin/rules.jsonnet rename mixin/{thanos => }/rules/bucket_replicate.libsonnet (100%) rename mixin/{thanos => }/rules/query.libsonnet (100%) rename mixin/{thanos => }/rules/receive.libsonnet (100%) rename mixin/{thanos => }/rules/rules.libsonnet (100%) rename mixin/{thanos => }/rules/store.libsonnet (100%) create mode 100644 mixin/separated_alerts.jsonnet delete mode 100644 mixin/thanos/alerts.jsonnet delete mode 100644 mixin/thanos/dashboards.jsonnet delete mode 100644 mixin/thanos/rules.jsonnet delete mode 100644 mixin/thanos/separated_alerts.jsonnet diff --git a/Makefile b/Makefile index 7ca3472902..a9deed3041 100644 --- a/Makefile +++ b/Makefile @@ -58,8 +58,7 @@ PROMTOOL ?= $(GOBIN)/promtool-$(PROMTOOL_VERSION) # systems gsed won't be installed, so will use sed as expected. SED ?= $(shell which gsed 2>/dev/null || which sed) -MIXIN_ROOT ?= mixin -THANOS_MIXIN ?= mixin/thanos +THANOS_MIXIN ?= mixin JSONNET_VENDOR_DIR ?= mixin/vendor WEB_DIR ?= website @@ -404,20 +403,20 @@ examples/tmp: $(JSONNET) -J ${JSONNET_VENDOR_DIR} -m examples/tmp/ ${THANOS_MIXIN}/separated_alerts.jsonnet | xargs -I{} sh -c 'cat {} | $(GOJSONTOYAML) > {}.yaml; rm -f {}' -- {} .PHONY: examples/dashboards # to keep examples/dashboards/dashboards.md. -examples/dashboards: $(JSONNET) ${THANOS_MIXIN}/mixin.libsonnet ${THANOS_MIXIN}/defaults.libsonnet ${THANOS_MIXIN}/dashboards/* +examples/dashboards: $(JSONNET) ${THANOS_MIXIN}/mixin.libsonnet ${THANOS_MIXIN}/config.libsonnet ${THANOS_MIXIN}/dashboards/* -rm -rf examples/dashboards/*.json $(JSONNET) -J ${JSONNET_VENDOR_DIR} -m examples/dashboards ${THANOS_MIXIN}/dashboards.jsonnet -examples/alerts/alerts.yaml: $(JSONNET) $(GOJSONTOYAML) ${THANOS_MIXIN}/mixin.libsonnet ${THANOS_MIXIN}/defaults.libsonnet ${THANOS_MIXIN}/alerts/* +examples/alerts/alerts.yaml: $(JSONNET) $(GOJSONTOYAML) ${THANOS_MIXIN}/mixin.libsonnet ${THANOS_MIXIN}/config.libsonnet ${THANOS_MIXIN}/alerts/* $(JSONNET) ${THANOS_MIXIN}/alerts.jsonnet | $(GOJSONTOYAML) > $@ -examples/alerts/rules.yaml: $(JSONNET) $(GOJSONTOYAML) ${THANOS_MIXIN}/mixin.libsonnet ${THANOS_MIXIN}/defaults.libsonnet ${THANOS_MIXIN}/rules/* +examples/alerts/rules.yaml: $(JSONNET) $(GOJSONTOYAML) ${THANOS_MIXIN}/mixin.libsonnet ${THANOS_MIXIN}/config.libsonnet ${THANOS_MIXIN}/rules/* $(JSONNET) ${THANOS_MIXIN}/rules.jsonnet | $(GOJSONTOYAML) > $@ .PHONY: jsonnet-vendor -jsonnet-vendor: $(JSONNET_BUNDLER) $(MIXIN_ROOT)/jsonnetfile.json $(MIXIN_ROOT)/jsonnetfile.lock.json +jsonnet-vendor: $(JSONNET_BUNDLER) $(THANOS_MIXIN)/jsonnetfile.json $(THANOS_MIXIN)/jsonnetfile.lock.json rm -rf ${JSONNET_VENDOR_DIR} - cd ${MIXIN_ROOT} && $(JSONNET_BUNDLER) install + cd ${THANOS_MIXIN} && $(JSONNET_BUNDLER) install JSONNETFMT_CMD := $(JSONNETFMT) -n 2 --max-blank-lines 2 --string-style s --comment-style s diff --git a/mixin/thanos/README.md b/mixin/README.md similarity index 97% rename from mixin/thanos/README.md rename to mixin/README.md index 7f35050e50..64eb5523fd 100644 --- a/mixin/thanos/README.md +++ b/mixin/README.md @@ -54,9 +54,9 @@ $ jb update #### Configure -This project is intended to be used as a library. You can extend and customize dashboards and alerting rules by creating for own generators, such as the generators ([alerts.jsonnet](alerts.jsonnet) and [dashboards.jsonnet](dashboards.jsonnet)) that are use to create [examples](../../examples). Default parameters are collected in [defaults.libsonnet](defaults.libsonnet), feel free to modify and generate your own definitons. +This project is intended to be used as a library. You can extend and customize dashboards and alerting rules by creating for own generators, such as the generators ([alerts.jsonnet](alerts.jsonnet) and [dashboards.jsonnet](dashboards.jsonnet)) that are use to create [examples](../../examples). Default parameters are collected in [config.libsonnet](config.libsonnet), feel free to modify and generate your own definitons. -[embedmd]:# (defaults.libsonnet) +[embedmd]:# (config.libsonnet) ```libsonnet { query+:: { diff --git a/mixin/alerts.jsonnet b/mixin/alerts.jsonnet new file mode 100644 index 0000000000..23f4afea68 --- /dev/null +++ b/mixin/alerts.jsonnet @@ -0,0 +1,3 @@ +( + import 'mixin.libsonnet' +).prometheusAlerts diff --git a/mixin/thanos/alerts/absent.libsonnet b/mixin/alerts/absent.libsonnet similarity index 100% rename from mixin/thanos/alerts/absent.libsonnet rename to mixin/alerts/absent.libsonnet diff --git a/mixin/thanos/alerts/alerts.libsonnet b/mixin/alerts/alerts.libsonnet similarity index 100% rename from mixin/thanos/alerts/alerts.libsonnet rename to mixin/alerts/alerts.libsonnet diff --git a/mixin/thanos/alerts/bucket_replicate.libsonnet b/mixin/alerts/bucket_replicate.libsonnet similarity index 100% rename from mixin/thanos/alerts/bucket_replicate.libsonnet rename to mixin/alerts/bucket_replicate.libsonnet diff --git a/mixin/thanos/alerts/compact.libsonnet b/mixin/alerts/compact.libsonnet similarity index 100% rename from mixin/thanos/alerts/compact.libsonnet rename to mixin/alerts/compact.libsonnet diff --git a/mixin/thanos/alerts/query.libsonnet b/mixin/alerts/query.libsonnet similarity index 100% rename from mixin/thanos/alerts/query.libsonnet rename to mixin/alerts/query.libsonnet diff --git a/mixin/thanos/alerts/receive.libsonnet b/mixin/alerts/receive.libsonnet similarity index 100% rename from mixin/thanos/alerts/receive.libsonnet rename to mixin/alerts/receive.libsonnet diff --git a/mixin/thanos/alerts/rule.libsonnet b/mixin/alerts/rule.libsonnet similarity index 100% rename from mixin/thanos/alerts/rule.libsonnet rename to mixin/alerts/rule.libsonnet diff --git a/mixin/thanos/alerts/sidecar.libsonnet b/mixin/alerts/sidecar.libsonnet similarity index 100% rename from mixin/thanos/alerts/sidecar.libsonnet rename to mixin/alerts/sidecar.libsonnet diff --git a/mixin/thanos/alerts/store.libsonnet b/mixin/alerts/store.libsonnet similarity index 100% rename from mixin/thanos/alerts/store.libsonnet rename to mixin/alerts/store.libsonnet diff --git a/mixin/thanos/defaults.libsonnet b/mixin/config.libsonnet similarity index 100% rename from mixin/thanos/defaults.libsonnet rename to mixin/config.libsonnet diff --git a/mixin/dashboards.jsonnet b/mixin/dashboards.jsonnet new file mode 100644 index 0000000000..94353a6c15 --- /dev/null +++ b/mixin/dashboards.jsonnet @@ -0,0 +1,9 @@ +local dashboards = + ( + import 'mixin.libsonnet' + ).grafanaDashboards; + +{ + [name]: dashboards[name] + for name in std.objectFields(dashboards) +} diff --git a/mixin/thanos/dashboards/bucket_replicate.libsonnet b/mixin/dashboards/bucket_replicate.libsonnet similarity index 100% rename from mixin/thanos/dashboards/bucket_replicate.libsonnet rename to mixin/dashboards/bucket_replicate.libsonnet diff --git a/mixin/thanos/dashboards/compact.libsonnet b/mixin/dashboards/compact.libsonnet similarity index 100% rename from mixin/thanos/dashboards/compact.libsonnet rename to mixin/dashboards/compact.libsonnet diff --git a/mixin/thanos/dashboards/dashboards.libsonnet b/mixin/dashboards/dashboards.libsonnet similarity index 100% rename from mixin/thanos/dashboards/dashboards.libsonnet rename to mixin/dashboards/dashboards.libsonnet diff --git a/mixin/thanos/dashboards/defaults.libsonnet b/mixin/dashboards/defaults.libsonnet similarity index 100% rename from mixin/thanos/dashboards/defaults.libsonnet rename to mixin/dashboards/defaults.libsonnet diff --git a/mixin/thanos/dashboards/overview.libsonnet b/mixin/dashboards/overview.libsonnet similarity index 100% rename from mixin/thanos/dashboards/overview.libsonnet rename to mixin/dashboards/overview.libsonnet diff --git a/mixin/thanos/dashboards/query.libsonnet b/mixin/dashboards/query.libsonnet similarity index 100% rename from mixin/thanos/dashboards/query.libsonnet rename to mixin/dashboards/query.libsonnet diff --git a/mixin/thanos/dashboards/receive.libsonnet b/mixin/dashboards/receive.libsonnet similarity index 100% rename from mixin/thanos/dashboards/receive.libsonnet rename to mixin/dashboards/receive.libsonnet diff --git a/mixin/thanos/dashboards/rule.libsonnet b/mixin/dashboards/rule.libsonnet similarity index 100% rename from mixin/thanos/dashboards/rule.libsonnet rename to mixin/dashboards/rule.libsonnet diff --git a/mixin/thanos/dashboards/sidecar.libsonnet b/mixin/dashboards/sidecar.libsonnet similarity index 100% rename from mixin/thanos/dashboards/sidecar.libsonnet rename to mixin/dashboards/sidecar.libsonnet diff --git a/mixin/thanos/dashboards/store.libsonnet b/mixin/dashboards/store.libsonnet similarity index 100% rename from mixin/thanos/dashboards/store.libsonnet rename to mixin/dashboards/store.libsonnet diff --git a/mixin/jsonnetfile.json b/mixin/jsonnetfile.json index 8f8d9d3559..45189debbf 100644 --- a/mixin/jsonnetfile.json +++ b/mixin/jsonnetfile.json @@ -17,15 +17,6 @@ } }, "version": "master" - }, - { - "source": { - "local": { - "directory": "thanos" - } - }, - "version": ".", - "name": "thanos-mixin" } ], "legacyImports": true diff --git a/mixin/jsonnetfile.lock.json b/mixin/jsonnetfile.lock.json index f2a1581a48..403c88e68a 100644 --- a/mixin/jsonnetfile.lock.json +++ b/mixin/jsonnetfile.lock.json @@ -19,15 +19,6 @@ }, "version": "f4c59f64f80442f871a06c91edf74d014b82acaf", "sum": "ELsYwK+kGdzX1mee2Yy+/b2mdO4Y503BOCDkFzwmGbE=" - }, - { - "source": { - "local": { - "directory": "thanos" - } - }, - "version": "", - "name": "thanos-mixin" } ], "legacyImports": false diff --git a/mixin/thanos/lib/thanos-grafana-builder/README.md b/mixin/lib/thanos-grafana-builder/README.md similarity index 100% rename from mixin/thanos/lib/thanos-grafana-builder/README.md rename to mixin/lib/thanos-grafana-builder/README.md diff --git a/mixin/thanos/lib/thanos-grafana-builder/builder.libsonnet b/mixin/lib/thanos-grafana-builder/builder.libsonnet similarity index 100% rename from mixin/thanos/lib/thanos-grafana-builder/builder.libsonnet rename to mixin/lib/thanos-grafana-builder/builder.libsonnet diff --git a/mixin/thanos/lib/thanos-grafana-builder/grpc.libsonnet b/mixin/lib/thanos-grafana-builder/grpc.libsonnet similarity index 100% rename from mixin/thanos/lib/thanos-grafana-builder/grpc.libsonnet rename to mixin/lib/thanos-grafana-builder/grpc.libsonnet diff --git a/mixin/thanos/lib/thanos-grafana-builder/http.libsonnet b/mixin/lib/thanos-grafana-builder/http.libsonnet similarity index 100% rename from mixin/thanos/lib/thanos-grafana-builder/http.libsonnet rename to mixin/lib/thanos-grafana-builder/http.libsonnet diff --git a/mixin/thanos/lib/thanos-grafana-builder/slo.libsonnet b/mixin/lib/thanos-grafana-builder/slo.libsonnet similarity index 100% rename from mixin/thanos/lib/thanos-grafana-builder/slo.libsonnet rename to mixin/lib/thanos-grafana-builder/slo.libsonnet diff --git a/mixin/thanos/mixin.libsonnet b/mixin/mixin.libsonnet similarity index 56% rename from mixin/thanos/mixin.libsonnet rename to mixin/mixin.libsonnet index 6590c396e4..8aa275ccbb 100644 --- a/mixin/thanos/mixin.libsonnet +++ b/mixin/mixin.libsonnet @@ -1,3 +1,4 @@ (import 'dashboards/dashboards.libsonnet') + (import 'alerts/alerts.libsonnet') + -(import 'rules/rules.libsonnet') +(import 'rules/rules.libsonnet') + +(import 'config.libsonnet') diff --git a/mixin/rules.jsonnet b/mixin/rules.jsonnet new file mode 100644 index 0000000000..1c9271ab76 --- /dev/null +++ b/mixin/rules.jsonnet @@ -0,0 +1,3 @@ +( + import 'mixin.libsonnet' +).prometheusRules diff --git a/mixin/thanos/rules/bucket_replicate.libsonnet b/mixin/rules/bucket_replicate.libsonnet similarity index 100% rename from mixin/thanos/rules/bucket_replicate.libsonnet rename to mixin/rules/bucket_replicate.libsonnet diff --git a/mixin/thanos/rules/query.libsonnet b/mixin/rules/query.libsonnet similarity index 100% rename from mixin/thanos/rules/query.libsonnet rename to mixin/rules/query.libsonnet diff --git a/mixin/thanos/rules/receive.libsonnet b/mixin/rules/receive.libsonnet similarity index 100% rename from mixin/thanos/rules/receive.libsonnet rename to mixin/rules/receive.libsonnet diff --git a/mixin/thanos/rules/rules.libsonnet b/mixin/rules/rules.libsonnet similarity index 100% rename from mixin/thanos/rules/rules.libsonnet rename to mixin/rules/rules.libsonnet diff --git a/mixin/thanos/rules/store.libsonnet b/mixin/rules/store.libsonnet similarity index 100% rename from mixin/thanos/rules/store.libsonnet rename to mixin/rules/store.libsonnet diff --git a/mixin/separated_alerts.jsonnet b/mixin/separated_alerts.jsonnet new file mode 100644 index 0000000000..329a427bf8 --- /dev/null +++ b/mixin/separated_alerts.jsonnet @@ -0,0 +1,7 @@ +{ + [group.name]: group + for group in + ( + import 'mixin.libsonnet' + ).prometheusAlerts.groups +} diff --git a/mixin/thanos/alerts.jsonnet b/mixin/thanos/alerts.jsonnet deleted file mode 100644 index 574da7f5a6..0000000000 --- a/mixin/thanos/alerts.jsonnet +++ /dev/null @@ -1,4 +0,0 @@ -( - (import 'mixin.libsonnet') + - (import 'defaults.libsonnet') -).prometheusAlerts diff --git a/mixin/thanos/dashboards.jsonnet b/mixin/thanos/dashboards.jsonnet deleted file mode 100644 index a9cd0bbfcf..0000000000 --- a/mixin/thanos/dashboards.jsonnet +++ /dev/null @@ -1,9 +0,0 @@ -local dashboards = ( - (import 'mixin.libsonnet') + - (import 'defaults.libsonnet') -).grafanaDashboards; - -{ - [name]: dashboards[name] - for name in std.objectFields(dashboards) -} diff --git a/mixin/thanos/rules.jsonnet b/mixin/thanos/rules.jsonnet deleted file mode 100644 index c50930e3b9..0000000000 --- a/mixin/thanos/rules.jsonnet +++ /dev/null @@ -1,4 +0,0 @@ -( - (import 'mixin.libsonnet') + - (import 'defaults.libsonnet') -).prometheusRules diff --git a/mixin/thanos/separated_alerts.jsonnet b/mixin/thanos/separated_alerts.jsonnet deleted file mode 100644 index 79402a0b3f..0000000000 --- a/mixin/thanos/separated_alerts.jsonnet +++ /dev/null @@ -1,7 +0,0 @@ -{ - [group.name]: group - for group in ( - (import 'mixin.libsonnet') + - (import 'defaults.libsonnet') - ).prometheusAlerts.groups -} From 6cfcb345d73c56f6bdfbf6b61a6fe8026f44cdb1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Mon, 18 May 2020 09:44:09 +0200 Subject: [PATCH 7/8] Documentation and CHANGELOG.md entry for #2579. (#2618) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Documentation and CHANGELOG.md entry for #2579. Signed-off-by: Peter Štibraný * Document hidden option to configure caching bucket, to make it less hidden. Signed-off-by: Peter Štibraný * Removed mention of caching_config. Signed-off-by: Peter Štibraný * Added periods. Signed-off-by: Peter Štibraný --- CHANGELOG.md | 3 ++- docs/components/store.md | 30 ++++++++++++++++++++++-------- 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3fe32a75fd..8170660458 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,7 +23,8 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#2502](https://github.com/thanos-io/thanos/pull/2502) Added `hints` field to `SeriesResponse`. Hints in an opaque data structure that can be used to carry additional information from the store and its content is implementation specific. - [#2521](https://github.com/thanos-io/thanos/pull/2521) Sidecar: add `thanos_sidecar_reloader_reloads_failed_total`, `thanos_sidecar_reloader_reloads_total`, `thanos_sidecar_reloader_watch_errors_total`, `thanos_sidecar_reloader_watch_events_total` and `thanos_sidecar_reloader_watches` metrics. - [#2412](https://github.com/thanos-io/thanos/pull/2412) ui: add React UI from Prometheus upstream. Currently only accessible from Query component as only `/graph` endpoint is migrated. -- [#2532](https://github.com/thanos-io/thanos/pull/2532) Store: Added hidden option for experimental caching bucket, that can cache chunks into shared memcached. This can speed up querying and reduce number of requests to object storage. +- [#2532](https://github.com/thanos-io/thanos/pull/2532) Store: Added hidden option `--store.caching-bucket.config=` (or `--store.caching-bucket.config-file=`) for experimental caching bucket, that can cache chunks into shared memcached. This can speed up querying and reduce number of requests to object storage. +- [#2579](https://github.com/thanos-io/thanos/pull/2579) Store: Experimental caching bucket can now cache metadata as well. Config has changed from #2532. ### Changed diff --git a/docs/components/store.md b/docs/components/store.md index 9043ee6a35..66a1ecfafe 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -280,7 +280,9 @@ While the remaining settings are **optional**: ## Caching Bucket -Thanos Store Gateway supports a "caching bucket" with chunks caching to speed up loading of chunks from TSDB blocks. Currently only memcached "backend" is supported: +Thanos Store Gateway supports a "caching bucket" with chunks and metadata caching to speed up loading of chunks from TSDB blocks. To configure caching, one needs to use `--store.caching-bucket.config=` or `--store.caching-bucket.config-file=`. + +Currently only memcached "backend" is supported: ```yaml backend: memcached @@ -288,23 +290,35 @@ backend_config: addresses: - localhost:11211 -caching_config: - chunk_subrange_size: 16000 - max_chunks_get_range_requests: 3 - chunk_object_size_ttl: 24h - chunk_subrange_ttl: 24h +chunk_subrange_size: 16000 +max_chunks_get_range_requests: 3 +chunk_object_size_ttl: 24h +chunk_subrange_ttl: 24h +blocks_iter_ttl: 5m +metafile_exists_ttl: 2h +metafile_doesnt_exist_ttl: 15m +metafile_content_ttl: 24h +metafile_max_size: 1MiB ``` `backend_config` field for memcached supports all the same configuration as memcached for [index cache](#memcached-index-cache). -`caching_config` is a configuration for chunks cache and supports the following optional settings: +Additional options to configure various aspects of chunks cache are available: - `chunk_subrange_size`: size of segment of chunks object that is stored to the cache. This is the smallest unit that chunks cache is working with. - `max_chunks_get_range_requests`: how many "get range" sub-requests may cache perform to fetch missing subranges. - `chunk_object_size_ttl`: how long to keep information about chunk file length in the cache. - `chunk_subrange_ttl`: how long to keep individual subranges in the cache. -Note that chunks cache is an experimental feature, and these fields may be renamed or removed completely in the future. +Following options are used for metadata caching (meta.json files, deletion mark files, iteration result): + +- `blocks_iter_ttl`: how long to cache result of iterating blocks. +- `metafile_exists_ttl`: how long to cache information about whether meta.json or deletion mark file exists. +- `metafile_doesnt_exist_ttl`: how long to cache information about whether meta.json or deletion mark file doesn't exist. +- `metafile_content_ttl`: how long to cache content of meta.json and deletion mark files. +- `metafile_max_size`: maximum size of cached meta.json and deletion mark file. Larger files are not cached. + +Note that chunks and metadata cache is an experimental feature, and these fields may be renamed or removed completely in the future. ## Index Header From d1ef032b0ce483eebbcb4a593b740252ec765f6f Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Mon, 18 May 2020 10:00:30 +0100 Subject: [PATCH 8/8] querier: Adjust deduplication for counters when querying for PromQL rates. (#2548) * querier: Adjust deduplication for counters when querying for PromQL rates. Signed-off-by: Bartlomiej Plotka * Added CHANGELOG.md Signed-off-by: Bartlomiej Plotka * Improved description as suggested by Frederic. Signed-off-by: Bartlomiej Plotka * Moved err adjust to be per replica, not per already deduped value. Signed-off-by: Bartlomiej Plotka --- CHANGELOG.md | 6 +- pkg/compact/downsample/downsample.go | 27 +++-- pkg/compact/downsample/downsample_test.go | 12 +- pkg/query/iter.go | 140 ++++++++++++++++++---- pkg/query/querier.go | 2 +- pkg/query/querier_test.go | 82 +++++++++---- 6 files changed, 202 insertions(+), 67 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8170660458..b9ad0f628b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,9 +14,10 @@ We use *breaking* word for marking changes that are not backward compatible (rel ### Fixed - [#2536](https://github.com/thanos-io/thanos/pull/2536) minio-go: Fixed AWS STS endpoint url to https for Web Identity providers on AWS EKS -- [#2501](https://github.com/thanos-io/thanos/pull/2501) Query: gracefully handle additional fields in `SeriesResponse` protobuf message that may be added in the future. -- [#2568](https://github.com/thanos-io/thanos/pull/2568) Query: does not close the connection of strict, static nodes if establishing a connection had succeeded but Info() call failed +- [#2501](https://github.com/thanos-io/thanos/pull/2501) Query: Gracefully handle additional fields in `SeriesResponse` protobuf message that may be added in the future. +- [#2568](https://github.com/thanos-io/thanos/pull/2568) Query: Does not close the connection of strict, static nodes if establishing a connection had succeeded but Info() call failed - [#2615](https://github.com/thanos-io/thanos/pull/2615) Rule: Fix bugs where rules were out of sync. +- [#2548](https://github.com/thanos-io/thanos/pull/2548) Query: Fixed rare cases of double counter reset accounting when querying `rate` with deduplication enabled. ### Added @@ -33,6 +34,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [2513](https://github.com/thanos-io/thanos/pull/2513) Tools: Moved `thanos bucket` commands to `thanos tools bucket`, also moved `thanos check rules` to `thanos tools rules-check`. `thanos tools rules-check` also takes rules by `--rules` repeated flag not argument anymore. +- [#2548](https://github.com/thanos-io/thanos/pull/2548/commits/53e69bd89b2b08c18df298eed7d90cb7179cc0ec) Store, Querier: remove duplicated chunks on StoreAPI. ## [v0.12.2](https://github.com/thanos-io/thanos/releases/tag/v0.12.2) - 2020.04.30 diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index d9c7f99f29..b1013ba8d2 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -348,12 +348,12 @@ func downsampleRawLoop(data []sample, resolution int64, numChunks int) []chunks. ab := newAggrChunkBuilder() - // Encode first raw value; see CounterSeriesIterator. + // Encode first raw value; see ApplyCounterResetsSeriesIterator. ab.apps[AggrCounter].Append(batch[0].t, batch[0].v) lastT := downsampleBatch(batch, resolution, ab.add) - // Encode last raw value; see CounterSeriesIterator. + // Encode last raw value; see ApplyCounterResetsSeriesIterator. ab.apps[AggrCounter].Append(lastT, batch[len(batch)-1].v) chks = append(chks, ab.encode()) @@ -525,7 +525,7 @@ func downsampleAggrBatch(chks []*AggrChunk, buf *[]sample, resolution int64) (ch acs = append(acs, c.Iterator(reuseIt)) } *buf = (*buf)[:0] - it := NewCounterSeriesIterator(acs...) + it := NewApplyCounterResetsIterator(acs...) if err := expandChunkIterator(it, buf); err != nil { return chk, err @@ -538,7 +538,7 @@ func downsampleAggrBatch(chks []*AggrChunk, buf *[]sample, resolution int64) (ch ab.chunks[AggrCounter] = chunkenc.NewXORChunk() ab.apps[AggrCounter], _ = ab.chunks[AggrCounter].Appender() - // Retain first raw value; see CounterSeriesIterator. + // Retain first raw value; see ApplyCounterResetsSeriesIterator. ab.apps[AggrCounter].Append((*buf)[0].t, (*buf)[0].v) lastT := downsampleBatch(*buf, resolution, func(t int64, a *aggregator) { @@ -550,7 +550,7 @@ func downsampleAggrBatch(chks []*AggrChunk, buf *[]sample, resolution int64) (ch ab.apps[AggrCounter].Append(t, a.counter) }) - // Retain last raw value; see CounterSeriesIterator. + // Retain last raw value; see ApplyCounterResetsSeriesIterator. ab.apps[AggrCounter].Append(lastT, it.lastV) ab.mint = mint @@ -563,7 +563,7 @@ type sample struct { v float64 } -// CounterSeriesIterator generates monotonically increasing values by iterating +// ApplyCounterResetsSeriesIterator generates monotonically increasing values by iterating // over an ordered sequence of chunks, which should be raw or aggregated chunks // of counter values. The generated samples can be used by PromQL functions // like 'rate' that calculate differences between counter values. Stale Markers @@ -580,7 +580,7 @@ type sample struct { // It handles overlapped chunks (removes overlaps). // NOTE: It is important to deduplicate with care ensuring that you don't hit // issue https://github.com/thanos-io/thanos/issues/2401#issuecomment-621958839. -type CounterSeriesIterator struct { +type ApplyCounterResetsSeriesIterator struct { chks []chunkenc.Iterator i int // Current chunk. total int // Total number of processed samples. @@ -589,11 +589,11 @@ type CounterSeriesIterator struct { totalV float64 // Total counter state since beginning of series. } -func NewCounterSeriesIterator(chks ...chunkenc.Iterator) *CounterSeriesIterator { - return &CounterSeriesIterator{chks: chks} +func NewApplyCounterResetsIterator(chks ...chunkenc.Iterator) *ApplyCounterResetsSeriesIterator { + return &ApplyCounterResetsSeriesIterator{chks: chks} } -func (it *CounterSeriesIterator) Next() bool { +func (it *ApplyCounterResetsSeriesIterator) Next() bool { for { if it.i >= len(it.chks) { return false @@ -637,11 +637,12 @@ func (it *CounterSeriesIterator) Next() bool { } } -func (it *CounterSeriesIterator) At() (t int64, v float64) { +func (it *ApplyCounterResetsSeriesIterator) At() (t int64, v float64) { return it.lastT, it.totalV } -func (it *CounterSeriesIterator) Seek(x int64) bool { +func (it *ApplyCounterResetsSeriesIterator) Seek(x int64) bool { + // Don't use underlying Seek, but iterate over next to not miss counter resets. for { if t, _ := it.At(); t >= x { return true @@ -654,7 +655,7 @@ func (it *CounterSeriesIterator) Seek(x int64) bool { } } -func (it *CounterSeriesIterator) Err() error { +func (it *ApplyCounterResetsSeriesIterator) Err() error { if it.i >= len(it.chks) { return nil } diff --git a/pkg/compact/downsample/downsample_test.go b/pkg/compact/downsample/downsample_test.go index 049765faa5..47fe161a9a 100644 --- a/pkg/compact/downsample/downsample_test.go +++ b/pkg/compact/downsample/downsample_test.go @@ -60,7 +60,7 @@ func TestDownsampleCounterBoundaryReset(t *testing.T) { iters = append(iters, chk.Iterator(nil)) } - citer := NewCounterSeriesIterator(iters...) + citer := NewApplyCounterResetsIterator(iters...) for citer.Next() { t, v := citer.At() res = append(res, sample{t: t, v: v}) @@ -592,7 +592,7 @@ var ( } ) -func TestCounterAggegationIterator(t *testing.T) { +func TestApplyCounterResetsIterator(t *testing.T) { defer leaktest.CheckTimeout(t, 10*time.Second)() for _, tcase := range []struct { @@ -657,7 +657,7 @@ func TestCounterAggegationIterator(t *testing.T) { its = append(its, newSampleIterator(c)) } - x := NewCounterSeriesIterator(its...) + x := NewApplyCounterResetsIterator(its...) var res []sample for x.Next() { @@ -691,7 +691,7 @@ func TestCounterSeriesIteratorSeek(t *testing.T) { } var res []sample - x := NewCounterSeriesIterator(its...) + x := NewApplyCounterResetsIterator(its...) ok := x.Seek(150) testutil.Assert(t, ok, "Seek should return true") @@ -718,7 +718,7 @@ func TestCounterSeriesIteratorSeekExtendTs(t *testing.T) { its = append(its, newSampleIterator(c)) } - x := NewCounterSeriesIterator(its...) + x := NewApplyCounterResetsIterator(its...) ok := x.Seek(500) testutil.Assert(t, !ok, "Seek should return false") @@ -738,7 +738,7 @@ func TestCounterSeriesIteratorSeekAfterNext(t *testing.T) { } var res []sample - x := NewCounterSeriesIterator(its...) + x := NewApplyCounterResetsIterator(its...) x.Next() diff --git a/pkg/query/iter.go b/pkg/query/iter.go index 0067d08a3c..1ed3534ea7 100644 --- a/pkg/query/iter.go +++ b/pkg/query/iter.go @@ -210,7 +210,7 @@ func (s *chunkSeries) Iterator() storage.SeriesIterator { for _, c := range s.chunks { its = append(its, getFirstIterator(c.Counter, c.Raw)) } - sit = downsample.NewCounterSeriesIterator(its...) + sit = downsample.NewApplyCounterResetsIterator(its...) default: return errSeriesIterator{err: errors.Errorf("unexpected result aggregate type %v", s.aggrs)} } @@ -376,6 +376,7 @@ func (it *chunkSeriesIterator) Err() error { type dedupSeriesSet struct { set storage.SeriesSet replicaLabels map[string]struct{} + isCounter bool replicas []storage.Series lset labels.Labels @@ -383,8 +384,8 @@ type dedupSeriesSet struct { ok bool } -func newDedupSeriesSet(set storage.SeriesSet, replicaLabels map[string]struct{}) storage.SeriesSet { - s := &dedupSeriesSet{set: set, replicaLabels: replicaLabels} +func newDedupSeriesSet(set storage.SeriesSet, replicaLabels map[string]struct{}, isCounter bool) storage.SeriesSet { + s := &dedupSeriesSet{set: set, replicaLabels: replicaLabels, isCounter: isCounter} s.ok = s.set.Next() if s.ok { s.peek = s.set.At() @@ -447,7 +448,7 @@ func (s *dedupSeriesSet) At() storage.Series { // Clients may store the series, so we must make a copy of the slice before advancing. repl := make([]storage.Series, len(s.replicas)) copy(repl, s.replicas) - return newDedupSeries(s.lset, repl...) + return newDedupSeries(s.lset, repl, s.isCounter) } func (s *dedupSeriesSet) Err() error { @@ -464,44 +465,130 @@ func (s seriesWithLabels) Labels() labels.Labels { return s.lset } type dedupSeries struct { lset labels.Labels replicas []storage.Series + + isCounter bool } -func newDedupSeries(lset labels.Labels, replicas ...storage.Series) *dedupSeries { - return &dedupSeries{lset: lset, replicas: replicas} +func newDedupSeries(lset labels.Labels, replicas []storage.Series, isCounter bool) *dedupSeries { + return &dedupSeries{lset: lset, isCounter: isCounter, replicas: replicas} } func (s *dedupSeries) Labels() labels.Labels { return s.lset } -func (s *dedupSeries) Iterator() (it storage.SeriesIterator) { - it = s.replicas[0].Iterator() +func (s *dedupSeries) Iterator() storage.SeriesIterator { + var it adjustableSeriesIterator + if s.isCounter { + it = &counterErrAdjustSeriesIterator{SeriesIterator: s.replicas[0].Iterator()} + } else { + it = noopAdjustableSeriesIterator{SeriesIterator: s.replicas[0].Iterator()} + } + for _, o := range s.replicas[1:] { - it = newDedupSeriesIterator(it, o.Iterator()) + var replicaIter adjustableSeriesIterator + if s.isCounter { + replicaIter = &counterErrAdjustSeriesIterator{SeriesIterator: o.Iterator()} + } else { + replicaIter = noopAdjustableSeriesIterator{SeriesIterator: o.Iterator()} + } + it = newDedupSeriesIterator(it, replicaIter) } return it } +// adjustableSeriesIterator iterates over the data of a time series and allows to adjust current value based on +// given lastValue iterated. +type adjustableSeriesIterator interface { + storage.SeriesIterator + + // adjustAtValue allows to adjust value by implementation if needed knowing the last value. This is used by counter + // implementation which can adjust for obsolete counter value. + adjustAtValue(lastValue float64) +} + +type noopAdjustableSeriesIterator struct { + storage.SeriesIterator +} + +func (it noopAdjustableSeriesIterator) adjustAtValue(float64) {} + +// counterErrAdjustSeriesIterator is extendedSeriesIterator used when we deduplicate counter. +// It makes sure we always adjust for the latest seen last counter value for all replicas. +// Let's consider following example: +// +// Replica 1 counter scrapes: 20 30 40 Nan - 0 5 +// Replica 2 counter scrapes: 25 35 45 Nan - 2 +// +// Now for downsampling purposes we are accounting the resets so our replicas before going to dedup iterator looks like this: +// +// Replica 1 counter total: 20 30 40 - - 40 45 +// Replica 2 counter total: 25 35 45 - - 47 +// +// Now if at any point we will switch our focus from replica 2 to replica 1 we will experience lower value than previous, +// which will trigger false positive counter reset in PromQL. +// +// We mitigate this by taking allowing invoking AdjustAtValue which adjust the value in case of last value being larger than current at. +// (Counter cannot go down) +// +// This is to mitigate https://github.com/thanos-io/thanos/issues/2401. +// TODO(bwplotka): Find better deduplication algorithm that does not require knowledge if the given +// series is counter or not: https://github.com/thanos-io/thanos/issues/2547. +type counterErrAdjustSeriesIterator struct { + storage.SeriesIterator + + errAdjust float64 +} + +func (it *counterErrAdjustSeriesIterator) adjustAtValue(lastValue float64) { + _, v := it.At() + if lastValue > v { + // This replica has obsolete value (did not see the correct "end" of counter value before app restart). Adjust. + it.errAdjust += lastValue - v + } +} + +func (it *counterErrAdjustSeriesIterator) At() (int64, float64) { + t, v := it.SeriesIterator.At() + return t, v + it.errAdjust +} + type dedupSeriesIterator struct { - a, b storage.SeriesIterator + a, b adjustableSeriesIterator + + aok, bok bool + + // TODO(bwplotka): Don't base on LastT, but on detected scrape interval. This will allow us to be more + // responsive to gaps: https://github.com/thanos-io/thanos/issues/981, let's do it in next PR. + lastT int64 + lastV float64 - aok, bok bool - lastT int64 penA, penB int64 useA bool } -func newDedupSeriesIterator(a, b storage.SeriesIterator) *dedupSeriesIterator { +func newDedupSeriesIterator(a, b adjustableSeriesIterator) *dedupSeriesIterator { return &dedupSeriesIterator{ a: a, b: b, lastT: math.MinInt64, + lastV: float64(math.MinInt64), aok: a.Next(), bok: b.Next(), } } func (it *dedupSeriesIterator) Next() bool { + lastValue := it.lastV + lastUseA := it.useA + defer func() { + if it.useA != lastUseA { + // We switched replicas. + // Ensure values are correct bases on value before At. + it.adjustAtValue(lastValue) + } + }() + // Advance both iterators to at least the next highest timestamp plus the potential penalty. if it.aok { it.aok = it.a.Seek(it.lastT + 1 + it.penA) @@ -509,18 +596,19 @@ func (it *dedupSeriesIterator) Next() bool { if it.bok { it.bok = it.b.Seek(it.lastT + 1 + it.penB) } + // Handle basic cases where one iterator is exhausted before the other. if !it.aok { it.useA = false if it.bok { - it.lastT, _ = it.b.At() + it.lastT, it.lastV = it.b.At() it.penB = 0 } return it.bok } if !it.bok { it.useA = true - it.lastT, _ = it.a.At() + it.lastT, it.lastV = it.a.At() it.penA = 0 return true } @@ -528,8 +616,8 @@ func (it *dedupSeriesIterator) Next() bool { // with the smaller timestamp. // The applied penalty potentially already skipped potential samples already // that would have resulted in exaggerated sampling frequency. - ta, _ := it.a.At() - tb, _ := it.b.At() + ta, va := it.a.At() + tb, vb := it.b.At() it.useA = ta <= tb @@ -540,29 +628,41 @@ func (it *dedupSeriesIterator) Next() bool { // timestamp assignment. // If we don't know a delta yet, we pick 5000 as a constant, which is based on the knowledge // that timestamps are in milliseconds and sampling frequencies typically multiple seconds long. - const initialPenality = 5000 + const initialPenalty = 5000 if it.useA { if it.lastT != math.MinInt64 { it.penB = 2 * (ta - it.lastT) } else { - it.penB = initialPenality + it.penB = initialPenalty } it.penA = 0 it.lastT = ta + it.lastV = va return true } if it.lastT != math.MinInt64 { it.penA = 2 * (tb - it.lastT) } else { - it.penA = initialPenality + it.penA = initialPenalty } it.penB = 0 it.lastT = tb + it.lastV = vb return true } +func (it *dedupSeriesIterator) adjustAtValue(lastValue float64) { + if it.aok { + it.a.adjustAtValue(lastValue) + } + if it.bok { + it.b.adjustAtValue(lastValue) + } +} + func (it *dedupSeriesIterator) Seek(t int64) bool { + // Don't use underlying Seek, but iterate over next to not miss gaps. for { ts, _ := it.At() if ts > 0 && ts >= t { diff --git a/pkg/query/querier.go b/pkg/query/querier.go index 791232597d..e437440ed7 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -228,7 +228,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s // The merged series set assembles all potentially-overlapping time ranges // of the same series into a single one. The series are ordered so that equal series // from different replicas are sequential. We can now deduplicate those. - return newDedupSeriesSet(set, q.replicaLabels), warns, nil + return newDedupSeriesSet(set, q.replicaLabels, len(aggrs) == 1 && aggrs[0] == storepb.Aggr_COUNTER), warns, nil } // sortDedupLabels re-sorts the set so that the same series with different replica diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index 98402d49f5..7b49aaec7e 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -274,13 +274,13 @@ var ( {t: 1587691805791, v: 488036}, {t: 1587691820791, v: 488241}, {t: 1587691835791, v: 488411}, {t: 1587691850791, v: 488625}, {t: 1587691865791, v: 488868}, {t: 1587691880791, v: 489005}, {t: 1587691895791, v: 489237}, {t: 1587691910791, v: 489545}, {t: 1587691925791, v: 489750}, {t: 1587691940791, v: 489899}, {t: 1587691955791, v: 490048}, {t: 1587691970791, v: 490364}, {t: 1587691985791, v: 490485}, {t: 1587692000791, v: 490722}, {t: 1587692015791, v: 490866}, {t: 1587692030791, v: 491025}, {t: 1587692045791, v: 491286}, {t: 1587692060816, v: 491543}, {t: 1587692075791, v: 491787}, {t: 1587692090791, v: 492065}, {t: 1587692105791, v: 492223}, {t: 1587692120816, v: 492501}, {t: 1587692135791, v: 492767}, {t: 1587692150791, v: 492955}, {t: 1587692165791, v: 493194}, {t: 1587692180792, v: 493402}, {t: 1587692195791, v: 493647}, {t: 1587692210791, v: 493897}, {t: 1587692225791, v: 494117}, {t: 1587692240805, v: 494356}, {t: 1587692255791, v: 494620}, {t: 1587692270791, v: 494762}, {t: 1587692285791, v: 495001}, {t: 1587692300805, v: 495222}, {t: 1587692315791, v: 495393}, {t: 1587692330791, v: 495662}, - {t: 1587692345791, v: 495875}, {t: 1587692360801, v: 496082}, {t: 1587692375791, v: 496196}, {t: 1587692390791, v: 496245}, {t: 1587692405791, v: 496295}, {t: 1587692420791, v: 496365}, {t: 1587692435791, v: 496401}, {t: 1587692450791, v: 496452}, {t: 1587692465791, v: 496491}, {t: 1587692480791, v: 496544}, {t: 1587692542149, v: 496537}, {t: 1587692557139, v: 496633}, - {t: 1587692572139, v: 496844}, {t: 1587692587139, v: 497040}, {t: 1587692602144, v: 497257}, {t: 1587692617139, v: 497522}, {t: 1587692632139, v: 497710}, {t: 1587692647139, v: 497938}, {t: 1587692662154, v: 498172}, {t: 1587692677139, v: 498459}, {t: 1587692692139, v: 498635}, {t: 1587692707139, v: 498832}, {t: 1587692722139, v: 499014}, {t: 1587692737139, v: 499170}, - {t: 1587692752139, v: 499338}, {t: 1587692767139, v: 499511}, {t: 1587692782149, v: 499719}, {t: 1587692797139, v: 499973}, {t: 1587692812139, v: 500189}, {t: 1587692827139, v: 500359}, {t: 1587692842139, v: 500517}, {t: 1587692857139, v: 500727}, {t: 1587692872139, v: 500959}, {t: 1587692887139, v: 501178}, {t: 1587692902139, v: 501246}, {t: 1587692917153, v: 501404}, - {t: 1587692932139, v: 501663}, {t: 1587692947139, v: 501850}, {t: 1587692962139, v: 502103}, {t: 1587692977155, v: 502280}, {t: 1587692992139, v: 502562}, {t: 1587693007139, v: 502742}, {t: 1587693022139, v: 502931}, {t: 1587693037139, v: 503190}, {t: 1587693052139, v: 503428}, {t: 1587693067139, v: 503630}, {t: 1587693082139, v: 503873}, {t: 1587693097139, v: 504027}, - {t: 1587693112139, v: 504179}, {t: 1587693127139, v: 504362}, {t: 1587693142139, v: 504590}, {t: 1587693157139, v: 504741}, {t: 1587693172139, v: 505056}, {t: 1587693187139, v: 505244}, {t: 1587693202139, v: 505436}, {t: 1587693217139, v: 505635}, {t: 1587693232139, v: 505936}, {t: 1587693247155, v: 506088}, {t: 1587693262139, v: 506309}, {t: 1587693277139, v: 506524}, - {t: 1587693292139, v: 506800}, {t: 1587693307139, v: 507010}, {t: 1587693322139, v: 507286}, {t: 1587693337139, v: 507530}, {t: 1587693352139, v: 507781}, {t: 1587693367139, v: 507991}, {t: 1587693382139, v: 508310}, {t: 1587693397139, v: 508570}, {t: 1587693412139, v: 508770}, {t: 1587693427139, v: 508982}, {t: 1587693442163, v: 509274}, {t: 1587693457139, v: 509477}, - {t: 1587693472139, v: 509713}, {t: 1587693487139, v: 509972}, {t: 1587693502139, v: 510182}, {t: 1587693517139, v: 510498}, {t: 1587693532139, v: 510654}, {t: 1587693547139, v: 510859}, {t: 1587693562139, v: 511124}, {t: 1587693577139, v: 511314}, {t: 1587693592139, v: 511488}, + {t: 1587692345791, v: 495875}, {t: 1587692360801, v: 496082}, {t: 1587692375791, v: 496196}, {t: 1587692390791, v: 496245}, {t: 1587692405791, v: 496295}, {t: 1587692420791, v: 496365}, {t: 1587692435791, v: 496401}, {t: 1587692450791, v: 496452}, {t: 1587692465791, v: 496491}, {t: 1587692480791, v: 496544}, {t: 1587692542149, v: 496544}, {t: 1587692557139, v: 496640}, + {t: 1587692572139, v: 496851}, {t: 1587692587139, v: 497047}, {t: 1587692602144, v: 497264}, {t: 1587692617139, v: 497529}, {t: 1587692632139, v: 497717}, {t: 1587692647139, v: 497945}, {t: 1587692662154, v: 498179}, {t: 1587692677139, v: 498466}, {t: 1587692692139, v: 498642}, {t: 1587692707139, v: 498839}, {t: 1587692722139, v: 499021}, {t: 1587692737139, v: 499177}, + {t: 1587692752139, v: 499345}, {t: 1587692767139, v: 499518}, {t: 1587692782149, v: 499726}, {t: 1587692797139, v: 499980}, {t: 1587692812139, v: 500196}, {t: 1587692827139, v: 500366}, {t: 1587692842139, v: 500524}, {t: 1587692857139, v: 500734}, {t: 1587692872139, v: 500966}, {t: 1587692887139, v: 501185}, {t: 1587692902139, v: 501253}, {t: 1587692917153, v: 501411}, + {t: 1587692932139, v: 501670}, {t: 1587692947139, v: 501857}, {t: 1587692962139, v: 502110}, {t: 1587692977155, v: 502287}, {t: 1587692992139, v: 502569}, {t: 1587693007139, v: 502749}, {t: 1587693022139, v: 502938}, {t: 1587693037139, v: 503197}, {t: 1587693052139, v: 503435}, {t: 1587693067139, v: 503637}, {t: 1587693082139, v: 503880}, {t: 1587693097139, v: 504034}, + {t: 1587693112139, v: 504186}, {t: 1587693127139, v: 504369}, {t: 1587693142139, v: 504597}, {t: 1587693157139, v: 504748}, {t: 1587693172139, v: 505063}, {t: 1587693187139, v: 505251}, {t: 1587693202139, v: 505443}, {t: 1587693217139, v: 505642}, {t: 1587693232139, v: 505943}, {t: 1587693247155, v: 506095}, {t: 1587693262139, v: 506316}, {t: 1587693277139, v: 506531}, + {t: 1587693292139, v: 506807}, {t: 1587693307139, v: 507017}, {t: 1587693322139, v: 507293}, {t: 1587693337139, v: 507537}, {t: 1587693352139, v: 507788}, {t: 1587693367139, v: 507998}, {t: 1587693382139, v: 508317}, {t: 1587693397139, v: 508577}, {t: 1587693412139, v: 508777}, {t: 1587693427139, v: 508989}, {t: 1587693442163, v: 509281}, {t: 1587693457139, v: 509484}, + {t: 1587693472139, v: 509720}, {t: 1587693487139, v: 509979}, {t: 1587693502139, v: 510189}, {t: 1587693517139, v: 510505}, {t: 1587693532139, v: 510661}, {t: 1587693547139, v: 510866}, {t: 1587693562139, v: 511131}, {t: 1587693577139, v: 511321}, {t: 1587693592139, v: 511495}, } ) @@ -730,7 +730,6 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { }) // Regression test against https://github.com/thanos-io/thanos/issues/2401. // Rate + dedup can cause incorrectness. - // TODO(bwplotka): To fix in next PR. t.Run("dedup=true", func(t *testing.T) { expectedLset := labels.FromStrings( "action", "widget.json", "controller", "Projects::MergeRequests::ContentController", "env", "gprd", "environment", @@ -764,8 +763,8 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { {T: 1587691100000, V: 15.922807017543859}, {T: 1587691200000, V: 15.63157894736842}, {T: 1587691300000, V: 14.982456140350878}, {T: 1587691400000, V: 14.187259188557551}, {T: 1587691500000, V: 13.828070175438594}, {T: 1587691600000, V: 13.971929824561403}, {T: 1587691700000, V: 15.31994329585807}, {T: 1587691800000, V: 14.30877192982456}, {T: 1587691900000, V: 13.915789473684212}, {T: 1587692000000, V: 13.312280701754384}, {T: 1587692100000, V: 14.136842105263158}, {T: 1587692200000, V: 14.39298245614035}, - {T: 1587692300000, V: 15.014035087719297}, {T: 1587692400000, V: 14.112280701754386}, {T: 1587692500000, V: 9.421065148148147}, {T: 1587692600000, V: 1740.491873127187}, // Suddenly unexpected rate spike. - {T: 1587692700000, V: 1742.229734448992}, {T: 1587692800000, V: 11.918703026416258}, {T: 1587692900000, V: 13.75813610765101}, {T: 1587693000000, V: 13.087719298245615}, + {T: 1587692300000, V: 15.014035087719297}, {T: 1587692400000, V: 14.112280701754386}, {T: 1587692500000, V: 9.421065148148147}, {T: 1587692600000, V: 6.3736754978451735}, + {T: 1587692700000, V: 8.19632056099571}, {T: 1587692800000, V: 11.918703026416258}, {T: 1587692900000, V: 13.75813610765101}, {T: 1587693000000, V: 13.087719298245615}, {T: 1587693100000, V: 13.466666666666667}, {T: 1587693200000, V: 14.028070175438595}, {T: 1587693300000, V: 14.23859649122807}, {T: 1587693400000, V: 15.407017543859647}, {T: 1587693500000, V: 15.915789473684208}, {T: 1587693600000, V: 15.712280701754386}, }}, @@ -786,8 +785,8 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { {Metric: expectedLset, Points: []promql.Point{ {T: 1587691800000, V: 14.464425770308123}, {T: 1587692300000, V: 14.763025210084033}, - {T: 1587692800000, V: 291.105652426067}, // Suddenly unexpected rate spike. - {T: 1587693300000, V: 290.89236811640285}, + {T: 1587692800000, V: 13.143575607888273}, + {T: 1587693300000, V: 12.930291298224086}, }}, }, vec) }) @@ -955,6 +954,7 @@ func TestDedupSeriesSet(t *testing.T) { input []series exp []series dedupLabels map[string]struct{} + isCounter bool }{ { // Single dedup label. @@ -1096,7 +1096,7 @@ func TestDedupSeriesSet(t *testing.T) { // Now, depending on what replica we look, we can see totally different counter value in total where total means // after accounting for counter resets. We account for that in downsample.CounterSeriesIterator, mainly because // we handle downsample Counter Aggregations specially (for detecting resets between chunks). - // TODO(bwplotka): Fix in next PR. + isCounter: true, input: []series{ { lset: labels.Labels{{Name: "replica", Value: "01"}}, @@ -1104,32 +1104,64 @@ func TestDedupSeriesSet(t *testing.T) { {10000, 8.0}, // Smaller timestamp, this will be chosen. CurrValue = 8.0. {20000, 9.0}, // Same. CurrValue = 9.0. // {Gap} app reset. No sample, because stale marker but removed by downsample.CounterSeriesIterator. - {50001, 9 + 1.0}}, // Next after 20000+1 has a bit higher than timestamp then in second series. Penalty 5000 will be added. + {50001, 9 + 1.0}, // Next after 20000+1 has a bit higher than timestamp then in second series. Penalty 5000 will be added. + {60000, 9 + 2.0}, + {70000, 9 + 3.0}, + {80000, 9 + 4.0}, + {90000, 9 + 5.0}, // This should be now taken, and we expect 14 to be correct value now. + {100000, 9 + 6.0}, + }, }, { lset: labels.Labels{{Name: "replica", Value: "02"}}, samples: []sample{ {10001, 8.0}, // Penalty 5000 will be added. // 20001 was app reset. No sample, because stale marker but removed by downsample.CounterSeriesIterator. Penalty 2 * (20000 - 10000) will be added. // 30001 no sample. Within penalty, ignored. - {45001, 8.0 + 0.5}, // Smaller timestamp, this will be chosen. CurrValue = 8.5 which is smaller than last chosen value. + {45001, 8 + 0.5}, // Smaller timestamp, this will be chosen. CurrValue = 8.5 which is smaller than last chosen value. + {55001, 8 + 1.5}, + {65001, 8 + 2.5}, + // {Gap} app reset. No sample, because stale marker but removed by downsample.CounterSeriesIterator. }, }, }, exp: []series{ { - lset: labels.Labels{}, - // Outcome for rate: Double counter reset accounted. - samples: []sample{{10000, 8}, {20000, 9}, {45001, 8.5}}, + lset: labels.Labels{}, + samples: []sample{{10000, 8}, {20000, 9}, {45001, 9}, {55001, 10}, {65001, 11}, {90000, 14}, {100000, 15}}, }, }, dedupLabels: map[string]struct{}{ "replica": {}, }, }, + { + // Same thing but not for counter should not adjust antything. + isCounter: false, + input: []series{ + { + lset: labels.Labels{{Name: "replica", Value: "01"}}, + samples: []sample{ + {10000, 8.0}, {20000, 9.0}, {50001, 9 + 1.0}, {60000, 9 + 2.0}, {70000, 9 + 3.0}, {80000, 9 + 4.0}, {90000, 9 + 5.0}, {100000, 9 + 6.0}, + }, + }, { + lset: labels.Labels{{Name: "replica", Value: "02"}}, + samples: []sample{ + {10001, 8.0}, {45001, 8 + 0.5}, {55001, 8 + 1.5}, {65001, 8 + 2.5}, + }, + }, + }, + exp: []series{ + { + lset: labels.Labels{}, + samples: []sample{{10000, 8}, {20000, 9}, {45001, 8.5}, {55001, 9.5}, {65001, 10.5}, {90000, 14}, {100000, 15}}, + }, + }, + dedupLabels: map[string]struct{}{"replica": {}}, + }, { // Regression test on real data against https://github.com/thanos-io/thanos/issues/2401. // Real data with stale marker after downsample.CounterSeriesIterator (required for downsampling + rate). - // TODO(bwplotka): Fix in next PR. + isCounter: true, input: []series{ { lset: labels.Labels{{Name: "replica", Value: "01"}}, @@ -1199,7 +1231,7 @@ func TestDedupSeriesSet(t *testing.T) { for _, tcase := range tests { t.Run("", func(t *testing.T) { - dedupSet := newDedupSeriesSet(&mockedSeriesSet{series: tcase.input}, tcase.dedupLabels) + dedupSet := newDedupSeriesSet(&mockedSeriesSet{series: tcase.input}, tcase.dedupLabels, tcase.isCounter) var ats []storage.Series for dedupSet.Next() { ats = append(ats, dedupSet.At()) @@ -1254,10 +1286,10 @@ func TestDedupSeriesIterator(t *testing.T) { for i, c := range cases { t.Logf("case %d:", i) it := newDedupSeriesIterator( - newMockedSeriesIterator(c.a), - newMockedSeriesIterator(c.b), + noopAdjustableSeriesIterator{newMockedSeriesIterator(c.a)}, + noopAdjustableSeriesIterator{newMockedSeriesIterator(c.b)}, ) - res := expandSeries(t, it) + res := expandSeries(t, noopAdjustableSeriesIterator{it}) testutil.Equals(t, c.exp, res) } } @@ -1265,8 +1297,8 @@ func TestDedupSeriesIterator(t *testing.T) { func BenchmarkDedupSeriesIterator(b *testing.B) { run := func(b *testing.B, s1, s2 []sample) { it := newDedupSeriesIterator( - newMockedSeriesIterator(s1), - newMockedSeriesIterator(s2), + noopAdjustableSeriesIterator{newMockedSeriesIterator(s1)}, + noopAdjustableSeriesIterator{newMockedSeriesIterator(s2)}, ) b.ResetTimer() var total int64