From 4c58233b261ceedc6367cdda2bff76b509e8ec27 Mon Sep 17 00:00:00 2001 From: Bartek Plotka Date: Wed, 2 Jan 2019 19:21:28 +0000 Subject: [PATCH] Upgraded Prometheus and TSDB deps. This PR does not add anything, however by upgrading Prometheus from 2.3.2 to master tip and same for TSDB it may affects few things. Bigger packages we use and changes I found manually: * prometheus/prometheus/discovery/file * [ENHANCEMENT] Discovery: Improve performance of previously slow updates of changes of targets. #4526 ?? cc @ivan-valkov * [BUGFIX] Wait for service discovery to stop before exiting #4508 ?? * prometheus/prometheus/promql: * [BUGFIX] PromQL: Fix a goroutine leak in the lexer/parser. #4858 * [BUGFIX] Change max/min over_time to handle NaNs properly. #438 * [BUGFIX] Check label name for `count_values` PromQL function. #4585 * [BUGFIX] Ensure that vectors and matrices do not contain identical label-sets. #4589 * [ENHANCEMENT] Optimize PromQL aggregations #4248 * [BUGFIX] Only add LookbackDelta to vector selectors #4399 * [BUGFIX] Reduce floating point errors in stddev and related functions #4533 * prometheus/prometheus/rules: * New metrics exposed! (prometheus evaluation!) * [ENHANCEMENT] Rules: Error out at load time for invalid templates, rather than at evaluation time. #4537 * prometheus/tsdb/index: Index reader optimizations. There are things/fixes we may reuse in next PRs (TODO create gh issues for those): * api changes (warnings support on Prometheus UI and Query API) * UI fixes: * [ENHANCEMENT] Web UI: Support console queries at specific times. #4764 * [ENHANCEMENT] Web UI: Avoid browser spell-checking in expression field. #472 * Use notifier package once https://github.com/prometheus/prometheus/pull/5025 is merged. * Ruler UI fixes: * [ENHANCEMENT] Show rule evaluation errors in UI #4457 Follopw up issues we can fix in further PRs: * QueryAPI has now api/v1/labels that Thanos does not yet support. Created issue with details: https://github.com/improbable-eng/thanos/issues/702 * https://github.com/improbable-eng/thanos/issues/703 Signed-off-by: Bartek Plotka --- Gopkg.lock | 38 +- Gopkg.toml | 7 +- cmd/thanos/bucket.go | 10 +- cmd/thanos/downsample.go | 10 +- cmd/thanos/query.go | 11 +- cmd/thanos/rule.go | 11 +- cmd/thanos/sidecar.go | 4 +- pkg/block/block.go | 133 +------ pkg/block/blockmeta/meta.go | 142 ++++++++ pkg/block/index.go | 89 ++++- pkg/block/index_test.go | 46 +++ pkg/compact/compact.go | 38 +- pkg/compact/compact_e2e_test.go | 34 +- pkg/compact/downsample/downsample.go | 22 +- pkg/compact/downsample/downsample_test.go | 8 +- pkg/compact/retention_test.go | 8 +- pkg/query/api/v1.go | 7 +- pkg/query/api/v1_test.go | 55 +-- pkg/query/querier.go | 19 +- pkg/query/querier_test.go | 2 +- pkg/query/test_print.go | 34 ++ pkg/shipper/shipper.go | 18 +- pkg/shipper/shipper_e2e_test.go | 7 +- pkg/shipper/shipper_test.go | 14 +- pkg/store/bucket.go | 71 ++-- pkg/store/bucket_profile_test.go | 408 ++++++++++++++++++++++ pkg/testutil/prometheus.go | 13 +- pkg/verifier/index_issue.go | 4 +- 28 files changed, 944 insertions(+), 319 deletions(-) create mode 100644 pkg/block/blockmeta/meta.go create mode 100644 pkg/block/index_test.go create mode 100644 pkg/query/test_print.go create mode 100644 pkg/store/bucket_profile_test.go diff --git a/Gopkg.lock b/Gopkg.lock index 94d9dfb1be6..57e6d89a262 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -199,12 +199,12 @@ revision = "2e65f85255dbc3072edf28d6b5b8efc472979f5a" [[projects]] - branch = "master" - digest = "1:9abc49f39e3e23e262594bb4fb70abf74c0c99e94f99153f43b143805e850719" + digest = "1:cea4aa2038169ee558bf507d5ea02c94ca85bcca28a4c7bb99fd59b31e43a686" name = "github.com/google/go-querystring" packages = ["query"] pruneopts = "" - revision = "53e6ce116135b80d037921a7fdd5138cf32d7a8a" + revision = "44c6ddd0a2342c386950e880b658017258da92fc" + version = "v1.0.0" [[projects]] digest = "1:e097a364f4e8d8d91b9b9eeafb992d3796a41fde3eb548c1a87eb9d9f60725cf" @@ -335,12 +335,12 @@ version = "v0.3.0" [[projects]] - digest = "1:82b912465c1da0668582a7d1117339c278e786c2536b3c3623029a0c7141c2d0" + digest = "1:84c28d9899cc4e00c38042d345cea8819275a5a62403a58530cac67022894776" name = "github.com/mattn/go-runewidth" packages = ["."] pruneopts = "" - revision = "ce7b0b5c7b45a81508558cd1dba6bb1e4ddb51bb" - version = "v0.0.3" + revision = "3ee7d812e62a0804a7d0a324e0249ca2db3476d3" + version = "v0.0.4" [[projects]] digest = "1:49a8b01a6cd6558d504b65608214ca40a78000e1b343ed0da5c6a9ccd83d6d30" @@ -390,20 +390,12 @@ version = "v0.11.0" [[projects]] - digest = "1:912349f5cf927bf96dca709623631ace7db723f07c70c4d56cfc22d9a667ed16" + digest = "1:b09858acd58e0873236c7b96903e3ec4e238d5de644c08bd8e712fa2d3d51ad2" name = "github.com/mozillazg/go-httpheader" packages = ["."] pruneopts = "" - revision = "4e5d6424981844faafc4b0649036b2e0395bdf99" - version = "v0.2.0" - -[[projects]] - branch = "master" - digest = "1:3adc46876d4d0e4d5bbcfcc44c2116b95d7a5c966e2ee92a219488547fd453f2" - name = "github.com/nightlyone/lockfile" - packages = ["."] - pruneopts = "" - revision = "0ad87eef1443f64d3d8c50da647e2b1552851124" + revision = "61f2392c3317b60616c9dcb10d0a4cfef131fe62" + version = "v0.2.1" [[projects]] digest = "1:94e9081cc450d2cdf4e6886fc2c06c07272f86477df2d74ee5931951fa3d2577" @@ -508,11 +500,12 @@ revision = "05ee40e3a273f7245e8777337fc7b46e533a9a92" [[projects]] - digest = "1:b5ff9852eabe841003da4b0a4b742a2878c722dda6481003432344f633a814fc" + digest = "1:912a82611430bfd1e597e76aac99cac6fd34094a07b07d7c5996cf51a21e7e07" name = "github.com/prometheus/prometheus" packages = [ "discovery/file", "discovery/targetgroup", + "pkg/gate", "pkg/labels", "pkg/rulefmt", "pkg/textparse", @@ -528,11 +521,10 @@ "util/testutil", ] pruneopts = "" - revision = "71af5e29e815795e9dd14742ee7725682fa14b7b" - version = "v2.3.2" + revision = "6e08029b56ae17c49e133d92a2792f6f119f2cbd" [[projects]] - digest = "1:216dcf26fbfb3f36f286ca3306882a157c51648e4b5d4f3a9e9c719faea6ea58" + digest = "1:0a03b362c09b1186dd53330881430c7c2c26ba07806ebad861f2aa99d9c5c6ae" name = "github.com/prometheus/tsdb" packages = [ ".", @@ -541,9 +533,11 @@ "fileutil", "index", "labels", + "wal", ] pruneopts = "" - revision = "bd832fc8274e8fe63999ac749daaaff9d881241f" + revision = "fd04e0963c04a1fbd891be7552b50f58e357f75c" + source = "github.com/bwplotka/tsdb" [[projects]] branch = "master" diff --git a/Gopkg.toml b/Gopkg.toml index 98bbc9494dc..e639afa84d9 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -37,7 +37,8 @@ ignored = ["github.com/improbable-eng/thanos/benchmark/*"] name = "github.com/prometheus/common" [[constraint]] - version = "v2.3.2" + # TODO(bwplotka): Move to released version once our recent fixes will be merged & released. + revision = "6e08029b56ae17c49e133d92a2792f6f119f2cbd" name = "github.com/prometheus/prometheus" [[override]] @@ -46,7 +47,9 @@ ignored = ["github.com/improbable-eng/thanos/benchmark/*"] [[constraint]] name = "github.com/prometheus/tsdb" - revision = "bd832fc8274e8fe63999ac749daaaff9d881241f" + # TODO(bwplotka): Move to upstream version once https://github.com/prometheus/tsdb/pull/492 is merged. + revision = "fd04e0963c04a1fbd891be7552b50f58e357f75c" + source = "github.com/bwplotka/tsdb" [[constraint]] branch = "master" diff --git a/cmd/thanos/bucket.go b/cmd/thanos/bucket.go index 01b9b54bf68..eca845231bf 100644 --- a/cmd/thanos/bucket.go +++ b/cmd/thanos/bucket.go @@ -10,10 +10,9 @@ import ( "text/template" "time" - "github.com/prometheus/tsdb/labels" - "github.com/go-kit/kit/log" "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" "github.com/improbable-eng/thanos/pkg/objstore/client" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/improbable-eng/thanos/pkg/verifier" @@ -23,6 +22,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/tsdb/labels" "golang.org/x/text/language" "golang.org/x/text/message" "gopkg.in/alecthomas/kingpin.v2" @@ -254,7 +254,7 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin defer cancel() // Getting Metas. - var blockMetas []*block.Meta + var blockMetas []*blockmeta.Meta if err = bkt.Iter(ctx, "", func(name string) error { id, ok := block.IsBlockDir(name) if !ok { @@ -277,7 +277,7 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin } } -func printTable(blockMetas []*block.Meta, selectorLabels labels.Labels, sortBy []string) error { +func printTable(blockMetas []*blockmeta.Meta, selectorLabels labels.Labels, sortBy []string) error { header := inspectColumns var lines [][]string @@ -355,7 +355,7 @@ func getKeysAlphabetically(labels map[string]string) []string { // matchesSelector checks if blockMeta contains every label from // the selector with the correct value -func matchesSelector(blockMeta *block.Meta, selectorLabels labels.Labels) bool { +func matchesSelector(blockMeta *blockmeta.Meta, selectorLabels labels.Labels) bool { for _, l := range selectorLabels { if v, ok := blockMeta.Thanos.Labels[l.Name]; !ok || v != l.Value { return false diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index c00aadd43db..dddd1e91afb 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -8,11 +8,10 @@ import ( "path/filepath" "time" - "github.com/prometheus/tsdb/chunkenc" - "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" "github.com/improbable-eng/thanos/pkg/compact/downsample" "github.com/improbable-eng/thanos/pkg/objstore" "github.com/improbable-eng/thanos/pkg/objstore/client" @@ -23,6 +22,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/tsdb" + "github.com/prometheus/tsdb/chunkenc" "gopkg.in/alecthomas/kingpin.v2" ) @@ -105,7 +105,7 @@ func downsampleBucket( if err := os.MkdirAll(dir, 0777); err != nil { return errors.Wrap(err, "create dir") } - var metas []*block.Meta + var metas []*blockmeta.Meta err := bkt.Iter(ctx, "", func(name string) error { id, ok := block.IsBlockDir(name) @@ -119,7 +119,7 @@ func downsampleBucket( } defer runutil.CloseWithLogOnErr(logger, rc, "block reader") - var m block.Meta + var m blockmeta.Meta if err := json.NewDecoder(rc).Decode(&m); err != nil { return errors.Wrap(err, "decode meta") } @@ -201,7 +201,7 @@ func downsampleBucket( return nil } -func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *block.Meta, dir string, resolution int64) error { +func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *blockmeta.Meta, dir string, resolution int64) error { begin := time.Now() bdir := filepath.Join(dir, m.ULID.String()) diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index bf6cfa64fe8..9e6c5ac19ef 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -296,7 +296,16 @@ func runQuery( return stores.Get(), nil }, selectorLset) queryableCreator = query.NewQueryableCreator(logger, proxy, replicaLabel) - engine = promql.NewEngine(logger, reg, maxConcurrentQueries, queryTimeout) + engine = promql.NewEngine( + promql.EngineOpts{ + Logger: logger, + Reg: reg, + MaxConcurrent: maxConcurrentQueries, + // TODO(bwplotka): Expose this as a flag: https://github.com/improbable-eng/thanos/issues/703 + MaxSamples: math.MaxInt32, + Timeout: queryTimeout, + }, + ) ) // Periodically update the store set with the addresses we see in our cluster. { diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 03fed102d2e..35d81858c17 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -19,15 +19,14 @@ import ( "syscall" "time" - "github.com/improbable-eng/thanos/pkg/extprom" - "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/alert" - "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" "github.com/improbable-eng/thanos/pkg/cluster" "github.com/improbable-eng/thanos/pkg/discovery/cache" "github.com/improbable-eng/thanos/pkg/discovery/dns" + "github.com/improbable-eng/thanos/pkg/extprom" "github.com/improbable-eng/thanos/pkg/objstore/client" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/improbable-eng/thanos/pkg/shipper" @@ -290,7 +289,7 @@ func runRule( ctx, cancel := context.WithCancel(context.Background()) ctx = tracing.ContextWithTracer(ctx, tracer) - notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) error { + notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) { res := make([]*alert.Alert, 0, len(alerts)) for _, alrt := range alerts { // Only send actually firing alerts. @@ -309,8 +308,6 @@ func runRule( res = append(res, a) } alertQ.Push(res) - - return nil } mgr = rules.NewManager(&rules.ManagerOptions{ Context: ctx, @@ -579,7 +576,7 @@ func runRule( } }() - s := shipper.New(logger, nil, dataDir, bkt, func() labels.Labels { return lset }, block.RulerSource) + s := shipper.New(logger, nil, dataDir, bkt, func() labels.Labels { return lset }, blockmeta.RulerSource) ctx, cancel := context.WithCancel(context.Background()) diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 61f493f93a8..1662a310fa8 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -14,7 +14,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" "github.com/improbable-eng/thanos/pkg/cluster" "github.com/improbable-eng/thanos/pkg/objstore/client" "github.com/improbable-eng/thanos/pkg/reloader" @@ -252,7 +252,7 @@ func runSidecar( } }() - s := shipper.New(logger, nil, dataDir, bkt, metadata.Labels, block.SidecarSource) + s := shipper.New(logger, nil, dataDir, bkt, metadata.Labels, blockmeta.SidecarSource) ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { diff --git a/pkg/block/block.go b/pkg/block/block.go index 118b7ea96c8..e6750e0881c 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -5,11 +5,12 @@ package block import ( "context" "encoding/json" - "io/ioutil" "os" "path" "path/filepath" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" + "fmt" "github.com/go-kit/kit/log" @@ -17,8 +18,6 @@ import ( "github.com/improbable-eng/thanos/pkg/runutil" "github.com/oklog/ulid" "github.com/pkg/errors" - "github.com/prometheus/tsdb" - "github.com/prometheus/tsdb/fileutil" ) const ( @@ -33,103 +32,6 @@ const ( DebugMetas = "debug/metas" ) -type SourceType string - -const ( - UnknownSource SourceType = "" - SidecarSource SourceType = "sidecar" - CompactorSource SourceType = "compactor" - CompactorRepairSource SourceType = "compactor.repair" - RulerSource SourceType = "ruler" - BucketRepairSource SourceType = "bucket.repair" - TestSource SourceType = "test" -) - -// Meta describes the a block's meta. It wraps the known TSDB meta structure and -// extends it by Thanos-specific fields. -type Meta struct { - Version int `json:"version"` - - tsdb.BlockMeta - - Thanos ThanosMeta `json:"thanos"` -} - -// ThanosMeta holds block meta information specific to Thanos. -type ThanosMeta struct { - Labels map[string]string `json:"labels"` - Downsample ThanosDownsampleMeta `json:"downsample"` - - // Source is a real upload source of the block. - Source SourceType `json:"source"` -} - -type ThanosDownsampleMeta struct { - Resolution int64 `json:"resolution"` -} - -// WriteMetaFile writes the given meta into /meta.json. -func WriteMetaFile(logger log.Logger, dir string, meta *Meta) error { - // Make any changes to the file appear atomic. - path := filepath.Join(dir, MetaFilename) - tmp := path + ".tmp" - - f, err := os.Create(tmp) - if err != nil { - return err - } - - enc := json.NewEncoder(f) - enc.SetIndent("", "\t") - - if err := enc.Encode(meta); err != nil { - runutil.CloseWithLogOnErr(logger, f, "close meta") - return err - } - if err := f.Close(); err != nil { - return err - } - return renameFile(logger, tmp, path) -} - -// ReadMetaFile reads the given meta from /meta.json. -func ReadMetaFile(dir string) (*Meta, error) { - b, err := ioutil.ReadFile(filepath.Join(dir, MetaFilename)) - if err != nil { - return nil, err - } - var m Meta - - if err := json.Unmarshal(b, &m); err != nil { - return nil, err - } - if m.Version != 1 { - return nil, errors.Errorf("unexpected meta file version %d", m.Version) - } - return &m, nil -} - -func renameFile(logger log.Logger, from, to string) error { - if err := os.RemoveAll(to); err != nil { - return err - } - if err := os.Rename(from, to); err != nil { - return err - } - - // Directory was renamed; sync parent dir to persist rename. - pdir, err := fileutil.OpenDir(filepath.Dir(to)) - if err != nil { - return err - } - - if err = fileutil.Fsync(pdir); err != nil { - runutil.CloseWithLogOnErr(logger, pdir, "close dir") - return err - } - return pdir.Close() -} - // Download downloads directory that is mean to be block directory. func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id ulid.ULID, dst string) error { if err := objstore.DownloadDir(ctx, logger, bucket, id.String(), dst); err != nil { @@ -169,7 +71,7 @@ func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st return errors.Wrap(err, "not a block dir") } - meta, err := ReadMetaFile(bdir) + meta, err := blockmeta.Read(bdir) if err != nil { // No meta or broken meta file. return errors.Wrap(err, "read meta") @@ -216,16 +118,16 @@ func Delete(ctx context.Context, bucket objstore.Bucket, id ulid.ULID) error { } // DownloadMeta downloads only meta file from bucket by block ID. -func DownloadMeta(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (Meta, error) { +func DownloadMeta(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (blockmeta.Meta, error) { rc, err := bkt.Get(ctx, path.Join(id.String(), MetaFilename)) if err != nil { - return Meta{}, errors.Wrapf(err, "meta.json bkt get for %s", id.String()) + return blockmeta.Meta{}, errors.Wrapf(err, "meta.json bkt get for %s", id.String()) } defer runutil.CloseWithLogOnErr(logger, rc, "download meta bucket client") - var m Meta + var m blockmeta.Meta if err := json.NewDecoder(rc).Decode(&m); err != nil { - return Meta{}, errors.Wrapf(err, "decode meta.json for block %s", id.String()) + return blockmeta.Meta{}, errors.Wrapf(err, "decode meta.json for block %s", id.String()) } return m, nil } @@ -234,24 +136,3 @@ func IsBlockDir(path string) (id ulid.ULID, ok bool) { id, err := ulid.Parse(filepath.Base(path)) return id, err == nil } - -// InjectThanosMeta sets Thanos meta to the block meta JSON and saves it to the disk. -// NOTE: It should be used after writing any block by any Thanos component, otherwise we will miss crucial metadata. -func InjectThanosMeta(logger log.Logger, bdir string, meta ThanosMeta, downsampledMeta *tsdb.BlockMeta) (*Meta, error) { - newMeta, err := ReadMetaFile(bdir) - if err != nil { - return nil, errors.Wrap(err, "read new meta") - } - newMeta.Thanos = meta - - // While downsampling we need to copy original compaction. - if downsampledMeta != nil { - newMeta.Compaction = downsampledMeta.Compaction - } - - if err := WriteMetaFile(logger, bdir, newMeta); err != nil { - return nil, errors.Wrap(err, "write new meta") - } - - return newMeta, nil -} diff --git a/pkg/block/blockmeta/meta.go b/pkg/block/blockmeta/meta.go new file mode 100644 index 00000000000..d470b4ec2c7 --- /dev/null +++ b/pkg/block/blockmeta/meta.go @@ -0,0 +1,142 @@ +package blockmeta + +// blockmeta package is implements writing and reading wrapped meta.json where Thanos puts its metadata. +// Those metadata contains external labels, downsampling resolution and source type. +// This package is minimal and separated because it usited by testutils which limits test helpers we can use in +// this package. + +import ( + "encoding/json" + "io/ioutil" + "os" + "path/filepath" + + "github.com/go-kit/kit/log" + "github.com/improbable-eng/thanos/pkg/runutil" + "github.com/pkg/errors" + "github.com/prometheus/tsdb" + "github.com/prometheus/tsdb/fileutil" +) + +type SourceType string + +const ( + UnknownSource SourceType = "" + SidecarSource SourceType = "sidecar" + CompactorSource SourceType = "compactor" + CompactorRepairSource SourceType = "compactor.repair" + RulerSource SourceType = "ruler" + BucketRepairSource SourceType = "bucket.repair" + TestSource SourceType = "test" +) + +const ( + // MetaFilename is the known JSON filename for meta information. + MetaFilename = "meta.json" +) + +// Meta describes the a block's meta. It wraps the known TSDB meta structure and +// extends it by Thanos-specific fields. +type Meta struct { + Version int `json:"version"` + + tsdb.BlockMeta + + Thanos Thanos `json:"thanos"` +} + +// Thanos holds block meta information specific to Thanos. +type Thanos struct { + Labels map[string]string `json:"labels"` + Downsample ThanosDownsample `json:"downsample"` + + // Source is a real upload source of the block. + Source SourceType `json:"source"` +} + +type ThanosDownsample struct { + Resolution int64 `json:"resolution"` +} + +// InjectThanos sets Thanos meta to the block meta JSON and saves it to the disk. +// NOTE: It should be used after writing any block by any Thanos component, otherwise we will miss crucial metadata. +func InjectThanos(logger log.Logger, bdir string, meta Thanos, downsampledMeta *tsdb.BlockMeta) (*Meta, error) { + newMeta, err := Read(bdir) + if err != nil { + return nil, errors.Wrap(err, "read new meta") + } + newMeta.Thanos = meta + + // While downsampling we need to copy original compaction. + if downsampledMeta != nil { + newMeta.Compaction = downsampledMeta.Compaction + } + + if err := Write(logger, bdir, newMeta); err != nil { + return nil, errors.Wrap(err, "write new meta") + } + + return newMeta, nil +} + +// Write writes the given meta into /meta.json. +func Write(logger log.Logger, dir string, meta *Meta) error { + // Make any changes to the file appear atomic. + path := filepath.Join(dir, MetaFilename) + tmp := path + ".tmp" + + f, err := os.Create(tmp) + if err != nil { + return err + } + + enc := json.NewEncoder(f) + enc.SetIndent("", "\t") + + if err := enc.Encode(meta); err != nil { + runutil.CloseWithLogOnErr(logger, f, "close meta") + return err + } + if err := f.Close(); err != nil { + return err + } + return renameFile(logger, tmp, path) +} + +func renameFile(logger log.Logger, from, to string) error { + if err := os.RemoveAll(to); err != nil { + return err + } + if err := os.Rename(from, to); err != nil { + return err + } + + // Directory was renamed; sync parent dir to persist rename. + pdir, err := fileutil.OpenDir(filepath.Dir(to)) + if err != nil { + return err + } + + if err = fileutil.Fsync(pdir); err != nil { + runutil.CloseWithLogOnErr(logger, pdir, "close dir") + return err + } + return pdir.Close() +} + +// Read reads the given meta from /meta.json. +func Read(dir string) (*Meta, error) { + b, err := ioutil.ReadFile(filepath.Join(dir, MetaFilename)) + if err != nil { + return nil, err + } + var m Meta + + if err := json.Unmarshal(b, &m); err != nil { + return nil, err + } + if m.Version != 1 { + return nil, errors.Errorf("unexpected meta file version %d", m.Version) + } + return &m, nil +} diff --git a/pkg/block/index.go b/pkg/block/index.go index 2249863b2d2..2d088953fc8 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -11,6 +11,10 @@ import ( "strings" "time" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" + + "github.com/prometheus/tsdb/fileutil" + "github.com/go-kit/kit/log" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/oklog/ulid" @@ -36,23 +40,84 @@ type indexCache struct { Postings []postingsRange } +type realByteSlice []byte + +func (b realByteSlice) Len() int { + return len(b) +} + +func (b realByteSlice) Range(start, end int) []byte { + return b[start:end] +} + +func (b realByteSlice) Sub(start, end int) index.ByteSlice { + return b[start:end] +} + +func getSymbolTable(b index.ByteSlice) (map[uint32]string, error) { + version := int(b.Range(4, 5)[0]) + + if version != 1 && version != 2 { + return nil, errors.Errorf("unknown index file version %d", version) + } + + toc, err := index.NewTOCFromByteSlice(b) + if err != nil { + return nil, errors.Wrap(err, "read TOC") + } + + symbolsV2, symbolsV1, err := index.ReadSymbols(b, version, int(toc.Symbols)) + if err != nil { + return nil, errors.Wrap(err, "read symbols") + } + + symbolsTable := make(map[uint32]string, len(symbolsV1)+len(symbolsV2)) + for o, s := range symbolsV1 { + symbolsTable[o] = s + } + for o, s := range symbolsV2 { + symbolsTable[uint32(o)] = s + } + + return symbolsTable, nil +} + // WriteIndexCache writes a cache file containing the first lookup stages // for an index file. -func WriteIndexCache(logger log.Logger, fn string, r *index.Reader) error { +func WriteIndexCache(logger log.Logger, indexFn string, fn string) error { + indexFile, err := fileutil.OpenMmapFile(indexFn) + if err != nil { + return errors.Wrapf(err, "open mmap index file %s", indexFn) + } + defer runutil.CloseWithLogOnErr(logger, indexFile, "close index cache mmap file from %s", indexFn) + + b := realByteSlice(indexFile.Bytes()) + indexr, err := index.NewReader(b) + if err != nil { + return errors.Wrap(err, "open index reader") + } + defer runutil.CloseWithLogOnErr(logger, indexr, "load index cache reader") + + // We assume reader verified index already. + symbols, err := getSymbolTable(b) + if err != nil { + return err + } + f, err := os.Create(fn) if err != nil { - return errors.Wrap(err, "create file") + return errors.Wrap(err, "create index cache file") } defer runutil.CloseWithLogOnErr(logger, f, "index cache writer") v := indexCache{ - Version: r.Version(), - Symbols: r.SymbolTable(), + Version: indexr.Version(), + Symbols: symbols, LabelValues: map[string][]string{}, } // Extract label value indices. - lnames, err := r.LabelIndices() + lnames, err := indexr.LabelIndices() if err != nil { return errors.Wrap(err, "read label indices") } @@ -62,7 +127,7 @@ func WriteIndexCache(logger log.Logger, fn string, r *index.Reader) error { } ln := lns[0] - tpls, err := r.LabelValues(ln) + tpls, err := indexr.LabelValues(ln) if err != nil { return errors.Wrap(err, "get label values") } @@ -82,7 +147,7 @@ func WriteIndexCache(logger log.Logger, fn string, r *index.Reader) error { } // Extract postings ranges. - pranges, err := r.PostingsRanges() + pranges, err := indexr.PostingsRanges() if err != nil { return errors.Wrap(err, "read postings ranges") } @@ -164,7 +229,7 @@ func VerifyIndex(logger log.Logger, fn string, minTime int64, maxTime int64) err } type Stats struct { - // TotalSeries represents total number of series in block. + // TotalSeries represnts total number of series in block. TotalSeries int // OutOfOrderSeries represents number of series that have out of order chunks. OutOfOrderSeries int @@ -346,7 +411,7 @@ type ignoreFnType func(mint, maxt int64, prev *chunks.Meta, curr *chunks.Meta) ( // - removes all near "complete" outside chunks introduced by https://github.com/prometheus/tsdb/issues/347. // Fixable inconsistencies are resolved in the new block. // TODO(bplotka): https://github.com/improbable-eng/thanos/issues/378 -func Repair(logger log.Logger, dir string, id ulid.ULID, source SourceType, ignoreChkFns ...ignoreFnType) (resid ulid.ULID, err error) { +func Repair(logger log.Logger, dir string, id ulid.ULID, source blockmeta.SourceType, ignoreChkFns ...ignoreFnType) (resid ulid.ULID, err error) { if len(ignoreChkFns) == 0 { return resid, errors.New("no ignore chunk function specified") } @@ -355,7 +420,7 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source SourceType, igno entropy := rand.New(rand.NewSource(time.Now().UnixNano())) resid = ulid.MustNew(ulid.Now(), entropy) - meta, err := ReadMetaFile(bdir) + meta, err := blockmeta.Read(bdir) if err != nil { return resid, errors.Wrap(err, "read meta file") } @@ -405,7 +470,7 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source SourceType, igno if err := rewrite(indexr, chunkr, indexw, chunkw, &resmeta, ignoreChkFns); err != nil { return resid, errors.Wrap(err, "rewrite block") } - if err := WriteMetaFile(logger, resdir, &resmeta); err != nil { + if err := blockmeta.Write(logger, resdir, &resmeta); err != nil { return resid, err } return resid, nil @@ -494,7 +559,7 @@ OUTER: func rewrite( indexr tsdb.IndexReader, chunkr tsdb.ChunkReader, indexw tsdb.IndexWriter, chunkw tsdb.ChunkWriter, - meta *Meta, + meta *blockmeta.Meta, ignoreChkFns []ignoreFnType, ) error { symbols, err := indexr.Symbols() diff --git a/pkg/block/index_test.go b/pkg/block/index_test.go new file mode 100644 index 00000000000..80c10e8e6ed --- /dev/null +++ b/pkg/block/index_test.go @@ -0,0 +1,46 @@ +package block + +import ( + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/go-kit/kit/log" + "github.com/improbable-eng/thanos/pkg/testutil" + "github.com/prometheus/tsdb/labels" +) + +func TestWriteReadIndexCache(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "test-compact-prepare") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + b, err := testutil.CreateBlock(tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + {{Name: "a", Value: "3"}}, + {{Name: "a", Value: "4"}}, + {{Name: "b", Value: "1"}}, + }, 100, 0, 1000, nil, 124) + testutil.Ok(t, err) + + fn := filepath.Join(tmpDir, "index.cache.json") + testutil.Ok(t, WriteIndexCache(log.NewNopLogger(), filepath.Join(tmpDir, b.String(), "index"), fn)) + + version, symbols, lvals, postings, err := ReadIndexCache(log.NewNopLogger(), fn) + testutil.Ok(t, err) + + testutil.Equals(t, 2, version) + testutil.Equals(t, 6, len(symbols)) + testutil.Equals(t, 2, len(lvals)) + + vals, ok := lvals["a"] + testutil.Assert(t, ok, "") + testutil.Equals(t, []string{"1", "2", "3", "4"}, vals) + + vals, ok = lvals["b"] + testutil.Assert(t, ok, "") + testutil.Equals(t, []string{"1"}, vals) + testutil.Equals(t, 6, len(postings)) +} diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 544de920eac..fe95ccfd719 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -9,6 +9,8 @@ import ( "sync" "time" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" + "io/ioutil" "github.com/go-kit/kit/log" @@ -39,7 +41,7 @@ type Syncer struct { bkt objstore.Bucket syncDelay time.Duration mtx sync.Mutex - blocks map[ulid.ULID]*block.Meta + blocks map[ulid.ULID]*blockmeta.Meta metrics *syncerMetrics } @@ -130,7 +132,7 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket logger: logger, reg: reg, syncDelay: syncDelay, - blocks: map[ulid.ULID]*block.Meta{}, + blocks: map[ulid.ULID]*blockmeta.Meta{}, bkt: bkt, metrics: newSyncerMetrics(reg), }, nil @@ -185,9 +187,9 @@ func (c *Syncer) syncMetas(ctx context.Context) error { // NOTE: It is not safe to miss "old" block (even that it is newly created) in sync step. Compactor needs to aware of ALL old blocks. // TODO(bplotka): https://github.com/improbable-eng/thanos/issues/377 if ulid.Now()-id.Time() < uint64(c.syncDelay/time.Millisecond) && - meta.Thanos.Source != block.BucketRepairSource && - meta.Thanos.Source != block.CompactorSource && - meta.Thanos.Source != block.CompactorRepairSource { + meta.Thanos.Source != blockmeta.BucketRepairSource && + meta.Thanos.Source != blockmeta.CompactorSource && + meta.Thanos.Source != blockmeta.CompactorRepairSource { level.Debug(c.logger).Log("msg", "block is too fresh for now", "block", id) return nil @@ -214,7 +216,7 @@ func (c *Syncer) syncMetas(ctx context.Context) error { // GroupKey returns a unique identifier for the group the block belongs to. It considers // the downsampling resolution and the block's labels. -func GroupKey(meta block.Meta) string { +func GroupKey(meta blockmeta.Meta) string { return groupKey(meta.Thanos.Downsample.Resolution, labels.FromMap(meta.Thanos.Labels)) } @@ -381,7 +383,7 @@ type Group struct { labels labels.Labels resolution int64 mtx sync.Mutex - blocks map[ulid.ULID]*block.Meta + blocks map[ulid.ULID]*blockmeta.Meta compactions prometheus.Counter compactionFailures prometheus.Counter groupGarbageCollectedBlocks prometheus.Counter @@ -405,7 +407,7 @@ func newGroup( bkt: bkt, labels: lset, resolution: resolution, - blocks: map[ulid.ULID]*block.Meta{}, + blocks: map[ulid.ULID]*blockmeta.Meta{}, compactions: compactions, compactionFailures: compactionFailures, groupGarbageCollectedBlocks: groupGarbageCollectedBlocks, @@ -419,7 +421,7 @@ func (cg *Group) Key() string { } // Add the block with the given meta to the group. -func (cg *Group) Add(meta *block.Meta) error { +func (cg *Group) Add(meta *blockmeta.Meta) error { cg.mtx.Lock() defer cg.mtx.Unlock() @@ -541,7 +543,7 @@ func IsRetryError(err error) bool { return ok } -func (cg *Group) areBlocksOverlapping(include *block.Meta, excludeDirs ...string) error { +func (cg *Group) areBlocksOverlapping(include *blockmeta.Meta, excludeDirs ...string) error { var ( metas []tsdb.BlockMeta exclude = map[ulid.ULID]struct{}{} @@ -597,12 +599,12 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, return retry(errors.Wrapf(err, "download block %s", ie.id)) } - meta, err := block.ReadMetaFile(bdir) + meta, err := blockmeta.Read(bdir) if err != nil { return errors.Wrapf(err, "read meta from %s", bdir) } - resid, err := block.Repair(logger, tmpdir, ie.id, block.CompactorRepairSource, block.IgnoreIssue347OutsideChunk) + resid, err := block.Repair(logger, tmpdir, ie.id, blockmeta.CompactorRepairSource, block.IgnoreIssue347OutsideChunk) if err != nil { return errors.Wrapf(err, "repair failed for block %s", ie.id) } @@ -647,7 +649,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( if err := os.MkdirAll(bdir, 0777); err != nil { return compID, errors.Wrap(err, "create planning block dir") } - if err := block.WriteMetaFile(cg.logger, bdir, meta); err != nil { + if err := blockmeta.Write(cg.logger, bdir, meta); err != nil { return compID, errors.Wrap(err, "write planning meta file") } } @@ -670,7 +672,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( begin := time.Now() for _, pdir := range plan { - meta, err := block.ReadMetaFile(pdir) + meta, err := blockmeta.Read(pdir) if err != nil { return compID, errors.Wrapf(err, "read meta from %s", pdir) } @@ -718,7 +720,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( begin = time.Now() - compID, err = comp.Compact(dir, plan...) + compID, err = comp.Compact(dir, plan, nil) if err != nil { return compID, halt(errors.Wrapf(err, "compact blocks %v", plan)) } @@ -727,10 +729,10 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( bdir := filepath.Join(dir, compID.String()) - newMeta, err := block.InjectThanosMeta(cg.logger, bdir, block.ThanosMeta{ + newMeta, err := blockmeta.InjectThanos(cg.logger, bdir, blockmeta.Thanos{ Labels: cg.labels.Map(), - Downsample: block.ThanosDownsampleMeta{Resolution: cg.resolution}, - Source: block.CompactorSource, + Downsample: blockmeta.ThanosDownsample{Resolution: cg.resolution}, + Source: blockmeta.CompactorSource, }, nil) if err != nil { return compID, errors.Wrapf(err, "failed to finalize the block %s", bdir) diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 74df73cc6f5..266fb93cdf3 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -13,6 +13,8 @@ import ( "testing" "time" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" + "github.com/go-kit/kit/log" "github.com/improbable-eng/thanos/pkg/block" "github.com/improbable-eng/thanos/pkg/objstore" @@ -37,13 +39,13 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) { // After the first synchronization the first 5 should be dropped and the // last 5 be loaded from the bucket. var ids []ulid.ULID - var metas []*block.Meta + var metas []*blockmeta.Meta for i := 0; i < 15; i++ { id, err := ulid.New(uint64(i), nil) testutil.Ok(t, err) - var meta block.Meta + var meta blockmeta.Meta meta.Version = 1 meta.ULID = id @@ -56,7 +58,7 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) { for _, m := range metas[5:] { var buf bytes.Buffer testutil.Ok(t, json.NewEncoder(&buf).Encode(&m)) - testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), block.MetaFilename), &buf)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), blockmeta.MetaFilename), &buf)) } groups, err := sy.Groups() @@ -79,11 +81,11 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { // Generate 10 source block metas and construct higher level blocks // that are higher compactions of them. - var metas []*block.Meta + var metas []*blockmeta.Meta var ids []ulid.ULID for i := 0; i < 10; i++ { - var m block.Meta + var m blockmeta.Meta m.Version = 1 m.ULID = ulid.MustNew(uint64(i), nil) @@ -94,28 +96,28 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { metas = append(metas, &m) } - var m1 block.Meta + var m1 blockmeta.Meta m1.Version = 1 m1.ULID = ulid.MustNew(100, nil) m1.Compaction.Level = 2 m1.Compaction.Sources = ids[:4] m1.Thanos.Downsample.Resolution = 0 - var m2 block.Meta + var m2 blockmeta.Meta m2.Version = 1 m2.ULID = ulid.MustNew(200, nil) m2.Compaction.Level = 2 m2.Compaction.Sources = ids[4:8] // last two source IDs is not part of a level 2 block. m2.Thanos.Downsample.Resolution = 0 - var m3 block.Meta + var m3 blockmeta.Meta m3.Version = 1 m3.ULID = ulid.MustNew(300, nil) m3.Compaction.Level = 3 m3.Compaction.Sources = ids[:9] // last source ID is not part of level 3 block. m3.Thanos.Downsample.Resolution = 0 - var m4 block.Meta + var m4 blockmeta.Meta m4.Version = 14 m4.ULID = ulid.MustNew(400, nil) m4.Compaction.Level = 2 @@ -127,7 +129,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { fmt.Println("create", m.ULID) var buf bytes.Buffer testutil.Ok(t, json.NewEncoder(&buf).Encode(&m)) - testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), block.MetaFilename), &buf)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), blockmeta.MetaFilename), &buf)) } // Do one initial synchronization with the bucket. @@ -173,7 +175,7 @@ func TestGroup_Compact_e2e(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) defer cancel() - var metas []*block.Meta + var metas []*blockmeta.Meta extLset := labels.Labels{{Name: "e1", Value: "1"}} b1, err := testutil.CreateBlock(prepareDir, []labels.Labels{ {{Name: "a", Value: "1"}}, @@ -183,7 +185,7 @@ func TestGroup_Compact_e2e(t *testing.T) { }, 100, 0, 1000, extLset, 124) testutil.Ok(t, err) - meta, err := block.ReadMetaFile(filepath.Join(prepareDir, b1.String())) + meta, err := blockmeta.Read(filepath.Join(prepareDir, b1.String())) testutil.Ok(t, err) metas = append(metas, meta) @@ -196,7 +198,7 @@ func TestGroup_Compact_e2e(t *testing.T) { testutil.Ok(t, err) // Mix order to make sure compact is able to deduct min time / max time. - meta, err = block.ReadMetaFile(filepath.Join(prepareDir, b3.String())) + meta, err = blockmeta.Read(filepath.Join(prepareDir, b3.String())) testutil.Ok(t, err) metas = append(metas, meta) @@ -204,7 +206,7 @@ func TestGroup_Compact_e2e(t *testing.T) { b2, err := testutil.CreateBlock(prepareDir, []labels.Labels{}, 100, 1001, 2000, extLset, 124) testutil.Ok(t, err) - meta, err = block.ReadMetaFile(filepath.Join(prepareDir, b2.String())) + meta, err = blockmeta.Read(filepath.Join(prepareDir, b2.String())) testutil.Ok(t, err) metas = append(metas, meta) @@ -217,7 +219,7 @@ func TestGroup_Compact_e2e(t *testing.T) { }, 100, 3001, 4000, extLset, 124) testutil.Ok(t, err) - meta, err = block.ReadMetaFile(filepath.Join(prepareDir, freshB.String())) + meta, err = blockmeta.Read(filepath.Join(prepareDir, freshB.String())) testutil.Ok(t, err) metas = append(metas, meta) @@ -263,7 +265,7 @@ func TestGroup_Compact_e2e(t *testing.T) { resDir := filepath.Join(dir, id.String()) testutil.Ok(t, block.Download(ctx, log.NewNopLogger(), bkt, id, resDir)) - meta, err = block.ReadMetaFile(resDir) + meta, err = blockmeta.Read(resDir) testutil.Ok(t, err) testutil.Equals(t, int64(0), meta.MinTime) diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index 305f72021c0..5b69b3dc7ff 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -5,7 +5,8 @@ import ( "path/filepath" "sort" - "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" + "github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/tsdb/chunkenc" @@ -31,7 +32,7 @@ const ( // Downsample downsamples the given block. It writes a new block into dir and returns its ID. func Downsample( logger log.Logger, - origMeta *block.Meta, + origMeta *blockmeta.Meta, b tsdb.BlockReader, dir string, resolution int64, @@ -125,18 +126,18 @@ func Downsample( if err != nil { return id, errors.Wrap(err, "create compactor") } - id, err = comp.Write(dir, newb, origMeta.MinTime, origMeta.MaxTime) + id, err = comp.Write(dir, newb, origMeta.MinTime, origMeta.MaxTime, &origMeta.BlockMeta) if err != nil { return id, errors.Wrap(err, "compact head") } bdir := filepath.Join(dir, id.String()) - var tmeta block.ThanosMeta + var tmeta blockmeta.Thanos tmeta = origMeta.Thanos - tmeta.Source = block.CompactorSource + tmeta.Source = blockmeta.CompactorSource tmeta.Downsample.Resolution = resolution - _, err = block.InjectThanosMeta(logger, bdir, tmeta, &origMeta.BlockMeta) + _, err = blockmeta.InjectThanos(logger, bdir, tmeta, &origMeta.BlockMeta) if err != nil { return id, errors.Wrapf(err, "failed to finalize the block %s", bdir) } @@ -228,13 +229,20 @@ func (b *memBlock) Chunks() (tsdb.ChunkReader, error) { } func (b *memBlock) Tombstones() (tsdb.TombstoneReader, error) { - return tsdb.EmptyTombstoneReader(), nil + return emptyTombstoneReader{}, nil } func (b *memBlock) Close() error { return nil } +type emptyTombstoneReader struct{} + +func (emptyTombstoneReader) Get(ref uint64) (tsdb.Intervals, error) { return nil, nil } +func (emptyTombstoneReader) Iter(func(uint64, tsdb.Intervals) error) error { return nil } +func (emptyTombstoneReader) Total() uint64 { return 0 } +func (emptyTombstoneReader) Close() error { return nil } + // currentWindow returns the end timestamp of the window that t falls into. func currentWindow(t, r int64) int64 { // The next timestamp is the next number after s.t that's aligned with window. diff --git a/pkg/compact/downsample/downsample_test.go b/pkg/compact/downsample/downsample_test.go index d3844784162..1d82dabe77a 100644 --- a/pkg/compact/downsample/downsample_test.go +++ b/pkg/compact/downsample/downsample_test.go @@ -7,6 +7,8 @@ import ( "path/filepath" "testing" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" + "github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/tsdb/chunks" @@ -59,7 +61,7 @@ func TestDownsampleRaw(t *testing.T) { }, }, } - testDownsample(t, input, &block.Meta{}, 100) + testDownsample(t, input, &blockmeta.Meta{}, 100) } func TestDownsampleAggr(t *testing.T) { @@ -96,7 +98,7 @@ func TestDownsampleAggr(t *testing.T) { }, }, } - var meta block.Meta + var meta blockmeta.Meta meta.Thanos.Downsample.Resolution = 10 testDownsample(t, input, &meta, 500) @@ -123,7 +125,7 @@ type downsampleTestSet struct { // testDownsample inserts the input into a block and invokes the downsampler with the given resolution. // The chunk ranges within the input block are aligned at 500 time units. -func testDownsample(t *testing.T, data []*downsampleTestSet, meta *block.Meta, resolution int64) { +func testDownsample(t *testing.T, data []*downsampleTestSet, meta *blockmeta.Meta, resolution int64) { t.Helper() dir, err := ioutil.TempDir("", "downsample-raw") diff --git a/pkg/compact/retention_test.go b/pkg/compact/retention_test.go index 0aef91c697b..3ec3b9c50bf 100644 --- a/pkg/compact/retention_test.go +++ b/pkg/compact/retention_test.go @@ -9,7 +9,7 @@ import ( "time" "github.com/go-kit/kit/log" - "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" "github.com/improbable-eng/thanos/pkg/compact" "github.com/improbable-eng/thanos/pkg/objstore" "github.com/improbable-eng/thanos/pkg/objstore/inmem" @@ -253,15 +253,15 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) { func uploadMockBlock(t *testing.T, bkt objstore.Bucket, id string, minTime, maxTime time.Time, resolutionLevel int64) { t.Helper() - meta1 := block.Meta{ + meta1 := blockmeta.Meta{ Version: 1, BlockMeta: tsdb.BlockMeta{ ULID: ulid.MustParse(id), MinTime: minTime.Unix() * 1000, MaxTime: maxTime.Unix() * 1000, }, - Thanos: block.ThanosMeta{ - Downsample: block.ThanosDownsampleMeta{ + Thanos: blockmeta.Thanos{ + Downsample: blockmeta.ThanosDownsample{ Resolution: resolutionLevel, }, }, diff --git a/pkg/query/api/v1.go b/pkg/query/api/v1.go index 23e5b0a801b..cf693dd2fbe 100644 --- a/pkg/query/api/v1.go +++ b/pkg/query/api/v1.go @@ -527,15 +527,16 @@ func (api *API) series(r *http.Request) (interface{}, []error, *apiError) { var sets []storage.SeriesSet for _, mset := range matcherSets { - s, err := q.Select(&storage.SelectParams{}, mset...) + s, _, err := q.Select(&storage.SelectParams{}, mset...) if err != nil { return nil, nil, &apiError{errorExec, err} } sets = append(sets, s) } - set := storage.NewMergeSeriesSet(sets) - metrics := []labels.Labels{} + set := storage.NewMergeSeriesSet(sets, nil) + + var metrics []labels.Labels for set.Next() { metrics = append(metrics, set.At().Labels()) } diff --git a/pkg/query/api/v1_test.go b/pkg/query/api/v1_test.go index 4afef1d0548..bcf17907b06 100644 --- a/pkg/query/api/v1_test.go +++ b/pkg/query/api/v1_test.go @@ -333,7 +333,7 @@ func TestEndpoints(t *testing.T) { "start": []string{"-2"}, "end": []string{"-1"}, }, - response: []labels.Labels{}, + response: []labels.Labels(nil), }, // Start and end after series ends. { @@ -343,7 +343,7 @@ func TestEndpoints(t *testing.T) { "start": []string{"100000"}, "end": []string{"100001"}, }, - response: []labels.Labels{}, + response: []labels.Labels(nil), }, // Start before series starts, end after series ends. { @@ -409,33 +409,38 @@ func TestEndpoints(t *testing.T) { } for _, test := range tests { - // Build a context with the correct request params. - ctx := context.Background() - for p, v := range test.params { - ctx = route.WithParam(ctx, p, v) - } - t.Logf("run query %q", test.query.Encode()) + if ok := t.Run(test.query.Encode(), func(t *testing.T) { + // Build a context with the correct request params. + ctx := context.Background() + for p, v := range test.params { + ctx = route.WithParam(ctx, p, v) + } - req, err := http.NewRequest("ANY", fmt.Sprintf("http://example.com?%s", test.query.Encode()), nil) - if err != nil { - t.Fatal(err) - } - resp, _, apiErr := test.endpoint(req.WithContext(ctx)) - if apiErr != nil { - if test.errType == errorNone { - t.Fatalf("Unexpected error: %s", apiErr) + req, err := http.NewRequest("ANY", fmt.Sprintf("http://example.com?%s", test.query.Encode()), nil) + if err != nil { + t.Fatal(err) } - if test.errType != apiErr.typ { - t.Fatalf("Expected error of type %q but got type %q", test.errType, apiErr.typ) + resp, _, apiErr := test.endpoint(req.WithContext(ctx)) + if apiErr != nil { + if test.errType == errorNone { + t.Fatalf("Unexpected error: %s", apiErr) + } + if test.errType != apiErr.typ { + t.Fatalf("Expected error of type %q but got type %q", test.errType, apiErr.typ) + } + return } - continue - } - if apiErr == nil && test.errType != errorNone { - t.Fatalf("Expected error of type %q but got none", test.errType) - } - if !reflect.DeepEqual(resp, test.response) { - t.Fatalf("Response does not match, expected:\n%+v\ngot:\n%+v", test.response, resp) + if apiErr == nil && test.errType != errorNone { + t.Fatalf("Expected error of type %q but got none", test.errType) + } + + if !reflect.DeepEqual(resp, test.response) { + t.Fatalf("Response does not match, expected:\n%+v\ngot:\n%+v", test.response, resp) + } + }); !ok { + return } + } } diff --git a/pkg/query/querier.go b/pkg/query/querier.go index 6e962f64727..819ff3ac2a1 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -169,13 +169,13 @@ func aggrsFromFunc(f string) ([]storepb.Aggr, resAggr) { return []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM}, resAggrAvg } -func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, error) { +func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { span, ctx := tracing.StartSpan(q.ctx, "querier_select") defer span.Finish() sms, err := translateMatchers(ms...) if err != nil { - return nil, errors.Wrap(err, "convert matchers") + return nil, nil, errors.Wrap(err, "convert matchers") } queryAggrs, resAggr := aggrsFromFunc(params.Func) @@ -189,10 +189,12 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s Aggregates: queryAggrs, PartialResponseDisabled: !q.partialResponse, }, resp); err != nil { - return nil, errors.Wrap(err, "proxy Series()") + return nil, nil, errors.Wrap(err, "proxy Series()") } for _, w := range resp.warnings { + // NOTE(bwplotka): We could use warnings return arguments here, however need reporter anyway for LabelValues and LabelNames method, + // so we choose to be consistent and keep reporter. q.warningReporter(errors.New(w)) } @@ -203,7 +205,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s maxt: q.maxt, set: newStoreSeriesSet(resp.seriesSet), aggr: resAggr, - }, nil + }, nil, nil } // TODO(fabxc): this could potentially pushed further down into the store API @@ -220,7 +222,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s // The merged series set assembles all potentially-overlapping time ranges // of the same series into a single one. The series are ordered so that equal series // from different replicas are sequential. We can now deduplicate those. - return newDedupSeriesSet(set, q.replicaLabel), nil + return newDedupSeriesSet(set, q.replicaLabel), nil, nil } // sortDedupLabels resorts the set so that the same series with different replica @@ -245,6 +247,7 @@ func sortDedupLabels(set []storepb.Series, replicaLabel string) { }) } +// LabelValues returns all potential values for a label name. func (q *querier) LabelValues(name string) ([]string, error) { span, ctx := tracing.StartSpan(q.ctx, "querier_label_values") defer span.Finish() @@ -261,6 +264,12 @@ func (q *querier) LabelValues(name string) ([]string, error) { return resp.Values, nil } +// LabelNames returns all the unique label names present in the block in sorted order. +// TODO(bwplotka): Consider adding labelNames to thanos Query API https://github.com/improbable-eng/thanos/issues/702. +func (q *querier) LabelNames() ([]string, error) { + return nil, errors.New("not implemented") +} + func (q *querier) Close() error { q.cancel() return nil diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index 46b51a7f3ef..980d837213e 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -36,7 +36,7 @@ func TestQuerier_Series(t *testing.T) { q := newQuerier(context.Background(), nil, 1, 300, "", testProxy, false, 0, true, nil) defer func() { testutil.Ok(t, q.Close()) }() - res, err := q.Select(&storage.SelectParams{}) + res, _, err := q.Select(&storage.SelectParams{}) testutil.Ok(t, err) expected := []struct { diff --git a/pkg/query/test_print.go b/pkg/query/test_print.go new file mode 100644 index 00000000000..70bc292439b --- /dev/null +++ b/pkg/query/test_print.go @@ -0,0 +1,34 @@ +package query + +import ( + "fmt" + + "github.com/prometheus/prometheus/storage" +) + +type printSeriesSet struct { + set storage.SeriesSet +} + +func newPrintSeriesSet(set storage.SeriesSet) storage.SeriesSet { + return &printSeriesSet{set: set} +} + +func (s *printSeriesSet) Next() bool { + return s.set.Next() +} + +func (s *printSeriesSet) At() storage.Series { + at := s.set.At() + fmt.Println("Series", at.Labels()) + + i := at.Iterator() + for i.Next() { + fmt.Println(i.At()) + } + return at +} + +func (s *printSeriesSet) Err() error { + return s.set.Err() +} diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go index 5c1df9a9b7e..542799ce5e7 100644 --- a/pkg/shipper/shipper.go +++ b/pkg/shipper/shipper.go @@ -11,6 +11,8 @@ import ( "path" "path/filepath" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/block" @@ -69,7 +71,7 @@ type Shipper struct { metrics *metrics bucket objstore.Bucket labels func() labels.Labels - source block.SourceType + source blockmeta.SourceType } // New creates a new shipper that detects new TSDB blocks in dir and uploads them @@ -80,7 +82,7 @@ func New( dir string, bucket objstore.Bucket, lbls func() labels.Labels, - source block.SourceType, + source blockmeta.SourceType, ) *Shipper { if logger == nil { logger = log.NewNopLogger() @@ -114,7 +116,7 @@ func (s *Shipper) Timestamps() (minTime, maxSyncTime int64, err error) { minTime = math.MaxInt64 maxSyncTime = math.MinInt64 - if err := s.iterBlockMetas(func(m *block.Meta) error { + if err := s.iterBlockMetas(func(m *blockmeta.Meta) error { if m.MinTime < minTime { minTime = m.MinTime } @@ -158,7 +160,7 @@ func (s *Shipper) Sync(ctx context.Context) { // TODO(bplotka): If there are no blocks in the system check for WAL dir to ensure we have actually // access to real TSDB dir (!). - if err = s.iterBlockMetas(func(m *block.Meta) error { + if err = s.iterBlockMetas(func(m *blockmeta.Meta) error { // Do not sync a block if we already uploaded it. If it is no longer found in the bucket, // it was generally removed by the compaction process. if _, ok := hasUploaded[m.ULID]; !ok { @@ -180,7 +182,7 @@ func (s *Shipper) Sync(ctx context.Context) { } } -func (s *Shipper) sync(ctx context.Context, meta *block.Meta) (err error) { +func (s *Shipper) sync(ctx context.Context, meta *blockmeta.Meta) (err error) { dir := filepath.Join(s.dir, meta.ULID.String()) // We only ship of the first compacted block level. @@ -225,7 +227,7 @@ func (s *Shipper) sync(ctx context.Context, meta *block.Meta) (err error) { meta.Thanos.Labels = lset.Map() } meta.Thanos.Source = s.source - if err := block.WriteMetaFile(s.logger, updir, meta); err != nil { + if err := blockmeta.Write(s.logger, updir, meta); err != nil { return errors.Wrap(err, "write meta file") } return block.Upload(ctx, s.logger, s.bucket, updir) @@ -234,7 +236,7 @@ func (s *Shipper) sync(ctx context.Context, meta *block.Meta) (err error) { // iterBlockMetas calls f with the block meta for each block found in dir. It logs // an error and continues if it cannot access a meta.json file. // If f returns an error, the function returns with the same error. -func (s *Shipper) iterBlockMetas(f func(m *block.Meta) error) error { +func (s *Shipper) iterBlockMetas(f func(m *blockmeta.Meta) error) error { names, err := fileutil.ReadDir(s.dir) if err != nil { return errors.Wrap(err, "read dir") @@ -253,7 +255,7 @@ func (s *Shipper) iterBlockMetas(f func(m *block.Meta) error) error { if !fi.IsDir() { continue } - m, err := block.ReadMetaFile(dir) + m, err := blockmeta.Read(dir) if err != nil { level.Warn(s.logger).Log("msg", "reading meta file failed", "err", err) continue diff --git a/pkg/shipper/shipper_e2e_test.go b/pkg/shipper/shipper_e2e_test.go index 8f0e70ecd65..f23930bec71 100644 --- a/pkg/shipper/shipper_e2e_test.go +++ b/pkg/shipper/shipper_e2e_test.go @@ -14,6 +14,7 @@ import ( "github.com/go-kit/kit/log" "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" "github.com/improbable-eng/thanos/pkg/objstore" "github.com/improbable-eng/thanos/pkg/objstore/objtesting" "github.com/improbable-eng/thanos/pkg/testutil" @@ -32,7 +33,7 @@ func TestShipper_UploadBlocks_e2e(t *testing.T) { }() extLset := labels.FromStrings("prometheus", "prom-1") - shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, block.TestSource) + shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, blockmeta.TestSource) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -54,7 +55,7 @@ func TestShipper_UploadBlocks_e2e(t *testing.T) { testutil.Ok(t, os.Mkdir(tmp, 0777)) - meta := block.Meta{ + meta := blockmeta.Meta{ BlockMeta: tsdb.BlockMeta{ MinTime: timestamp.FromTime(now.Add(time.Duration(i) * time.Hour)), MaxTime: timestamp.FromTime(now.Add((time.Duration(i) * time.Hour) + 1)), @@ -62,7 +63,7 @@ func TestShipper_UploadBlocks_e2e(t *testing.T) { } meta.Version = 1 meta.ULID = id - meta.Thanos.Source = block.TestSource + meta.Thanos.Source = blockmeta.TestSource metab, err := json.Marshal(&meta) testutil.Ok(t, err) diff --git a/pkg/shipper/shipper_test.go b/pkg/shipper/shipper_test.go index 368c5e92b99..5adbb32734d 100644 --- a/pkg/shipper/shipper_test.go +++ b/pkg/shipper/shipper_test.go @@ -2,15 +2,13 @@ package shipper import ( "io/ioutil" - "os" - "testing" - "math" - + "os" "path" + "testing" "github.com/go-kit/kit/log" - "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" "github.com/improbable-eng/thanos/pkg/testutil" "github.com/oklog/ulid" "github.com/prometheus/tsdb" @@ -23,7 +21,7 @@ func TestShipperTimestamps(t *testing.T) { testutil.Ok(t, os.RemoveAll(dir)) }() - s := New(nil, nil, dir, nil, nil, block.TestSource) + s := New(nil, nil, dir, nil, nil, blockmeta.TestSource) // Missing thanos meta file. _, _, err = s.Timestamps() @@ -41,7 +39,7 @@ func TestShipperTimestamps(t *testing.T) { id1 := ulid.MustNew(1, nil) testutil.Ok(t, os.Mkdir(path.Join(dir, id1.String()), os.ModePerm)) - testutil.Ok(t, block.WriteMetaFile(log.NewNopLogger(), path.Join(dir, id1.String()), &block.Meta{ + testutil.Ok(t, blockmeta.Write(log.NewNopLogger(), path.Join(dir, id1.String()), &blockmeta.Meta{ Version: 1, BlockMeta: tsdb.BlockMeta{ ULID: id1, @@ -56,7 +54,7 @@ func TestShipperTimestamps(t *testing.T) { id2 := ulid.MustNew(2, nil) testutil.Ok(t, os.Mkdir(path.Join(dir, id2.String()), os.ModePerm)) - testutil.Ok(t, block.WriteMetaFile(log.NewNopLogger(), path.Join(dir, id2.String()), &block.Meta{ + testutil.Ok(t, blockmeta.Write(log.NewNopLogger(), path.Join(dir, id2.String()), &blockmeta.Meta{ Version: 1, BlockMeta: tsdb.BlockMeta{ ULID: id2, diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index e121e9d4081..a7de1cf9766 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -16,6 +16,8 @@ import ( "sync" "time" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/block" @@ -459,6 +461,7 @@ func (s *bucketSeriesSet) Err() error { return s.err } +// blockSeries return requested series from given index and chunk readers. func (s *BucketStore) blockSeries( ctx context.Context, ulid ulid.ULID, @@ -488,7 +491,6 @@ func (s *BucketStore) blockSeries( } // Get result postings list by resolving the postings tree. - // TODO(bwplotka): Users are seeing panics here, because of lazyPosting being not loaded by preloadPostings. ps, err := index.ExpandPostings(lazyPostings) if err != nil { return nil, stats, errors.Wrap(err, "expand postings") @@ -504,7 +506,8 @@ func (s *BucketStore) blockSeries( } } - // Preload all series index data + // Preload all series index data. + // TODO(bwplotka): Consider not keeping all series in memory all the time. if err := indexr.preloadSeries(ps); err != nil { return nil, stats, errors.Wrap(err, "preload series") } @@ -1001,7 +1004,7 @@ func (s *bucketBlockSet) labelMatchers(matchers ...labels.Matcher) ([]labels.Mat type bucketBlock struct { logger log.Logger bucket objstore.BucketReader - meta *block.Meta + meta *blockmeta.Meta dir string indexCache *indexCache chunkPool *pool.BytesPool @@ -1065,7 +1068,7 @@ func (b *bucketBlock) loadMeta(ctx context.Context, id ulid.ULID) error { } else if err != nil { return err } - meta, err := block.ReadMetaFile(b.dir) + meta, err := blockmeta.Read(b.dir) if err != nil { return errors.Wrap(err, "read meta.json") } @@ -1095,19 +1098,15 @@ func (b *bucketBlock) loadIndexCache(ctx context.Context) (err error) { } }() - indexr, err := index.NewFileReader(fn) - if err != nil { - return errors.Wrap(err, "open index reader") - } - defer runutil.CloseWithLogOnErr(b.logger, indexr, "load index cache reader") + // Create index cache adhoc. - if err := block.WriteIndexCache(b.logger, cachefn, indexr); err != nil { + if err := block.WriteIndexCache(b.logger, fn, cachefn); err != nil { return errors.Wrap(err, "write index cache") } b.indexVersion, b.symbols, b.lvals, b.postings, err = block.ReadIndexCache(b.logger, cachefn) if err != nil { - return errors.Wrap(err, "read index cache") + return errors.Wrap(err, "read fresh index cache") } return nil } @@ -1179,15 +1178,22 @@ func newBucketIndexReader(ctx context.Context, logger log.Logger, block *bucketB logger: logger, ctx: ctx, block: block, - dec: &index.Decoder{}, stats: &queryStats{}, cache: cache, loadedSeries: map[uint64][]byte{}, } - r.dec.SetSymbolTable(r.block.symbols) + r.dec = &index.Decoder{LookupSymbol: r.lookupSymbol} return r } +func (r *bucketIndexReader) lookupSymbol(o uint32) (string, error) { + s, ok := r.block.symbols[o] + if !ok { + return "", errors.Errorf("bucketIndexReader: unknown symbol offset %d", o) + } + return s, nil +} + func (r *bucketIndexReader) preloadPostings() error { const maxGapSize = 512 * 1024 @@ -1270,23 +1276,24 @@ func (r *bucketIndexReader) loadPostings(ctx context.Context, postings []*lazyPo return nil } -func (r *bucketIndexReader) preloadSeries(ids []uint64) error { +func (r *bucketIndexReader) preloadSeries(refs []uint64) error { const maxSeriesSize = 64 * 1024 const maxGapSize = 512 * 1024 - var newIDs []uint64 + var newRefs []uint64 - for _, id := range ids { - if b, ok := r.cache.series(r.block.meta.ULID, id); ok { - r.loadedSeries[id] = b + for _, ref := range refs { + if b, ok := r.cache.series(r.block.meta.ULID, ref); ok { + r.loadedSeries[ref] = b continue } - newIDs = append(newIDs, id) + newRefs = append(newRefs, ref) } - ids = newIDs + refs = newRefs - parts := partitionRanges(len(ids), func(i int) (start, end uint64) { - return ids[i], ids[i] + maxSeriesSize + // Combine multiple close byte ranges to not be rate-limited from object storage. + parts := partitionRanges(len(refs), func(i int) (start, end uint64) { + return refs[i], refs[i] + maxSeriesSize }, maxGapSize) var g run.Group @@ -1295,7 +1302,7 @@ func (r *bucketIndexReader) preloadSeries(ids []uint64) error { i, j := p[0], p[1] g.Add(func() error { - return r.loadSeries(ctx, ids[i:j], ids[i], ids[j-1]+maxSeriesSize) + return r.loadSeries(ctx, refs[i:j], refs[i], refs[j-1]+maxSeriesSize) }, func(err error) { if err != nil { cancel() @@ -1305,7 +1312,7 @@ func (r *bucketIndexReader) preloadSeries(ids []uint64) error { return g.Run() } -func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, start, end uint64) error { +func (r *bucketIndexReader) loadSeries(ctx context.Context, refs []uint64, start, end uint64) error { begin := time.Now() b, err := r.block.readIndexRange(ctx, int64(start), int64(end-start)) @@ -1317,12 +1324,12 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, start, defer r.mtx.Unlock() r.stats.seriesFetchCount++ - r.stats.seriesFetched += len(ids) + r.stats.seriesFetched += len(refs) r.stats.seriesFetchDurationSum += time.Since(begin) r.stats.seriesFetchedSizeSum += int(end - start) - for _, id := range ids { - c := b[id-start:] + for _, ref := range refs { + c := b[ref-start:] l, n := binary.Uvarint(c) if n < 1 { @@ -1332,8 +1339,8 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, start, return errors.Errorf("invalid remaining size %d, expected %d", len(c), n+int(l)) } c = c[n : n+int(l)] - r.loadedSeries[id] = c - r.cache.setSeries(r.block.meta.ULID, id, c) + r.loadedSeries[ref] = c + r.cache.setSeries(r.block.meta.ULID, ref, c) } return nil } @@ -1421,6 +1428,7 @@ func (r *bucketIndexReader) SortedPostings(p index.Postings) index.Postings { // Series populates the given labels and chunk metas for the series identified // by the reference. // Returns ErrNotFound if the ref does not resolve to a known series. +// prealoadSeries needs to be invoked first to have this method return loaded results. func (r *bucketIndexReader) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error { b, ok := r.loadedSeries[ref] if !ok { @@ -1438,6 +1446,11 @@ func (r *bucketIndexReader) LabelIndices() ([][]string, error) { return nil, errors.New("not implemented") } +// LabelNames returns all the unique label names present in the index in sorted order. +func (r *bucketIndexReader) LabelNames() ([]string, error) { + return nil, errors.New("not implemented") +} + // Close released the underlying resources of the reader. func (r *bucketIndexReader) Close() error { r.block.pendingReaders.Done() diff --git a/pkg/store/bucket_profile_test.go b/pkg/store/bucket_profile_test.go new file mode 100644 index 00000000000..942231418f8 --- /dev/null +++ b/pkg/store/bucket_profile_test.go @@ -0,0 +1,408 @@ +package store + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "runtime" + "runtime/pprof" + "sync" + "testing" + "time" + + "github.com/oklog/ulid" + + "github.com/improbable-eng/thanos/pkg/objstore/inmem" + + "github.com/go-kit/kit/log" + "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/runutil" + "github.com/improbable-eng/thanos/pkg/store/storepb" + "github.com/improbable-eng/thanos/pkg/testutil" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/prometheus/tsdb/labels" +) + +func saveHeap(t *testing.T, name string) { + time.Sleep(500 * time.Millisecond) + runtime.GC() + f, err := os.OpenFile("heap-"+name, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm) + testutil.Ok(t, err) + + defer f.Close() + testutil.Ok(t, pprof.WriteHeapProfile(f)) +} + +func TestBucketStore_PROFILE(t *testing.T) { + bkt := inmem.NewBucket() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dir, err := ioutil.TempDir("", "test_bucketstore_e2e") + testutil.Ok(t, err) + //defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() + + series := []labels.Labels{ + labels.FromStrings("a", "1", "b", "1"), + labels.FromStrings("a", "1", "b", "2"), + labels.FromStrings("a", "2", "b", "1"), + labels.FromStrings("a", "2", "b", "2"), + labels.FromStrings("a", "1", "c", "1"), + labels.FromStrings("a", "1", "c", "2"), + labels.FromStrings("a", "2", "c", "1"), + labels.FromStrings("a", "2", "c", "2"), + } + extLset := labels.FromStrings("ext1", "value1") + + start := time.Now() + now := start + + var ids []ulid.ULID + for i := 0; i < 3; i++ { + mint := timestamp.FromTime(now) + now = now.Add(2 * time.Hour) + maxt := timestamp.FromTime(now) + + // Create two blocks per time slot. Only add 10 samples each so only one chunk + // gets created each. This way we can easily verify we got 10 chunks per series below. + id1, err := testutil.CreateBlock(dir, series[:4], 10, mint, maxt, extLset, 0) + testutil.Ok(t, err) + id2, err := testutil.CreateBlock(dir, series[4:], 10, mint, maxt, extLset, 0) + testutil.Ok(t, err) + + ids = append(ids, id1, id2) + dir1, dir2 := filepath.Join(dir, id1.String()), filepath.Join(dir, id2.String()) + + // Add labels to the meta of the second block. + meta, err := block.ReadMetaFile(dir2) + testutil.Ok(t, err) + meta.Thanos.Labels = map[string]string{"ext2": "value2"} + testutil.Ok(t, block.WriteMetaFile(log.NewNopLogger(), dir2, meta)) + + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, dir1)) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, dir2)) + + testutil.Ok(t, os.RemoveAll(dir1)) + testutil.Ok(t, os.RemoveAll(dir2)) + } + + store, err := NewBucketStore(nil, nil, bkt, dir, 100, 0, false) + testutil.Ok(t, err) + + ctx, _ = context.WithTimeout(ctx, 30*time.Second) + + if err := runutil.Retry(100*time.Millisecond, ctx.Done(), func() error { + if err := store.SyncBlocks(ctx); err != nil { + return err + } + if store.numBlocks() < 6 { + return errors.New("not all blocks loaded") + } + return nil + }); err != nil && errors.Cause(err) != context.Canceled { + t.Error(err) + t.FailNow() + } + testutil.Ok(t, err) + + pbseries := [][]storepb.Label{ + {{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}}, + {{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}}, + {{Name: "a", Value: "2"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "2"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "2"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}}, + {{Name: "a", Value: "2"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}}, + } + + srv := newStoreSeriesServer(ctx) + + err = store.Series(&storepb.SeriesRequest{ + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_RE, Name: "a", Value: "1|2"}, + }, + MinTime: timestamp.FromTime(start), + MaxTime: timestamp.FromTime(now), + }, srv) + testutil.Ok(t, err) + testutil.Equals(t, len(pbseries), len(srv.SeriesSet)) + + g := sync.WaitGroup{} + + // NO REPRO + go func() { + g.Add(1) + time.Sleep(10 * time.Millisecond) + // Simulate deleted blocks without sync (compaction!) + testutil.Ok(t, block.Delete(ctx, bkt, ids[2])) + time.Sleep(10 * time.Millisecond) + store.SyncBlocks(ctx) + store.SyncBlocks(ctx) + + g.Done() + }() + + for i := 0; i < 1000; i++ { + go func() { + g.Add(1) + srv := newStoreSeriesServer(ctx) + + err = store.Series(&storepb.SeriesRequest{ + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_RE, Name: "a", Value: "1|2"}, + }, + MinTime: timestamp.FromTime(start), + MaxTime: timestamp.FromTime(now), + }, srv) + fmt.Println(err) + //testutil.Ok(t, err) + //testutil.Equals(t, len(pbseries), len(srv.SeriesSet)) + + g.Done() + }() + } + time.Sleep(10 * time.Millisecond) + for i := 0; i < 1000; i++ { + go func() { + g.Add(1) + srv := newStoreSeriesServer(ctx) + + err = store.Series(&storepb.SeriesRequest{ + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_RE, Name: "a", Value: "1|2"}, + }, + MinTime: timestamp.FromTime(start), + MaxTime: timestamp.FromTime(now), + }, srv) + fmt.Println(err) + //testutil.Ok(t, err) + //testutil.Equals(t, len(pbseries), len(srv.SeriesSet)) + + g.Done() + }() + } + + g.Wait() + + //for i, s := range srv.SeriesSet { + // testutil.Equals(t, pbseries[i], s.Labels) + // testutil.Equals(t, 3, len(s.Chunks)) + //} + + saveHeap(t, "2") +} + +/* +================== +WARNING: DATA RACE +Read at 0x00c4201c22f8 by goroutine 75: + github.com/improbable-eng/thanos/pkg/pool.(*BytesPool).Put() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/pool/pool.go:83 +0x14c + github.com/improbable-eng/thanos/pkg/store.(*bucketChunkReader).Close() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1570 +0x115 + github.com/improbable-eng/thanos/pkg/runutil.CloseWithLogOnErr() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/runutil/runutil.go:60 +0x59 + github.com/improbable-eng/thanos/pkg/store.(*BucketStore).Series() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:811 +0x2d7e + github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE.func2() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:140 +0x4e6 + +Previous write at 0x00c4201c22f8 by goroutine 25: + sync/atomic.AddInt64() + /usr/local/go/src/runtime/race_amd64.s:276 +0xb + github.com/improbable-eng/thanos/pkg/pool.(*BytesPool).Get() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/pool/pool.go:65 +0x1ad + github.com/improbable-eng/thanos/pkg/store.(*bucketBlock).readChunkRange() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1130 +0x95 + github.com/improbable-eng/thanos/pkg/store.(*bucketChunkReader).loadChunks() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1499 +0xe7 + github.com/improbable-eng/thanos/pkg/store.(*bucketChunkReader).preload.func3() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1485 +0x23a + github.com/improbable-eng/thanos/vendor/github.com/oklog/run.(*Group).Run.func1() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/vendor/github.com/oklog/run/group.go:38 +0x34 + +Goroutine 75 (running) created at: + github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:136 +0x238e + testing.tRunner() + /usr/local/go/src/testing/testing.go:777 +0x16d + +Goroutine 25 (finished) created at: + github.com/improbable-eng/thanos/vendor/github.com/oklog/run.(*Group).Run() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/vendor/github.com/oklog/run/group.go:37 +0x10b + github.com/improbable-eng/thanos/pkg/store.(*bucketChunkReader).preload() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1493 +0x6f0 + github.com/improbable-eng/thanos/pkg/store.(*BucketStore).blockSeries() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:572 +0x1109 + github.com/improbable-eng/thanos/pkg/store.(*BucketStore).Series.func1() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:721 +0x1e7 + github.com/improbable-eng/thanos/vendor/github.com/oklog/run.(*Group).Run.func1() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/vendor/github.com/oklog/run/group.go:38 +0x34 +================== + +================== +WARNING: DATA RACE +Write at 0x00c42029c2fc by goroutine 10: + internal/race.Write() + /usr/local/go/src/internal/race/race.go:41 +0x38 + sync.(*WaitGroup).Wait() + /usr/local/go/src/sync/waitgroup.go:127 +0xf3 + github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:164 +0x2442 + testing.tRunner() + /usr/local/go/src/testing/testing.go:777 +0x16d + +Previous read at 0x00c42029c2fc by goroutine 74: + internal/race.Read() + /usr/local/go/src/internal/race/race.go:37 +0x38 + sync.(*WaitGroup).Add() + /usr/local/go/src/sync/waitgroup.go:70 +0x16e + github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE.func2() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:137 +0x5c + +Goroutine 10 (running) created at: + testing.(*T).Run() + /usr/local/go/src/testing/testing.go:824 +0x564 + testing.runTests.func1() + /usr/local/go/src/testing/testing.go:1063 +0xa4 + testing.tRunner() + /usr/local/go/src/testing/testing.go:777 +0x16d + testing.runTests() + /usr/local/go/src/testing/testing.go:1061 +0x4e1 + testing.(*M).Run() + /usr/local/go/src/testing/testing.go:978 +0x2cd + main.main() + _testmain.go:70 +0x22a + +Goroutine 74 (running) created at: + github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:136 +0x238e + testing.tRunner() + /usr/local/go/src/testing/testing.go:777 +0x16d +================== +================== +WARNING: DATA RACE +Write at 0x00c4202647b0 by goroutine 230: + runtime.mapdelete_faststr() + /usr/local/go/src/runtime/hashmap_fast.go:883 +0x0 + github.com/improbable-eng/thanos/pkg/objstore/inmem.(*Bucket).Delete() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/objstore/inmem/inmem.go:138 +0x69 + github.com/improbable-eng/thanos/pkg/objstore.DeleteDir.func1() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/objstore/objstore.go:102 +0x113 + github.com/improbable-eng/thanos/pkg/objstore/inmem.(*Bucket).Iter() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/objstore/inmem/inmem.go:76 +0x616 + github.com/improbable-eng/thanos/pkg/objstore.DeleteDir() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/objstore/objstore.go:97 +0x10c + github.com/improbable-eng/thanos/pkg/block.Delete() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/block/block.go:215 +0x7f + github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE.func3() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:157 +0xae + +Previous read at 0x00c4202647b0 by goroutine 85: + runtime.mapaccess2_faststr() + /usr/local/go/src/runtime/hashmap_fast.go:261 +0x0 + github.com/improbable-eng/thanos/pkg/objstore/inmem.(*Bucket).GetRange() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/objstore/inmem/inmem.go:103 +0x9b + github.com/improbable-eng/thanos/pkg/store.(*bucketBlock).readChunkRange() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1136 +0x255 + github.com/improbable-eng/thanos/pkg/store.(*bucketChunkReader).loadChunks() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1499 +0xe7 + github.com/improbable-eng/thanos/pkg/store.(*bucketChunkReader).preload.func3() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1485 +0x23a + github.com/improbable-eng/thanos/vendor/github.com/oklog/run.(*Group).Run.func1() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/vendor/github.com/oklog/run/group.go:38 +0x34 + +Goroutine 230 (running) created at: + github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:154 +0x2431 + testing.tRunner() + /usr/local/go/src/testing/testing.go:777 +0x16d + +Goroutine 85 (finished) created at: + github.com/improbable-eng/thanos/vendor/github.com/oklog/run.(*Group).Run() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/vendor/github.com/oklog/run/group.go:37 +0x10b + github.com/improbable-eng/thanos/pkg/store.(*bucketChunkReader).preload() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1493 +0x6f0 + github.com/improbable-eng/thanos/pkg/store.(*BucketStore).blockSeries() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:572 +0x1109 + github.com/improbable-eng/thanos/pkg/store.(*BucketStore).Series.func1() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:721 +0x1e7 + github.com/improbable-eng/thanos/vendor/github.com/oklog/run.(*Group).Run.func1() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/vendor/github.com/oklog/run/group.go:38 +0x34 +================== +================== +WARNING: DATA RACE +Read at 0x00c4200d4978 by goroutine 76: + github.com/improbable-eng/thanos/pkg/pool.(*BytesPool).Put() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/pool/pool.go:83 +0x14c + github.com/improbable-eng/thanos/pkg/store.(*bucketChunkReader).Close() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1570 +0x115 + github.com/improbable-eng/thanos/pkg/runutil.CloseWithLogOnErr() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/runutil/runutil.go:60 +0x59 + github.com/improbable-eng/thanos/pkg/store.(*BucketStore).Series() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:811 +0x2d7e + github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE.func2() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:140 +0x4e6 + +Previous write at 0x00c4200d4978 by goroutine 365: + sync/atomic.AddInt64() + /usr/local/go/src/runtime/race_amd64.s:276 +0xb + github.com/improbable-eng/thanos/pkg/pool.(*BytesPool).Get() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/pool/pool.go:65 +0x1ad + github.com/improbable-eng/thanos/pkg/store.(*bucketBlock).readChunkRange() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1130 +0x95 + github.com/improbable-eng/thanos/pkg/store.(*bucketChunkReader).loadChunks() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1499 +0xe7 + github.com/improbable-eng/thanos/pkg/store.(*bucketChunkReader).preload.func3() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1485 +0x23a + github.com/improbable-eng/thanos/vendor/github.com/oklog/run.(*Group).Run.func1() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/vendor/github.com/oklog/run/group.go:38 +0x34 + +Goroutine 76 (running) created at: + github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:136 +0x238e + testing.tRunner() + /usr/local/go/src/testing/testing.go:777 +0x16d + +Goroutine 365 (finished) created at: + github.com/improbable-eng/thanos/vendor/github.com/oklog/run.(*Group).Run() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/vendor/github.com/oklog/run/group.go:37 +0x10b + github.com/improbable-eng/thanos/pkg/store.(*bucketChunkReader).preload() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1493 +0x6f0 + github.com/improbable-eng/thanos/pkg/store.(*BucketStore).blockSeries() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:572 +0x1109 + github.com/improbable-eng/thanos/pkg/store.(*BucketStore).Series.func1() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:721 +0x1e7 + github.com/improbable-eng/thanos/vendor/github.com/oklog/run.(*Group).Run.func1() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/vendor/github.com/oklog/run/group.go:38 +0x34 +================== + +================== +WARNING: DATA RACE +Write at 0x00c4200837d0 by goroutine 77: + github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE.func2() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:140 +0x50b + +Previous write at 0x00c4200837d0 by goroutine 76: + github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE.func2() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:140 +0x50b + +Goroutine 77 (running) created at: + github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:136 +0x238e + testing.tRunner() + /usr/local/go/src/testing/testing.go:777 +0x16d + +Goroutine 76 (finished) created at: + github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:136 +0x238e + testing.tRunner() + /usr/local/go/src/testing/testing.go:777 +0x16d +================== + +*/ diff --git a/pkg/testutil/prometheus.go b/pkg/testutil/prometheus.go index 807b61f8a05..6c5729531b8 100644 --- a/pkg/testutil/prometheus.go +++ b/pkg/testutil/prometheus.go @@ -12,8 +12,9 @@ import ( "syscall" "time" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" + "github.com/go-kit/kit/log" - "github.com/improbable-eng/thanos/pkg/block" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -188,7 +189,7 @@ func CreateBlock( extLset labels.Labels, resolution int64, ) (id ulid.ULID, err error) { - h, err := tsdb.NewHead(nil, nil, tsdb.NopWAL(), 10000000000) + h, err := tsdb.NewHead(nil, nil, nil, 10000000000) if err != nil { return id, errors.Wrap(err, "create head block") } @@ -238,15 +239,15 @@ func CreateBlock( return id, errors.Wrap(err, "create compactor") } - id, err = c.Write(dir, h, mint, maxt) + id, err = c.Write(dir, h, mint, maxt, nil) if err != nil { return id, errors.Wrap(err, "write block") } - if _, err = block.InjectThanosMeta(log.NewNopLogger(), filepath.Join(dir, id.String()), block.ThanosMeta{ + if _, err = blockmeta.InjectThanos(log.NewNopLogger(), filepath.Join(dir, id.String()), blockmeta.Thanos{ Labels: extLset.Map(), - Downsample: block.ThanosDownsampleMeta{Resolution: resolution}, - Source: block.TestSource, + Downsample: blockmeta.ThanosDownsample{Resolution: resolution}, + Source: blockmeta.TestSource, }, nil); err != nil { return id, errors.Wrap(err, "finalize block") } diff --git a/pkg/verifier/index_issue.go b/pkg/verifier/index_issue.go index 54a20703d45..3e96ccb2062 100644 --- a/pkg/verifier/index_issue.go +++ b/pkg/verifier/index_issue.go @@ -8,6 +8,8 @@ import ( "path" "path/filepath" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/block" @@ -94,7 +96,7 @@ func IndexIssue(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bac logger, tmpdir, id, - block.BucketRepairSource, + blockmeta.BucketRepairSource, block.IgnoreCompleteOutsideChunk, block.IgnoreDuplicateOutsideChunk, block.IgnoreIssue347OutsideChunk,