diff --git a/Gopkg.lock b/Gopkg.lock index 94d9dfb1be6..57e6d89a262 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -199,12 +199,12 @@ revision = "2e65f85255dbc3072edf28d6b5b8efc472979f5a" [[projects]] - branch = "master" - digest = "1:9abc49f39e3e23e262594bb4fb70abf74c0c99e94f99153f43b143805e850719" + digest = "1:cea4aa2038169ee558bf507d5ea02c94ca85bcca28a4c7bb99fd59b31e43a686" name = "github.com/google/go-querystring" packages = ["query"] pruneopts = "" - revision = "53e6ce116135b80d037921a7fdd5138cf32d7a8a" + revision = "44c6ddd0a2342c386950e880b658017258da92fc" + version = "v1.0.0" [[projects]] digest = "1:e097a364f4e8d8d91b9b9eeafb992d3796a41fde3eb548c1a87eb9d9f60725cf" @@ -335,12 +335,12 @@ version = "v0.3.0" [[projects]] - digest = "1:82b912465c1da0668582a7d1117339c278e786c2536b3c3623029a0c7141c2d0" + digest = "1:84c28d9899cc4e00c38042d345cea8819275a5a62403a58530cac67022894776" name = "github.com/mattn/go-runewidth" packages = ["."] pruneopts = "" - revision = "ce7b0b5c7b45a81508558cd1dba6bb1e4ddb51bb" - version = "v0.0.3" + revision = "3ee7d812e62a0804a7d0a324e0249ca2db3476d3" + version = "v0.0.4" [[projects]] digest = "1:49a8b01a6cd6558d504b65608214ca40a78000e1b343ed0da5c6a9ccd83d6d30" @@ -390,20 +390,12 @@ version = "v0.11.0" [[projects]] - digest = "1:912349f5cf927bf96dca709623631ace7db723f07c70c4d56cfc22d9a667ed16" + digest = "1:b09858acd58e0873236c7b96903e3ec4e238d5de644c08bd8e712fa2d3d51ad2" name = "github.com/mozillazg/go-httpheader" packages = ["."] pruneopts = "" - revision = "4e5d6424981844faafc4b0649036b2e0395bdf99" - version = "v0.2.0" - -[[projects]] - branch = "master" - digest = "1:3adc46876d4d0e4d5bbcfcc44c2116b95d7a5c966e2ee92a219488547fd453f2" - name = "github.com/nightlyone/lockfile" - packages = ["."] - pruneopts = "" - revision = "0ad87eef1443f64d3d8c50da647e2b1552851124" + revision = "61f2392c3317b60616c9dcb10d0a4cfef131fe62" + version = "v0.2.1" [[projects]] digest = "1:94e9081cc450d2cdf4e6886fc2c06c07272f86477df2d74ee5931951fa3d2577" @@ -508,11 +500,12 @@ revision = "05ee40e3a273f7245e8777337fc7b46e533a9a92" [[projects]] - digest = "1:b5ff9852eabe841003da4b0a4b742a2878c722dda6481003432344f633a814fc" + digest = "1:912a82611430bfd1e597e76aac99cac6fd34094a07b07d7c5996cf51a21e7e07" name = "github.com/prometheus/prometheus" packages = [ "discovery/file", "discovery/targetgroup", + "pkg/gate", "pkg/labels", "pkg/rulefmt", "pkg/textparse", @@ -528,11 +521,10 @@ "util/testutil", ] pruneopts = "" - revision = "71af5e29e815795e9dd14742ee7725682fa14b7b" - version = "v2.3.2" + revision = "6e08029b56ae17c49e133d92a2792f6f119f2cbd" [[projects]] - digest = "1:216dcf26fbfb3f36f286ca3306882a157c51648e4b5d4f3a9e9c719faea6ea58" + digest = "1:0a03b362c09b1186dd53330881430c7c2c26ba07806ebad861f2aa99d9c5c6ae" name = "github.com/prometheus/tsdb" packages = [ ".", @@ -541,9 +533,11 @@ "fileutil", "index", "labels", + "wal", ] pruneopts = "" - revision = "bd832fc8274e8fe63999ac749daaaff9d881241f" + revision = "fd04e0963c04a1fbd891be7552b50f58e357f75c" + source = "github.com/bwplotka/tsdb" [[projects]] branch = "master" diff --git a/Gopkg.toml b/Gopkg.toml index 98bbc9494dc..e639afa84d9 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -37,7 +37,8 @@ ignored = ["github.com/improbable-eng/thanos/benchmark/*"] name = "github.com/prometheus/common" [[constraint]] - version = "v2.3.2" + # TODO(bwplotka): Move to released version once our recent fixes will be merged & released. + revision = "6e08029b56ae17c49e133d92a2792f6f119f2cbd" name = "github.com/prometheus/prometheus" [[override]] @@ -46,7 +47,9 @@ ignored = ["github.com/improbable-eng/thanos/benchmark/*"] [[constraint]] name = "github.com/prometheus/tsdb" - revision = "bd832fc8274e8fe63999ac749daaaff9d881241f" + # TODO(bwplotka): Move to upstream version once https://github.com/prometheus/tsdb/pull/492 is merged. + revision = "fd04e0963c04a1fbd891be7552b50f58e357f75c" + source = "github.com/bwplotka/tsdb" [[constraint]] branch = "master" diff --git a/cmd/thanos/bucket.go b/cmd/thanos/bucket.go index 01b9b54bf68..eca845231bf 100644 --- a/cmd/thanos/bucket.go +++ b/cmd/thanos/bucket.go @@ -10,10 +10,9 @@ import ( "text/template" "time" - "github.com/prometheus/tsdb/labels" - "github.com/go-kit/kit/log" "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" "github.com/improbable-eng/thanos/pkg/objstore/client" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/improbable-eng/thanos/pkg/verifier" @@ -23,6 +22,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/tsdb/labels" "golang.org/x/text/language" "golang.org/x/text/message" "gopkg.in/alecthomas/kingpin.v2" @@ -254,7 +254,7 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin defer cancel() // Getting Metas. - var blockMetas []*block.Meta + var blockMetas []*blockmeta.Meta if err = bkt.Iter(ctx, "", func(name string) error { id, ok := block.IsBlockDir(name) if !ok { @@ -277,7 +277,7 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin } } -func printTable(blockMetas []*block.Meta, selectorLabels labels.Labels, sortBy []string) error { +func printTable(blockMetas []*blockmeta.Meta, selectorLabels labels.Labels, sortBy []string) error { header := inspectColumns var lines [][]string @@ -355,7 +355,7 @@ func getKeysAlphabetically(labels map[string]string) []string { // matchesSelector checks if blockMeta contains every label from // the selector with the correct value -func matchesSelector(blockMeta *block.Meta, selectorLabels labels.Labels) bool { +func matchesSelector(blockMeta *blockmeta.Meta, selectorLabels labels.Labels) bool { for _, l := range selectorLabels { if v, ok := blockMeta.Thanos.Labels[l.Name]; !ok || v != l.Value { return false diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index c00aadd43db..dddd1e91afb 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -8,11 +8,10 @@ import ( "path/filepath" "time" - "github.com/prometheus/tsdb/chunkenc" - "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" "github.com/improbable-eng/thanos/pkg/compact/downsample" "github.com/improbable-eng/thanos/pkg/objstore" "github.com/improbable-eng/thanos/pkg/objstore/client" @@ -23,6 +22,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/tsdb" + "github.com/prometheus/tsdb/chunkenc" "gopkg.in/alecthomas/kingpin.v2" ) @@ -105,7 +105,7 @@ func downsampleBucket( if err := os.MkdirAll(dir, 0777); err != nil { return errors.Wrap(err, "create dir") } - var metas []*block.Meta + var metas []*blockmeta.Meta err := bkt.Iter(ctx, "", func(name string) error { id, ok := block.IsBlockDir(name) @@ -119,7 +119,7 @@ func downsampleBucket( } defer runutil.CloseWithLogOnErr(logger, rc, "block reader") - var m block.Meta + var m blockmeta.Meta if err := json.NewDecoder(rc).Decode(&m); err != nil { return errors.Wrap(err, "decode meta") } @@ -201,7 +201,7 @@ func downsampleBucket( return nil } -func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *block.Meta, dir string, resolution int64) error { +func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *blockmeta.Meta, dir string, resolution int64) error { begin := time.Now() bdir := filepath.Join(dir, m.ULID.String()) diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index bf6cfa64fe8..9e6c5ac19ef 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -296,7 +296,16 @@ func runQuery( return stores.Get(), nil }, selectorLset) queryableCreator = query.NewQueryableCreator(logger, proxy, replicaLabel) - engine = promql.NewEngine(logger, reg, maxConcurrentQueries, queryTimeout) + engine = promql.NewEngine( + promql.EngineOpts{ + Logger: logger, + Reg: reg, + MaxConcurrent: maxConcurrentQueries, + // TODO(bwplotka): Expose this as a flag: https://github.com/improbable-eng/thanos/issues/703 + MaxSamples: math.MaxInt32, + Timeout: queryTimeout, + }, + ) ) // Periodically update the store set with the addresses we see in our cluster. { diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 03fed102d2e..35d81858c17 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -19,15 +19,14 @@ import ( "syscall" "time" - "github.com/improbable-eng/thanos/pkg/extprom" - "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/alert" - "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" "github.com/improbable-eng/thanos/pkg/cluster" "github.com/improbable-eng/thanos/pkg/discovery/cache" "github.com/improbable-eng/thanos/pkg/discovery/dns" + "github.com/improbable-eng/thanos/pkg/extprom" "github.com/improbable-eng/thanos/pkg/objstore/client" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/improbable-eng/thanos/pkg/shipper" @@ -290,7 +289,7 @@ func runRule( ctx, cancel := context.WithCancel(context.Background()) ctx = tracing.ContextWithTracer(ctx, tracer) - notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) error { + notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) { res := make([]*alert.Alert, 0, len(alerts)) for _, alrt := range alerts { // Only send actually firing alerts. @@ -309,8 +308,6 @@ func runRule( res = append(res, a) } alertQ.Push(res) - - return nil } mgr = rules.NewManager(&rules.ManagerOptions{ Context: ctx, @@ -579,7 +576,7 @@ func runRule( } }() - s := shipper.New(logger, nil, dataDir, bkt, func() labels.Labels { return lset }, block.RulerSource) + s := shipper.New(logger, nil, dataDir, bkt, func() labels.Labels { return lset }, blockmeta.RulerSource) ctx, cancel := context.WithCancel(context.Background()) diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 61f493f93a8..1662a310fa8 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -14,7 +14,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" "github.com/improbable-eng/thanos/pkg/cluster" "github.com/improbable-eng/thanos/pkg/objstore/client" "github.com/improbable-eng/thanos/pkg/reloader" @@ -252,7 +252,7 @@ func runSidecar( } }() - s := shipper.New(logger, nil, dataDir, bkt, metadata.Labels, block.SidecarSource) + s := shipper.New(logger, nil, dataDir, bkt, metadata.Labels, blockmeta.SidecarSource) ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { diff --git a/pkg/block/block.go b/pkg/block/block.go index 118b7ea96c8..e6750e0881c 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -5,11 +5,12 @@ package block import ( "context" "encoding/json" - "io/ioutil" "os" "path" "path/filepath" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" + "fmt" "github.com/go-kit/kit/log" @@ -17,8 +18,6 @@ import ( "github.com/improbable-eng/thanos/pkg/runutil" "github.com/oklog/ulid" "github.com/pkg/errors" - "github.com/prometheus/tsdb" - "github.com/prometheus/tsdb/fileutil" ) const ( @@ -33,103 +32,6 @@ const ( DebugMetas = "debug/metas" ) -type SourceType string - -const ( - UnknownSource SourceType = "" - SidecarSource SourceType = "sidecar" - CompactorSource SourceType = "compactor" - CompactorRepairSource SourceType = "compactor.repair" - RulerSource SourceType = "ruler" - BucketRepairSource SourceType = "bucket.repair" - TestSource SourceType = "test" -) - -// Meta describes the a block's meta. It wraps the known TSDB meta structure and -// extends it by Thanos-specific fields. -type Meta struct { - Version int `json:"version"` - - tsdb.BlockMeta - - Thanos ThanosMeta `json:"thanos"` -} - -// ThanosMeta holds block meta information specific to Thanos. -type ThanosMeta struct { - Labels map[string]string `json:"labels"` - Downsample ThanosDownsampleMeta `json:"downsample"` - - // Source is a real upload source of the block. - Source SourceType `json:"source"` -} - -type ThanosDownsampleMeta struct { - Resolution int64 `json:"resolution"` -} - -// WriteMetaFile writes the given meta into /meta.json. -func WriteMetaFile(logger log.Logger, dir string, meta *Meta) error { - // Make any changes to the file appear atomic. - path := filepath.Join(dir, MetaFilename) - tmp := path + ".tmp" - - f, err := os.Create(tmp) - if err != nil { - return err - } - - enc := json.NewEncoder(f) - enc.SetIndent("", "\t") - - if err := enc.Encode(meta); err != nil { - runutil.CloseWithLogOnErr(logger, f, "close meta") - return err - } - if err := f.Close(); err != nil { - return err - } - return renameFile(logger, tmp, path) -} - -// ReadMetaFile reads the given meta from /meta.json. -func ReadMetaFile(dir string) (*Meta, error) { - b, err := ioutil.ReadFile(filepath.Join(dir, MetaFilename)) - if err != nil { - return nil, err - } - var m Meta - - if err := json.Unmarshal(b, &m); err != nil { - return nil, err - } - if m.Version != 1 { - return nil, errors.Errorf("unexpected meta file version %d", m.Version) - } - return &m, nil -} - -func renameFile(logger log.Logger, from, to string) error { - if err := os.RemoveAll(to); err != nil { - return err - } - if err := os.Rename(from, to); err != nil { - return err - } - - // Directory was renamed; sync parent dir to persist rename. - pdir, err := fileutil.OpenDir(filepath.Dir(to)) - if err != nil { - return err - } - - if err = fileutil.Fsync(pdir); err != nil { - runutil.CloseWithLogOnErr(logger, pdir, "close dir") - return err - } - return pdir.Close() -} - // Download downloads directory that is mean to be block directory. func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id ulid.ULID, dst string) error { if err := objstore.DownloadDir(ctx, logger, bucket, id.String(), dst); err != nil { @@ -169,7 +71,7 @@ func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st return errors.Wrap(err, "not a block dir") } - meta, err := ReadMetaFile(bdir) + meta, err := blockmeta.Read(bdir) if err != nil { // No meta or broken meta file. return errors.Wrap(err, "read meta") @@ -216,16 +118,16 @@ func Delete(ctx context.Context, bucket objstore.Bucket, id ulid.ULID) error { } // DownloadMeta downloads only meta file from bucket by block ID. -func DownloadMeta(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (Meta, error) { +func DownloadMeta(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (blockmeta.Meta, error) { rc, err := bkt.Get(ctx, path.Join(id.String(), MetaFilename)) if err != nil { - return Meta{}, errors.Wrapf(err, "meta.json bkt get for %s", id.String()) + return blockmeta.Meta{}, errors.Wrapf(err, "meta.json bkt get for %s", id.String()) } defer runutil.CloseWithLogOnErr(logger, rc, "download meta bucket client") - var m Meta + var m blockmeta.Meta if err := json.NewDecoder(rc).Decode(&m); err != nil { - return Meta{}, errors.Wrapf(err, "decode meta.json for block %s", id.String()) + return blockmeta.Meta{}, errors.Wrapf(err, "decode meta.json for block %s", id.String()) } return m, nil } @@ -234,24 +136,3 @@ func IsBlockDir(path string) (id ulid.ULID, ok bool) { id, err := ulid.Parse(filepath.Base(path)) return id, err == nil } - -// InjectThanosMeta sets Thanos meta to the block meta JSON and saves it to the disk. -// NOTE: It should be used after writing any block by any Thanos component, otherwise we will miss crucial metadata. -func InjectThanosMeta(logger log.Logger, bdir string, meta ThanosMeta, downsampledMeta *tsdb.BlockMeta) (*Meta, error) { - newMeta, err := ReadMetaFile(bdir) - if err != nil { - return nil, errors.Wrap(err, "read new meta") - } - newMeta.Thanos = meta - - // While downsampling we need to copy original compaction. - if downsampledMeta != nil { - newMeta.Compaction = downsampledMeta.Compaction - } - - if err := WriteMetaFile(logger, bdir, newMeta); err != nil { - return nil, errors.Wrap(err, "write new meta") - } - - return newMeta, nil -} diff --git a/pkg/block/blockmeta/meta.go b/pkg/block/blockmeta/meta.go new file mode 100644 index 00000000000..d470b4ec2c7 --- /dev/null +++ b/pkg/block/blockmeta/meta.go @@ -0,0 +1,142 @@ +package blockmeta + +// blockmeta package is implements writing and reading wrapped meta.json where Thanos puts its metadata. +// Those metadata contains external labels, downsampling resolution and source type. +// This package is minimal and separated because it usited by testutils which limits test helpers we can use in +// this package. + +import ( + "encoding/json" + "io/ioutil" + "os" + "path/filepath" + + "github.com/go-kit/kit/log" + "github.com/improbable-eng/thanos/pkg/runutil" + "github.com/pkg/errors" + "github.com/prometheus/tsdb" + "github.com/prometheus/tsdb/fileutil" +) + +type SourceType string + +const ( + UnknownSource SourceType = "" + SidecarSource SourceType = "sidecar" + CompactorSource SourceType = "compactor" + CompactorRepairSource SourceType = "compactor.repair" + RulerSource SourceType = "ruler" + BucketRepairSource SourceType = "bucket.repair" + TestSource SourceType = "test" +) + +const ( + // MetaFilename is the known JSON filename for meta information. + MetaFilename = "meta.json" +) + +// Meta describes the a block's meta. It wraps the known TSDB meta structure and +// extends it by Thanos-specific fields. +type Meta struct { + Version int `json:"version"` + + tsdb.BlockMeta + + Thanos Thanos `json:"thanos"` +} + +// Thanos holds block meta information specific to Thanos. +type Thanos struct { + Labels map[string]string `json:"labels"` + Downsample ThanosDownsample `json:"downsample"` + + // Source is a real upload source of the block. + Source SourceType `json:"source"` +} + +type ThanosDownsample struct { + Resolution int64 `json:"resolution"` +} + +// InjectThanos sets Thanos meta to the block meta JSON and saves it to the disk. +// NOTE: It should be used after writing any block by any Thanos component, otherwise we will miss crucial metadata. +func InjectThanos(logger log.Logger, bdir string, meta Thanos, downsampledMeta *tsdb.BlockMeta) (*Meta, error) { + newMeta, err := Read(bdir) + if err != nil { + return nil, errors.Wrap(err, "read new meta") + } + newMeta.Thanos = meta + + // While downsampling we need to copy original compaction. + if downsampledMeta != nil { + newMeta.Compaction = downsampledMeta.Compaction + } + + if err := Write(logger, bdir, newMeta); err != nil { + return nil, errors.Wrap(err, "write new meta") + } + + return newMeta, nil +} + +// Write writes the given meta into /meta.json. +func Write(logger log.Logger, dir string, meta *Meta) error { + // Make any changes to the file appear atomic. + path := filepath.Join(dir, MetaFilename) + tmp := path + ".tmp" + + f, err := os.Create(tmp) + if err != nil { + return err + } + + enc := json.NewEncoder(f) + enc.SetIndent("", "\t") + + if err := enc.Encode(meta); err != nil { + runutil.CloseWithLogOnErr(logger, f, "close meta") + return err + } + if err := f.Close(); err != nil { + return err + } + return renameFile(logger, tmp, path) +} + +func renameFile(logger log.Logger, from, to string) error { + if err := os.RemoveAll(to); err != nil { + return err + } + if err := os.Rename(from, to); err != nil { + return err + } + + // Directory was renamed; sync parent dir to persist rename. + pdir, err := fileutil.OpenDir(filepath.Dir(to)) + if err != nil { + return err + } + + if err = fileutil.Fsync(pdir); err != nil { + runutil.CloseWithLogOnErr(logger, pdir, "close dir") + return err + } + return pdir.Close() +} + +// Read reads the given meta from /meta.json. +func Read(dir string) (*Meta, error) { + b, err := ioutil.ReadFile(filepath.Join(dir, MetaFilename)) + if err != nil { + return nil, err + } + var m Meta + + if err := json.Unmarshal(b, &m); err != nil { + return nil, err + } + if m.Version != 1 { + return nil, errors.Errorf("unexpected meta file version %d", m.Version) + } + return &m, nil +} diff --git a/pkg/block/index.go b/pkg/block/index.go index 2249863b2d2..2d088953fc8 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -11,6 +11,10 @@ import ( "strings" "time" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" + + "github.com/prometheus/tsdb/fileutil" + "github.com/go-kit/kit/log" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/oklog/ulid" @@ -36,23 +40,84 @@ type indexCache struct { Postings []postingsRange } +type realByteSlice []byte + +func (b realByteSlice) Len() int { + return len(b) +} + +func (b realByteSlice) Range(start, end int) []byte { + return b[start:end] +} + +func (b realByteSlice) Sub(start, end int) index.ByteSlice { + return b[start:end] +} + +func getSymbolTable(b index.ByteSlice) (map[uint32]string, error) { + version := int(b.Range(4, 5)[0]) + + if version != 1 && version != 2 { + return nil, errors.Errorf("unknown index file version %d", version) + } + + toc, err := index.NewTOCFromByteSlice(b) + if err != nil { + return nil, errors.Wrap(err, "read TOC") + } + + symbolsV2, symbolsV1, err := index.ReadSymbols(b, version, int(toc.Symbols)) + if err != nil { + return nil, errors.Wrap(err, "read symbols") + } + + symbolsTable := make(map[uint32]string, len(symbolsV1)+len(symbolsV2)) + for o, s := range symbolsV1 { + symbolsTable[o] = s + } + for o, s := range symbolsV2 { + symbolsTable[uint32(o)] = s + } + + return symbolsTable, nil +} + // WriteIndexCache writes a cache file containing the first lookup stages // for an index file. -func WriteIndexCache(logger log.Logger, fn string, r *index.Reader) error { +func WriteIndexCache(logger log.Logger, indexFn string, fn string) error { + indexFile, err := fileutil.OpenMmapFile(indexFn) + if err != nil { + return errors.Wrapf(err, "open mmap index file %s", indexFn) + } + defer runutil.CloseWithLogOnErr(logger, indexFile, "close index cache mmap file from %s", indexFn) + + b := realByteSlice(indexFile.Bytes()) + indexr, err := index.NewReader(b) + if err != nil { + return errors.Wrap(err, "open index reader") + } + defer runutil.CloseWithLogOnErr(logger, indexr, "load index cache reader") + + // We assume reader verified index already. + symbols, err := getSymbolTable(b) + if err != nil { + return err + } + f, err := os.Create(fn) if err != nil { - return errors.Wrap(err, "create file") + return errors.Wrap(err, "create index cache file") } defer runutil.CloseWithLogOnErr(logger, f, "index cache writer") v := indexCache{ - Version: r.Version(), - Symbols: r.SymbolTable(), + Version: indexr.Version(), + Symbols: symbols, LabelValues: map[string][]string{}, } // Extract label value indices. - lnames, err := r.LabelIndices() + lnames, err := indexr.LabelIndices() if err != nil { return errors.Wrap(err, "read label indices") } @@ -62,7 +127,7 @@ func WriteIndexCache(logger log.Logger, fn string, r *index.Reader) error { } ln := lns[0] - tpls, err := r.LabelValues(ln) + tpls, err := indexr.LabelValues(ln) if err != nil { return errors.Wrap(err, "get label values") } @@ -82,7 +147,7 @@ func WriteIndexCache(logger log.Logger, fn string, r *index.Reader) error { } // Extract postings ranges. - pranges, err := r.PostingsRanges() + pranges, err := indexr.PostingsRanges() if err != nil { return errors.Wrap(err, "read postings ranges") } @@ -164,7 +229,7 @@ func VerifyIndex(logger log.Logger, fn string, minTime int64, maxTime int64) err } type Stats struct { - // TotalSeries represents total number of series in block. + // TotalSeries represnts total number of series in block. TotalSeries int // OutOfOrderSeries represents number of series that have out of order chunks. OutOfOrderSeries int @@ -346,7 +411,7 @@ type ignoreFnType func(mint, maxt int64, prev *chunks.Meta, curr *chunks.Meta) ( // - removes all near "complete" outside chunks introduced by https://github.com/prometheus/tsdb/issues/347. // Fixable inconsistencies are resolved in the new block. // TODO(bplotka): https://github.com/improbable-eng/thanos/issues/378 -func Repair(logger log.Logger, dir string, id ulid.ULID, source SourceType, ignoreChkFns ...ignoreFnType) (resid ulid.ULID, err error) { +func Repair(logger log.Logger, dir string, id ulid.ULID, source blockmeta.SourceType, ignoreChkFns ...ignoreFnType) (resid ulid.ULID, err error) { if len(ignoreChkFns) == 0 { return resid, errors.New("no ignore chunk function specified") } @@ -355,7 +420,7 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source SourceType, igno entropy := rand.New(rand.NewSource(time.Now().UnixNano())) resid = ulid.MustNew(ulid.Now(), entropy) - meta, err := ReadMetaFile(bdir) + meta, err := blockmeta.Read(bdir) if err != nil { return resid, errors.Wrap(err, "read meta file") } @@ -405,7 +470,7 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source SourceType, igno if err := rewrite(indexr, chunkr, indexw, chunkw, &resmeta, ignoreChkFns); err != nil { return resid, errors.Wrap(err, "rewrite block") } - if err := WriteMetaFile(logger, resdir, &resmeta); err != nil { + if err := blockmeta.Write(logger, resdir, &resmeta); err != nil { return resid, err } return resid, nil @@ -494,7 +559,7 @@ OUTER: func rewrite( indexr tsdb.IndexReader, chunkr tsdb.ChunkReader, indexw tsdb.IndexWriter, chunkw tsdb.ChunkWriter, - meta *Meta, + meta *blockmeta.Meta, ignoreChkFns []ignoreFnType, ) error { symbols, err := indexr.Symbols() diff --git a/pkg/block/index_test.go b/pkg/block/index_test.go new file mode 100644 index 00000000000..80c10e8e6ed --- /dev/null +++ b/pkg/block/index_test.go @@ -0,0 +1,46 @@ +package block + +import ( + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/go-kit/kit/log" + "github.com/improbable-eng/thanos/pkg/testutil" + "github.com/prometheus/tsdb/labels" +) + +func TestWriteReadIndexCache(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "test-compact-prepare") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + b, err := testutil.CreateBlock(tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + {{Name: "a", Value: "3"}}, + {{Name: "a", Value: "4"}}, + {{Name: "b", Value: "1"}}, + }, 100, 0, 1000, nil, 124) + testutil.Ok(t, err) + + fn := filepath.Join(tmpDir, "index.cache.json") + testutil.Ok(t, WriteIndexCache(log.NewNopLogger(), filepath.Join(tmpDir, b.String(), "index"), fn)) + + version, symbols, lvals, postings, err := ReadIndexCache(log.NewNopLogger(), fn) + testutil.Ok(t, err) + + testutil.Equals(t, 2, version) + testutil.Equals(t, 6, len(symbols)) + testutil.Equals(t, 2, len(lvals)) + + vals, ok := lvals["a"] + testutil.Assert(t, ok, "") + testutil.Equals(t, []string{"1", "2", "3", "4"}, vals) + + vals, ok = lvals["b"] + testutil.Assert(t, ok, "") + testutil.Equals(t, []string{"1"}, vals) + testutil.Equals(t, 6, len(postings)) +} diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 544de920eac..fe95ccfd719 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -9,6 +9,8 @@ import ( "sync" "time" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" + "io/ioutil" "github.com/go-kit/kit/log" @@ -39,7 +41,7 @@ type Syncer struct { bkt objstore.Bucket syncDelay time.Duration mtx sync.Mutex - blocks map[ulid.ULID]*block.Meta + blocks map[ulid.ULID]*blockmeta.Meta metrics *syncerMetrics } @@ -130,7 +132,7 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket logger: logger, reg: reg, syncDelay: syncDelay, - blocks: map[ulid.ULID]*block.Meta{}, + blocks: map[ulid.ULID]*blockmeta.Meta{}, bkt: bkt, metrics: newSyncerMetrics(reg), }, nil @@ -185,9 +187,9 @@ func (c *Syncer) syncMetas(ctx context.Context) error { // NOTE: It is not safe to miss "old" block (even that it is newly created) in sync step. Compactor needs to aware of ALL old blocks. // TODO(bplotka): https://github.com/improbable-eng/thanos/issues/377 if ulid.Now()-id.Time() < uint64(c.syncDelay/time.Millisecond) && - meta.Thanos.Source != block.BucketRepairSource && - meta.Thanos.Source != block.CompactorSource && - meta.Thanos.Source != block.CompactorRepairSource { + meta.Thanos.Source != blockmeta.BucketRepairSource && + meta.Thanos.Source != blockmeta.CompactorSource && + meta.Thanos.Source != blockmeta.CompactorRepairSource { level.Debug(c.logger).Log("msg", "block is too fresh for now", "block", id) return nil @@ -214,7 +216,7 @@ func (c *Syncer) syncMetas(ctx context.Context) error { // GroupKey returns a unique identifier for the group the block belongs to. It considers // the downsampling resolution and the block's labels. -func GroupKey(meta block.Meta) string { +func GroupKey(meta blockmeta.Meta) string { return groupKey(meta.Thanos.Downsample.Resolution, labels.FromMap(meta.Thanos.Labels)) } @@ -381,7 +383,7 @@ type Group struct { labels labels.Labels resolution int64 mtx sync.Mutex - blocks map[ulid.ULID]*block.Meta + blocks map[ulid.ULID]*blockmeta.Meta compactions prometheus.Counter compactionFailures prometheus.Counter groupGarbageCollectedBlocks prometheus.Counter @@ -405,7 +407,7 @@ func newGroup( bkt: bkt, labels: lset, resolution: resolution, - blocks: map[ulid.ULID]*block.Meta{}, + blocks: map[ulid.ULID]*blockmeta.Meta{}, compactions: compactions, compactionFailures: compactionFailures, groupGarbageCollectedBlocks: groupGarbageCollectedBlocks, @@ -419,7 +421,7 @@ func (cg *Group) Key() string { } // Add the block with the given meta to the group. -func (cg *Group) Add(meta *block.Meta) error { +func (cg *Group) Add(meta *blockmeta.Meta) error { cg.mtx.Lock() defer cg.mtx.Unlock() @@ -541,7 +543,7 @@ func IsRetryError(err error) bool { return ok } -func (cg *Group) areBlocksOverlapping(include *block.Meta, excludeDirs ...string) error { +func (cg *Group) areBlocksOverlapping(include *blockmeta.Meta, excludeDirs ...string) error { var ( metas []tsdb.BlockMeta exclude = map[ulid.ULID]struct{}{} @@ -597,12 +599,12 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, return retry(errors.Wrapf(err, "download block %s", ie.id)) } - meta, err := block.ReadMetaFile(bdir) + meta, err := blockmeta.Read(bdir) if err != nil { return errors.Wrapf(err, "read meta from %s", bdir) } - resid, err := block.Repair(logger, tmpdir, ie.id, block.CompactorRepairSource, block.IgnoreIssue347OutsideChunk) + resid, err := block.Repair(logger, tmpdir, ie.id, blockmeta.CompactorRepairSource, block.IgnoreIssue347OutsideChunk) if err != nil { return errors.Wrapf(err, "repair failed for block %s", ie.id) } @@ -647,7 +649,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( if err := os.MkdirAll(bdir, 0777); err != nil { return compID, errors.Wrap(err, "create planning block dir") } - if err := block.WriteMetaFile(cg.logger, bdir, meta); err != nil { + if err := blockmeta.Write(cg.logger, bdir, meta); err != nil { return compID, errors.Wrap(err, "write planning meta file") } } @@ -670,7 +672,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( begin := time.Now() for _, pdir := range plan { - meta, err := block.ReadMetaFile(pdir) + meta, err := blockmeta.Read(pdir) if err != nil { return compID, errors.Wrapf(err, "read meta from %s", pdir) } @@ -718,7 +720,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( begin = time.Now() - compID, err = comp.Compact(dir, plan...) + compID, err = comp.Compact(dir, plan, nil) if err != nil { return compID, halt(errors.Wrapf(err, "compact blocks %v", plan)) } @@ -727,10 +729,10 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( bdir := filepath.Join(dir, compID.String()) - newMeta, err := block.InjectThanosMeta(cg.logger, bdir, block.ThanosMeta{ + newMeta, err := blockmeta.InjectThanos(cg.logger, bdir, blockmeta.Thanos{ Labels: cg.labels.Map(), - Downsample: block.ThanosDownsampleMeta{Resolution: cg.resolution}, - Source: block.CompactorSource, + Downsample: blockmeta.ThanosDownsample{Resolution: cg.resolution}, + Source: blockmeta.CompactorSource, }, nil) if err != nil { return compID, errors.Wrapf(err, "failed to finalize the block %s", bdir) diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 74df73cc6f5..266fb93cdf3 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -13,6 +13,8 @@ import ( "testing" "time" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" + "github.com/go-kit/kit/log" "github.com/improbable-eng/thanos/pkg/block" "github.com/improbable-eng/thanos/pkg/objstore" @@ -37,13 +39,13 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) { // After the first synchronization the first 5 should be dropped and the // last 5 be loaded from the bucket. var ids []ulid.ULID - var metas []*block.Meta + var metas []*blockmeta.Meta for i := 0; i < 15; i++ { id, err := ulid.New(uint64(i), nil) testutil.Ok(t, err) - var meta block.Meta + var meta blockmeta.Meta meta.Version = 1 meta.ULID = id @@ -56,7 +58,7 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) { for _, m := range metas[5:] { var buf bytes.Buffer testutil.Ok(t, json.NewEncoder(&buf).Encode(&m)) - testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), block.MetaFilename), &buf)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), blockmeta.MetaFilename), &buf)) } groups, err := sy.Groups() @@ -79,11 +81,11 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { // Generate 10 source block metas and construct higher level blocks // that are higher compactions of them. - var metas []*block.Meta + var metas []*blockmeta.Meta var ids []ulid.ULID for i := 0; i < 10; i++ { - var m block.Meta + var m blockmeta.Meta m.Version = 1 m.ULID = ulid.MustNew(uint64(i), nil) @@ -94,28 +96,28 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { metas = append(metas, &m) } - var m1 block.Meta + var m1 blockmeta.Meta m1.Version = 1 m1.ULID = ulid.MustNew(100, nil) m1.Compaction.Level = 2 m1.Compaction.Sources = ids[:4] m1.Thanos.Downsample.Resolution = 0 - var m2 block.Meta + var m2 blockmeta.Meta m2.Version = 1 m2.ULID = ulid.MustNew(200, nil) m2.Compaction.Level = 2 m2.Compaction.Sources = ids[4:8] // last two source IDs is not part of a level 2 block. m2.Thanos.Downsample.Resolution = 0 - var m3 block.Meta + var m3 blockmeta.Meta m3.Version = 1 m3.ULID = ulid.MustNew(300, nil) m3.Compaction.Level = 3 m3.Compaction.Sources = ids[:9] // last source ID is not part of level 3 block. m3.Thanos.Downsample.Resolution = 0 - var m4 block.Meta + var m4 blockmeta.Meta m4.Version = 14 m4.ULID = ulid.MustNew(400, nil) m4.Compaction.Level = 2 @@ -127,7 +129,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { fmt.Println("create", m.ULID) var buf bytes.Buffer testutil.Ok(t, json.NewEncoder(&buf).Encode(&m)) - testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), block.MetaFilename), &buf)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), blockmeta.MetaFilename), &buf)) } // Do one initial synchronization with the bucket. @@ -173,7 +175,7 @@ func TestGroup_Compact_e2e(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) defer cancel() - var metas []*block.Meta + var metas []*blockmeta.Meta extLset := labels.Labels{{Name: "e1", Value: "1"}} b1, err := testutil.CreateBlock(prepareDir, []labels.Labels{ {{Name: "a", Value: "1"}}, @@ -183,7 +185,7 @@ func TestGroup_Compact_e2e(t *testing.T) { }, 100, 0, 1000, extLset, 124) testutil.Ok(t, err) - meta, err := block.ReadMetaFile(filepath.Join(prepareDir, b1.String())) + meta, err := blockmeta.Read(filepath.Join(prepareDir, b1.String())) testutil.Ok(t, err) metas = append(metas, meta) @@ -196,7 +198,7 @@ func TestGroup_Compact_e2e(t *testing.T) { testutil.Ok(t, err) // Mix order to make sure compact is able to deduct min time / max time. - meta, err = block.ReadMetaFile(filepath.Join(prepareDir, b3.String())) + meta, err = blockmeta.Read(filepath.Join(prepareDir, b3.String())) testutil.Ok(t, err) metas = append(metas, meta) @@ -204,7 +206,7 @@ func TestGroup_Compact_e2e(t *testing.T) { b2, err := testutil.CreateBlock(prepareDir, []labels.Labels{}, 100, 1001, 2000, extLset, 124) testutil.Ok(t, err) - meta, err = block.ReadMetaFile(filepath.Join(prepareDir, b2.String())) + meta, err = blockmeta.Read(filepath.Join(prepareDir, b2.String())) testutil.Ok(t, err) metas = append(metas, meta) @@ -217,7 +219,7 @@ func TestGroup_Compact_e2e(t *testing.T) { }, 100, 3001, 4000, extLset, 124) testutil.Ok(t, err) - meta, err = block.ReadMetaFile(filepath.Join(prepareDir, freshB.String())) + meta, err = blockmeta.Read(filepath.Join(prepareDir, freshB.String())) testutil.Ok(t, err) metas = append(metas, meta) @@ -263,7 +265,7 @@ func TestGroup_Compact_e2e(t *testing.T) { resDir := filepath.Join(dir, id.String()) testutil.Ok(t, block.Download(ctx, log.NewNopLogger(), bkt, id, resDir)) - meta, err = block.ReadMetaFile(resDir) + meta, err = blockmeta.Read(resDir) testutil.Ok(t, err) testutil.Equals(t, int64(0), meta.MinTime) diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index 305f72021c0..5b69b3dc7ff 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -5,7 +5,8 @@ import ( "path/filepath" "sort" - "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" + "github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/tsdb/chunkenc" @@ -31,7 +32,7 @@ const ( // Downsample downsamples the given block. It writes a new block into dir and returns its ID. func Downsample( logger log.Logger, - origMeta *block.Meta, + origMeta *blockmeta.Meta, b tsdb.BlockReader, dir string, resolution int64, @@ -125,18 +126,18 @@ func Downsample( if err != nil { return id, errors.Wrap(err, "create compactor") } - id, err = comp.Write(dir, newb, origMeta.MinTime, origMeta.MaxTime) + id, err = comp.Write(dir, newb, origMeta.MinTime, origMeta.MaxTime, &origMeta.BlockMeta) if err != nil { return id, errors.Wrap(err, "compact head") } bdir := filepath.Join(dir, id.String()) - var tmeta block.ThanosMeta + var tmeta blockmeta.Thanos tmeta = origMeta.Thanos - tmeta.Source = block.CompactorSource + tmeta.Source = blockmeta.CompactorSource tmeta.Downsample.Resolution = resolution - _, err = block.InjectThanosMeta(logger, bdir, tmeta, &origMeta.BlockMeta) + _, err = blockmeta.InjectThanos(logger, bdir, tmeta, &origMeta.BlockMeta) if err != nil { return id, errors.Wrapf(err, "failed to finalize the block %s", bdir) } @@ -228,13 +229,20 @@ func (b *memBlock) Chunks() (tsdb.ChunkReader, error) { } func (b *memBlock) Tombstones() (tsdb.TombstoneReader, error) { - return tsdb.EmptyTombstoneReader(), nil + return emptyTombstoneReader{}, nil } func (b *memBlock) Close() error { return nil } +type emptyTombstoneReader struct{} + +func (emptyTombstoneReader) Get(ref uint64) (tsdb.Intervals, error) { return nil, nil } +func (emptyTombstoneReader) Iter(func(uint64, tsdb.Intervals) error) error { return nil } +func (emptyTombstoneReader) Total() uint64 { return 0 } +func (emptyTombstoneReader) Close() error { return nil } + // currentWindow returns the end timestamp of the window that t falls into. func currentWindow(t, r int64) int64 { // The next timestamp is the next number after s.t that's aligned with window. diff --git a/pkg/compact/downsample/downsample_test.go b/pkg/compact/downsample/downsample_test.go index d3844784162..1d82dabe77a 100644 --- a/pkg/compact/downsample/downsample_test.go +++ b/pkg/compact/downsample/downsample_test.go @@ -7,6 +7,8 @@ import ( "path/filepath" "testing" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" + "github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/tsdb/chunks" @@ -59,7 +61,7 @@ func TestDownsampleRaw(t *testing.T) { }, }, } - testDownsample(t, input, &block.Meta{}, 100) + testDownsample(t, input, &blockmeta.Meta{}, 100) } func TestDownsampleAggr(t *testing.T) { @@ -96,7 +98,7 @@ func TestDownsampleAggr(t *testing.T) { }, }, } - var meta block.Meta + var meta blockmeta.Meta meta.Thanos.Downsample.Resolution = 10 testDownsample(t, input, &meta, 500) @@ -123,7 +125,7 @@ type downsampleTestSet struct { // testDownsample inserts the input into a block and invokes the downsampler with the given resolution. // The chunk ranges within the input block are aligned at 500 time units. -func testDownsample(t *testing.T, data []*downsampleTestSet, meta *block.Meta, resolution int64) { +func testDownsample(t *testing.T, data []*downsampleTestSet, meta *blockmeta.Meta, resolution int64) { t.Helper() dir, err := ioutil.TempDir("", "downsample-raw") diff --git a/pkg/compact/retention_test.go b/pkg/compact/retention_test.go index 0aef91c697b..3ec3b9c50bf 100644 --- a/pkg/compact/retention_test.go +++ b/pkg/compact/retention_test.go @@ -9,7 +9,7 @@ import ( "time" "github.com/go-kit/kit/log" - "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" "github.com/improbable-eng/thanos/pkg/compact" "github.com/improbable-eng/thanos/pkg/objstore" "github.com/improbable-eng/thanos/pkg/objstore/inmem" @@ -253,15 +253,15 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) { func uploadMockBlock(t *testing.T, bkt objstore.Bucket, id string, minTime, maxTime time.Time, resolutionLevel int64) { t.Helper() - meta1 := block.Meta{ + meta1 := blockmeta.Meta{ Version: 1, BlockMeta: tsdb.BlockMeta{ ULID: ulid.MustParse(id), MinTime: minTime.Unix() * 1000, MaxTime: maxTime.Unix() * 1000, }, - Thanos: block.ThanosMeta{ - Downsample: block.ThanosDownsampleMeta{ + Thanos: blockmeta.Thanos{ + Downsample: blockmeta.ThanosDownsample{ Resolution: resolutionLevel, }, }, diff --git a/pkg/query/api/v1.go b/pkg/query/api/v1.go index 23e5b0a801b..cf693dd2fbe 100644 --- a/pkg/query/api/v1.go +++ b/pkg/query/api/v1.go @@ -527,15 +527,16 @@ func (api *API) series(r *http.Request) (interface{}, []error, *apiError) { var sets []storage.SeriesSet for _, mset := range matcherSets { - s, err := q.Select(&storage.SelectParams{}, mset...) + s, _, err := q.Select(&storage.SelectParams{}, mset...) if err != nil { return nil, nil, &apiError{errorExec, err} } sets = append(sets, s) } - set := storage.NewMergeSeriesSet(sets) - metrics := []labels.Labels{} + set := storage.NewMergeSeriesSet(sets, nil) + + var metrics []labels.Labels for set.Next() { metrics = append(metrics, set.At().Labels()) } diff --git a/pkg/query/api/v1_test.go b/pkg/query/api/v1_test.go index 4afef1d0548..bcf17907b06 100644 --- a/pkg/query/api/v1_test.go +++ b/pkg/query/api/v1_test.go @@ -333,7 +333,7 @@ func TestEndpoints(t *testing.T) { "start": []string{"-2"}, "end": []string{"-1"}, }, - response: []labels.Labels{}, + response: []labels.Labels(nil), }, // Start and end after series ends. { @@ -343,7 +343,7 @@ func TestEndpoints(t *testing.T) { "start": []string{"100000"}, "end": []string{"100001"}, }, - response: []labels.Labels{}, + response: []labels.Labels(nil), }, // Start before series starts, end after series ends. { @@ -409,33 +409,38 @@ func TestEndpoints(t *testing.T) { } for _, test := range tests { - // Build a context with the correct request params. - ctx := context.Background() - for p, v := range test.params { - ctx = route.WithParam(ctx, p, v) - } - t.Logf("run query %q", test.query.Encode()) + if ok := t.Run(test.query.Encode(), func(t *testing.T) { + // Build a context with the correct request params. + ctx := context.Background() + for p, v := range test.params { + ctx = route.WithParam(ctx, p, v) + } - req, err := http.NewRequest("ANY", fmt.Sprintf("http://example.com?%s", test.query.Encode()), nil) - if err != nil { - t.Fatal(err) - } - resp, _, apiErr := test.endpoint(req.WithContext(ctx)) - if apiErr != nil { - if test.errType == errorNone { - t.Fatalf("Unexpected error: %s", apiErr) + req, err := http.NewRequest("ANY", fmt.Sprintf("http://example.com?%s", test.query.Encode()), nil) + if err != nil { + t.Fatal(err) } - if test.errType != apiErr.typ { - t.Fatalf("Expected error of type %q but got type %q", test.errType, apiErr.typ) + resp, _, apiErr := test.endpoint(req.WithContext(ctx)) + if apiErr != nil { + if test.errType == errorNone { + t.Fatalf("Unexpected error: %s", apiErr) + } + if test.errType != apiErr.typ { + t.Fatalf("Expected error of type %q but got type %q", test.errType, apiErr.typ) + } + return } - continue - } - if apiErr == nil && test.errType != errorNone { - t.Fatalf("Expected error of type %q but got none", test.errType) - } - if !reflect.DeepEqual(resp, test.response) { - t.Fatalf("Response does not match, expected:\n%+v\ngot:\n%+v", test.response, resp) + if apiErr == nil && test.errType != errorNone { + t.Fatalf("Expected error of type %q but got none", test.errType) + } + + if !reflect.DeepEqual(resp, test.response) { + t.Fatalf("Response does not match, expected:\n%+v\ngot:\n%+v", test.response, resp) + } + }); !ok { + return } + } } diff --git a/pkg/query/querier.go b/pkg/query/querier.go index 6e962f64727..819ff3ac2a1 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -169,13 +169,13 @@ func aggrsFromFunc(f string) ([]storepb.Aggr, resAggr) { return []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM}, resAggrAvg } -func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, error) { +func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { span, ctx := tracing.StartSpan(q.ctx, "querier_select") defer span.Finish() sms, err := translateMatchers(ms...) if err != nil { - return nil, errors.Wrap(err, "convert matchers") + return nil, nil, errors.Wrap(err, "convert matchers") } queryAggrs, resAggr := aggrsFromFunc(params.Func) @@ -189,10 +189,12 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s Aggregates: queryAggrs, PartialResponseDisabled: !q.partialResponse, }, resp); err != nil { - return nil, errors.Wrap(err, "proxy Series()") + return nil, nil, errors.Wrap(err, "proxy Series()") } for _, w := range resp.warnings { + // NOTE(bwplotka): We could use warnings return arguments here, however need reporter anyway for LabelValues and LabelNames method, + // so we choose to be consistent and keep reporter. q.warningReporter(errors.New(w)) } @@ -203,7 +205,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s maxt: q.maxt, set: newStoreSeriesSet(resp.seriesSet), aggr: resAggr, - }, nil + }, nil, nil } // TODO(fabxc): this could potentially pushed further down into the store API @@ -220,7 +222,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s // The merged series set assembles all potentially-overlapping time ranges // of the same series into a single one. The series are ordered so that equal series // from different replicas are sequential. We can now deduplicate those. - return newDedupSeriesSet(set, q.replicaLabel), nil + return newDedupSeriesSet(set, q.replicaLabel), nil, nil } // sortDedupLabels resorts the set so that the same series with different replica @@ -245,6 +247,7 @@ func sortDedupLabels(set []storepb.Series, replicaLabel string) { }) } +// LabelValues returns all potential values for a label name. func (q *querier) LabelValues(name string) ([]string, error) { span, ctx := tracing.StartSpan(q.ctx, "querier_label_values") defer span.Finish() @@ -261,6 +264,12 @@ func (q *querier) LabelValues(name string) ([]string, error) { return resp.Values, nil } +// LabelNames returns all the unique label names present in the block in sorted order. +// TODO(bwplotka): Consider adding labelNames to thanos Query API https://github.com/improbable-eng/thanos/issues/702. +func (q *querier) LabelNames() ([]string, error) { + return nil, errors.New("not implemented") +} + func (q *querier) Close() error { q.cancel() return nil diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index 46b51a7f3ef..980d837213e 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -36,7 +36,7 @@ func TestQuerier_Series(t *testing.T) { q := newQuerier(context.Background(), nil, 1, 300, "", testProxy, false, 0, true, nil) defer func() { testutil.Ok(t, q.Close()) }() - res, err := q.Select(&storage.SelectParams{}) + res, _, err := q.Select(&storage.SelectParams{}) testutil.Ok(t, err) expected := []struct { diff --git a/pkg/query/test_print.go b/pkg/query/test_print.go new file mode 100644 index 00000000000..70bc292439b --- /dev/null +++ b/pkg/query/test_print.go @@ -0,0 +1,34 @@ +package query + +import ( + "fmt" + + "github.com/prometheus/prometheus/storage" +) + +type printSeriesSet struct { + set storage.SeriesSet +} + +func newPrintSeriesSet(set storage.SeriesSet) storage.SeriesSet { + return &printSeriesSet{set: set} +} + +func (s *printSeriesSet) Next() bool { + return s.set.Next() +} + +func (s *printSeriesSet) At() storage.Series { + at := s.set.At() + fmt.Println("Series", at.Labels()) + + i := at.Iterator() + for i.Next() { + fmt.Println(i.At()) + } + return at +} + +func (s *printSeriesSet) Err() error { + return s.set.Err() +} diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go index 5c1df9a9b7e..542799ce5e7 100644 --- a/pkg/shipper/shipper.go +++ b/pkg/shipper/shipper.go @@ -11,6 +11,8 @@ import ( "path" "path/filepath" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/block" @@ -69,7 +71,7 @@ type Shipper struct { metrics *metrics bucket objstore.Bucket labels func() labels.Labels - source block.SourceType + source blockmeta.SourceType } // New creates a new shipper that detects new TSDB blocks in dir and uploads them @@ -80,7 +82,7 @@ func New( dir string, bucket objstore.Bucket, lbls func() labels.Labels, - source block.SourceType, + source blockmeta.SourceType, ) *Shipper { if logger == nil { logger = log.NewNopLogger() @@ -114,7 +116,7 @@ func (s *Shipper) Timestamps() (minTime, maxSyncTime int64, err error) { minTime = math.MaxInt64 maxSyncTime = math.MinInt64 - if err := s.iterBlockMetas(func(m *block.Meta) error { + if err := s.iterBlockMetas(func(m *blockmeta.Meta) error { if m.MinTime < minTime { minTime = m.MinTime } @@ -158,7 +160,7 @@ func (s *Shipper) Sync(ctx context.Context) { // TODO(bplotka): If there are no blocks in the system check for WAL dir to ensure we have actually // access to real TSDB dir (!). - if err = s.iterBlockMetas(func(m *block.Meta) error { + if err = s.iterBlockMetas(func(m *blockmeta.Meta) error { // Do not sync a block if we already uploaded it. If it is no longer found in the bucket, // it was generally removed by the compaction process. if _, ok := hasUploaded[m.ULID]; !ok { @@ -180,7 +182,7 @@ func (s *Shipper) Sync(ctx context.Context) { } } -func (s *Shipper) sync(ctx context.Context, meta *block.Meta) (err error) { +func (s *Shipper) sync(ctx context.Context, meta *blockmeta.Meta) (err error) { dir := filepath.Join(s.dir, meta.ULID.String()) // We only ship of the first compacted block level. @@ -225,7 +227,7 @@ func (s *Shipper) sync(ctx context.Context, meta *block.Meta) (err error) { meta.Thanos.Labels = lset.Map() } meta.Thanos.Source = s.source - if err := block.WriteMetaFile(s.logger, updir, meta); err != nil { + if err := blockmeta.Write(s.logger, updir, meta); err != nil { return errors.Wrap(err, "write meta file") } return block.Upload(ctx, s.logger, s.bucket, updir) @@ -234,7 +236,7 @@ func (s *Shipper) sync(ctx context.Context, meta *block.Meta) (err error) { // iterBlockMetas calls f with the block meta for each block found in dir. It logs // an error and continues if it cannot access a meta.json file. // If f returns an error, the function returns with the same error. -func (s *Shipper) iterBlockMetas(f func(m *block.Meta) error) error { +func (s *Shipper) iterBlockMetas(f func(m *blockmeta.Meta) error) error { names, err := fileutil.ReadDir(s.dir) if err != nil { return errors.Wrap(err, "read dir") @@ -253,7 +255,7 @@ func (s *Shipper) iterBlockMetas(f func(m *block.Meta) error) error { if !fi.IsDir() { continue } - m, err := block.ReadMetaFile(dir) + m, err := blockmeta.Read(dir) if err != nil { level.Warn(s.logger).Log("msg", "reading meta file failed", "err", err) continue diff --git a/pkg/shipper/shipper_e2e_test.go b/pkg/shipper/shipper_e2e_test.go index 8f0e70ecd65..f23930bec71 100644 --- a/pkg/shipper/shipper_e2e_test.go +++ b/pkg/shipper/shipper_e2e_test.go @@ -14,6 +14,7 @@ import ( "github.com/go-kit/kit/log" "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" "github.com/improbable-eng/thanos/pkg/objstore" "github.com/improbable-eng/thanos/pkg/objstore/objtesting" "github.com/improbable-eng/thanos/pkg/testutil" @@ -32,7 +33,7 @@ func TestShipper_UploadBlocks_e2e(t *testing.T) { }() extLset := labels.FromStrings("prometheus", "prom-1") - shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, block.TestSource) + shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, blockmeta.TestSource) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -54,7 +55,7 @@ func TestShipper_UploadBlocks_e2e(t *testing.T) { testutil.Ok(t, os.Mkdir(tmp, 0777)) - meta := block.Meta{ + meta := blockmeta.Meta{ BlockMeta: tsdb.BlockMeta{ MinTime: timestamp.FromTime(now.Add(time.Duration(i) * time.Hour)), MaxTime: timestamp.FromTime(now.Add((time.Duration(i) * time.Hour) + 1)), @@ -62,7 +63,7 @@ func TestShipper_UploadBlocks_e2e(t *testing.T) { } meta.Version = 1 meta.ULID = id - meta.Thanos.Source = block.TestSource + meta.Thanos.Source = blockmeta.TestSource metab, err := json.Marshal(&meta) testutil.Ok(t, err) diff --git a/pkg/shipper/shipper_test.go b/pkg/shipper/shipper_test.go index 368c5e92b99..5adbb32734d 100644 --- a/pkg/shipper/shipper_test.go +++ b/pkg/shipper/shipper_test.go @@ -2,15 +2,13 @@ package shipper import ( "io/ioutil" - "os" - "testing" - "math" - + "os" "path" + "testing" "github.com/go-kit/kit/log" - "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" "github.com/improbable-eng/thanos/pkg/testutil" "github.com/oklog/ulid" "github.com/prometheus/tsdb" @@ -23,7 +21,7 @@ func TestShipperTimestamps(t *testing.T) { testutil.Ok(t, os.RemoveAll(dir)) }() - s := New(nil, nil, dir, nil, nil, block.TestSource) + s := New(nil, nil, dir, nil, nil, blockmeta.TestSource) // Missing thanos meta file. _, _, err = s.Timestamps() @@ -41,7 +39,7 @@ func TestShipperTimestamps(t *testing.T) { id1 := ulid.MustNew(1, nil) testutil.Ok(t, os.Mkdir(path.Join(dir, id1.String()), os.ModePerm)) - testutil.Ok(t, block.WriteMetaFile(log.NewNopLogger(), path.Join(dir, id1.String()), &block.Meta{ + testutil.Ok(t, blockmeta.Write(log.NewNopLogger(), path.Join(dir, id1.String()), &blockmeta.Meta{ Version: 1, BlockMeta: tsdb.BlockMeta{ ULID: id1, @@ -56,7 +54,7 @@ func TestShipperTimestamps(t *testing.T) { id2 := ulid.MustNew(2, nil) testutil.Ok(t, os.Mkdir(path.Join(dir, id2.String()), os.ModePerm)) - testutil.Ok(t, block.WriteMetaFile(log.NewNopLogger(), path.Join(dir, id2.String()), &block.Meta{ + testutil.Ok(t, blockmeta.Write(log.NewNopLogger(), path.Join(dir, id2.String()), &blockmeta.Meta{ Version: 1, BlockMeta: tsdb.BlockMeta{ ULID: id2, diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index e121e9d4081..a7de1cf9766 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -16,6 +16,8 @@ import ( "sync" "time" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/block" @@ -459,6 +461,7 @@ func (s *bucketSeriesSet) Err() error { return s.err } +// blockSeries return requested series from given index and chunk readers. func (s *BucketStore) blockSeries( ctx context.Context, ulid ulid.ULID, @@ -488,7 +491,6 @@ func (s *BucketStore) blockSeries( } // Get result postings list by resolving the postings tree. - // TODO(bwplotka): Users are seeing panics here, because of lazyPosting being not loaded by preloadPostings. ps, err := index.ExpandPostings(lazyPostings) if err != nil { return nil, stats, errors.Wrap(err, "expand postings") @@ -504,7 +506,8 @@ func (s *BucketStore) blockSeries( } } - // Preload all series index data + // Preload all series index data. + // TODO(bwplotka): Consider not keeping all series in memory all the time. if err := indexr.preloadSeries(ps); err != nil { return nil, stats, errors.Wrap(err, "preload series") } @@ -1001,7 +1004,7 @@ func (s *bucketBlockSet) labelMatchers(matchers ...labels.Matcher) ([]labels.Mat type bucketBlock struct { logger log.Logger bucket objstore.BucketReader - meta *block.Meta + meta *blockmeta.Meta dir string indexCache *indexCache chunkPool *pool.BytesPool @@ -1065,7 +1068,7 @@ func (b *bucketBlock) loadMeta(ctx context.Context, id ulid.ULID) error { } else if err != nil { return err } - meta, err := block.ReadMetaFile(b.dir) + meta, err := blockmeta.Read(b.dir) if err != nil { return errors.Wrap(err, "read meta.json") } @@ -1095,19 +1098,15 @@ func (b *bucketBlock) loadIndexCache(ctx context.Context) (err error) { } }() - indexr, err := index.NewFileReader(fn) - if err != nil { - return errors.Wrap(err, "open index reader") - } - defer runutil.CloseWithLogOnErr(b.logger, indexr, "load index cache reader") + // Create index cache adhoc. - if err := block.WriteIndexCache(b.logger, cachefn, indexr); err != nil { + if err := block.WriteIndexCache(b.logger, fn, cachefn); err != nil { return errors.Wrap(err, "write index cache") } b.indexVersion, b.symbols, b.lvals, b.postings, err = block.ReadIndexCache(b.logger, cachefn) if err != nil { - return errors.Wrap(err, "read index cache") + return errors.Wrap(err, "read fresh index cache") } return nil } @@ -1179,15 +1178,22 @@ func newBucketIndexReader(ctx context.Context, logger log.Logger, block *bucketB logger: logger, ctx: ctx, block: block, - dec: &index.Decoder{}, stats: &queryStats{}, cache: cache, loadedSeries: map[uint64][]byte{}, } - r.dec.SetSymbolTable(r.block.symbols) + r.dec = &index.Decoder{LookupSymbol: r.lookupSymbol} return r } +func (r *bucketIndexReader) lookupSymbol(o uint32) (string, error) { + s, ok := r.block.symbols[o] + if !ok { + return "", errors.Errorf("bucketIndexReader: unknown symbol offset %d", o) + } + return s, nil +} + func (r *bucketIndexReader) preloadPostings() error { const maxGapSize = 512 * 1024 @@ -1270,23 +1276,24 @@ func (r *bucketIndexReader) loadPostings(ctx context.Context, postings []*lazyPo return nil } -func (r *bucketIndexReader) preloadSeries(ids []uint64) error { +func (r *bucketIndexReader) preloadSeries(refs []uint64) error { const maxSeriesSize = 64 * 1024 const maxGapSize = 512 * 1024 - var newIDs []uint64 + var newRefs []uint64 - for _, id := range ids { - if b, ok := r.cache.series(r.block.meta.ULID, id); ok { - r.loadedSeries[id] = b + for _, ref := range refs { + if b, ok := r.cache.series(r.block.meta.ULID, ref); ok { + r.loadedSeries[ref] = b continue } - newIDs = append(newIDs, id) + newRefs = append(newRefs, ref) } - ids = newIDs + refs = newRefs - parts := partitionRanges(len(ids), func(i int) (start, end uint64) { - return ids[i], ids[i] + maxSeriesSize + // Combine multiple close byte ranges to not be rate-limited from object storage. + parts := partitionRanges(len(refs), func(i int) (start, end uint64) { + return refs[i], refs[i] + maxSeriesSize }, maxGapSize) var g run.Group @@ -1295,7 +1302,7 @@ func (r *bucketIndexReader) preloadSeries(ids []uint64) error { i, j := p[0], p[1] g.Add(func() error { - return r.loadSeries(ctx, ids[i:j], ids[i], ids[j-1]+maxSeriesSize) + return r.loadSeries(ctx, refs[i:j], refs[i], refs[j-1]+maxSeriesSize) }, func(err error) { if err != nil { cancel() @@ -1305,7 +1312,7 @@ func (r *bucketIndexReader) preloadSeries(ids []uint64) error { return g.Run() } -func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, start, end uint64) error { +func (r *bucketIndexReader) loadSeries(ctx context.Context, refs []uint64, start, end uint64) error { begin := time.Now() b, err := r.block.readIndexRange(ctx, int64(start), int64(end-start)) @@ -1317,12 +1324,12 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, start, defer r.mtx.Unlock() r.stats.seriesFetchCount++ - r.stats.seriesFetched += len(ids) + r.stats.seriesFetched += len(refs) r.stats.seriesFetchDurationSum += time.Since(begin) r.stats.seriesFetchedSizeSum += int(end - start) - for _, id := range ids { - c := b[id-start:] + for _, ref := range refs { + c := b[ref-start:] l, n := binary.Uvarint(c) if n < 1 { @@ -1332,8 +1339,8 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, start, return errors.Errorf("invalid remaining size %d, expected %d", len(c), n+int(l)) } c = c[n : n+int(l)] - r.loadedSeries[id] = c - r.cache.setSeries(r.block.meta.ULID, id, c) + r.loadedSeries[ref] = c + r.cache.setSeries(r.block.meta.ULID, ref, c) } return nil } @@ -1421,6 +1428,7 @@ func (r *bucketIndexReader) SortedPostings(p index.Postings) index.Postings { // Series populates the given labels and chunk metas for the series identified // by the reference. // Returns ErrNotFound if the ref does not resolve to a known series. +// prealoadSeries needs to be invoked first to have this method return loaded results. func (r *bucketIndexReader) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error { b, ok := r.loadedSeries[ref] if !ok { @@ -1438,6 +1446,11 @@ func (r *bucketIndexReader) LabelIndices() ([][]string, error) { return nil, errors.New("not implemented") } +// LabelNames returns all the unique label names present in the index in sorted order. +func (r *bucketIndexReader) LabelNames() ([]string, error) { + return nil, errors.New("not implemented") +} + // Close released the underlying resources of the reader. func (r *bucketIndexReader) Close() error { r.block.pendingReaders.Done() diff --git a/pkg/store/bucket_profile_test.go b/pkg/store/bucket_profile_test.go new file mode 100644 index 00000000000..942231418f8 --- /dev/null +++ b/pkg/store/bucket_profile_test.go @@ -0,0 +1,408 @@ +package store + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "runtime" + "runtime/pprof" + "sync" + "testing" + "time" + + "github.com/oklog/ulid" + + "github.com/improbable-eng/thanos/pkg/objstore/inmem" + + "github.com/go-kit/kit/log" + "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/runutil" + "github.com/improbable-eng/thanos/pkg/store/storepb" + "github.com/improbable-eng/thanos/pkg/testutil" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/prometheus/tsdb/labels" +) + +func saveHeap(t *testing.T, name string) { + time.Sleep(500 * time.Millisecond) + runtime.GC() + f, err := os.OpenFile("heap-"+name, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm) + testutil.Ok(t, err) + + defer f.Close() + testutil.Ok(t, pprof.WriteHeapProfile(f)) +} + +func TestBucketStore_PROFILE(t *testing.T) { + bkt := inmem.NewBucket() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dir, err := ioutil.TempDir("", "test_bucketstore_e2e") + testutil.Ok(t, err) + //defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() + + series := []labels.Labels{ + labels.FromStrings("a", "1", "b", "1"), + labels.FromStrings("a", "1", "b", "2"), + labels.FromStrings("a", "2", "b", "1"), + labels.FromStrings("a", "2", "b", "2"), + labels.FromStrings("a", "1", "c", "1"), + labels.FromStrings("a", "1", "c", "2"), + labels.FromStrings("a", "2", "c", "1"), + labels.FromStrings("a", "2", "c", "2"), + } + extLset := labels.FromStrings("ext1", "value1") + + start := time.Now() + now := start + + var ids []ulid.ULID + for i := 0; i < 3; i++ { + mint := timestamp.FromTime(now) + now = now.Add(2 * time.Hour) + maxt := timestamp.FromTime(now) + + // Create two blocks per time slot. Only add 10 samples each so only one chunk + // gets created each. This way we can easily verify we got 10 chunks per series below. + id1, err := testutil.CreateBlock(dir, series[:4], 10, mint, maxt, extLset, 0) + testutil.Ok(t, err) + id2, err := testutil.CreateBlock(dir, series[4:], 10, mint, maxt, extLset, 0) + testutil.Ok(t, err) + + ids = append(ids, id1, id2) + dir1, dir2 := filepath.Join(dir, id1.String()), filepath.Join(dir, id2.String()) + + // Add labels to the meta of the second block. + meta, err := block.ReadMetaFile(dir2) + testutil.Ok(t, err) + meta.Thanos.Labels = map[string]string{"ext2": "value2"} + testutil.Ok(t, block.WriteMetaFile(log.NewNopLogger(), dir2, meta)) + + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, dir1)) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, dir2)) + + testutil.Ok(t, os.RemoveAll(dir1)) + testutil.Ok(t, os.RemoveAll(dir2)) + } + + store, err := NewBucketStore(nil, nil, bkt, dir, 100, 0, false) + testutil.Ok(t, err) + + ctx, _ = context.WithTimeout(ctx, 30*time.Second) + + if err := runutil.Retry(100*time.Millisecond, ctx.Done(), func() error { + if err := store.SyncBlocks(ctx); err != nil { + return err + } + if store.numBlocks() < 6 { + return errors.New("not all blocks loaded") + } + return nil + }); err != nil && errors.Cause(err) != context.Canceled { + t.Error(err) + t.FailNow() + } + testutil.Ok(t, err) + + pbseries := [][]storepb.Label{ + {{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}}, + {{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}}, + {{Name: "a", Value: "2"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "2"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "2"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}}, + {{Name: "a", Value: "2"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}}, + } + + srv := newStoreSeriesServer(ctx) + + err = store.Series(&storepb.SeriesRequest{ + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_RE, Name: "a", Value: "1|2"}, + }, + MinTime: timestamp.FromTime(start), + MaxTime: timestamp.FromTime(now), + }, srv) + testutil.Ok(t, err) + testutil.Equals(t, len(pbseries), len(srv.SeriesSet)) + + g := sync.WaitGroup{} + + // NO REPRO + go func() { + g.Add(1) + time.Sleep(10 * time.Millisecond) + // Simulate deleted blocks without sync (compaction!) + testutil.Ok(t, block.Delete(ctx, bkt, ids[2])) + time.Sleep(10 * time.Millisecond) + store.SyncBlocks(ctx) + store.SyncBlocks(ctx) + + g.Done() + }() + + for i := 0; i < 1000; i++ { + go func() { + g.Add(1) + srv := newStoreSeriesServer(ctx) + + err = store.Series(&storepb.SeriesRequest{ + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_RE, Name: "a", Value: "1|2"}, + }, + MinTime: timestamp.FromTime(start), + MaxTime: timestamp.FromTime(now), + }, srv) + fmt.Println(err) + //testutil.Ok(t, err) + //testutil.Equals(t, len(pbseries), len(srv.SeriesSet)) + + g.Done() + }() + } + time.Sleep(10 * time.Millisecond) + for i := 0; i < 1000; i++ { + go func() { + g.Add(1) + srv := newStoreSeriesServer(ctx) + + err = store.Series(&storepb.SeriesRequest{ + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_RE, Name: "a", Value: "1|2"}, + }, + MinTime: timestamp.FromTime(start), + MaxTime: timestamp.FromTime(now), + }, srv) + fmt.Println(err) + //testutil.Ok(t, err) + //testutil.Equals(t, len(pbseries), len(srv.SeriesSet)) + + g.Done() + }() + } + + g.Wait() + + //for i, s := range srv.SeriesSet { + // testutil.Equals(t, pbseries[i], s.Labels) + // testutil.Equals(t, 3, len(s.Chunks)) + //} + + saveHeap(t, "2") +} + +/* +================== +WARNING: DATA RACE +Read at 0x00c4201c22f8 by goroutine 75: + github.com/improbable-eng/thanos/pkg/pool.(*BytesPool).Put() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/pool/pool.go:83 +0x14c + github.com/improbable-eng/thanos/pkg/store.(*bucketChunkReader).Close() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1570 +0x115 + github.com/improbable-eng/thanos/pkg/runutil.CloseWithLogOnErr() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/runutil/runutil.go:60 +0x59 + github.com/improbable-eng/thanos/pkg/store.(*BucketStore).Series() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:811 +0x2d7e + github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE.func2() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:140 +0x4e6 + +Previous write at 0x00c4201c22f8 by goroutine 25: + sync/atomic.AddInt64() + /usr/local/go/src/runtime/race_amd64.s:276 +0xb + github.com/improbable-eng/thanos/pkg/pool.(*BytesPool).Get() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/pool/pool.go:65 +0x1ad + github.com/improbable-eng/thanos/pkg/store.(*bucketBlock).readChunkRange() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1130 +0x95 + github.com/improbable-eng/thanos/pkg/store.(*bucketChunkReader).loadChunks() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1499 +0xe7 + github.com/improbable-eng/thanos/pkg/store.(*bucketChunkReader).preload.func3() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1485 +0x23a + github.com/improbable-eng/thanos/vendor/github.com/oklog/run.(*Group).Run.func1() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/vendor/github.com/oklog/run/group.go:38 +0x34 + +Goroutine 75 (running) created at: + github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:136 +0x238e + testing.tRunner() + /usr/local/go/src/testing/testing.go:777 +0x16d + +Goroutine 25 (finished) created at: + github.com/improbable-eng/thanos/vendor/github.com/oklog/run.(*Group).Run() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/vendor/github.com/oklog/run/group.go:37 +0x10b + github.com/improbable-eng/thanos/pkg/store.(*bucketChunkReader).preload() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1493 +0x6f0 + github.com/improbable-eng/thanos/pkg/store.(*BucketStore).blockSeries() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:572 +0x1109 + github.com/improbable-eng/thanos/pkg/store.(*BucketStore).Series.func1() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:721 +0x1e7 + github.com/improbable-eng/thanos/vendor/github.com/oklog/run.(*Group).Run.func1() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/vendor/github.com/oklog/run/group.go:38 +0x34 +================== + +================== +WARNING: DATA RACE +Write at 0x00c42029c2fc by goroutine 10: + internal/race.Write() + /usr/local/go/src/internal/race/race.go:41 +0x38 + sync.(*WaitGroup).Wait() + /usr/local/go/src/sync/waitgroup.go:127 +0xf3 + github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:164 +0x2442 + testing.tRunner() + /usr/local/go/src/testing/testing.go:777 +0x16d + +Previous read at 0x00c42029c2fc by goroutine 74: + internal/race.Read() + /usr/local/go/src/internal/race/race.go:37 +0x38 + sync.(*WaitGroup).Add() + /usr/local/go/src/sync/waitgroup.go:70 +0x16e + github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE.func2() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:137 +0x5c + +Goroutine 10 (running) created at: + testing.(*T).Run() + /usr/local/go/src/testing/testing.go:824 +0x564 + testing.runTests.func1() + /usr/local/go/src/testing/testing.go:1063 +0xa4 + testing.tRunner() + /usr/local/go/src/testing/testing.go:777 +0x16d + testing.runTests() + /usr/local/go/src/testing/testing.go:1061 +0x4e1 + testing.(*M).Run() + /usr/local/go/src/testing/testing.go:978 +0x2cd + main.main() + _testmain.go:70 +0x22a + +Goroutine 74 (running) created at: + github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:136 +0x238e + testing.tRunner() + /usr/local/go/src/testing/testing.go:777 +0x16d +================== +================== +WARNING: DATA RACE +Write at 0x00c4202647b0 by goroutine 230: + runtime.mapdelete_faststr() + /usr/local/go/src/runtime/hashmap_fast.go:883 +0x0 + github.com/improbable-eng/thanos/pkg/objstore/inmem.(*Bucket).Delete() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/objstore/inmem/inmem.go:138 +0x69 + github.com/improbable-eng/thanos/pkg/objstore.DeleteDir.func1() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/objstore/objstore.go:102 +0x113 + github.com/improbable-eng/thanos/pkg/objstore/inmem.(*Bucket).Iter() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/objstore/inmem/inmem.go:76 +0x616 + github.com/improbable-eng/thanos/pkg/objstore.DeleteDir() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/objstore/objstore.go:97 +0x10c + github.com/improbable-eng/thanos/pkg/block.Delete() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/block/block.go:215 +0x7f + github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE.func3() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:157 +0xae + +Previous read at 0x00c4202647b0 by goroutine 85: + runtime.mapaccess2_faststr() + /usr/local/go/src/runtime/hashmap_fast.go:261 +0x0 + github.com/improbable-eng/thanos/pkg/objstore/inmem.(*Bucket).GetRange() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/objstore/inmem/inmem.go:103 +0x9b + github.com/improbable-eng/thanos/pkg/store.(*bucketBlock).readChunkRange() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1136 +0x255 + github.com/improbable-eng/thanos/pkg/store.(*bucketChunkReader).loadChunks() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1499 +0xe7 + github.com/improbable-eng/thanos/pkg/store.(*bucketChunkReader).preload.func3() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1485 +0x23a + github.com/improbable-eng/thanos/vendor/github.com/oklog/run.(*Group).Run.func1() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/vendor/github.com/oklog/run/group.go:38 +0x34 + +Goroutine 230 (running) created at: + github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:154 +0x2431 + testing.tRunner() + /usr/local/go/src/testing/testing.go:777 +0x16d + +Goroutine 85 (finished) created at: + github.com/improbable-eng/thanos/vendor/github.com/oklog/run.(*Group).Run() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/vendor/github.com/oklog/run/group.go:37 +0x10b + github.com/improbable-eng/thanos/pkg/store.(*bucketChunkReader).preload() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1493 +0x6f0 + github.com/improbable-eng/thanos/pkg/store.(*BucketStore).blockSeries() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:572 +0x1109 + github.com/improbable-eng/thanos/pkg/store.(*BucketStore).Series.func1() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:721 +0x1e7 + github.com/improbable-eng/thanos/vendor/github.com/oklog/run.(*Group).Run.func1() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/vendor/github.com/oklog/run/group.go:38 +0x34 +================== +================== +WARNING: DATA RACE +Read at 0x00c4200d4978 by goroutine 76: + github.com/improbable-eng/thanos/pkg/pool.(*BytesPool).Put() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/pool/pool.go:83 +0x14c + github.com/improbable-eng/thanos/pkg/store.(*bucketChunkReader).Close() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1570 +0x115 + github.com/improbable-eng/thanos/pkg/runutil.CloseWithLogOnErr() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/runutil/runutil.go:60 +0x59 + github.com/improbable-eng/thanos/pkg/store.(*BucketStore).Series() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:811 +0x2d7e + github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE.func2() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:140 +0x4e6 + +Previous write at 0x00c4200d4978 by goroutine 365: + sync/atomic.AddInt64() + /usr/local/go/src/runtime/race_amd64.s:276 +0xb + github.com/improbable-eng/thanos/pkg/pool.(*BytesPool).Get() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/pool/pool.go:65 +0x1ad + github.com/improbable-eng/thanos/pkg/store.(*bucketBlock).readChunkRange() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1130 +0x95 + github.com/improbable-eng/thanos/pkg/store.(*bucketChunkReader).loadChunks() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1499 +0xe7 + github.com/improbable-eng/thanos/pkg/store.(*bucketChunkReader).preload.func3() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1485 +0x23a + github.com/improbable-eng/thanos/vendor/github.com/oklog/run.(*Group).Run.func1() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/vendor/github.com/oklog/run/group.go:38 +0x34 + +Goroutine 76 (running) created at: + github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:136 +0x238e + testing.tRunner() + /usr/local/go/src/testing/testing.go:777 +0x16d + +Goroutine 365 (finished) created at: + github.com/improbable-eng/thanos/vendor/github.com/oklog/run.(*Group).Run() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/vendor/github.com/oklog/run/group.go:37 +0x10b + github.com/improbable-eng/thanos/pkg/store.(*bucketChunkReader).preload() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1493 +0x6f0 + github.com/improbable-eng/thanos/pkg/store.(*BucketStore).blockSeries() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:572 +0x1109 + github.com/improbable-eng/thanos/pkg/store.(*BucketStore).Series.func1() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:721 +0x1e7 + github.com/improbable-eng/thanos/vendor/github.com/oklog/run.(*Group).Run.func1() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/vendor/github.com/oklog/run/group.go:38 +0x34 +================== + +================== +WARNING: DATA RACE +Write at 0x00c4200837d0 by goroutine 77: + github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE.func2() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:140 +0x50b + +Previous write at 0x00c4200837d0 by goroutine 76: + github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE.func2() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:140 +0x50b + +Goroutine 77 (running) created at: + github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:136 +0x238e + testing.tRunner() + /usr/local/go/src/testing/testing.go:777 +0x16d + +Goroutine 76 (finished) created at: + github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE() + /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:136 +0x238e + testing.tRunner() + /usr/local/go/src/testing/testing.go:777 +0x16d +================== + +*/ diff --git a/pkg/testutil/prometheus.go b/pkg/testutil/prometheus.go index 807b61f8a05..6c5729531b8 100644 --- a/pkg/testutil/prometheus.go +++ b/pkg/testutil/prometheus.go @@ -12,8 +12,9 @@ import ( "syscall" "time" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" + "github.com/go-kit/kit/log" - "github.com/improbable-eng/thanos/pkg/block" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -188,7 +189,7 @@ func CreateBlock( extLset labels.Labels, resolution int64, ) (id ulid.ULID, err error) { - h, err := tsdb.NewHead(nil, nil, tsdb.NopWAL(), 10000000000) + h, err := tsdb.NewHead(nil, nil, nil, 10000000000) if err != nil { return id, errors.Wrap(err, "create head block") } @@ -238,15 +239,15 @@ func CreateBlock( return id, errors.Wrap(err, "create compactor") } - id, err = c.Write(dir, h, mint, maxt) + id, err = c.Write(dir, h, mint, maxt, nil) if err != nil { return id, errors.Wrap(err, "write block") } - if _, err = block.InjectThanosMeta(log.NewNopLogger(), filepath.Join(dir, id.String()), block.ThanosMeta{ + if _, err = blockmeta.InjectThanos(log.NewNopLogger(), filepath.Join(dir, id.String()), blockmeta.Thanos{ Labels: extLset.Map(), - Downsample: block.ThanosDownsampleMeta{Resolution: resolution}, - Source: block.TestSource, + Downsample: blockmeta.ThanosDownsample{Resolution: resolution}, + Source: blockmeta.TestSource, }, nil); err != nil { return id, errors.Wrap(err, "finalize block") } diff --git a/pkg/verifier/index_issue.go b/pkg/verifier/index_issue.go index 54a20703d45..3e96ccb2062 100644 --- a/pkg/verifier/index_issue.go +++ b/pkg/verifier/index_issue.go @@ -8,6 +8,8 @@ import ( "path" "path/filepath" + "github.com/improbable-eng/thanos/pkg/block/blockmeta" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/block" @@ -94,7 +96,7 @@ func IndexIssue(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bac logger, tmpdir, id, - block.BucketRepairSource, + blockmeta.BucketRepairSource, block.IgnoreCompleteOutsideChunk, block.IgnoreDuplicateOutsideChunk, block.IgnoreIssue347OutsideChunk,