Skip to content

Commit

Permalink
Update mimir-prometheus (grafana#7293)
Browse files Browse the repository at this point in the history
* Update mimir-prometheus

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Do not shard histogram_avg()

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Re-vendored

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Fix TestBucketStore_Series_ShouldQueryBlockWithOutOfOrderChunks

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Fix TestGroupCompactE2E

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Fixed TestMultitenantCompactor_OutOfOrderCompaction

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Reworked TestBucketStore_Series_ShouldQueryBlockWithOutOfOrderChunks

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Simplify TestBucketStore_Series_ShouldQueryBlockWithOutOfOrderChunks

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Use filepath.Join() instead of hardcoding path separator

Signed-off-by: Marco Pracucci <marco@pracucci.com>

---------

Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci authored and beatkind committed Feb 13, 2024
1 parent 6e3ff83 commit 14d241a
Show file tree
Hide file tree
Showing 31 changed files with 643 additions and 402 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
* [FEATURE] Querier / query-frontend: added `-querier.promql-experimental-functions-enabled` CLI flag (and respective YAML config option) to enable experimental PromQL functions. The experimental functions introduced are: `mad_over_time()`, `sort_by_label()` and `sort_by_label_desc()`. #7057
* [FEATURE] Alertmanager API: added `-alertmanager.grafana-alertmanager-compatibility-enabled` CLI flag (and respective YAML config option) to enable an experimental API endpoints that support the migration of the Grafana Alertmanager. #7057
* [FEATURE] Alertmanager: Added `-alertmanager.utf8-strict-mode-enabled` to control support for any UTF-8 character as part of Alertmanager configuration/API matchers and labels. It's default value is set to `false`. #6898
* [FEATURE] Querier: added `histogram_avg()` function support to PromQL. #7293
* [ENHANCEMENT] Store-gateway: add no-compact details column on store-gateway tenants admin UI. #6848
* [ENHANCEMENT] PromQL: ignore small errors for bucketQuantile #6766
* [ENHANCEMENT] Distributor: improve efficiency of some errors #6785
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ require (
)

// Using a fork of Prometheus with Mimir-specific changes.
replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240129161136-6383c86b1da4
replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240205112357-84eae046431c

// Replace memberlist with our fork which includes some fixes that haven't been
// merged upstream yet:
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -552,8 +552,8 @@ github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586 h1:/of8Z8taCPft
github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/grafana/mimir-prometheus v0.0.0-20240129161136-6383c86b1da4 h1:cU5K0ITFhJiSxDaDn8qvjCmd0xsDPDtizz57ZU5kaeM=
github.com/grafana/mimir-prometheus v0.0.0-20240129161136-6383c86b1da4/go.mod h1:IMaHPfxuCuOjlEoyUE0AG7FGZLEwCg6WHXHLNQwPrJQ=
github.com/grafana/mimir-prometheus v0.0.0-20240205112357-84eae046431c h1:jrkiLy8SjrLKbCeF1SCq6bKfdM2bR5/1hlnx+wwxAPQ=
github.com/grafana/mimir-prometheus v0.0.0-20240205112357-84eae046431c/go.mod h1:IMaHPfxuCuOjlEoyUE0AG7FGZLEwCg6WHXHLNQwPrJQ=
github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956 h1:em1oddjXL8c1tL0iFdtVtPloq2hRPen2MJQKoAWpxu0=
github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956/go.mod h1:qtI1ogk+2JhVPIXVc6q+NHziSmy2W5GbdQZFUHADCBU=
github.com/grafana/pyroscope-go/godeltaprof v0.1.6 h1:nEdZ8louGAplSvIJi1HVp7kWvFvdiiYg3COLlTwJiFo=
Expand Down
132 changes: 4 additions & 128 deletions pkg/compactor/bucket_compactor_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,12 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/providers/filesystem"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"

"github.com/grafana/mimir/pkg/storage/tsdb/block"
Expand Down Expand Up @@ -333,26 +329,16 @@ func TestGroupCompactE2E(t *testing.T) {
labels.FromStrings("a", "7"),
},
},
}, []blockgenSpec{
{
numSamples: 100, mint: 0, maxt: 499, extLset: extLabels, res: 124,
series: []labels.Labels{
labels.FromStrings("a", "1"),
labels.FromStrings("a", "1", "b", "2"),
labels.FromStrings("a", "3"),
labels.FromStrings("a", "4"),
},
},
})

require.NoError(t, bComp.Compact(ctx, 0), 0)
assert.Equal(t, 5.0, promtest.ToFloat64(sy.metrics.blocksMarkedForDeletion))
assert.Equal(t, 1.0, promtest.ToFloat64(metrics.blocksMarkedForNoCompact.WithLabelValues(block.OutOfOrderChunksNoCompactReason)))
assert.Equal(t, 0.0, promtest.ToFloat64(metrics.blocksMarkedForNoCompact.WithLabelValues(block.OutOfOrderChunksNoCompactReason)))
assert.Equal(t, 0.0, promtest.ToFloat64(sy.metrics.garbageCollectionFailures))
assert.Equal(t, 2.0, promtest.ToFloat64(metrics.groupCompactions))
assert.Equal(t, 3.0, promtest.ToFloat64(metrics.groupCompactionRunsStarted))
assert.Equal(t, 2.0, promtest.ToFloat64(metrics.groupCompactionRunsStarted))
assert.Equal(t, 2.0, promtest.ToFloat64(metrics.groupCompactionRunsCompleted))
assert.Equal(t, 1.0, promtest.ToFloat64(metrics.groupCompactionRunsFailed))
assert.Equal(t, 0.0, promtest.ToFloat64(metrics.groupCompactionRunsFailed))

_, err = os.Stat(dir)
assert.True(t, os.IsNotExist(err), "dir %s should be remove after compaction.", dir)
Expand All @@ -364,7 +350,6 @@ func TestGroupCompactE2E(t *testing.T) {
metas[4].ULID: false,
metas[5].ULID: false,
metas[8].ULID: false,
metas[9].ULID: false,
}
others := map[string]block.Meta{}
require.NoError(t, bkt.Iter(ctx, "", func(n string) error {
Expand Down Expand Up @@ -436,7 +421,7 @@ type blockgenSpec struct {
res int64
}

func createAndUpload(t testing.TB, bkt objstore.Bucket, blocks []blockgenSpec, blocksWithOutOfOrderChunks []blockgenSpec) (metas []*block.Meta) {
func createAndUpload(t testing.TB, bkt objstore.Bucket, blocks []blockgenSpec) (metas []*block.Meta) {
prepareDir := t.TempDir()

ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
Expand All @@ -447,15 +432,6 @@ func createAndUpload(t testing.TB, bkt objstore.Bucket, blocks []blockgenSpec, b
metas = append(metas, meta)
require.NoError(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(prepareDir, id.String()), nil))
}
for _, b := range blocksWithOutOfOrderChunks {
id, meta := createBlock(ctx, t, prepareDir, b)

err := putOutOfOrderIndex(filepath.Join(prepareDir, id.String()), b.mint, b.maxt)
require.NoError(t, err)

metas = append(metas, meta)
require.NoError(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(prepareDir, id.String()), nil))
}

return metas
}
Expand Down Expand Up @@ -747,103 +723,3 @@ func createBlockWithOptions(

return id, nil
}

var indexFilename = "index"

type indexWriterSeries struct {
labels labels.Labels
chunks []chunks.Meta // series file offset of chunks
}

type indexWriterSeriesSlice []*indexWriterSeries

// putOutOfOrderIndex updates the index in blockDir with an index containing an out-of-order chunk
// copied from https://github.com/prometheus/prometheus/blob/b1ed4a0a663d0c62526312311c7529471abbc565/tsdb/index/index_test.go#L346
func putOutOfOrderIndex(blockDir string, minTime int64, maxTime int64) error {

if minTime >= maxTime || minTime+4 >= maxTime {
return fmt.Errorf("minTime must be at least 4 less than maxTime to not create overlapping chunks")
}

lbls := []labels.Labels{
labels.FromStrings("lbl1", "1"),
}

// Sort labels as the index writer expects series in sorted order.
sort.Sort(labels.Slice(lbls))

symbols := map[string]struct{}{}
for _, lset := range lbls {
lset.Range(func(l labels.Label) {
symbols[l.Name] = struct{}{}
symbols[l.Value] = struct{}{}
})
}

var input indexWriterSeriesSlice

// Generate ChunkMetas for every label set.
for _, lset := range lbls {
var metas []chunks.Meta
// only need two chunks that are out-of-order
chk1 := chunks.Meta{
MinTime: maxTime - 2,
MaxTime: maxTime - 1,
Ref: chunks.ChunkRef(rand.Uint64()),
Chunk: chunkenc.NewXORChunk(),
}
metas = append(metas, chk1)
chk2 := chunks.Meta{
MinTime: minTime + 1,
MaxTime: minTime + 2,
Ref: chunks.ChunkRef(rand.Uint64()),
Chunk: chunkenc.NewXORChunk(),
}
metas = append(metas, chk2)

input = append(input, &indexWriterSeries{
labels: lset,
chunks: metas,
})
}

iw, err := index.NewWriter(context.Background(), filepath.Join(blockDir, indexFilename))
if err != nil {
return err
}

syms := []string{}
for s := range symbols {
syms = append(syms, s)
}
slices.Sort(syms)
for _, s := range syms {
if err := iw.AddSymbol(s); err != nil {
return err
}
}

// Population procedure as done by compaction.
var (
postings = index.NewMemPostings()
values = map[string]map[string]struct{}{}
)

for i, s := range input {
if err := iw.AddSeries(storage.SeriesRef(i), s.labels, s.chunks...); err != nil {
return err
}

s.labels.Range(func(l labels.Label) {
valset, ok := values[l.Name]
if !ok {
valset = map[string]struct{}{}
values[l.Name] = valset
}
valset[l.Value] = struct{}{}
})
postings.Add(storage.SeriesRef(i), s.labels)
}

return iw.Close()
}
83 changes: 60 additions & 23 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2181,57 +2181,94 @@ func stopServiceFn(t *testing.T, serv services.Service) func() {
}

func TestMultitenantCompactor_OutOfOrderCompaction(t *testing.T) {
// Generate a single block with out of order chunks.
specs := []*block.SeriesSpec{
{
Labels: labels.FromStrings("case", "out_of_order"),
Chunks: []chunks.Meta{
must(chunks.ChunkFromSamples([]chunks.Sample{newSample(20, 20, nil, nil), newSample(21, 21, nil, nil)})),
must(chunks.ChunkFromSamples([]chunks.Sample{newSample(10, 10, nil, nil), newSample(11, 11, nil, nil)})),
// Extend block to cover 2h.
must(chunks.ChunkFromSamples([]chunks.Sample{newSample(0, 0, nil, nil), newSample(2*time.Hour.Milliseconds()-1, 0, nil, nil)})),
const user = "user"

var (
ctx = context.Background()
storageDir = t.TempDir()
fixtureDir = filepath.Join("fixtures", "test-ooo-compaction")
)

// Utility function originally used to generate a block with out of order chunks
// used by this test. The block has been generated commenting out the checks done
// by TSDB block Writer to prevent OOO chunks writing.
_ = func() {
specs := []*block.SeriesSpec{
{
Labels: labels.FromStrings("case", "out_of_order"),
Chunks: []chunks.Meta{
must(chunks.ChunkFromSamples([]chunks.Sample{newSample(20, 20, nil, nil), newSample(21, 21, nil, nil)})),
must(chunks.ChunkFromSamples([]chunks.Sample{newSample(10, 10, nil, nil), newSample(11, 11, nil, nil)})),
// Extend block to cover 2h.
must(chunks.ChunkFromSamples([]chunks.Sample{newSample(0, 0, nil, nil), newSample(2*time.Hour.Milliseconds()-1, 0, nil, nil)})),
},
},
},
}
}

const user = "user"
_, err := block.GenerateBlockFromSpec(fixtureDir, specs)
require.NoError(t, err)

storageDir := t.TempDir()
// We need two blocks to start compaction.
meta1, err := block.GenerateBlockFromSpec(filepath.Join(storageDir, user), specs)
require.NoError(t, err)
meta2, err := block.GenerateBlockFromSpec(filepath.Join(storageDir, user), specs)
require.NoError(t, err)
_, err = block.GenerateBlockFromSpec(fixtureDir, specs)
require.NoError(t, err)
}

bkt, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
require.NoError(t, err)
userBkt := bucket.NewUserBucketClient(user, bkt, nil)

// Copy blocks from fixtures dir to the test bucket.
var metas []*block.Meta

entries, err := os.ReadDir(fixtureDir)
require.NoError(t, err)

for _, entry := range entries {
if !entry.IsDir() {
continue
}

blockDir := filepath.Join(fixtureDir, entry.Name())

blockID, err := ulid.Parse(entry.Name())
require.NoErrorf(t, err, "parsing block ID from directory name %q", entry.Name())

meta, err := block.ReadMetaFromDir(blockDir)
require.NoErrorf(t, err, "reading meta from block at &s", blockDir)

require.NoError(t, block.Upload(ctx, log.NewNopLogger(), userBkt, filepath.Join(fixtureDir, blockID.String()), meta))

metas = append(metas, meta)
}

// We expect 2 blocks have been copied.
require.Len(t, metas, 2)

cfg := prepareConfig(t)
c, _, tsdbPlanner, logs, registry := prepare(t, cfg, bkt)

tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*block.Meta{meta1, meta2}, nil)
tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return(metas, nil)

// Start the compactor
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
require.NoError(t, services.StartAndAwaitRunning(ctx, c))

// Wait until a compaction run has been completed.
test.Poll(t, 10*time.Second, 1.0, func() interface{} {
return prom_testutil.ToFloat64(c.compactionRunsCompleted)
})

// Stop the compactor.
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))
require.NoError(t, services.StopAndAwaitTerminated(ctx, c))

// Verify that compactor has found block with out of order chunks, and this block is now marked for no-compaction.
r := regexp.MustCompile("level=info component=compactor user=user msg=\"block has been marked for no compaction\" block=([0-9A-Z]+)")
matches := r.FindStringSubmatch(logs.String())
require.Len(t, matches, 2) // Entire string match + single group match.

skippedBlock := matches[1]
require.True(t, skippedBlock == meta1.ULID.String() || skippedBlock == meta2.ULID.String())
require.True(t, skippedBlock == metas[0].ULID.String() || skippedBlock == metas[1].ULID.String())

m := &block.NoCompactMark{}
require.NoError(t, block.ReadMarker(context.Background(), log.NewNopLogger(), objstore.WithNoopInstr(bkt), path.Join(user, skippedBlock), m))
require.NoError(t, block.ReadMarker(ctx, log.NewNopLogger(), objstore.WithNoopInstr(bkt), path.Join(user, skippedBlock), m))
require.Equal(t, skippedBlock, m.ID.String())
require.NotZero(t, m.NoCompactTime)
require.Equal(t, block.NoCompactReason(block.OutOfOrderChunksNoCompactReason), m.Reason)
Expand Down
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"ulid": "01HNWYKRP02T1EW2AXEZXG12QA",
"minTime": 0,
"maxTime": 7200000,
"stats": {},
"compaction": {
"level": 1,
"sources": [
"01HNWYKRP02T1EW2AXEZXG12QA"
]
},
"version": 1,
"out_of_order": false,
"thanos": {
"version": 1,
"labels": null,
"downsample": {
"resolution": 0
},
"source": ""
}
}
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"ulid": "01HNWYKS6MZWPPV0Q3FNEK4GHZ",
"minTime": 0,
"maxTime": 7200000,
"stats": {},
"compaction": {
"level": 1,
"sources": [
"01HNWYKS6MZWPPV0Q3FNEK4GHZ"
]
},
"version": 1,
"out_of_order": false,
"thanos": {
"version": 1,
"labels": null,
"downsample": {
"resolution": 0
},
"source": ""
}
}
3 changes: 3 additions & 0 deletions pkg/frontend/querymiddleware/astmapper/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ var NonParallelFuncs = []string{
"sort",
"time",
"vector",

// The following function may be parallelized using a strategy similar to avg().
"histogram_avg",
}

// FuncsWithDefaultTimeArg is the list of functions that extract date information from a variadic list of params,
Expand Down
Loading

0 comments on commit 14d241a

Please sign in to comment.