diff --git a/Gopkg.lock b/Gopkg.lock index 94d9dfb1be..b01347f47f 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -199,12 +199,12 @@ revision = "2e65f85255dbc3072edf28d6b5b8efc472979f5a" [[projects]] - branch = "master" - digest = "1:9abc49f39e3e23e262594bb4fb70abf74c0c99e94f99153f43b143805e850719" + digest = "1:cea4aa2038169ee558bf507d5ea02c94ca85bcca28a4c7bb99fd59b31e43a686" name = "github.com/google/go-querystring" packages = ["query"] pruneopts = "" - revision = "53e6ce116135b80d037921a7fdd5138cf32d7a8a" + revision = "44c6ddd0a2342c386950e880b658017258da92fc" + version = "v1.0.0" [[projects]] digest = "1:e097a364f4e8d8d91b9b9eeafb992d3796a41fde3eb548c1a87eb9d9f60725cf" @@ -335,12 +335,12 @@ version = "v0.3.0" [[projects]] - digest = "1:82b912465c1da0668582a7d1117339c278e786c2536b3c3623029a0c7141c2d0" + digest = "1:84c28d9899cc4e00c38042d345cea8819275a5a62403a58530cac67022894776" name = "github.com/mattn/go-runewidth" packages = ["."] pruneopts = "" - revision = "ce7b0b5c7b45a81508558cd1dba6bb1e4ddb51bb" - version = "v0.0.3" + revision = "3ee7d812e62a0804a7d0a324e0249ca2db3476d3" + version = "v0.0.4" [[projects]] digest = "1:49a8b01a6cd6558d504b65608214ca40a78000e1b343ed0da5c6a9ccd83d6d30" @@ -390,20 +390,12 @@ version = "v0.11.0" [[projects]] - digest = "1:912349f5cf927bf96dca709623631ace7db723f07c70c4d56cfc22d9a667ed16" + digest = "1:b09858acd58e0873236c7b96903e3ec4e238d5de644c08bd8e712fa2d3d51ad2" name = "github.com/mozillazg/go-httpheader" packages = ["."] pruneopts = "" - revision = "4e5d6424981844faafc4b0649036b2e0395bdf99" - version = "v0.2.0" - -[[projects]] - branch = "master" - digest = "1:3adc46876d4d0e4d5bbcfcc44c2116b95d7a5c966e2ee92a219488547fd453f2" - name = "github.com/nightlyone/lockfile" - packages = ["."] - pruneopts = "" - revision = "0ad87eef1443f64d3d8c50da647e2b1552851124" + revision = "61f2392c3317b60616c9dcb10d0a4cfef131fe62" + version = "v0.2.1" [[projects]] digest = "1:94e9081cc450d2cdf4e6886fc2c06c07272f86477df2d74ee5931951fa3d2577" @@ -508,11 +500,12 @@ revision = "05ee40e3a273f7245e8777337fc7b46e533a9a92" [[projects]] - digest = "1:b5ff9852eabe841003da4b0a4b742a2878c722dda6481003432344f633a814fc" + digest = "1:43ad0a170d6f826f8dd63244960384eb205e75423a444f7afbf145425f287227" name = "github.com/prometheus/prometheus" packages = [ "discovery/file", "discovery/targetgroup", + "pkg/gate", "pkg/labels", "pkg/rulefmt", "pkg/textparse", @@ -528,11 +521,10 @@ "util/testutil", ] pruneopts = "" - revision = "71af5e29e815795e9dd14742ee7725682fa14b7b" - version = "v2.3.2" + revision = "3bd41cc92c7800cc6072171bd4237406126fa169" [[projects]] - digest = "1:216dcf26fbfb3f36f286ca3306882a157c51648e4b5d4f3a9e9c719faea6ea58" + digest = "1:00780e2d7a870f4de3da0d854cc419170c81cf82e6f2e802a3543fcf54c1867d" name = "github.com/prometheus/tsdb" packages = [ ".", @@ -541,9 +533,11 @@ "fileutil", "index", "labels", + "wal", ] pruneopts = "" - revision = "bd832fc8274e8fe63999ac749daaaff9d881241f" + revision = "10ba228e6baa4811818e04b1ab9b48110bb43d7b" + version = "v0.4.0" [[projects]] branch = "master" diff --git a/Gopkg.toml b/Gopkg.toml index 98bbc9494d..057fa27ac4 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -37,7 +37,8 @@ ignored = ["github.com/improbable-eng/thanos/benchmark/*"] name = "github.com/prometheus/common" [[constraint]] - version = "v2.3.2" + # TODO(bwplotka): Move to released version once our recent fixes will be released (v2.7.0) + revision = "3bd41cc92c7800cc6072171bd4237406126fa169" name = "github.com/prometheus/prometheus" [[override]] @@ -46,7 +47,7 @@ ignored = ["github.com/improbable-eng/thanos/benchmark/*"] [[constraint]] name = "github.com/prometheus/tsdb" - revision = "bd832fc8274e8fe63999ac749daaaff9d881241f" + version = "v0.4.0" [[constraint]] branch = "master" diff --git a/Makefile b/Makefile index 9bd2cd077a..3c8b8341a6 100644 --- a/Makefile +++ b/Makefile @@ -34,7 +34,7 @@ PROMU_VERSION ?= 264dc36af9ea3103255063497636bd5713e3e9c1 # E2e test deps. # Referenced by github.com/improbable-eng/thanos/blob/master/docs/getting_started.md#prometheus -SUPPORTED_PROM_VERSIONS ?=v2.0.0 v2.2.1 v2.3.2 v2.4.3 v2.5.0 +SUPPORTED_PROM_VERSIONS ?=v2.2.1 v2.3.2 v2.4.3 v2.5.0 ALERTMANAGER_VERSION ?=v0.15.2 MINIO_SERVER_VERSION ?=RELEASE.2018-10-06T00-15-16Z diff --git a/cmd/thanos/bucket.go b/cmd/thanos/bucket.go index 01b9b54bf6..ff68feee2a 100644 --- a/cmd/thanos/bucket.go +++ b/cmd/thanos/bucket.go @@ -10,10 +10,9 @@ import ( "text/template" "time" - "github.com/prometheus/tsdb/labels" - "github.com/go-kit/kit/log" "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/improbable-eng/thanos/pkg/objstore/client" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/improbable-eng/thanos/pkg/verifier" @@ -23,6 +22,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/tsdb/labels" "golang.org/x/text/language" "golang.org/x/text/message" "gopkg.in/alecthomas/kingpin.v2" @@ -254,7 +254,7 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin defer cancel() // Getting Metas. - var blockMetas []*block.Meta + var blockMetas []*metadata.Meta if err = bkt.Iter(ctx, "", func(name string) error { id, ok := block.IsBlockDir(name) if !ok { @@ -277,7 +277,7 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin } } -func printTable(blockMetas []*block.Meta, selectorLabels labels.Labels, sortBy []string) error { +func printTable(blockMetas []*metadata.Meta, selectorLabels labels.Labels, sortBy []string) error { header := inspectColumns var lines [][]string @@ -355,7 +355,7 @@ func getKeysAlphabetically(labels map[string]string) []string { // matchesSelector checks if blockMeta contains every label from // the selector with the correct value -func matchesSelector(blockMeta *block.Meta, selectorLabels labels.Labels) bool { +func matchesSelector(blockMeta *metadata.Meta, selectorLabels labels.Labels) bool { for _, l := range selectorLabels { if v, ok := blockMeta.Thanos.Labels[l.Name]; !ok || v != l.Value { return false diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index c00aadd43d..5f3846ff50 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -8,11 +8,10 @@ import ( "path/filepath" "time" - "github.com/prometheus/tsdb/chunkenc" - "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/improbable-eng/thanos/pkg/compact/downsample" "github.com/improbable-eng/thanos/pkg/objstore" "github.com/improbable-eng/thanos/pkg/objstore/client" @@ -23,6 +22,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/tsdb" + "github.com/prometheus/tsdb/chunkenc" "gopkg.in/alecthomas/kingpin.v2" ) @@ -105,7 +105,7 @@ func downsampleBucket( if err := os.MkdirAll(dir, 0777); err != nil { return errors.Wrap(err, "create dir") } - var metas []*block.Meta + var metas []*metadata.Meta err := bkt.Iter(ctx, "", func(name string) error { id, ok := block.IsBlockDir(name) @@ -119,7 +119,7 @@ func downsampleBucket( } defer runutil.CloseWithLogOnErr(logger, rc, "block reader") - var m block.Meta + var m metadata.Meta if err := json.NewDecoder(rc).Decode(&m); err != nil { return errors.Wrap(err, "decode meta") } @@ -201,7 +201,7 @@ func downsampleBucket( return nil } -func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *block.Meta, dir string, resolution int64) error { +func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta, dir string, resolution int64) error { begin := time.Now() bdir := filepath.Join(dir, m.ULID.String()) @@ -224,7 +224,7 @@ func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bu pool = downsample.NewPool() } - b, err := tsdb.OpenBlock(bdir, pool) + b, err := tsdb.OpenBlock(logger, bdir, pool) if err != nil { return errors.Wrapf(err, "open block %s", m.ULID) } diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index bf6cfa64fe..9e6c5ac19e 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -296,7 +296,16 @@ func runQuery( return stores.Get(), nil }, selectorLset) queryableCreator = query.NewQueryableCreator(logger, proxy, replicaLabel) - engine = promql.NewEngine(logger, reg, maxConcurrentQueries, queryTimeout) + engine = promql.NewEngine( + promql.EngineOpts{ + Logger: logger, + Reg: reg, + MaxConcurrent: maxConcurrentQueries, + // TODO(bwplotka): Expose this as a flag: https://github.com/improbable-eng/thanos/issues/703 + MaxSamples: math.MaxInt32, + Timeout: queryTimeout, + }, + ) ) // Periodically update the store set with the addresses we see in our cluster. { diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 03fed102d2..56ad52f438 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -19,15 +19,14 @@ import ( "syscall" "time" - "github.com/improbable-eng/thanos/pkg/extprom" - "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/alert" - "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/improbable-eng/thanos/pkg/cluster" "github.com/improbable-eng/thanos/pkg/discovery/cache" "github.com/improbable-eng/thanos/pkg/discovery/dns" + "github.com/improbable-eng/thanos/pkg/extprom" "github.com/improbable-eng/thanos/pkg/objstore/client" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/improbable-eng/thanos/pkg/shipper" @@ -117,7 +116,6 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application, name string) MaxBlockDuration: *tsdbBlockDuration, Retention: *tsdbRetention, NoLockfile: true, - WALFlushInterval: 30 * time.Second, } lookupQueries := map[string]struct{}{} @@ -290,7 +288,7 @@ func runRule( ctx, cancel := context.WithCancel(context.Background()) ctx = tracing.ContextWithTracer(ctx, tracer) - notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) error { + notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) { res := make([]*alert.Alert, 0, len(alerts)) for _, alrt := range alerts { // Only send actually firing alerts. @@ -309,8 +307,6 @@ func runRule( res = append(res, a) } alertQ.Push(res) - - return nil } mgr = rules.NewManager(&rules.ManagerOptions{ Context: ctx, @@ -579,7 +575,7 @@ func runRule( } }() - s := shipper.New(logger, nil, dataDir, bkt, func() labels.Labels { return lset }, block.RulerSource) + s := shipper.New(logger, nil, dataDir, bkt, func() labels.Labels { return lset }, metadata.RulerSource) ctx, cancel := context.WithCancel(context.Background()) diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 61f493f93a..bd462b2f65 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -14,7 +14,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/improbable-eng/thanos/pkg/cluster" "github.com/improbable-eng/thanos/pkg/objstore/client" "github.com/improbable-eng/thanos/pkg/reloader" @@ -102,7 +102,7 @@ func runSidecar( reloader *reloader.Reloader, component string, ) error { - var metadata = &metadata{ + var m = &promMetadata{ promURL: promURL, // Start out with the full time range. The shipper will constrain it later. @@ -128,7 +128,7 @@ func runSidecar( // Blocking query of external labels before joining as a Source Peer into gossip. // We retry infinitely until we reach and fetch labels from our Prometheus. err := runutil.Retry(2*time.Second, ctx.Done(), func() error { - if err := metadata.UpdateLabels(ctx, logger); err != nil { + if err := m.UpdateLabels(ctx, logger); err != nil { level.Warn(logger).Log( "msg", "failed to fetch initial external labels. Is Prometheus running? Retrying", "err", err, @@ -145,14 +145,14 @@ func runSidecar( return errors.Wrap(err, "initial external labels query") } - if len(metadata.Labels()) == 0 { + if len(m.Labels()) == 0 { return errors.New("no external labels configured on Prometheus server, uniquely identifying external labels must be configured") } // New gossip cluster. - mint, maxt := metadata.Timestamps() + mint, maxt := m.Timestamps() if err = peer.Join(cluster.PeerTypeSource, cluster.PeerMetadata{ - Labels: metadata.LabelsPB(), + Labels: m.LabelsPB(), MinTime: mint, MaxTime: maxt, }); err != nil { @@ -165,12 +165,12 @@ func runSidecar( iterCtx, iterCancel := context.WithTimeout(context.Background(), 5*time.Second) defer iterCancel() - if err := metadata.UpdateLabels(iterCtx, logger); err != nil { + if err := m.UpdateLabels(iterCtx, logger); err != nil { level.Warn(logger).Log("msg", "heartbeat failed", "err", err) promUp.Set(0) } else { // Update gossip. - peer.SetLabels(metadata.LabelsPB()) + peer.SetLabels(m.LabelsPB()) promUp.Set(1) lastHeartbeat.Set(float64(time.Now().UnixNano()) / 1e9) @@ -204,7 +204,7 @@ func runSidecar( var client http.Client promStore, err := store.NewPrometheusStore( - logger, &client, promURL, metadata.Labels, metadata.Timestamps) + logger, &client, promURL, m.Labels, m.Timestamps) if err != nil { return errors.Wrap(err, "create Prometheus store") } @@ -252,7 +252,7 @@ func runSidecar( } }() - s := shipper.New(logger, nil, dataDir, bkt, metadata.Labels, block.SidecarSource) + s := shipper.New(logger, nil, dataDir, bkt, m.Labels, metadata.SidecarSource) ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { @@ -265,9 +265,9 @@ func runSidecar( if err != nil { level.Warn(logger).Log("msg", "reading timestamps failed", "err", err) } else { - metadata.UpdateTimestamps(minTime, math.MaxInt64) + m.UpdateTimestamps(minTime, math.MaxInt64) - mint, maxt := metadata.Timestamps() + mint, maxt := m.Timestamps() peer.SetTimestamps(mint, maxt) } return nil @@ -281,7 +281,7 @@ func runSidecar( return nil } -type metadata struct { +type promMetadata struct { promURL *url.URL mtx sync.Mutex @@ -290,7 +290,7 @@ type metadata struct { labels labels.Labels } -func (s *metadata) UpdateLabels(ctx context.Context, logger log.Logger) error { +func (s *promMetadata) UpdateLabels(ctx context.Context, logger log.Logger) error { elset, err := queryExternalLabels(ctx, logger, s.promURL) if err != nil { return err @@ -303,7 +303,7 @@ func (s *metadata) UpdateLabels(ctx context.Context, logger log.Logger) error { return nil } -func (s *metadata) UpdateTimestamps(mint int64, maxt int64) { +func (s *promMetadata) UpdateTimestamps(mint int64, maxt int64) { s.mtx.Lock() defer s.mtx.Unlock() @@ -311,14 +311,14 @@ func (s *metadata) UpdateTimestamps(mint int64, maxt int64) { s.maxt = maxt } -func (s *metadata) Labels() labels.Labels { +func (s *promMetadata) Labels() labels.Labels { s.mtx.Lock() defer s.mtx.Unlock() return s.labels } -func (s *metadata) LabelsPB() []storepb.Label { +func (s *promMetadata) LabelsPB() []storepb.Label { s.mtx.Lock() defer s.mtx.Unlock() @@ -332,7 +332,7 @@ func (s *metadata) LabelsPB() []storepb.Label { return lset } -func (s *metadata) Timestamps() (mint int64, maxt int64) { +func (s *promMetadata) Timestamps() (mint int64, maxt int64) { s.mtx.Lock() defer s.mtx.Unlock() diff --git a/pkg/block/block.go b/pkg/block/block.go index 118b7ea96c..cdc52a3e08 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -5,11 +5,12 @@ package block import ( "context" "encoding/json" - "io/ioutil" "os" "path" "path/filepath" + "github.com/improbable-eng/thanos/pkg/block/metadata" + "fmt" "github.com/go-kit/kit/log" @@ -17,8 +18,6 @@ import ( "github.com/improbable-eng/thanos/pkg/runutil" "github.com/oklog/ulid" "github.com/pkg/errors" - "github.com/prometheus/tsdb" - "github.com/prometheus/tsdb/fileutil" ) const ( @@ -33,103 +32,6 @@ const ( DebugMetas = "debug/metas" ) -type SourceType string - -const ( - UnknownSource SourceType = "" - SidecarSource SourceType = "sidecar" - CompactorSource SourceType = "compactor" - CompactorRepairSource SourceType = "compactor.repair" - RulerSource SourceType = "ruler" - BucketRepairSource SourceType = "bucket.repair" - TestSource SourceType = "test" -) - -// Meta describes the a block's meta. It wraps the known TSDB meta structure and -// extends it by Thanos-specific fields. -type Meta struct { - Version int `json:"version"` - - tsdb.BlockMeta - - Thanos ThanosMeta `json:"thanos"` -} - -// ThanosMeta holds block meta information specific to Thanos. -type ThanosMeta struct { - Labels map[string]string `json:"labels"` - Downsample ThanosDownsampleMeta `json:"downsample"` - - // Source is a real upload source of the block. - Source SourceType `json:"source"` -} - -type ThanosDownsampleMeta struct { - Resolution int64 `json:"resolution"` -} - -// WriteMetaFile writes the given meta into /meta.json. -func WriteMetaFile(logger log.Logger, dir string, meta *Meta) error { - // Make any changes to the file appear atomic. - path := filepath.Join(dir, MetaFilename) - tmp := path + ".tmp" - - f, err := os.Create(tmp) - if err != nil { - return err - } - - enc := json.NewEncoder(f) - enc.SetIndent("", "\t") - - if err := enc.Encode(meta); err != nil { - runutil.CloseWithLogOnErr(logger, f, "close meta") - return err - } - if err := f.Close(); err != nil { - return err - } - return renameFile(logger, tmp, path) -} - -// ReadMetaFile reads the given meta from /meta.json. -func ReadMetaFile(dir string) (*Meta, error) { - b, err := ioutil.ReadFile(filepath.Join(dir, MetaFilename)) - if err != nil { - return nil, err - } - var m Meta - - if err := json.Unmarshal(b, &m); err != nil { - return nil, err - } - if m.Version != 1 { - return nil, errors.Errorf("unexpected meta file version %d", m.Version) - } - return &m, nil -} - -func renameFile(logger log.Logger, from, to string) error { - if err := os.RemoveAll(to); err != nil { - return err - } - if err := os.Rename(from, to); err != nil { - return err - } - - // Directory was renamed; sync parent dir to persist rename. - pdir, err := fileutil.OpenDir(filepath.Dir(to)) - if err != nil { - return err - } - - if err = fileutil.Fsync(pdir); err != nil { - runutil.CloseWithLogOnErr(logger, pdir, "close dir") - return err - } - return pdir.Close() -} - // Download downloads directory that is mean to be block directory. func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id ulid.ULID, dst string) error { if err := objstore.DownloadDir(ctx, logger, bucket, id.String(), dst); err != nil { @@ -169,7 +71,7 @@ func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st return errors.Wrap(err, "not a block dir") } - meta, err := ReadMetaFile(bdir) + meta, err := metadata.Read(bdir) if err != nil { // No meta or broken meta file. return errors.Wrap(err, "read meta") @@ -216,16 +118,16 @@ func Delete(ctx context.Context, bucket objstore.Bucket, id ulid.ULID) error { } // DownloadMeta downloads only meta file from bucket by block ID. -func DownloadMeta(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (Meta, error) { +func DownloadMeta(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (metadata.Meta, error) { rc, err := bkt.Get(ctx, path.Join(id.String(), MetaFilename)) if err != nil { - return Meta{}, errors.Wrapf(err, "meta.json bkt get for %s", id.String()) + return metadata.Meta{}, errors.Wrapf(err, "meta.json bkt get for %s", id.String()) } defer runutil.CloseWithLogOnErr(logger, rc, "download meta bucket client") - var m Meta + var m metadata.Meta if err := json.NewDecoder(rc).Decode(&m); err != nil { - return Meta{}, errors.Wrapf(err, "decode meta.json for block %s", id.String()) + return metadata.Meta{}, errors.Wrapf(err, "decode meta.json for block %s", id.String()) } return m, nil } @@ -234,24 +136,3 @@ func IsBlockDir(path string) (id ulid.ULID, ok bool) { id, err := ulid.Parse(filepath.Base(path)) return id, err == nil } - -// InjectThanosMeta sets Thanos meta to the block meta JSON and saves it to the disk. -// NOTE: It should be used after writing any block by any Thanos component, otherwise we will miss crucial metadata. -func InjectThanosMeta(logger log.Logger, bdir string, meta ThanosMeta, downsampledMeta *tsdb.BlockMeta) (*Meta, error) { - newMeta, err := ReadMetaFile(bdir) - if err != nil { - return nil, errors.Wrap(err, "read new meta") - } - newMeta.Thanos = meta - - // While downsampling we need to copy original compaction. - if downsampledMeta != nil { - newMeta.Compaction = downsampledMeta.Compaction - } - - if err := WriteMetaFile(logger, bdir, newMeta); err != nil { - return nil, errors.Wrap(err, "write new meta") - } - - return newMeta, nil -} diff --git a/pkg/block/index.go b/pkg/block/index.go index 2249863b2d..7669df223e 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -11,6 +11,10 @@ import ( "strings" "time" + "github.com/improbable-eng/thanos/pkg/block/metadata" + + "github.com/prometheus/tsdb/fileutil" + "github.com/go-kit/kit/log" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/oklog/ulid" @@ -36,23 +40,84 @@ type indexCache struct { Postings []postingsRange } +type realByteSlice []byte + +func (b realByteSlice) Len() int { + return len(b) +} + +func (b realByteSlice) Range(start, end int) []byte { + return b[start:end] +} + +func (b realByteSlice) Sub(start, end int) index.ByteSlice { + return b[start:end] +} + +func getSymbolTable(b index.ByteSlice) (map[uint32]string, error) { + version := int(b.Range(4, 5)[0]) + + if version != 1 && version != 2 { + return nil, errors.Errorf("unknown index file version %d", version) + } + + toc, err := index.NewTOCFromByteSlice(b) + if err != nil { + return nil, errors.Wrap(err, "read TOC") + } + + symbolsV2, symbolsV1, err := index.ReadSymbols(b, version, int(toc.Symbols)) + if err != nil { + return nil, errors.Wrap(err, "read symbols") + } + + symbolsTable := make(map[uint32]string, len(symbolsV1)+len(symbolsV2)) + for o, s := range symbolsV1 { + symbolsTable[o] = s + } + for o, s := range symbolsV2 { + symbolsTable[uint32(o)] = s + } + + return symbolsTable, nil +} + // WriteIndexCache writes a cache file containing the first lookup stages // for an index file. -func WriteIndexCache(logger log.Logger, fn string, r *index.Reader) error { +func WriteIndexCache(logger log.Logger, indexFn string, fn string) error { + indexFile, err := fileutil.OpenMmapFile(indexFn) + if err != nil { + return errors.Wrapf(err, "open mmap index file %s", indexFn) + } + defer runutil.CloseWithLogOnErr(logger, indexFile, "close index cache mmap file from %s", indexFn) + + b := realByteSlice(indexFile.Bytes()) + indexr, err := index.NewReader(b) + if err != nil { + return errors.Wrap(err, "open index reader") + } + defer runutil.CloseWithLogOnErr(logger, indexr, "load index cache reader") + + // We assume reader verified index already. + symbols, err := getSymbolTable(b) + if err != nil { + return err + } + f, err := os.Create(fn) if err != nil { - return errors.Wrap(err, "create file") + return errors.Wrap(err, "create index cache file") } defer runutil.CloseWithLogOnErr(logger, f, "index cache writer") v := indexCache{ - Version: r.Version(), - Symbols: r.SymbolTable(), + Version: indexr.Version(), + Symbols: symbols, LabelValues: map[string][]string{}, } // Extract label value indices. - lnames, err := r.LabelIndices() + lnames, err := indexr.LabelIndices() if err != nil { return errors.Wrap(err, "read label indices") } @@ -62,7 +127,7 @@ func WriteIndexCache(logger log.Logger, fn string, r *index.Reader) error { } ln := lns[0] - tpls, err := r.LabelValues(ln) + tpls, err := indexr.LabelValues(ln) if err != nil { return errors.Wrap(err, "get label values") } @@ -82,7 +147,7 @@ func WriteIndexCache(logger log.Logger, fn string, r *index.Reader) error { } // Extract postings ranges. - pranges, err := r.PostingsRanges() + pranges, err := indexr.PostingsRanges() if err != nil { return errors.Wrap(err, "read postings ranges") } @@ -164,7 +229,7 @@ func VerifyIndex(logger log.Logger, fn string, minTime int64, maxTime int64) err } type Stats struct { - // TotalSeries represents total number of series in block. + // TotalSeries represnts total number of series in block. TotalSeries int // OutOfOrderSeries represents number of series that have out of order chunks. OutOfOrderSeries int @@ -346,7 +411,7 @@ type ignoreFnType func(mint, maxt int64, prev *chunks.Meta, curr *chunks.Meta) ( // - removes all near "complete" outside chunks introduced by https://github.com/prometheus/tsdb/issues/347. // Fixable inconsistencies are resolved in the new block. // TODO(bplotka): https://github.com/improbable-eng/thanos/issues/378 -func Repair(logger log.Logger, dir string, id ulid.ULID, source SourceType, ignoreChkFns ...ignoreFnType) (resid ulid.ULID, err error) { +func Repair(logger log.Logger, dir string, id ulid.ULID, source metadata.SourceType, ignoreChkFns ...ignoreFnType) (resid ulid.ULID, err error) { if len(ignoreChkFns) == 0 { return resid, errors.New("no ignore chunk function specified") } @@ -355,7 +420,7 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source SourceType, igno entropy := rand.New(rand.NewSource(time.Now().UnixNano())) resid = ulid.MustNew(ulid.Now(), entropy) - meta, err := ReadMetaFile(bdir) + meta, err := metadata.Read(bdir) if err != nil { return resid, errors.Wrap(err, "read meta file") } @@ -363,7 +428,7 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source SourceType, igno return resid, errors.New("cannot repair downsampled block") } - b, err := tsdb.OpenBlock(bdir, nil) + b, err := tsdb.OpenBlock(logger, bdir, nil) if err != nil { return resid, errors.Wrap(err, "open block") } @@ -405,7 +470,7 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source SourceType, igno if err := rewrite(indexr, chunkr, indexw, chunkw, &resmeta, ignoreChkFns); err != nil { return resid, errors.Wrap(err, "rewrite block") } - if err := WriteMetaFile(logger, resdir, &resmeta); err != nil { + if err := metadata.Write(logger, resdir, &resmeta); err != nil { return resid, err } return resid, nil @@ -494,7 +559,7 @@ OUTER: func rewrite( indexr tsdb.IndexReader, chunkr tsdb.ChunkReader, indexw tsdb.IndexWriter, chunkw tsdb.ChunkWriter, - meta *Meta, + meta *metadata.Meta, ignoreChkFns []ignoreFnType, ) error { symbols, err := indexr.Symbols() diff --git a/pkg/block/index_test.go b/pkg/block/index_test.go new file mode 100644 index 0000000000..80c10e8e6e --- /dev/null +++ b/pkg/block/index_test.go @@ -0,0 +1,46 @@ +package block + +import ( + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/go-kit/kit/log" + "github.com/improbable-eng/thanos/pkg/testutil" + "github.com/prometheus/tsdb/labels" +) + +func TestWriteReadIndexCache(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "test-compact-prepare") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + b, err := testutil.CreateBlock(tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + {{Name: "a", Value: "3"}}, + {{Name: "a", Value: "4"}}, + {{Name: "b", Value: "1"}}, + }, 100, 0, 1000, nil, 124) + testutil.Ok(t, err) + + fn := filepath.Join(tmpDir, "index.cache.json") + testutil.Ok(t, WriteIndexCache(log.NewNopLogger(), filepath.Join(tmpDir, b.String(), "index"), fn)) + + version, symbols, lvals, postings, err := ReadIndexCache(log.NewNopLogger(), fn) + testutil.Ok(t, err) + + testutil.Equals(t, 2, version) + testutil.Equals(t, 6, len(symbols)) + testutil.Equals(t, 2, len(lvals)) + + vals, ok := lvals["a"] + testutil.Assert(t, ok, "") + testutil.Equals(t, []string{"1", "2", "3", "4"}, vals) + + vals, ok = lvals["b"] + testutil.Assert(t, ok, "") + testutil.Equals(t, []string{"1"}, vals) + testutil.Equals(t, 6, len(postings)) +} diff --git a/pkg/block/metadata/meta.go b/pkg/block/metadata/meta.go new file mode 100644 index 0000000000..e4e87122c2 --- /dev/null +++ b/pkg/block/metadata/meta.go @@ -0,0 +1,142 @@ +package metadata + +// metadata package is implements writing and reading wrapped meta.json where Thanos puts its metadata. +// Those metadata contains external labels, downsampling resolution and source type. +// This package is minimal and separated because it usited by testutils which limits test helpers we can use in +// this package. + +import ( + "encoding/json" + "io/ioutil" + "os" + "path/filepath" + + "github.com/go-kit/kit/log" + "github.com/improbable-eng/thanos/pkg/runutil" + "github.com/pkg/errors" + "github.com/prometheus/tsdb" + "github.com/prometheus/tsdb/fileutil" +) + +type SourceType string + +const ( + UnknownSource SourceType = "" + SidecarSource SourceType = "sidecar" + CompactorSource SourceType = "compactor" + CompactorRepairSource SourceType = "compactor.repair" + RulerSource SourceType = "ruler" + BucketRepairSource SourceType = "bucket.repair" + TestSource SourceType = "test" +) + +const ( + // MetaFilename is the known JSON filename for meta information. + MetaFilename = "meta.json" +) + +// Meta describes the a block's meta. It wraps the known TSDB meta structure and +// extends it by Thanos-specific fields. +type Meta struct { + Version int `json:"version"` + + tsdb.BlockMeta + + Thanos Thanos `json:"thanos"` +} + +// Thanos holds block meta information specific to Thanos. +type Thanos struct { + Labels map[string]string `json:"labels"` + Downsample ThanosDownsample `json:"downsample"` + + // Source is a real upload source of the block. + Source SourceType `json:"source"` +} + +type ThanosDownsample struct { + Resolution int64 `json:"resolution"` +} + +// InjectThanos sets Thanos meta to the block meta JSON and saves it to the disk. +// NOTE: It should be used after writing any block by any Thanos component, otherwise we will miss crucial metadata. +func InjectThanos(logger log.Logger, bdir string, meta Thanos, downsampledMeta *tsdb.BlockMeta) (*Meta, error) { + newMeta, err := Read(bdir) + if err != nil { + return nil, errors.Wrap(err, "read new meta") + } + newMeta.Thanos = meta + + // While downsampling we need to copy original compaction. + if downsampledMeta != nil { + newMeta.Compaction = downsampledMeta.Compaction + } + + if err := Write(logger, bdir, newMeta); err != nil { + return nil, errors.Wrap(err, "write new meta") + } + + return newMeta, nil +} + +// Write writes the given meta into /meta.json. +func Write(logger log.Logger, dir string, meta *Meta) error { + // Make any changes to the file appear atomic. + path := filepath.Join(dir, MetaFilename) + tmp := path + ".tmp" + + f, err := os.Create(tmp) + if err != nil { + return err + } + + enc := json.NewEncoder(f) + enc.SetIndent("", "\t") + + if err := enc.Encode(meta); err != nil { + runutil.CloseWithLogOnErr(logger, f, "close meta") + return err + } + if err := f.Close(); err != nil { + return err + } + return renameFile(logger, tmp, path) +} + +func renameFile(logger log.Logger, from, to string) error { + if err := os.RemoveAll(to); err != nil { + return err + } + if err := os.Rename(from, to); err != nil { + return err + } + + // Directory was renamed; sync parent dir to persist rename. + pdir, err := fileutil.OpenDir(filepath.Dir(to)) + if err != nil { + return err + } + + if err = fileutil.Fsync(pdir); err != nil { + runutil.CloseWithLogOnErr(logger, pdir, "close dir") + return err + } + return pdir.Close() +} + +// Read reads the given meta from /meta.json. +func Read(dir string) (*Meta, error) { + b, err := ioutil.ReadFile(filepath.Join(dir, MetaFilename)) + if err != nil { + return nil, err + } + var m Meta + + if err := json.Unmarshal(b, &m); err != nil { + return nil, err + } + if m.Version != 1 { + return nil, errors.Errorf("unexpected meta file version %d", m.Version) + } + return &m, nil +} diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 544de920ea..29302f2a1c 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -9,6 +9,8 @@ import ( "sync" "time" + "github.com/improbable-eng/thanos/pkg/block/metadata" + "io/ioutil" "github.com/go-kit/kit/log" @@ -39,7 +41,7 @@ type Syncer struct { bkt objstore.Bucket syncDelay time.Duration mtx sync.Mutex - blocks map[ulid.ULID]*block.Meta + blocks map[ulid.ULID]*metadata.Meta metrics *syncerMetrics } @@ -130,7 +132,7 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket logger: logger, reg: reg, syncDelay: syncDelay, - blocks: map[ulid.ULID]*block.Meta{}, + blocks: map[ulid.ULID]*metadata.Meta{}, bkt: bkt, metrics: newSyncerMetrics(reg), }, nil @@ -185,9 +187,9 @@ func (c *Syncer) syncMetas(ctx context.Context) error { // NOTE: It is not safe to miss "old" block (even that it is newly created) in sync step. Compactor needs to aware of ALL old blocks. // TODO(bplotka): https://github.com/improbable-eng/thanos/issues/377 if ulid.Now()-id.Time() < uint64(c.syncDelay/time.Millisecond) && - meta.Thanos.Source != block.BucketRepairSource && - meta.Thanos.Source != block.CompactorSource && - meta.Thanos.Source != block.CompactorRepairSource { + meta.Thanos.Source != metadata.BucketRepairSource && + meta.Thanos.Source != metadata.CompactorSource && + meta.Thanos.Source != metadata.CompactorRepairSource { level.Debug(c.logger).Log("msg", "block is too fresh for now", "block", id) return nil @@ -214,7 +216,7 @@ func (c *Syncer) syncMetas(ctx context.Context) error { // GroupKey returns a unique identifier for the group the block belongs to. It considers // the downsampling resolution and the block's labels. -func GroupKey(meta block.Meta) string { +func GroupKey(meta metadata.Meta) string { return groupKey(meta.Thanos.Downsample.Resolution, labels.FromMap(meta.Thanos.Labels)) } @@ -381,7 +383,7 @@ type Group struct { labels labels.Labels resolution int64 mtx sync.Mutex - blocks map[ulid.ULID]*block.Meta + blocks map[ulid.ULID]*metadata.Meta compactions prometheus.Counter compactionFailures prometheus.Counter groupGarbageCollectedBlocks prometheus.Counter @@ -405,7 +407,7 @@ func newGroup( bkt: bkt, labels: lset, resolution: resolution, - blocks: map[ulid.ULID]*block.Meta{}, + blocks: map[ulid.ULID]*metadata.Meta{}, compactions: compactions, compactionFailures: compactionFailures, groupGarbageCollectedBlocks: groupGarbageCollectedBlocks, @@ -419,7 +421,7 @@ func (cg *Group) Key() string { } // Add the block with the given meta to the group. -func (cg *Group) Add(meta *block.Meta) error { +func (cg *Group) Add(meta *metadata.Meta) error { cg.mtx.Lock() defer cg.mtx.Unlock() @@ -541,7 +543,7 @@ func IsRetryError(err error) bool { return ok } -func (cg *Group) areBlocksOverlapping(include *block.Meta, excludeDirs ...string) error { +func (cg *Group) areBlocksOverlapping(include *metadata.Meta, excludeDirs ...string) error { var ( metas []tsdb.BlockMeta exclude = map[ulid.ULID]struct{}{} @@ -566,6 +568,9 @@ func (cg *Group) areBlocksOverlapping(include *block.Meta, excludeDirs ...string metas = append(metas, include.BlockMeta) } + sort.Slice(metas, func(i, j int) bool { + return metas[i].MinTime < metas[j].MinTime + }) if overlaps := tsdb.OverlappingBlocks(metas); len(overlaps) > 0 { return errors.Errorf("overlaps found while gathering blocks. %s", overlaps) } @@ -597,12 +602,12 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, return retry(errors.Wrapf(err, "download block %s", ie.id)) } - meta, err := block.ReadMetaFile(bdir) + meta, err := metadata.Read(bdir) if err != nil { return errors.Wrapf(err, "read meta from %s", bdir) } - resid, err := block.Repair(logger, tmpdir, ie.id, block.CompactorRepairSource, block.IgnoreIssue347OutsideChunk) + resid, err := block.Repair(logger, tmpdir, ie.id, metadata.CompactorRepairSource, block.IgnoreIssue347OutsideChunk) if err != nil { return errors.Wrapf(err, "repair failed for block %s", ie.id) } @@ -647,7 +652,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( if err := os.MkdirAll(bdir, 0777); err != nil { return compID, errors.Wrap(err, "create planning block dir") } - if err := block.WriteMetaFile(cg.logger, bdir, meta); err != nil { + if err := metadata.Write(cg.logger, bdir, meta); err != nil { return compID, errors.Wrap(err, "write planning meta file") } } @@ -670,7 +675,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( begin := time.Now() for _, pdir := range plan { - meta, err := block.ReadMetaFile(pdir) + meta, err := metadata.Read(pdir) if err != nil { return compID, errors.Wrapf(err, "read meta from %s", pdir) } @@ -718,7 +723,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( begin = time.Now() - compID, err = comp.Compact(dir, plan...) + compID, err = comp.Compact(dir, plan, nil) if err != nil { return compID, halt(errors.Wrapf(err, "compact blocks %v", plan)) } @@ -727,10 +732,10 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( bdir := filepath.Join(dir, compID.String()) - newMeta, err := block.InjectThanosMeta(cg.logger, bdir, block.ThanosMeta{ + newMeta, err := metadata.InjectThanos(cg.logger, bdir, metadata.Thanos{ Labels: cg.labels.Map(), - Downsample: block.ThanosDownsampleMeta{Resolution: cg.resolution}, - Source: block.CompactorSource, + Downsample: metadata.ThanosDownsample{Resolution: cg.resolution}, + Source: metadata.CompactorSource, }, nil) if err != nil { return compID, errors.Wrapf(err, "failed to finalize the block %s", bdir) diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 74df73cc6f..11213dc047 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "math/rand" "os" "path" "path/filepath" @@ -15,12 +16,14 @@ import ( "github.com/go-kit/kit/log" "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/improbable-eng/thanos/pkg/objstore" "github.com/improbable-eng/thanos/pkg/objstore/objtesting" "github.com/improbable-eng/thanos/pkg/testutil" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/tsdb" + "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" ) @@ -37,13 +40,13 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) { // After the first synchronization the first 5 should be dropped and the // last 5 be loaded from the bucket. var ids []ulid.ULID - var metas []*block.Meta + var metas []*metadata.Meta for i := 0; i < 15; i++ { id, err := ulid.New(uint64(i), nil) testutil.Ok(t, err) - var meta block.Meta + var meta metadata.Meta meta.Version = 1 meta.ULID = id @@ -56,7 +59,7 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) { for _, m := range metas[5:] { var buf bytes.Buffer testutil.Ok(t, json.NewEncoder(&buf).Encode(&m)) - testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), block.MetaFilename), &buf)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), metadata.MetaFilename), &buf)) } groups, err := sy.Groups() @@ -79,11 +82,11 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { // Generate 10 source block metas and construct higher level blocks // that are higher compactions of them. - var metas []*block.Meta + var metas []*metadata.Meta var ids []ulid.ULID for i := 0; i < 10; i++ { - var m block.Meta + var m metadata.Meta m.Version = 1 m.ULID = ulid.MustNew(uint64(i), nil) @@ -94,28 +97,28 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { metas = append(metas, &m) } - var m1 block.Meta + var m1 metadata.Meta m1.Version = 1 m1.ULID = ulid.MustNew(100, nil) m1.Compaction.Level = 2 m1.Compaction.Sources = ids[:4] m1.Thanos.Downsample.Resolution = 0 - var m2 block.Meta + var m2 metadata.Meta m2.Version = 1 m2.ULID = ulid.MustNew(200, nil) m2.Compaction.Level = 2 m2.Compaction.Sources = ids[4:8] // last two source IDs is not part of a level 2 block. m2.Thanos.Downsample.Resolution = 0 - var m3 block.Meta + var m3 metadata.Meta m3.Version = 1 m3.ULID = ulid.MustNew(300, nil) m3.Compaction.Level = 3 m3.Compaction.Sources = ids[:9] // last source ID is not part of level 3 block. m3.Thanos.Downsample.Resolution = 0 - var m4 block.Meta + var m4 metadata.Meta m4.Version = 14 m4.ULID = ulid.MustNew(400, nil) m4.Compaction.Level = 2 @@ -127,7 +130,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { fmt.Println("create", m.ULID) var buf bytes.Buffer testutil.Ok(t, json.NewEncoder(&buf).Encode(&m)) - testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), block.MetaFilename), &buf)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), metadata.MetaFilename), &buf)) } // Do one initial synchronization with the bucket. @@ -173,7 +176,7 @@ func TestGroup_Compact_e2e(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) defer cancel() - var metas []*block.Meta + var metas []*metadata.Meta extLset := labels.Labels{{Name: "e1", Value: "1"}} b1, err := testutil.CreateBlock(prepareDir, []labels.Labels{ {{Name: "a", Value: "1"}}, @@ -183,7 +186,7 @@ func TestGroup_Compact_e2e(t *testing.T) { }, 100, 0, 1000, extLset, 124) testutil.Ok(t, err) - meta, err := block.ReadMetaFile(filepath.Join(prepareDir, b1.String())) + meta, err := metadata.Read(filepath.Join(prepareDir, b1.String())) testutil.Ok(t, err) metas = append(metas, meta) @@ -196,15 +199,18 @@ func TestGroup_Compact_e2e(t *testing.T) { testutil.Ok(t, err) // Mix order to make sure compact is able to deduct min time / max time. - meta, err = block.ReadMetaFile(filepath.Join(prepareDir, b3.String())) + meta, err = metadata.Read(filepath.Join(prepareDir, b3.String())) testutil.Ok(t, err) metas = append(metas, meta) - // Empty block. This can happen when TSDB does not have any samples for min-block-size time. - b2, err := testutil.CreateBlock(prepareDir, []labels.Labels{}, 100, 1001, 2000, extLset, 124) + // Currently TSDB does not produces empty blocks (see: https://github.com/prometheus/tsdb/pull/374). However before v2.7.0 it was + // so we still want to mimick this case as close as possible. + b2, err := createEmptyBlock(prepareDir, 1001, 2000, extLset, 124) testutil.Ok(t, err) - meta, err = block.ReadMetaFile(filepath.Join(prepareDir, b2.String())) + // blocks" count=3 mint=0 maxt=3000 ulid=01D1RQCRRJM77KQQ4GYDSC50GM sources="[01D1RQCRMNZBVHBPGRPG2M3NZQ 01D1RQCRPJMYN45T65YA1PRWB7 01D1RQCRNMTWJKTN5QQXFNKKH8]" + + meta, err = metadata.Read(filepath.Join(prepareDir, b2.String())) testutil.Ok(t, err) metas = append(metas, meta) @@ -217,7 +223,7 @@ func TestGroup_Compact_e2e(t *testing.T) { }, 100, 3001, 4000, extLset, 124) testutil.Ok(t, err) - meta, err = block.ReadMetaFile(filepath.Join(prepareDir, freshB.String())) + meta, err = metadata.Read(filepath.Join(prepareDir, freshB.String())) testutil.Ok(t, err) metas = append(metas, meta) @@ -263,7 +269,7 @@ func TestGroup_Compact_e2e(t *testing.T) { resDir := filepath.Join(dir, id.String()) testutil.Ok(t, block.Download(ctx, log.NewNopLogger(), bkt, id, resDir)) - meta, err = block.ReadMetaFile(resDir) + meta, err = metadata.Read(resDir) testutil.Ok(t, err) testutil.Equals(t, int64(0), meta.MinTime) @@ -294,3 +300,56 @@ func TestGroup_Compact_e2e(t *testing.T) { testutil.Ok(t, err) }) } + +// createEmptyBlock produces empty block like it was the case before fix: https://github.com/prometheus/tsdb/pull/374. +// (Prometheus pre v2.7.0) +func createEmptyBlock(dir string, mint int64, maxt int64, extLset labels.Labels, resolution int64) (ulid.ULID, error) { + entropy := rand.New(rand.NewSource(time.Now().UnixNano())) + uid := ulid.MustNew(ulid.Now(), entropy) + + if err := os.Mkdir(path.Join(dir, uid.String()), os.ModePerm); err != nil { + return ulid.ULID{}, errors.Wrap(err, "close index") + } + + if err := os.Mkdir(path.Join(dir, uid.String(), "chunks"), os.ModePerm); err != nil { + return ulid.ULID{}, errors.Wrap(err, "close index") + } + + w, err := index.NewWriter(path.Join(dir, uid.String(), "index")) + if err != nil { + return ulid.ULID{}, errors.Wrap(err, "new index") + } + + if err := w.Close(); err != nil { + return ulid.ULID{}, errors.Wrap(err, "close index") + } + + m := tsdb.BlockMeta{ + Version: 1, + ULID: uid, + MinTime: mint, + MaxTime: maxt, + Compaction: tsdb.BlockMetaCompaction{ + Level: 1, + Sources: []ulid.ULID{uid}, + }, + } + b, err := json.Marshal(&m) + if err != nil { + return ulid.ULID{}, err + } + + if err := ioutil.WriteFile(path.Join(dir, uid.String(), "meta.json"), b, os.ModePerm); err != nil { + return ulid.ULID{}, errors.Wrap(err, "saving meta.json") + } + + if _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(dir, uid.String()), metadata.Thanos{ + Labels: extLset.Map(), + Downsample: metadata.ThanosDownsample{Resolution: resolution}, + Source: metadata.TestSource, + }, nil); err != nil { + return ulid.ULID{}, errors.Wrap(err, "finalize block") + } + + return uid, nil +} diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index 305f72021c..f5afecdcd0 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -5,7 +5,8 @@ import ( "path/filepath" "sort" - "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/metadata" + "github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/tsdb/chunkenc" @@ -31,7 +32,7 @@ const ( // Downsample downsamples the given block. It writes a new block into dir and returns its ID. func Downsample( logger log.Logger, - origMeta *block.Meta, + origMeta *metadata.Meta, b tsdb.BlockReader, dir string, resolution int64, @@ -113,6 +114,7 @@ func Downsample( origMeta.Thanos.Downsample.Resolution, resolution, ) + if err != nil { return id, errors.Wrap(err, "downsample aggregate block") } @@ -125,18 +127,18 @@ func Downsample( if err != nil { return id, errors.Wrap(err, "create compactor") } - id, err = comp.Write(dir, newb, origMeta.MinTime, origMeta.MaxTime) + id, err = comp.Write(dir, newb, origMeta.MinTime, origMeta.MaxTime, &origMeta.BlockMeta) if err != nil { return id, errors.Wrap(err, "compact head") } bdir := filepath.Join(dir, id.String()) - var tmeta block.ThanosMeta + var tmeta metadata.Thanos tmeta = origMeta.Thanos - tmeta.Source = block.CompactorSource + tmeta.Source = metadata.CompactorSource tmeta.Downsample.Resolution = resolution - _, err = block.InjectThanosMeta(logger, bdir, tmeta, &origMeta.BlockMeta) + _, err = metadata.InjectThanos(logger, bdir, tmeta, &origMeta.BlockMeta) if err != nil { return id, errors.Wrapf(err, "failed to finalize the block %s", bdir) } @@ -228,13 +230,20 @@ func (b *memBlock) Chunks() (tsdb.ChunkReader, error) { } func (b *memBlock) Tombstones() (tsdb.TombstoneReader, error) { - return tsdb.EmptyTombstoneReader(), nil + return emptyTombstoneReader{}, nil } func (b *memBlock) Close() error { return nil } +type emptyTombstoneReader struct{} + +func (emptyTombstoneReader) Get(ref uint64) (tsdb.Intervals, error) { return nil, nil } +func (emptyTombstoneReader) Iter(func(uint64, tsdb.Intervals) error) error { return nil } +func (emptyTombstoneReader) Total() uint64 { return 0 } +func (emptyTombstoneReader) Close() error { return nil } + // currentWindow returns the end timestamp of the window that t falls into. func currentWindow(t, r int64) int64 { // The next timestamp is the next number after s.t that's aligned with window. @@ -412,6 +421,7 @@ func downsampleRaw(data []sample, resolution int64) []chunks.Meta { chks = append(chks, ab.encode()) } + return chks } diff --git a/pkg/compact/downsample/downsample_test.go b/pkg/compact/downsample/downsample_test.go index d384478416..bb2c38b17a 100644 --- a/pkg/compact/downsample/downsample_test.go +++ b/pkg/compact/downsample/downsample_test.go @@ -1,12 +1,15 @@ package downsample import ( + "github.com/prometheus/tsdb" "io/ioutil" "math" "os" "path/filepath" "testing" + "github.com/improbable-eng/thanos/pkg/block/metadata" + "github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/tsdb/chunks" @@ -59,7 +62,7 @@ func TestDownsampleRaw(t *testing.T) { }, }, } - testDownsample(t, input, &block.Meta{}, 100) + testDownsample(t, input, &metadata.Meta{BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 250}}, 100) } func TestDownsampleAggr(t *testing.T) { @@ -96,8 +99,9 @@ func TestDownsampleAggr(t *testing.T) { }, }, } - var meta block.Meta + var meta metadata.Meta meta.Thanos.Downsample.Resolution = 10 + meta.BlockMeta = tsdb.BlockMeta{MinTime: 99, MaxTime: 1300} testDownsample(t, input, &meta, 500) } @@ -123,7 +127,7 @@ type downsampleTestSet struct { // testDownsample inserts the input into a block and invokes the downsampler with the given resolution. // The chunk ranges within the input block are aligned at 500 time units. -func testDownsample(t *testing.T, data []*downsampleTestSet, meta *block.Meta, resolution int64) { +func testDownsample(t *testing.T, data []*downsampleTestSet, meta *metadata.Meta, resolution int64) { t.Helper() dir, err := ioutil.TempDir("", "downsample-raw") diff --git a/pkg/compact/downsample/pool.go b/pkg/compact/downsample/pool.go index 9b199e7ab9..17094cd0c6 100644 --- a/pkg/compact/downsample/pool.go +++ b/pkg/compact/downsample/pool.go @@ -6,14 +6,14 @@ import ( "github.com/prometheus/tsdb/chunkenc" ) -// Pool is a memory pool of chunk objects, supporting Thanos aggr chunk encoding. +// Pool is a memory pool of chunk objects, supporting Thanos aggregated chunk encoding. // It maintains separate pools for xor and aggr chunks. type pool struct { wrapped chunkenc.Pool aggr sync.Pool } -// TODO(bplotka): Add reasonable limits to our sync pools them to detect OOMs early. +// TODO(bwplotka): Add reasonable limits to our sync pooling them to detect OOMs early. func NewPool() chunkenc.Pool { return &pool{ wrapped: chunkenc.NewPool(), @@ -51,6 +51,7 @@ func (p *pool) Put(c chunkenc.Chunk) error { // Clear []byte. *ac = AggrChunk(nil) p.aggr.Put(ac) + return nil } return p.wrapped.Put(c) diff --git a/pkg/compact/retention_test.go b/pkg/compact/retention_test.go index 0aef91c697..1a26b1e5db 100644 --- a/pkg/compact/retention_test.go +++ b/pkg/compact/retention_test.go @@ -9,7 +9,7 @@ import ( "time" "github.com/go-kit/kit/log" - "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/improbable-eng/thanos/pkg/compact" "github.com/improbable-eng/thanos/pkg/objstore" "github.com/improbable-eng/thanos/pkg/objstore/inmem" @@ -253,15 +253,15 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) { func uploadMockBlock(t *testing.T, bkt objstore.Bucket, id string, minTime, maxTime time.Time, resolutionLevel int64) { t.Helper() - meta1 := block.Meta{ + meta1 := metadata.Meta{ Version: 1, BlockMeta: tsdb.BlockMeta{ ULID: ulid.MustParse(id), MinTime: minTime.Unix() * 1000, MaxTime: maxTime.Unix() * 1000, }, - Thanos: block.ThanosMeta{ - Downsample: block.ThanosDownsampleMeta{ + Thanos: metadata.Thanos{ + Downsample: metadata.ThanosDownsample{ Resolution: resolutionLevel, }, }, diff --git a/pkg/query/api/v1.go b/pkg/query/api/v1.go index 23e5b0a801..cf693dd2fb 100644 --- a/pkg/query/api/v1.go +++ b/pkg/query/api/v1.go @@ -527,15 +527,16 @@ func (api *API) series(r *http.Request) (interface{}, []error, *apiError) { var sets []storage.SeriesSet for _, mset := range matcherSets { - s, err := q.Select(&storage.SelectParams{}, mset...) + s, _, err := q.Select(&storage.SelectParams{}, mset...) if err != nil { return nil, nil, &apiError{errorExec, err} } sets = append(sets, s) } - set := storage.NewMergeSeriesSet(sets) - metrics := []labels.Labels{} + set := storage.NewMergeSeriesSet(sets, nil) + + var metrics []labels.Labels for set.Next() { metrics = append(metrics, set.At().Labels()) } diff --git a/pkg/query/api/v1_test.go b/pkg/query/api/v1_test.go index 4afef1d054..bcf17907b0 100644 --- a/pkg/query/api/v1_test.go +++ b/pkg/query/api/v1_test.go @@ -333,7 +333,7 @@ func TestEndpoints(t *testing.T) { "start": []string{"-2"}, "end": []string{"-1"}, }, - response: []labels.Labels{}, + response: []labels.Labels(nil), }, // Start and end after series ends. { @@ -343,7 +343,7 @@ func TestEndpoints(t *testing.T) { "start": []string{"100000"}, "end": []string{"100001"}, }, - response: []labels.Labels{}, + response: []labels.Labels(nil), }, // Start before series starts, end after series ends. { @@ -409,33 +409,38 @@ func TestEndpoints(t *testing.T) { } for _, test := range tests { - // Build a context with the correct request params. - ctx := context.Background() - for p, v := range test.params { - ctx = route.WithParam(ctx, p, v) - } - t.Logf("run query %q", test.query.Encode()) + if ok := t.Run(test.query.Encode(), func(t *testing.T) { + // Build a context with the correct request params. + ctx := context.Background() + for p, v := range test.params { + ctx = route.WithParam(ctx, p, v) + } - req, err := http.NewRequest("ANY", fmt.Sprintf("http://example.com?%s", test.query.Encode()), nil) - if err != nil { - t.Fatal(err) - } - resp, _, apiErr := test.endpoint(req.WithContext(ctx)) - if apiErr != nil { - if test.errType == errorNone { - t.Fatalf("Unexpected error: %s", apiErr) + req, err := http.NewRequest("ANY", fmt.Sprintf("http://example.com?%s", test.query.Encode()), nil) + if err != nil { + t.Fatal(err) } - if test.errType != apiErr.typ { - t.Fatalf("Expected error of type %q but got type %q", test.errType, apiErr.typ) + resp, _, apiErr := test.endpoint(req.WithContext(ctx)) + if apiErr != nil { + if test.errType == errorNone { + t.Fatalf("Unexpected error: %s", apiErr) + } + if test.errType != apiErr.typ { + t.Fatalf("Expected error of type %q but got type %q", test.errType, apiErr.typ) + } + return } - continue - } - if apiErr == nil && test.errType != errorNone { - t.Fatalf("Expected error of type %q but got none", test.errType) - } - if !reflect.DeepEqual(resp, test.response) { - t.Fatalf("Response does not match, expected:\n%+v\ngot:\n%+v", test.response, resp) + if apiErr == nil && test.errType != errorNone { + t.Fatalf("Expected error of type %q but got none", test.errType) + } + + if !reflect.DeepEqual(resp, test.response) { + t.Fatalf("Response does not match, expected:\n%+v\ngot:\n%+v", test.response, resp) + } + }); !ok { + return } + } } diff --git a/pkg/query/querier.go b/pkg/query/querier.go index 6e962f6472..819ff3ac2a 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -169,13 +169,13 @@ func aggrsFromFunc(f string) ([]storepb.Aggr, resAggr) { return []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM}, resAggrAvg } -func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, error) { +func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { span, ctx := tracing.StartSpan(q.ctx, "querier_select") defer span.Finish() sms, err := translateMatchers(ms...) if err != nil { - return nil, errors.Wrap(err, "convert matchers") + return nil, nil, errors.Wrap(err, "convert matchers") } queryAggrs, resAggr := aggrsFromFunc(params.Func) @@ -189,10 +189,12 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s Aggregates: queryAggrs, PartialResponseDisabled: !q.partialResponse, }, resp); err != nil { - return nil, errors.Wrap(err, "proxy Series()") + return nil, nil, errors.Wrap(err, "proxy Series()") } for _, w := range resp.warnings { + // NOTE(bwplotka): We could use warnings return arguments here, however need reporter anyway for LabelValues and LabelNames method, + // so we choose to be consistent and keep reporter. q.warningReporter(errors.New(w)) } @@ -203,7 +205,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s maxt: q.maxt, set: newStoreSeriesSet(resp.seriesSet), aggr: resAggr, - }, nil + }, nil, nil } // TODO(fabxc): this could potentially pushed further down into the store API @@ -220,7 +222,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s // The merged series set assembles all potentially-overlapping time ranges // of the same series into a single one. The series are ordered so that equal series // from different replicas are sequential. We can now deduplicate those. - return newDedupSeriesSet(set, q.replicaLabel), nil + return newDedupSeriesSet(set, q.replicaLabel), nil, nil } // sortDedupLabels resorts the set so that the same series with different replica @@ -245,6 +247,7 @@ func sortDedupLabels(set []storepb.Series, replicaLabel string) { }) } +// LabelValues returns all potential values for a label name. func (q *querier) LabelValues(name string) ([]string, error) { span, ctx := tracing.StartSpan(q.ctx, "querier_label_values") defer span.Finish() @@ -261,6 +264,12 @@ func (q *querier) LabelValues(name string) ([]string, error) { return resp.Values, nil } +// LabelNames returns all the unique label names present in the block in sorted order. +// TODO(bwplotka): Consider adding labelNames to thanos Query API https://github.com/improbable-eng/thanos/issues/702. +func (q *querier) LabelNames() ([]string, error) { + return nil, errors.New("not implemented") +} + func (q *querier) Close() error { q.cancel() return nil diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index 46b51a7f3e..980d837213 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -36,7 +36,7 @@ func TestQuerier_Series(t *testing.T) { q := newQuerier(context.Background(), nil, 1, 300, "", testProxy, false, 0, true, nil) defer func() { testutil.Ok(t, q.Close()) }() - res, err := q.Select(&storage.SelectParams{}) + res, _, err := q.Select(&storage.SelectParams{}) testutil.Ok(t, err) expected := []struct { diff --git a/pkg/query/test_print.go b/pkg/query/test_print.go new file mode 100644 index 0000000000..70bc292439 --- /dev/null +++ b/pkg/query/test_print.go @@ -0,0 +1,34 @@ +package query + +import ( + "fmt" + + "github.com/prometheus/prometheus/storage" +) + +type printSeriesSet struct { + set storage.SeriesSet +} + +func newPrintSeriesSet(set storage.SeriesSet) storage.SeriesSet { + return &printSeriesSet{set: set} +} + +func (s *printSeriesSet) Next() bool { + return s.set.Next() +} + +func (s *printSeriesSet) At() storage.Series { + at := s.set.At() + fmt.Println("Series", at.Labels()) + + i := at.Iterator() + for i.Next() { + fmt.Println(i.At()) + } + return at +} + +func (s *printSeriesSet) Err() error { + return s.set.Err() +} diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go index 5c1df9a9b7..2163ec8b7a 100644 --- a/pkg/shipper/shipper.go +++ b/pkg/shipper/shipper.go @@ -11,6 +11,8 @@ import ( "path" "path/filepath" + "github.com/improbable-eng/thanos/pkg/block/metadata" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/block" @@ -69,7 +71,7 @@ type Shipper struct { metrics *metrics bucket objstore.Bucket labels func() labels.Labels - source block.SourceType + source metadata.SourceType } // New creates a new shipper that detects new TSDB blocks in dir and uploads them @@ -80,7 +82,7 @@ func New( dir string, bucket objstore.Bucket, lbls func() labels.Labels, - source block.SourceType, + source metadata.SourceType, ) *Shipper { if logger == nil { logger = log.NewNopLogger() @@ -114,7 +116,7 @@ func (s *Shipper) Timestamps() (minTime, maxSyncTime int64, err error) { minTime = math.MaxInt64 maxSyncTime = math.MinInt64 - if err := s.iterBlockMetas(func(m *block.Meta) error { + if err := s.iterBlockMetas(func(m *metadata.Meta) error { if m.MinTime < minTime { minTime = m.MinTime } @@ -158,7 +160,7 @@ func (s *Shipper) Sync(ctx context.Context) { // TODO(bplotka): If there are no blocks in the system check for WAL dir to ensure we have actually // access to real TSDB dir (!). - if err = s.iterBlockMetas(func(m *block.Meta) error { + if err = s.iterBlockMetas(func(m *metadata.Meta) error { // Do not sync a block if we already uploaded it. If it is no longer found in the bucket, // it was generally removed by the compaction process. if _, ok := hasUploaded[m.ULID]; !ok { @@ -180,7 +182,7 @@ func (s *Shipper) Sync(ctx context.Context) { } } -func (s *Shipper) sync(ctx context.Context, meta *block.Meta) (err error) { +func (s *Shipper) sync(ctx context.Context, meta *metadata.Meta) (err error) { dir := filepath.Join(s.dir, meta.ULID.String()) // We only ship of the first compacted block level. @@ -225,7 +227,7 @@ func (s *Shipper) sync(ctx context.Context, meta *block.Meta) (err error) { meta.Thanos.Labels = lset.Map() } meta.Thanos.Source = s.source - if err := block.WriteMetaFile(s.logger, updir, meta); err != nil { + if err := metadata.Write(s.logger, updir, meta); err != nil { return errors.Wrap(err, "write meta file") } return block.Upload(ctx, s.logger, s.bucket, updir) @@ -234,7 +236,7 @@ func (s *Shipper) sync(ctx context.Context, meta *block.Meta) (err error) { // iterBlockMetas calls f with the block meta for each block found in dir. It logs // an error and continues if it cannot access a meta.json file. // If f returns an error, the function returns with the same error. -func (s *Shipper) iterBlockMetas(f func(m *block.Meta) error) error { +func (s *Shipper) iterBlockMetas(f func(m *metadata.Meta) error) error { names, err := fileutil.ReadDir(s.dir) if err != nil { return errors.Wrap(err, "read dir") @@ -253,7 +255,7 @@ func (s *Shipper) iterBlockMetas(f func(m *block.Meta) error) error { if !fi.IsDir() { continue } - m, err := block.ReadMetaFile(dir) + m, err := metadata.Read(dir) if err != nil { level.Warn(s.logger).Log("msg", "reading meta file failed", "err", err) continue diff --git a/pkg/shipper/shipper_e2e_test.go b/pkg/shipper/shipper_e2e_test.go index 8f0e70ecd6..496d53322f 100644 --- a/pkg/shipper/shipper_e2e_test.go +++ b/pkg/shipper/shipper_e2e_test.go @@ -14,6 +14,7 @@ import ( "github.com/go-kit/kit/log" "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/improbable-eng/thanos/pkg/objstore" "github.com/improbable-eng/thanos/pkg/objstore/objtesting" "github.com/improbable-eng/thanos/pkg/testutil" @@ -32,7 +33,7 @@ func TestShipper_UploadBlocks_e2e(t *testing.T) { }() extLset := labels.FromStrings("prometheus", "prom-1") - shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, block.TestSource) + shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -54,7 +55,7 @@ func TestShipper_UploadBlocks_e2e(t *testing.T) { testutil.Ok(t, os.Mkdir(tmp, 0777)) - meta := block.Meta{ + meta := metadata.Meta{ BlockMeta: tsdb.BlockMeta{ MinTime: timestamp.FromTime(now.Add(time.Duration(i) * time.Hour)), MaxTime: timestamp.FromTime(now.Add((time.Duration(i) * time.Hour) + 1)), @@ -62,7 +63,7 @@ func TestShipper_UploadBlocks_e2e(t *testing.T) { } meta.Version = 1 meta.ULID = id - meta.Thanos.Source = block.TestSource + meta.Thanos.Source = metadata.TestSource metab, err := json.Marshal(&meta) testutil.Ok(t, err) diff --git a/pkg/shipper/shipper_test.go b/pkg/shipper/shipper_test.go index 368c5e92b9..150b5264db 100644 --- a/pkg/shipper/shipper_test.go +++ b/pkg/shipper/shipper_test.go @@ -2,15 +2,13 @@ package shipper import ( "io/ioutil" - "os" - "testing" - "math" - + "os" "path" + "testing" "github.com/go-kit/kit/log" - "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/improbable-eng/thanos/pkg/testutil" "github.com/oklog/ulid" "github.com/prometheus/tsdb" @@ -23,7 +21,7 @@ func TestShipperTimestamps(t *testing.T) { testutil.Ok(t, os.RemoveAll(dir)) }() - s := New(nil, nil, dir, nil, nil, block.TestSource) + s := New(nil, nil, dir, nil, nil, metadata.TestSource) // Missing thanos meta file. _, _, err = s.Timestamps() @@ -41,7 +39,7 @@ func TestShipperTimestamps(t *testing.T) { id1 := ulid.MustNew(1, nil) testutil.Ok(t, os.Mkdir(path.Join(dir, id1.String()), os.ModePerm)) - testutil.Ok(t, block.WriteMetaFile(log.NewNopLogger(), path.Join(dir, id1.String()), &block.Meta{ + testutil.Ok(t, metadata.Write(log.NewNopLogger(), path.Join(dir, id1.String()), &metadata.Meta{ Version: 1, BlockMeta: tsdb.BlockMeta{ ULID: id1, @@ -56,7 +54,7 @@ func TestShipperTimestamps(t *testing.T) { id2 := ulid.MustNew(2, nil) testutil.Ok(t, os.Mkdir(path.Join(dir, id2.String()), os.ModePerm)) - testutil.Ok(t, block.WriteMetaFile(log.NewNopLogger(), path.Join(dir, id2.String()), &block.Meta{ + testutil.Ok(t, metadata.Write(log.NewNopLogger(), path.Join(dir, id2.String()), &metadata.Meta{ Version: 1, BlockMeta: tsdb.BlockMeta{ ULID: id2, diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index e121e9d408..2614bfbdca 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -16,6 +16,8 @@ import ( "sync" "time" + "github.com/improbable-eng/thanos/pkg/block/metadata" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/block" @@ -459,6 +461,7 @@ func (s *bucketSeriesSet) Err() error { return s.err } +// blockSeries return requested series from given index and chunk readers. func (s *BucketStore) blockSeries( ctx context.Context, ulid ulid.ULID, @@ -488,7 +491,6 @@ func (s *BucketStore) blockSeries( } // Get result postings list by resolving the postings tree. - // TODO(bwplotka): Users are seeing panics here, because of lazyPosting being not loaded by preloadPostings. ps, err := index.ExpandPostings(lazyPostings) if err != nil { return nil, stats, errors.Wrap(err, "expand postings") @@ -504,7 +506,8 @@ func (s *BucketStore) blockSeries( } } - // Preload all series index data + // Preload all series index data. + // TODO(bwplotka): Consider not keeping all series in memory all the time. if err := indexr.preloadSeries(ps); err != nil { return nil, stats, errors.Wrap(err, "preload series") } @@ -1001,7 +1004,7 @@ func (s *bucketBlockSet) labelMatchers(matchers ...labels.Matcher) ([]labels.Mat type bucketBlock struct { logger log.Logger bucket objstore.BucketReader - meta *block.Meta + meta *metadata.Meta dir string indexCache *indexCache chunkPool *pool.BytesPool @@ -1065,7 +1068,7 @@ func (b *bucketBlock) loadMeta(ctx context.Context, id ulid.ULID) error { } else if err != nil { return err } - meta, err := block.ReadMetaFile(b.dir) + meta, err := metadata.Read(b.dir) if err != nil { return errors.Wrap(err, "read meta.json") } @@ -1095,19 +1098,15 @@ func (b *bucketBlock) loadIndexCache(ctx context.Context) (err error) { } }() - indexr, err := index.NewFileReader(fn) - if err != nil { - return errors.Wrap(err, "open index reader") - } - defer runutil.CloseWithLogOnErr(b.logger, indexr, "load index cache reader") + // Create index cache adhoc. - if err := block.WriteIndexCache(b.logger, cachefn, indexr); err != nil { + if err := block.WriteIndexCache(b.logger, fn, cachefn); err != nil { return errors.Wrap(err, "write index cache") } b.indexVersion, b.symbols, b.lvals, b.postings, err = block.ReadIndexCache(b.logger, cachefn) if err != nil { - return errors.Wrap(err, "read index cache") + return errors.Wrap(err, "read fresh index cache") } return nil } @@ -1179,15 +1178,22 @@ func newBucketIndexReader(ctx context.Context, logger log.Logger, block *bucketB logger: logger, ctx: ctx, block: block, - dec: &index.Decoder{}, stats: &queryStats{}, cache: cache, loadedSeries: map[uint64][]byte{}, } - r.dec.SetSymbolTable(r.block.symbols) + r.dec = &index.Decoder{LookupSymbol: r.lookupSymbol} return r } +func (r *bucketIndexReader) lookupSymbol(o uint32) (string, error) { + s, ok := r.block.symbols[o] + if !ok { + return "", errors.Errorf("bucketIndexReader: unknown symbol offset %d", o) + } + return s, nil +} + func (r *bucketIndexReader) preloadPostings() error { const maxGapSize = 512 * 1024 @@ -1270,23 +1276,24 @@ func (r *bucketIndexReader) loadPostings(ctx context.Context, postings []*lazyPo return nil } -func (r *bucketIndexReader) preloadSeries(ids []uint64) error { +func (r *bucketIndexReader) preloadSeries(refs []uint64) error { const maxSeriesSize = 64 * 1024 const maxGapSize = 512 * 1024 - var newIDs []uint64 + var newRefs []uint64 - for _, id := range ids { - if b, ok := r.cache.series(r.block.meta.ULID, id); ok { - r.loadedSeries[id] = b + for _, ref := range refs { + if b, ok := r.cache.series(r.block.meta.ULID, ref); ok { + r.loadedSeries[ref] = b continue } - newIDs = append(newIDs, id) + newRefs = append(newRefs, ref) } - ids = newIDs + refs = newRefs - parts := partitionRanges(len(ids), func(i int) (start, end uint64) { - return ids[i], ids[i] + maxSeriesSize + // Combine multiple close byte ranges to not be rate-limited from object storage. + parts := partitionRanges(len(refs), func(i int) (start, end uint64) { + return refs[i], refs[i] + maxSeriesSize }, maxGapSize) var g run.Group @@ -1295,7 +1302,7 @@ func (r *bucketIndexReader) preloadSeries(ids []uint64) error { i, j := p[0], p[1] g.Add(func() error { - return r.loadSeries(ctx, ids[i:j], ids[i], ids[j-1]+maxSeriesSize) + return r.loadSeries(ctx, refs[i:j], refs[i], refs[j-1]+maxSeriesSize) }, func(err error) { if err != nil { cancel() @@ -1305,7 +1312,7 @@ func (r *bucketIndexReader) preloadSeries(ids []uint64) error { return g.Run() } -func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, start, end uint64) error { +func (r *bucketIndexReader) loadSeries(ctx context.Context, refs []uint64, start, end uint64) error { begin := time.Now() b, err := r.block.readIndexRange(ctx, int64(start), int64(end-start)) @@ -1317,12 +1324,12 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, start, defer r.mtx.Unlock() r.stats.seriesFetchCount++ - r.stats.seriesFetched += len(ids) + r.stats.seriesFetched += len(refs) r.stats.seriesFetchDurationSum += time.Since(begin) r.stats.seriesFetchedSizeSum += int(end - start) - for _, id := range ids { - c := b[id-start:] + for _, ref := range refs { + c := b[ref-start:] l, n := binary.Uvarint(c) if n < 1 { @@ -1332,8 +1339,8 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, start, return errors.Errorf("invalid remaining size %d, expected %d", len(c), n+int(l)) } c = c[n : n+int(l)] - r.loadedSeries[id] = c - r.cache.setSeries(r.block.meta.ULID, id, c) + r.loadedSeries[ref] = c + r.cache.setSeries(r.block.meta.ULID, ref, c) } return nil } @@ -1421,6 +1428,7 @@ func (r *bucketIndexReader) SortedPostings(p index.Postings) index.Postings { // Series populates the given labels and chunk metas for the series identified // by the reference. // Returns ErrNotFound if the ref does not resolve to a known series. +// prealoadSeries needs to be invoked first to have this method return loaded results. func (r *bucketIndexReader) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error { b, ok := r.loadedSeries[ref] if !ok { @@ -1438,6 +1446,11 @@ func (r *bucketIndexReader) LabelIndices() ([][]string, error) { return nil, errors.New("not implemented") } +// LabelNames returns all the unique label names present in the index in sorted order. +func (r *bucketIndexReader) LabelNames() ([]string, error) { + return nil, errors.New("not implemented") +} + // Close released the underlying resources of the reader. func (r *bucketIndexReader) Close() error { r.block.pendingReaders.Done() diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index b5bbd6692a..9b0d536f78 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -10,6 +10,7 @@ import ( "github.com/go-kit/kit/log" "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/improbable-eng/thanos/pkg/objstore" "github.com/improbable-eng/thanos/pkg/objstore/objtesting" "github.com/improbable-eng/thanos/pkg/runutil" @@ -66,10 +67,10 @@ func TestBucketStore_e2e(t *testing.T) { dir1, dir2 := filepath.Join(dir, id1.String()), filepath.Join(dir, id2.String()) // Add labels to the meta of the second block. - meta, err := block.ReadMetaFile(dir2) + meta, err := metadata.Read(dir2) testutil.Ok(t, err) meta.Thanos.Labels = map[string]string{"ext2": "value2"} - testutil.Ok(t, block.WriteMetaFile(log.NewNopLogger(), dir2, meta)) + testutil.Ok(t, metadata.Write(log.NewNopLogger(), dir2, meta)) testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, dir1)) testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, dir2)) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index fa23dfc6f8..ef531b81a5 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -4,13 +4,11 @@ import ( "testing" "time" - "github.com/oklog/ulid" - - "github.com/improbable-eng/thanos/pkg/compact/downsample" - "github.com/fortytw2/leaktest" - "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/metadata" + "github.com/improbable-eng/thanos/pkg/compact/downsample" "github.com/improbable-eng/thanos/pkg/testutil" + "github.com/oklog/ulid" "github.com/prometheus/tsdb/labels" ) @@ -41,7 +39,7 @@ func TestBucketBlockSet_addGet(t *testing.T) { } for _, in := range input { - var m block.Meta + var m metadata.Meta m.Thanos.Downsample.Resolution = in.window m.MinTime = in.mint m.MaxTime = in.maxt @@ -102,7 +100,7 @@ func TestBucketBlockSet_addGet(t *testing.T) { var exp []*bucketBlock for _, b := range c.res { - var m block.Meta + var m metadata.Meta m.Thanos.Downsample.Resolution = b.window m.MinTime = b.mint m.MaxTime = b.maxt @@ -129,7 +127,7 @@ func TestBucketBlockSet_remove(t *testing.T) { } for _, in := range input { - var m block.Meta + var m metadata.Meta m.ULID = in.id m.MinTime = in.mint m.MaxTime = in.maxt diff --git a/pkg/testutil/prometheus.go b/pkg/testutil/prometheus.go index 807b61f8a0..7cf869f4d5 100644 --- a/pkg/testutil/prometheus.go +++ b/pkg/testutil/prometheus.go @@ -12,8 +12,9 @@ import ( "syscall" "time" + "github.com/improbable-eng/thanos/pkg/block/metadata" + "github.com/go-kit/kit/log" - "github.com/improbable-eng/thanos/pkg/block" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -157,7 +158,7 @@ func (p *Prometheus) SetConfig(s string) (err error) { // Stop terminates Prometheus and clean up its data directory. func (p *Prometheus) Stop() error { if err := p.cmd.Process.Signal(syscall.SIGTERM); err != nil { - return errors.Wrapf(err, "failed to Prometheus. Kill it manually and cleanr %s dir", p.db.Dir()) + return errors.Wrapf(err, "failed to Prometheus. Kill it manually and clean %s dir", p.db.Dir()) } time.Sleep(time.Second / 2) @@ -188,7 +189,7 @@ func CreateBlock( extLset labels.Labels, resolution int64, ) (id ulid.ULID, err error) { - h, err := tsdb.NewHead(nil, nil, tsdb.NopWAL(), 10000000000) + h, err := tsdb.NewHead(nil, nil, nil, 10000000000) if err != nil { return id, errors.Wrap(err, "create head block") } @@ -238,15 +239,15 @@ func CreateBlock( return id, errors.Wrap(err, "create compactor") } - id, err = c.Write(dir, h, mint, maxt) + id, err = c.Write(dir, h, mint, maxt, nil) if err != nil { return id, errors.Wrap(err, "write block") } - if _, err = block.InjectThanosMeta(log.NewNopLogger(), filepath.Join(dir, id.String()), block.ThanosMeta{ + if _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(dir, id.String()), metadata.Thanos{ Labels: extLset.Map(), - Downsample: block.ThanosDownsampleMeta{Resolution: resolution}, - Source: block.TestSource, + Downsample: metadata.ThanosDownsample{Resolution: resolution}, + Source: metadata.TestSource, }, nil); err != nil { return id, errors.Wrap(err, "finalize block") } @@ -256,4 +257,4 @@ func CreateBlock( } return id, nil -} +} \ No newline at end of file diff --git a/pkg/verifier/index_issue.go b/pkg/verifier/index_issue.go index 54a20703d4..72100b15bc 100644 --- a/pkg/verifier/index_issue.go +++ b/pkg/verifier/index_issue.go @@ -8,6 +8,8 @@ import ( "path" "path/filepath" + "github.com/improbable-eng/thanos/pkg/block/metadata" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/block" @@ -94,7 +96,7 @@ func IndexIssue(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bac logger, tmpdir, id, - block.BucketRepairSource, + metadata.BucketRepairSource, block.IgnoreCompleteOutsideChunk, block.IgnoreDuplicateOutsideChunk, block.IgnoreIssue347OutsideChunk,