Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make it clear, explicit, and fail early on query with just external labels #1321

Merged
merged 1 commit into from
Jul 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,21 @@ func (p *PrometheusStore) putBuffer(b *[]byte) {

// Series returns all series for a requested time range and label matcher.
func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_SeriesServer) error {
ext := p.externalLabels()
externalLabels := p.externalLabels()

match, newMatchers, err := labelsMatches(ext, r.Matchers)
match, newMatchers, err := matchesExternalLabels(r.Matchers, externalLabels)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}

if !match {
return nil
}

if len(newMatchers) == 0 {
return status.Error(codes.InvalidArgument, errors.New("no matchers specified (excluding external labels)").Error())
}

q := prompb.Query{StartTimestampMs: r.MinTime, EndTimestampMs: r.MaxTime}

// TODO(fabxc): import common definitions from prompb once we have a stable gRPC
Expand Down Expand Up @@ -166,7 +172,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie
span.SetTag("series_count", len(resp.Results[0].Timeseries))

for _, e := range resp.Results[0].Timeseries {
lset := p.translateAndExtendLabels(e.Labels, ext)
lset := p.translateAndExtendLabels(e.Labels, externalLabels)

if len(e.Samples) == 0 {
// As found in https://github.com/improbable-eng/thanos/issues/381
Expand Down Expand Up @@ -286,8 +292,10 @@ func (p *PrometheusStore) promSeries(ctx context.Context, q prompb.Query) (*prom
return &data, nil
}

func labelsMatches(lset labels.Labels, ms []storepb.LabelMatcher) (bool, []storepb.LabelMatcher, error) {
if len(lset) == 0 {
// matchesExternalLabels filters out external labels matching from matcher if exsits as the local storage does not have them.
// It also returns false if given matchers are not matching external labels.
func matchesExternalLabels(ms []storepb.LabelMatcher, externalLabels labels.Labels) (bool, []storepb.LabelMatcher, error) {
if len(externalLabels) == 0 {
return true, ms, nil
}

Expand All @@ -299,7 +307,7 @@ func labelsMatches(lset labels.Labels, ms []storepb.LabelMatcher) (bool, []store
return false, nil, err
}

extValue := lset.Get(m.Name)
extValue := externalLabels.Get(m.Name)
if extValue == "" {
// Agnostic to external labels.
newMatcher = append(newMatcher, m)
Expand All @@ -312,6 +320,7 @@ func labelsMatches(lset labels.Labels, ms []storepb.LabelMatcher) (bool, []store
return false, nil, nil
}
}

return true, newMatcher, nil
}

Expand Down
63 changes: 40 additions & 23 deletions pkg/store/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,36 +61,53 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) {
}, nil)
testutil.Ok(t, err)

// Query all three samples except for the first one. Since we round up queried data
// to seconds, we can test whether the extra sample gets stripped properly.
srv := newStoreSeriesServer(ctx)
{
// Query all three samples except for the first one. Since we round up queried data
// to seconds, we can test whether the extra sample gets stripped properly.
srv := newStoreSeriesServer(ctx)

err = proxy.Series(&storepb.SeriesRequest{
MinTime: baseT + 101,
MaxTime: baseT + 300,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"},
},
}, srv)
testutil.Ok(t, err)

err = proxy.Series(&storepb.SeriesRequest{
MinTime: baseT + 101,
MaxTime: baseT + 300,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"},
},
}, srv)
testutil.Ok(t, err)
testutil.Equals(t, 1, len(srv.SeriesSet))

testutil.Equals(t, 1, len(srv.SeriesSet))
testutil.Equals(t, []storepb.Label{
{Name: "a", Value: "b"},
{Name: "region", Value: "eu-west"},
}, srv.SeriesSet[0].Labels)

testutil.Equals(t, []storepb.Label{
{Name: "a", Value: "b"},
{Name: "region", Value: "eu-west"},
}, srv.SeriesSet[0].Labels)
testutil.Equals(t, 1, len(srv.SeriesSet[0].Chunks))

testutil.Equals(t, 1, len(srv.SeriesSet[0].Chunks))
c := srv.SeriesSet[0].Chunks[0]
testutil.Equals(t, storepb.Chunk_XOR, c.Raw.Type)

c := srv.SeriesSet[0].Chunks[0]
testutil.Equals(t, storepb.Chunk_XOR, c.Raw.Type)
chk, err := chunkenc.FromData(chunkenc.EncXOR, c.Raw.Data)
testutil.Ok(t, err)

chk, err := chunkenc.FromData(chunkenc.EncXOR, c.Raw.Data)
testutil.Ok(t, err)
samples := expandChunk(chk.Iterator())
testutil.Equals(t, []sample{{baseT + 200, 2}, {baseT + 300, 3}}, samples)

samples := expandChunk(chk.Iterator())
testutil.Equals(t, []sample{{baseT + 200, 2}, {baseT + 300, 3}}, samples)
}
// Querying by external labels only.
{
srv := newStoreSeriesServer(ctx)

err = proxy.Series(&storepb.SeriesRequest{
MinTime: baseT + 101,
MaxTime: baseT + 300,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "region", Value: "eu-west"},
},
}, srv)
testutil.NotOk(t, err)
testutil.Equals(t, "rpc error: code = InvalidArgument desc = no matchers specified (excluding external labels)", err.Error())
}
}

type sample struct {
Expand Down
8 changes: 6 additions & 2 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,14 +181,18 @@ func (s ctxRespSender) send(r *storepb.SeriesResponse) {
// Series returns all series for a requested time range and label matcher. Requested series are taken from other
// stores and proxied to RPC client. NOTE: Resulted data are not trimmed exactly to min and max time range.
func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
match, newMatchers, err := labelsMatches(s.selectorLabels, r.Matchers)
match, newMatchers, err := matchesExternalLabels(r.Matchers, s.selectorLabels)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
if !match {
return nil
}

if len(newMatchers) == 0 {
return status.Error(codes.InvalidArgument, errors.New("no matchers specified (excluding external labels)").Error())
}

var (
g, gctx = errgroup.WithContext(srv.Context())

Expand Down Expand Up @@ -220,7 +224,7 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
for _, st := range s.stores() {
// We might be able to skip the store if its meta information indicates
// it cannot have series matching our query.
// NOTE: all matchers are validated in labelsMatches method so we explicitly ignore error.
// NOTE: all matchers are validated in matchesExternalLabels method so we explicitly ignore error.
spanStoreMathes, gctx := tracing.StartSpan(gctx, "store_matches")
ok, _ := storeMatches(st, r.MinTime, r.MaxTime, r.Matchers...)
spanStoreMathes.Finish()
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ func TestProxyStore_Series_RegressionFillResponseChannel(t *testing.T) {
&storepb.SeriesRequest{
MinTime: 1,
MaxTime: 300,
Matchers: []storepb.LabelMatcher{{Name: "fed", Value: "a", Type: storepb.LabelMatcher_EQ}},
Matchers: []storepb.LabelMatcher{{Name: "any", Value: ".*", Type: storepb.LabelMatcher_RE}},
}, s,
))
testutil.Equals(t, 0, len(s.SeriesSet))
Expand Down
32 changes: 19 additions & 13 deletions pkg/store/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,37 +22,37 @@ import (
// It attaches the provided external labels to all results. It only responds with raw data
// and does not support downsampling.
type TSDBStore struct {
logger log.Logger
db *tsdb.DB
component component.SourceStoreAPI
labels labels.Labels
logger log.Logger
db *tsdb.DB
component component.SourceStoreAPI
externalLabels labels.Labels
}

// NewTSDBStore creates a new TSDBStore.
func NewTSDBStore(logger log.Logger, reg prometheus.Registerer, db *tsdb.DB, component component.SourceStoreAPI, externalLabels labels.Labels) *TSDBStore {
func NewTSDBStore(logger log.Logger, _ prometheus.Registerer, db *tsdb.DB, component component.SourceStoreAPI, externalLabels labels.Labels) *TSDBStore {
if logger == nil {
logger = log.NewNopLogger()
}
return &TSDBStore{
logger: logger,
db: db,
component: component,
labels: externalLabels,
logger: logger,
db: db,
component: component,
externalLabels: externalLabels,
}
}

// Info returns store information about the Prometheus instance.
func (s *TSDBStore) Info(ctx context.Context, r *storepb.InfoRequest) (*storepb.InfoResponse, error) {
res := &storepb.InfoResponse{
Labels: make([]storepb.Label, 0, len(s.labels)),
Labels: make([]storepb.Label, 0, len(s.externalLabels)),
StoreType: s.component.ToProto(),
MinTime: 0,
MaxTime: math.MaxInt64,
}
if blocks := s.db.Blocks(); len(blocks) > 0 {
res.MinTime = blocks[0].Meta().MinTime
}
for _, l := range s.labels {
for _, l := range s.externalLabels {
res.Labels = append(res.Labels, storepb.Label{
Name: l.Name,
Value: l.Value,
Expand All @@ -73,13 +73,19 @@ func (s *TSDBStore) Info(ctx context.Context, r *storepb.InfoRequest) (*storepb.
// Series returns all series for a requested time range and label matcher. The returned data may
// exceed the requested time bounds.
func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
match, newMatchers, err := labelsMatches(s.labels, r.Matchers)
match, newMatchers, err := matchesExternalLabels(r.Matchers, s.externalLabels)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}

if !match {
return nil
}

if len(newMatchers) == 0 {
return status.Error(codes.InvalidArgument, errors.New("no matchers specified (excluding external labels)").Error())
}

matchers, err := translateMatchers(newMatchers)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
Expand Down Expand Up @@ -113,7 +119,7 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer
return status.Errorf(codes.Internal, "encode chunk: %s", err)
}

respSeries.Labels = s.translateAndExtendLabels(series.Labels(), s.labels)
respSeries.Labels = s.translateAndExtendLabels(series.Labels(), s.externalLabels)
respSeries.Chunks = append(respSeries.Chunks[:0], c...)

if err := srv.Send(storepb.NewSeriesResponse(&respSeries)); err != nil {
Expand Down