Skip to content

Commit

Permalink
Added min-time limitaiton for sidecar. This allows optionally storing…
Browse files Browse the repository at this point in the history
… longer retention time on Prometheus.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Oct 10, 2019
1 parent aaaa95f commit bbd0999
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 24 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ jobs:
test:
docker:
# Build by Thanos make docker-ci
- image: quay.io/thanos/thanos-ci:v0.1.0
- image: quay.io/thanos/thanos-ci:v0.2.0
working_directory: /go/src/github.com/thanos-io/thanos
environment:
GO111MODULE: 'on'
Expand Down
9 changes: 5 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ FILES_TO_FMT ?= $(shell find . -path ./vendor -prune -o -name '*.go' -print

DOCKER_IMAGE_REPO ?= quay.io/thanos/thanos
DOCKER_IMAGE_TAG ?= $(subst /,-,$(shell git rev-parse --abbrev-ref HEAD))-$(shell date +%Y-%m-%d)-$(shell git rev-parse --short HEAD)
DOCKER_CI_TAG ?= test

TMP_GOPATH ?= /tmp/thanos-go
GOBIN ?= $(firstword $(subst :, ,${GOPATH}))/bin
Expand Down Expand Up @@ -45,8 +46,8 @@ ME ?= $(shell whoami)
# Referenced by github.com/thanos-io/thanos/blob/master/docs/getting_started.md#prometheus

# Limited prom version, because testing was not possible. This should fix it: https://github.com/thanos-io/thanos/issues/758
PROM_VERSIONS ?= v2.4.3 v2.5.0 v2.8.1 v2.9.2
PROMS ?= $(GOBIN)/prometheus-v2.4.3 $(GOBIN)/prometheus-v2.5.0 $(GOBIN)/prometheus-v2.8.1 $(GOBIN)/prometheus-v2.9.2
PROM_VERSIONS ?= v2.4.3 v2.5.0 v2.8.1 v2.9.2 v2.13.0
PROMS ?= $(GOBIN)/prometheus-v2.4.3 $(GOBIN)/prometheus-v2.5.0 $(GOBIN)/prometheus-v2.8.1 $(GOBIN)/prometheus-v2.9.2 $(GOBIN)/prometheus-v2.13.0

ALERTMANAGER_VERSION ?= v0.15.2
ALERTMANAGER ?= $(GOBIN)/alertmanager-$(ALERTMANAGER_VERSION)
Expand Down Expand Up @@ -234,8 +235,8 @@ docker-ci: install-deps
@cp -r $(GOBIN)/* ./tmp/bin
@docker build -t thanos-ci -f Dockerfile.thanos-ci .
@echo ">> pushing thanos-ci image"
@docker tag "thanos-ci" "quay.io/thanos/thanos-ci:v0.1.0"
@docker push "quay.io/thanos/thanos-ci:v0.1.0"
@docker tag "thanos-ci" "quay.io/thanos/thanos-ci:$(DOCKER_CI_TAG)"
@docker push "quay.io/thanos/thanos-ci:$(DOCKER_CI_TAG)"

# tooling deps. TODO(bwplotka): Pin them all to certain version!
.PHONY: check-git
Expand Down
21 changes: 18 additions & 3 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/prometheus/prometheus/tsdb/labels"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
thanosmodel "github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/promclient"
Expand Down Expand Up @@ -56,6 +57,9 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application) {

uploadCompacted := cmd.Flag("shipper.upload-compacted", "[Experimental] If true sidecar will try to upload compacted blocks as well. Useful for migration purposes. Works only if compaction is disabled on Prometheus.").Default("false").Hidden().Bool()

minTime := thanosmodel.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Thanos sidecar will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z"))

m[component.Sidecar.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
rl := reloader.New(
log.With(logger, "component", "reloader"),
Expand All @@ -64,6 +68,7 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application) {
*reloaderCfgOutputFile,
*reloaderRuleDirs,
)

return runSidecar(
g,
logger,
Expand All @@ -80,6 +85,7 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application) {
rl,
*uploadCompacted,
component.Sidecar,
*minTime,
)
}
}
Expand All @@ -100,14 +106,17 @@ func runSidecar(
reloader *reloader.Reloader,
uploadCompacted bool,
comp component.Component,
limitMinTime thanosmodel.TimeOrDurationValue,
) error {
var m = &promMetadata{
promURL: promURL,

// Start out with the full time range. The shipper will constrain it later.
// TODO(fabxc): minimum timestamp is never adjusted if shipping is disabled.
mint: 0,
mint: limitMinTime.PrometheusTimestamp(),
maxt: math.MaxInt64,

limitMinTime: limitMinTime,
}

confContentYaml, err := objStoreConfig.Content()
Expand Down Expand Up @@ -285,9 +294,9 @@ func runSidecar(
minTime, _, err := s.Timestamps()
if err != nil {
level.Warn(logger).Log("msg", "reading timestamps failed", "err", err)
} else {
m.UpdateTimestamps(minTime, math.MaxInt64)
return nil
}
m.UpdateTimestamps(minTime, math.MaxInt64)
return nil
})
}, func(error) {
Expand Down Expand Up @@ -341,6 +350,8 @@ type promMetadata struct {
mint int64
maxt int64
labels labels.Labels

limitMinTime thanosmodel.TimeOrDurationValue
}

func (s *promMetadata) UpdateLabels(ctx context.Context, logger log.Logger) error {
Expand All @@ -360,6 +371,10 @@ func (s *promMetadata) UpdateTimestamps(mint int64, maxt int64) {
s.mtx.Lock()
defer s.mtx.Unlock()

if mint < s.limitMinTime.PrometheusTimestamp() {
mint = s.limitMinTime.PrometheusTimestamp()
}

s.mint = mint
s.maxt = maxt
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing blocks from object storage.").
Default("20").Int()

minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Thanos Store serves only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Thanos Store will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z"))

maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve. Thanos Store serves only blocks, which happened eariler than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve. Thanos Store will serve only blocks, which happened eariler than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("9999-12-31T23:59:59Z"))

m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
Expand Down
7 changes: 7 additions & 0 deletions docs/components/sidecar.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,5 +125,12 @@ Flags:
contains object store configuration. See format
details:
https://thanos.io/storage.md/#configuration
--min-time=0000-01-01T00:00:00Z
Start of time range limit to serve. Thanos
sidecar will serve only metrics, which happened
later than this value. Option can be a constant
time in RFC3339 format or time duration
relative to current time, such as -1d or 2h45m.
Valid duration units are ms, s, m, h, d, w, y.

```
16 changes: 8 additions & 8 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,16 @@ Flags:
from object storage.
--min-time=0000-01-01T00:00:00Z
Start of time range limit to serve. Thanos
Store serves only metrics, which happened later
than this value. Option can be a constant time
in RFC3339 format or time duration relative to
current time, such as -1d or 2h45m. Valid
duration units are ms, s, m, h, d, w, y.
Store will serve only metrics, which happened
later than this value. Option can be a constant
time in RFC3339 format or time duration
relative to current time, such as -1d or 2h45m.
Valid duration units are ms, s, m, h, d, w, y.
--max-time=9999-12-31T23:59:59Z
End of time range limit to serve. Thanos Store
serves only blocks, which happened eariler than
this value. Option can be a constant time in
RFC3339 format or time duration relative to
will serve only blocks, which happened eariler
than this value. Option can be a constant time
in RFC3339 format or time duration relative to
current time, such as -1d or 2h45m. Valid
duration units are ms, s, m, h, d, w, y.

Expand Down
6 changes: 6 additions & 0 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie
return status.Error(codes.InvalidArgument, errors.New("no matchers specified (excluding external labels)").Error())
}

// Don't ask for more than available time. This includes potential `minTime` flag limit.
availableMinTime, _ := p.timestamps()
if r.MinTime < availableMinTime {
r.MinTime = availableMinTime
}

q := &prompb.Query{StartTimestampMs: r.MinTime, EndTimestampMs: r.MaxTime}

for _, m := range newMatchers {
Expand Down
42 changes: 37 additions & 5 deletions pkg/store/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) {
u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

limitMinT := int64(0)
proxy, err := NewPrometheusStore(nil, nil, u, component.Sidecar,
func() labels.Labels {
return labels.FromStrings("region", "eu-west")
}, nil)
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return limitMinT, -1 }) // Maxt does not matter.
testutil.Ok(t, err)

// Query all three samples except for the first one. Since we round up queried data
// to seconds, we can test whether the extra sample gets stripped properly.
{
// Query all three samples except for the first one. Since we round up queried data
// to seconds, we can test whether the extra sample gets stripped properly.
srv := newStoreSeriesServer(ctx)
testutil.Ok(t, proxy.Series(&storepb.SeriesRequest{
MinTime: baseT + 101,
Expand Down Expand Up @@ -92,6 +92,38 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) {
testutil.Equals(t, []sample{{baseT + 200, 2}, {baseT + 300, 3}}, samples)

}
// Query all samples, but limit mint time to exclude the first one.
{
limitMinT = baseT + 101
srv := newStoreSeriesServer(ctx)
testutil.Ok(t, proxy.Series(&storepb.SeriesRequest{
MinTime: 0,
MaxTime: baseT + 300,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"},
},
}, srv))
// Revert for next cases.
limitMinT = 0

testutil.Equals(t, 1, len(srv.SeriesSet))

testutil.Equals(t, []storepb.Label{
{Name: "a", Value: "b"},
{Name: "region", Value: "eu-west"},
}, srv.SeriesSet[0].Labels)

testutil.Equals(t, 1, len(srv.SeriesSet[0].Chunks))

c := srv.SeriesSet[0].Chunks[0]
testutil.Equals(t, storepb.Chunk_XOR, c.Raw.Type)

chk, err := chunkenc.FromData(chunkenc.EncXOR, c.Raw.Data)
testutil.Ok(t, err)

samples := expandChunk(chk.Iterator(nil))
testutil.Equals(t, []sample{{baseT + 200, 2}, {baseT + 300, 3}}, samples)
}
// Querying by external labels only.
{
srv := newStoreSeriesServer(ctx)
Expand Down
2 changes: 1 addition & 1 deletion pkg/testutil/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

const (
defaultPrometheusVersion = "v2.9.2"
defaultPrometheusVersion = "v2.13.0"
defaultAlertmanagerVersion = "v0.15.2"
defaultMinioVersion = "RELEASE.2018-10-06T00-15-16Z"

Expand Down

0 comments on commit bbd0999

Please sign in to comment.