Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update mimir-prometheus #7293

Merged
merged 9 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
* [FEATURE] Querier / query-frontend: added `-querier.promql-experimental-functions-enabled` CLI flag (and respective YAML config option) to enable experimental PromQL functions. The experimental functions introduced are: `mad_over_time()`, `sort_by_label()` and `sort_by_label_desc()`. #7057
* [FEATURE] Alertmanager API: added `-alertmanager.grafana-alertmanager-compatibility-enabled` CLI flag (and respective YAML config option) to enable an experimental API endpoints that support the migration of the Grafana Alertmanager. #7057
* [FEATURE] Alertmanager: Added `-alertmanager.utf8-strict-mode-enabled` to control support for any UTF-8 character as part of Alertmanager configuration/API matchers and labels. It's default value is set to `false`. #6898
* [FEATURE] Querier: added `histogram_avg()` function support to PromQL. #7293
* [ENHANCEMENT] Store-gateway: add no-compact details column on store-gateway tenants admin UI. #6848
* [ENHANCEMENT] PromQL: ignore small errors for bucketQuantile #6766
* [ENHANCEMENT] Distributor: improve efficiency of some errors #6785
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ require (
)

// Using a fork of Prometheus with Mimir-specific changes.
replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240129161136-6383c86b1da4
replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240205112357-84eae046431c

// Replace memberlist with our fork which includes some fixes that haven't been
// merged upstream yet:
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -552,8 +552,8 @@ github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586 h1:/of8Z8taCPft
github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/grafana/mimir-prometheus v0.0.0-20240129161136-6383c86b1da4 h1:cU5K0ITFhJiSxDaDn8qvjCmd0xsDPDtizz57ZU5kaeM=
github.com/grafana/mimir-prometheus v0.0.0-20240129161136-6383c86b1da4/go.mod h1:IMaHPfxuCuOjlEoyUE0AG7FGZLEwCg6WHXHLNQwPrJQ=
github.com/grafana/mimir-prometheus v0.0.0-20240205112357-84eae046431c h1:jrkiLy8SjrLKbCeF1SCq6bKfdM2bR5/1hlnx+wwxAPQ=
github.com/grafana/mimir-prometheus v0.0.0-20240205112357-84eae046431c/go.mod h1:IMaHPfxuCuOjlEoyUE0AG7FGZLEwCg6WHXHLNQwPrJQ=
github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956 h1:em1oddjXL8c1tL0iFdtVtPloq2hRPen2MJQKoAWpxu0=
github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956/go.mod h1:qtI1ogk+2JhVPIXVc6q+NHziSmy2W5GbdQZFUHADCBU=
github.com/grafana/pyroscope-go/godeltaprof v0.1.6 h1:nEdZ8louGAplSvIJi1HVp7kWvFvdiiYg3COLlTwJiFo=
Expand Down
132 changes: 4 additions & 128 deletions pkg/compactor/bucket_compactor_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,12 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/providers/filesystem"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"

"github.com/grafana/mimir/pkg/storage/tsdb/block"
Expand Down Expand Up @@ -333,26 +329,16 @@ func TestGroupCompactE2E(t *testing.T) {
labels.FromStrings("a", "7"),
},
},
}, []blockgenSpec{
{
numSamples: 100, mint: 0, maxt: 499, extLset: extLabels, res: 124,
series: []labels.Labels{
labels.FromStrings("a", "1"),
labels.FromStrings("a", "1", "b", "2"),
labels.FromStrings("a", "3"),
labels.FromStrings("a", "4"),
},
},
})

require.NoError(t, bComp.Compact(ctx, 0), 0)
assert.Equal(t, 5.0, promtest.ToFloat64(sy.metrics.blocksMarkedForDeletion))
assert.Equal(t, 1.0, promtest.ToFloat64(metrics.blocksMarkedForNoCompact.WithLabelValues(block.OutOfOrderChunksNoCompactReason)))
assert.Equal(t, 0.0, promtest.ToFloat64(metrics.blocksMarkedForNoCompact.WithLabelValues(block.OutOfOrderChunksNoCompactReason)))
assert.Equal(t, 0.0, promtest.ToFloat64(sy.metrics.garbageCollectionFailures))
assert.Equal(t, 2.0, promtest.ToFloat64(metrics.groupCompactions))
assert.Equal(t, 3.0, promtest.ToFloat64(metrics.groupCompactionRunsStarted))
assert.Equal(t, 2.0, promtest.ToFloat64(metrics.groupCompactionRunsStarted))
assert.Equal(t, 2.0, promtest.ToFloat64(metrics.groupCompactionRunsCompleted))
assert.Equal(t, 1.0, promtest.ToFloat64(metrics.groupCompactionRunsFailed))
assert.Equal(t, 0.0, promtest.ToFloat64(metrics.groupCompactionRunsFailed))

_, err = os.Stat(dir)
assert.True(t, os.IsNotExist(err), "dir %s should be remove after compaction.", dir)
Expand All @@ -364,7 +350,6 @@ func TestGroupCompactE2E(t *testing.T) {
metas[4].ULID: false,
metas[5].ULID: false,
metas[8].ULID: false,
metas[9].ULID: false,
}
others := map[string]block.Meta{}
require.NoError(t, bkt.Iter(ctx, "", func(n string) error {
Expand Down Expand Up @@ -436,7 +421,7 @@ type blockgenSpec struct {
res int64
}

func createAndUpload(t testing.TB, bkt objstore.Bucket, blocks []blockgenSpec, blocksWithOutOfOrderChunks []blockgenSpec) (metas []*block.Meta) {
func createAndUpload(t testing.TB, bkt objstore.Bucket, blocks []blockgenSpec) (metas []*block.Meta) {
prepareDir := t.TempDir()

ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
Expand All @@ -447,15 +432,6 @@ func createAndUpload(t testing.TB, bkt objstore.Bucket, blocks []blockgenSpec, b
metas = append(metas, meta)
require.NoError(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(prepareDir, id.String()), nil))
}
for _, b := range blocksWithOutOfOrderChunks {
id, meta := createBlock(ctx, t, prepareDir, b)

err := putOutOfOrderIndex(filepath.Join(prepareDir, id.String()), b.mint, b.maxt)
require.NoError(t, err)

metas = append(metas, meta)
require.NoError(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(prepareDir, id.String()), nil))
}

return metas
}
Expand Down Expand Up @@ -747,103 +723,3 @@ func createBlockWithOptions(

return id, nil
}

var indexFilename = "index"

type indexWriterSeries struct {
labels labels.Labels
chunks []chunks.Meta // series file offset of chunks
}

type indexWriterSeriesSlice []*indexWriterSeries

// putOutOfOrderIndex updates the index in blockDir with an index containing an out-of-order chunk
// copied from https://github.com/prometheus/prometheus/blob/b1ed4a0a663d0c62526312311c7529471abbc565/tsdb/index/index_test.go#L346
func putOutOfOrderIndex(blockDir string, minTime int64, maxTime int64) error {

if minTime >= maxTime || minTime+4 >= maxTime {
return fmt.Errorf("minTime must be at least 4 less than maxTime to not create overlapping chunks")
}

lbls := []labels.Labels{
labels.FromStrings("lbl1", "1"),
}

// Sort labels as the index writer expects series in sorted order.
sort.Sort(labels.Slice(lbls))

symbols := map[string]struct{}{}
for _, lset := range lbls {
lset.Range(func(l labels.Label) {
symbols[l.Name] = struct{}{}
symbols[l.Value] = struct{}{}
})
}

var input indexWriterSeriesSlice

// Generate ChunkMetas for every label set.
for _, lset := range lbls {
var metas []chunks.Meta
// only need two chunks that are out-of-order
chk1 := chunks.Meta{
MinTime: maxTime - 2,
MaxTime: maxTime - 1,
Ref: chunks.ChunkRef(rand.Uint64()),
Chunk: chunkenc.NewXORChunk(),
}
metas = append(metas, chk1)
chk2 := chunks.Meta{
MinTime: minTime + 1,
MaxTime: minTime + 2,
Ref: chunks.ChunkRef(rand.Uint64()),
Chunk: chunkenc.NewXORChunk(),
}
metas = append(metas, chk2)

input = append(input, &indexWriterSeries{
labels: lset,
chunks: metas,
})
}

iw, err := index.NewWriter(context.Background(), filepath.Join(blockDir, indexFilename))
if err != nil {
return err
}

syms := []string{}
for s := range symbols {
syms = append(syms, s)
}
slices.Sort(syms)
for _, s := range syms {
if err := iw.AddSymbol(s); err != nil {
return err
}
}

// Population procedure as done by compaction.
var (
postings = index.NewMemPostings()
values = map[string]map[string]struct{}{}
)

for i, s := range input {
if err := iw.AddSeries(storage.SeriesRef(i), s.labels, s.chunks...); err != nil {
return err
}

s.labels.Range(func(l labels.Label) {
valset, ok := values[l.Name]
if !ok {
valset = map[string]struct{}{}
values[l.Name] = valset
}
valset[l.Value] = struct{}{}
})
postings.Add(storage.SeriesRef(i), s.labels)
}

return iw.Close()
}
83 changes: 60 additions & 23 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2181,57 +2181,94 @@ func stopServiceFn(t *testing.T, serv services.Service) func() {
}

func TestMultitenantCompactor_OutOfOrderCompaction(t *testing.T) {
// Generate a single block with out of order chunks.
specs := []*block.SeriesSpec{
{
Labels: labels.FromStrings("case", "out_of_order"),
Chunks: []chunks.Meta{
must(chunks.ChunkFromSamples([]chunks.Sample{newSample(20, 20, nil, nil), newSample(21, 21, nil, nil)})),
must(chunks.ChunkFromSamples([]chunks.Sample{newSample(10, 10, nil, nil), newSample(11, 11, nil, nil)})),
// Extend block to cover 2h.
must(chunks.ChunkFromSamples([]chunks.Sample{newSample(0, 0, nil, nil), newSample(2*time.Hour.Milliseconds()-1, 0, nil, nil)})),
const user = "user"

var (
ctx = context.Background()
storageDir = t.TempDir()
fixtureDir = filepath.Join("fixtures", "test-ooo-compaction")
)

// Utility function originally used to generate a block with out of order chunks
// used by this test. The block has been generated commenting out the checks done
// by TSDB block Writer to prevent OOO chunks writing.
_ = func() {
specs := []*block.SeriesSpec{
{
Labels: labels.FromStrings("case", "out_of_order"),
Chunks: []chunks.Meta{
must(chunks.ChunkFromSamples([]chunks.Sample{newSample(20, 20, nil, nil), newSample(21, 21, nil, nil)})),
must(chunks.ChunkFromSamples([]chunks.Sample{newSample(10, 10, nil, nil), newSample(11, 11, nil, nil)})),
// Extend block to cover 2h.
must(chunks.ChunkFromSamples([]chunks.Sample{newSample(0, 0, nil, nil), newSample(2*time.Hour.Milliseconds()-1, 0, nil, nil)})),
},
},
},
}
}

const user = "user"
_, err := block.GenerateBlockFromSpec(fixtureDir, specs)
require.NoError(t, err)

storageDir := t.TempDir()
// We need two blocks to start compaction.
meta1, err := block.GenerateBlockFromSpec(filepath.Join(storageDir, user), specs)
require.NoError(t, err)
meta2, err := block.GenerateBlockFromSpec(filepath.Join(storageDir, user), specs)
require.NoError(t, err)
_, err = block.GenerateBlockFromSpec(fixtureDir, specs)
require.NoError(t, err)
}

bkt, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
require.NoError(t, err)
userBkt := bucket.NewUserBucketClient(user, bkt, nil)

// Copy blocks from fixtures dir to the test bucket.
var metas []*block.Meta

entries, err := os.ReadDir(fixtureDir)
require.NoError(t, err)

for _, entry := range entries {
if !entry.IsDir() {
continue
}

blockDir := filepath.Join(fixtureDir, entry.Name())

blockID, err := ulid.Parse(entry.Name())
require.NoErrorf(t, err, "parsing block ID from directory name %q", entry.Name())

meta, err := block.ReadMetaFromDir(blockDir)
require.NoErrorf(t, err, "reading meta from block at &s", blockDir)

require.NoError(t, block.Upload(ctx, log.NewNopLogger(), userBkt, filepath.Join(fixtureDir, blockID.String()), meta))

metas = append(metas, meta)
}

// We expect 2 blocks have been copied.
require.Len(t, metas, 2)

cfg := prepareConfig(t)
c, _, tsdbPlanner, logs, registry := prepare(t, cfg, bkt)

tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*block.Meta{meta1, meta2}, nil)
tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return(metas, nil)

// Start the compactor
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
require.NoError(t, services.StartAndAwaitRunning(ctx, c))

// Wait until a compaction run has been completed.
test.Poll(t, 10*time.Second, 1.0, func() interface{} {
return prom_testutil.ToFloat64(c.compactionRunsCompleted)
})

// Stop the compactor.
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))
require.NoError(t, services.StopAndAwaitTerminated(ctx, c))

// Verify that compactor has found block with out of order chunks, and this block is now marked for no-compaction.
r := regexp.MustCompile("level=info component=compactor user=user msg=\"block has been marked for no compaction\" block=([0-9A-Z]+)")
matches := r.FindStringSubmatch(logs.String())
require.Len(t, matches, 2) // Entire string match + single group match.

skippedBlock := matches[1]
require.True(t, skippedBlock == meta1.ULID.String() || skippedBlock == meta2.ULID.String())
require.True(t, skippedBlock == metas[0].ULID.String() || skippedBlock == metas[1].ULID.String())

m := &block.NoCompactMark{}
require.NoError(t, block.ReadMarker(context.Background(), log.NewNopLogger(), objstore.WithNoopInstr(bkt), path.Join(user, skippedBlock), m))
require.NoError(t, block.ReadMarker(ctx, log.NewNopLogger(), objstore.WithNoopInstr(bkt), path.Join(user, skippedBlock), m))
require.Equal(t, skippedBlock, m.ID.String())
require.NotZero(t, m.NoCompactTime)
require.Equal(t, block.NoCompactReason(block.OutOfOrderChunksNoCompactReason), m.Reason)
Expand Down
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"ulid": "01HNWYKRP02T1EW2AXEZXG12QA",
"minTime": 0,
"maxTime": 7200000,
"stats": {},
"compaction": {
"level": 1,
"sources": [
"01HNWYKRP02T1EW2AXEZXG12QA"
]
},
"version": 1,
"out_of_order": false,
"thanos": {
"version": 1,
"labels": null,
"downsample": {
"resolution": 0
},
"source": ""
}
}
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"ulid": "01HNWYKS6MZWPPV0Q3FNEK4GHZ",
"minTime": 0,
"maxTime": 7200000,
"stats": {},
"compaction": {
"level": 1,
"sources": [
"01HNWYKS6MZWPPV0Q3FNEK4GHZ"
]
},
"version": 1,
"out_of_order": false,
"thanos": {
"version": 1,
"labels": null,
"downsample": {
"resolution": 0
},
"source": ""
}
}
3 changes: 3 additions & 0 deletions pkg/frontend/querymiddleware/astmapper/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ var NonParallelFuncs = []string{
"sort",
"time",
"vector",

// The following function may be parallelized using a strategy similar to avg().
"histogram_avg",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added task: #7307

}

// FuncsWithDefaultTimeArg is the list of functions that extract date information from a variadic list of params,
Expand Down
Loading
Loading