Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Import Thanos protobuf definitions #3222

Merged
merged 7 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,12 @@ pkg/distributor/ha_tracker.pb.go: pkg/distributor/ha_tracker.proto
pkg/ruler/rulespb/rules.pb.go: pkg/ruler/rulespb/rules.proto
pkg/ruler/ruler.pb.go: pkg/ruler/ruler.proto
pkg/scheduler/schedulerpb/scheduler.pb.go: pkg/scheduler/schedulerpb/scheduler.proto
pkg/storegateway/labelpb/types.pb.go: pkg/storegateway/labelpb/types.proto
pkg/storegateway/prompb/types.pb.go: pkg/storegateway/prompb/types.proto
pkg/storegateway/hintspb/hints.pb.go: pkg/storegateway/hintspb/hints.proto
pkg/storegateway/storegatewaypb/gateway.pb.go: pkg/storegateway/storegatewaypb/gateway.proto
pkg/storegateway/storepb/rpc.pb.go: pkg/storegateway/storepb/rpc.proto
pkg/storegateway/storepb/types.pb.go: pkg/storegateway/storepb/types.proto
pkg/alertmanager/alertmanagerpb/alertmanager.pb.go: pkg/alertmanager/alertmanagerpb/alertmanager.proto
pkg/alertmanager/alertspb/alerts.pb.go: pkg/alertmanager/alertspb/alerts.proto

Expand Down Expand Up @@ -236,9 +241,7 @@ protos: ## Generates protobuf files.
protos: $(PROTO_GOS)

%.pb.go:
@# The store-gateway RPC is based on Thanos which uses relative references to other protos, so we need
@# to configure all such relative paths.
protoc -I $(GOPATH)/src:./vendor/github.com/thanos-io/thanos/pkg:./vendor/github.com/gogo/protobuf:./vendor:./$(@D) --gogoslick_out=plugins=grpc,Mgoogle/protobuf/any.proto=github.com/gogo/protobuf/types,:./$(@D) ./$(patsubst %.pb.go,%.proto,$@)
protoc -I $(GOPATH)/src:./vendor/github.com/gogo/protobuf:./vendor:./$(@D):./pkg/storegateway/storepb --gogoslick_out=plugins=grpc,Mgoogle/protobuf/any.proto=github.com/gogo/protobuf/types,:./$(@D) ./$(patsubst %.pb.go,%.proto,$@)

lint-packaging-scripts: packaging/deb/control/postinst packaging/deb/control/prerm packaging/rpm/control/post packaging/rpm/control/preun
shellcheck $?
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ require (

require (
github.com/alecthomas/chroma v0.10.0
github.com/cespare/xxhash/v2 v2.1.2
github.com/google/go-cmp v0.5.9
github.com/google/go-github/v32 v32.1.0
github.com/google/uuid v1.3.0
Expand Down Expand Up @@ -99,7 +100,6 @@ require (
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b // indirect
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand Down
131 changes: 91 additions & 40 deletions integration/backward_compatibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/grafana/e2e"
e2edb "github.com/grafana/e2e/db"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -58,14 +60,17 @@ func runBackwardCompatibilityTest(t *testing.T, previousImage string, oldFlagsMa
require.NoError(t, err)
defer s.Close()

flagTSDBPath := map[string]string{
"-blocks-storage.tsdb.dir": e2e.ContainerSharedDir + "/tsdb-shared",
}
const blockRangePeriod = 5 * time.Second

flags := mergeFlags(
BlocksStorageFlags(),
BlocksStorageS3Flags(),
flagTSDBPath,
map[string]string{
"-blocks-storage.tsdb.dir": e2e.ContainerSharedDir + "/tsdb-shared",
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
},
)

// Start dependencies.
Expand All @@ -83,16 +88,41 @@ func runBackwardCompatibilityTest(t *testing.T, previousImage string, oldFlagsMa
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512+1), "cortex_ring_tokens_total"))

// Push some series to Mimir.
now := time.Now()
series, expectedVector := generateSeries("series_1", now)
series1Timestamp := time.Now()
series2Timestamp := series1Timestamp.Add(blockRangePeriod * 2)
series1, expectedVector1 := generateSeries("series_1", series1Timestamp, prompb.Label{Name: "series_1", Value: "series_1"})
series2, expectedVector2 := generateSeries("series_2", series2Timestamp, prompb.Label{Name: "series_2", Value: "series_2"})

c, err := e2emimir.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1")
require.NoError(t, err)

res, err := c.Push(series)
res, err := c.Push(series1)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

res, err = c.Push(series2)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_shipper_uploads_total"))
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series"))
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series_created_total"))
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series_removed_total"))

// Push another series to further compact another block and delete the first block
// due to expired retention.
series3Timestamp := series2Timestamp.Add(blockRangePeriod * 2)
series3, expectedVector3 := generateSeries("series_3", series3Timestamp, prompb.Label{Name: "series_3", Value: "series_3"})

res, err = c.Push(series3)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_shipper_uploads_total"))
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series"))
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(3), "cortex_ingester_memory_series_created_total"))
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series_removed_total"))

// Stop ingester on old version
require.NoError(t, s.Stop(ingester))

Expand All @@ -103,16 +133,19 @@ func runBackwardCompatibilityTest(t *testing.T, previousImage string, oldFlagsMa
// The distributor should have 512 tokens for the ingester ring and 1 for the distributor ring
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512+1), "cortex_ring_tokens_total"))

checkQueries(t,
consul,
expectedVector,
previousImage,
flags,
oldFlagsMapper,
now,
s,
1,
)
checkQueries(t, consul, previousImage, flags, oldFlagsMapper, s, 1, instantQueryTest{
expr: "series_1",
time: series1Timestamp,
expectedVector: expectedVector1,
}, instantQueryTest{
expr: "series_2",
time: series2Timestamp,
expectedVector: expectedVector2,
}, instantQueryTest{
expr: "series_3",
time: series3Timestamp,
expectedVector: expectedVector3,
})
}

// Check for issues like https://github.com/cortexproject/cortex/issues/2356
Expand Down Expand Up @@ -156,44 +189,46 @@ func runNewDistributorsCanPushToOldIngestersWithReplication(t *testing.T, previo
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

checkQueries(t, consul,
expectedVector,
previousImage,
flags,
oldFlagsMapper,
now,
s,
3,
)
checkQueries(t, consul, previousImage, flags, oldFlagsMapper, s, 3, instantQueryTest{
time: now,
expr: "series_1",
expectedVector: expectedVector,
})
}

func checkQueries(
t *testing.T,
consul *e2e.HTTPService,
expectedVector model.Vector,
previousImage string,
flags map[string]string,
oldFlagsMapper e2emimir.FlagMapper,
now time.Time,
s *e2e.Scenario,
numIngesters int,
instantQueries ...instantQueryTest,
) {
cases := map[string]struct {
queryFrontendOptions []e2emimir.Option
querierOptions []e2emimir.Option
storeGatewayOptions []e2emimir.Option
}{
"old query-frontend, new querier": {
"old query-frontend, new querier and store-gateway": {
queryFrontendOptions: []e2emimir.Option{
e2emimir.WithImage(previousImage),
e2emimir.WithFlagMapper(oldFlagsMapper),
},
},
"new query-frontend, old querier": {
"new query-frontend and store-gateway, old querier": {
querierOptions: []e2emimir.Option{
e2emimir.WithImage(previousImage),
e2emimir.WithFlagMapper(oldFlagsMapper),
},
},
"new query-frontend and querier, old store-gateway": {
storeGatewayOptions: []e2emimir.Option{
e2emimir.WithImage(previousImage),
e2emimir.WithFlagMapper(oldFlagsMapper),
},
},
}

for name, c := range cases {
Expand All @@ -205,34 +240,50 @@ func checkQueries(
require.NoError(t, s.Stop(queryFrontend))
}()

// Start querier.
// Start querier and store-gateway.
querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), e2e.MergeFlagsWithoutRemovingEmpty(flags, map[string]string{
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
}), c.querierOptions...)
storeGateway := e2emimir.NewStoreGateway("store-gateway", consul.NetworkHTTPEndpoint(), flags, c.storeGatewayOptions...)

require.NoError(t, s.Start(querier))
require.NoError(t, s.Start(querier, storeGateway))
defer func() {
require.NoError(t, s.Stop(querier))
require.NoError(t, s.Stop(querier, storeGateway))
}()

// Wait until querier and query-frontend are ready, and the querier has updated the ring.
require.NoError(t, s.WaitReady(querier, queryFrontend))
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(float64(numIngesters*512)), "cortex_ring_tokens_total"))
// Wait until querier, query-frontend and store-gateway are ready, and the querier has updated the ring.
require.NoError(t, s.WaitReady(querier, queryFrontend, storeGateway))
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(float64(numIngesters)), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "store-gateway-client"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

// Query the series.
for _, endpoint := range []string{queryFrontend.HTTPEndpoint(), querier.HTTPEndpoint()} {
c, err := e2emimir.NewClient("", endpoint, "", "", "user-1")
require.NoError(t, err)

result, err := c.Query("series_1", now)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector, result.(model.Vector))
for _, query := range instantQueries {
t.Run(fmt.Sprintf("%s: %s", endpoint, query.expr), func(t *testing.T) {
result, err := c.Query(query.expr, query.time)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, query.expectedVector, result.(model.Vector))
})
}
}
})
}
}

type instantQueryTest struct {
expr string
time time.Time
expectedVector model.Vector
}

type testingLogger interface{ Logf(string, ...interface{}) }

func previousImageVersionOverrides(t *testing.T) map[string]e2emimir.FlagMapper {
Expand Down
2 changes: 2 additions & 0 deletions integration/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,12 @@ var (
return map[string]string{
"-blocks-storage.tsdb.block-ranges-period": "1m",
"-blocks-storage.bucket-store.bucket-index.enabled": "false",
"-blocks-storage.bucket-store.ignore-blocks-within": "0",
"-blocks-storage.bucket-store.sync-interval": "5s",
"-blocks-storage.tsdb.retention-period": "5m",
"-blocks-storage.tsdb.ship-interval": "1m",
"-blocks-storage.tsdb.head-compaction-interval": "1s",
"-querier.query-store-after": "0",
}
}

Expand Down
1 change: 0 additions & 1 deletion integration/ingester_sharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ func TestIngesterSharding(t *testing.T) {
// to a small enough value that they'll have been part of the ring for long enough by the time we attempt
// to query back the values we wrote to them. If they _haven't_ been part of the ring for long enough, the
// query would be sent to all ingesters and our test wouldn't really be testing anything.
flags["-querier.query-store-after"] = "0"
flags["-querier.query-ingesters-within"] = fmt.Sprintf("%ds", queryIngestersWithinSecs)
flags["-ingester.ring.heartbeat-period"] = "1s"

Expand Down
9 changes: 1 addition & 8 deletions integration/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,8 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
"-blocks-storage.bucket-store.index-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.bucket-store.index-cache.backend": testCfg.indexCacheBackend,
"-blocks-storage.bucket-store.ignore-blocks-within": "0",
"-blocks-storage.bucket-store.bucket-index.enabled": strconv.FormatBool(testCfg.bucketIndexEnabled),
"-store-gateway.tenant-shard-size": fmt.Sprintf("%d", testCfg.tenantShardSize),
"-querier.query-store-after": "0",
"-query-frontend.query-stats-enabled": "true",
"-query-frontend.parallelize-shardable-queries": strconv.FormatBool(testCfg.queryShardingEnabled),
})
Expand Down Expand Up @@ -355,7 +353,6 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) {
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.bucket-store.ignore-blocks-within": "0",
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
"-blocks-storage.bucket-store.index-cache.backend": testCfg.indexCacheBackend,
"-blocks-storage.bucket-store.index-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
Expand All @@ -375,8 +372,6 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) {
"-compactor.ring.store": "consul",
"-compactor.ring.consul.hostname": consul.NetworkHTTPEndpoint(),
"-compactor.cleanup-interval": "2s", // Update bucket index often.
// Querier.
"-querier.query-store-after": "0",
// Query-frontend.
"-query-frontend.parallelize-shardable-queries": strconv.FormatBool(testCfg.queryShardingEnabled),
})
Expand Down Expand Up @@ -778,12 +773,10 @@ func TestQuerierWithBlocksStorageOnMissingBlocksFromStorage(t *testing.T) {
// blocks (less than 3*sync-interval age) as they could be unnoticed by the store-gateway and ingesters
// have them anyway. We turn down the sync-interval to speed up the test.
storeGateway := e2emimir.NewStoreGateway("store-gateway", consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.bucket-store.ignore-blocks-within": "0",
"-blocks-storage.bucket-store.sync-interval": "1s",
}))
querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
"-blocks-storage.bucket-store.sync-interval": "1s",
"-querier.query-store-after": "0",
}))
require.NoError(t, s.StartAndWaitReady(querier, storeGateway))

Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"

"github.com/grafana/mimir/pkg/storage/series"
"github.com/grafana/mimir/pkg/storegateway/labelpb"
"github.com/grafana/mimir/pkg/storegateway/storepb"
)

func convertMatchersToLabelMatcher(matchers []*labels.Matcher) []storepb.LabelMatcher {
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"

"github.com/grafana/mimir/pkg/storegateway/labelpb"
"github.com/grafana/mimir/pkg/storegateway/storepb"
"github.com/grafana/mimir/pkg/util"
)

Expand Down
Loading