diff --git a/CHANGELOG.md b/CHANGELOG.md index dedf12c5b68..ddfcf7994e4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6167](https://github.com/thanos-io/thanos/pull/6195) Receive: add flag `tsdb.too-far-in-future.time-window` to prevent clock skewed samples to pollute TSDB head and block all valid incoming samples. - [#6273](https://github.com/thanos-io/thanos/pull/6273) Mixin: Allow specifying an instance name filter in dashboards - [#6163](https://github.com/thanos-io/thanos/pull/6163) Receiver: Add hidden flag `--receive-forward-max-backoff` to configure the max backoff for forwarding requests. +- [#5777](https://github.com/thanos-io/thanos/pull/5777) Receive: Allow specifying tenant-specific external labels in Router Ingestor. ### Fixed diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 763b5cb4a3c..0a340a77f3a 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -281,7 +281,7 @@ func runReceive( level.Debug(logger).Log("msg", "setting up hashring") { - if err := setupHashring(g, logger, reg, conf, hashringChangedChan, webHandler, statusProber, enableIngestion); err != nil { + if err := setupHashring(g, logger, reg, conf, hashringChangedChan, webHandler, statusProber, enableIngestion, dbs); err != nil { return err } } @@ -451,12 +451,13 @@ func setupHashring(g *run.Group, webHandler *receive.Handler, statusProber prober.Probe, enableIngestion bool, + dbs *receive.MultiTSDB, ) error { // Note: the hashring configuration watcher // is the sender and thus closes the chan. // In the single-node case, which has no configuration // watcher, we close the chan ourselves. - updates := make(chan receive.Hashring, 1) + updates := make(chan []receive.HashringConfig, 1) algorithm := receive.HashringAlgorithm(conf.hashringsAlgorithm) // The Hashrings config file path is given initializing config watcher. @@ -475,33 +476,28 @@ func setupHashring(g *run.Group, ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { - level.Info(logger).Log("msg", "the hashring initialized with config watcher.") - return receive.HashringFromConfigWatcher(ctx, algorithm, conf.replicationFactor, updates, cw) + return receive.ConfigFromWatcher(ctx, updates, cw) }, func(error) { cancel() }) } else { var ( - ring receive.Hashring - err error + cf []receive.HashringConfig + err error ) // The Hashrings config file content given initialize configuration from content. if len(conf.hashringsFileContent) > 0 { - ring, err = receive.HashringFromConfig(algorithm, conf.replicationFactor, conf.hashringsFileContent) + cf, err = receive.ParseConfig([]byte(conf.hashringsFileContent)) if err != nil { close(updates) - return errors.Wrap(err, "failed to validate hashring configuration file") + return errors.Wrap(err, "failed to validate hashring configuration content") } - level.Info(logger).Log("msg", "the hashring initialized directly with the given content through the flag.") - } else { - level.Info(logger).Log("msg", "the hashring file is not specified use single node hashring.") - ring = receive.SingleNodeHashring(conf.endpoint) } cancel := make(chan struct{}) g.Add(func() error { defer close(updates) - updates <- ring + updates <- cf <-cancel return nil }, func(error) { @@ -518,11 +514,27 @@ func setupHashring(g *run.Group, for { select { - case h, ok := <-updates: + case c, ok := <-updates: if !ok { return nil } - webHandler.Hashring(h) + + if c == nil { + webHandler.Hashring(receive.SingleNodeHashring(conf.endpoint)) + level.Info(logger).Log("msg", "Empty hashring config. Set up single node hashring.") + } else { + h, err := receive.NewMultiHashring(algorithm, conf.replicationFactor, c) + if err != nil { + return errors.Wrap(err, "unable to create new hashring from config") + } + webHandler.Hashring(h) + level.Info(logger).Log("msg", "Set up hashring for the given hashring config.") + } + + if err := dbs.SetHashringConfig(c); err != nil { + return errors.Wrap(err, "failed to set hashring config in MultiTSDB") + } + // If ingestion is enabled, send a signal to TSDB to flush. if enableIngestion { hashringChangedChan <- struct{}{} diff --git a/pkg/exemplars/tsdb.go b/pkg/exemplars/tsdb.go index 88c33d84bed..be824dd8f88 100644 --- a/pkg/exemplars/tsdb.go +++ b/pkg/exemplars/tsdb.go @@ -4,6 +4,8 @@ package exemplars import ( + "sync" + "github.com/gogo/status" "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" @@ -16,8 +18,10 @@ import ( // TSDB allows fetching exemplars from a TSDB instance. type TSDB struct { - db storage.ExemplarQueryable + db storage.ExemplarQueryable + extLabels labels.Labels + mtx sync.RWMutex } // NewTSDB creates new exemplars.TSDB. @@ -28,9 +32,23 @@ func NewTSDB(db storage.ExemplarQueryable, extLabels labels.Labels) *TSDB { } } +func (t *TSDB) SetExtLabels(extLabels labels.Labels) { + t.mtx.Lock() + defer t.mtx.Unlock() + + t.extLabels = extLabels +} + +func (t *TSDB) getExtLabels() labels.Labels { + t.mtx.RLock() + defer t.mtx.RUnlock() + + return t.extLabels +} + // Exemplars returns all specified exemplars from a TSDB instance. func (t *TSDB) Exemplars(matchers [][]*labels.Matcher, start, end int64, s exemplarspb.Exemplars_ExemplarsServer) error { - match, selectors := selectorsMatchesExternalLabels(matchers, t.extLabels) + match, selectors := selectorsMatchesExternalLabels(matchers, t.getExtLabels()) if !match { return nil @@ -53,7 +71,7 @@ func (t *TSDB) Exemplars(matchers [][]*labels.Matcher, start, end int64, s exemp for _, e := range exemplars { exd := exemplarspb.ExemplarData{ SeriesLabels: labelpb.ZLabelSet{ - Labels: labelpb.ZLabelsFromPromLabels(labelpb.ExtendSortedLabels(e.SeriesLabels, t.extLabels)), + Labels: labelpb.ZLabelsFromPromLabels(labelpb.ExtendSortedLabels(e.SeriesLabels, t.getExtLabels())), }, Exemplars: exemplarspb.ExemplarsFromPromExemplars(e.Exemplars), } diff --git a/pkg/receive/config.go b/pkg/receive/config.go index 27c39a8d11c..a88942c16bf 100644 --- a/pkg/receive/config.go +++ b/pkg/receive/config.go @@ -40,10 +40,11 @@ const ( // HashringConfig represents the configuration for a hashring // a receive node knows about. type HashringConfig struct { - Hashring string `json:"hashring,omitempty"` - Tenants []string `json:"tenants,omitempty"` - Endpoints []string `json:"endpoints"` - Algorithm HashringAlgorithm `json:"algorithm,omitempty"` + Hashring string `json:"hashring,omitempty"` + Tenants []string `json:"tenants,omitempty"` + Endpoints []string `json:"endpoints"` + Algorithm HashringAlgorithm `json:"algorithm,omitempty"` + ExternalLabels map[string]string `json:"external_labels,omitempty"` } // ConfigWatcher is able to watch a file containing a hashring configuration @@ -255,6 +256,30 @@ func (cw *ConfigWatcher) refresh(ctx context.Context) { } } +func ConfigFromWatcher(ctx context.Context, updates chan<- []HashringConfig, cw *ConfigWatcher) error { + defer close(updates) + go cw.Run(ctx) + + for { + select { + case cfg, ok := <-cw.C(): + if !ok { + return errors.New("hashring config watcher stopped unexpectedly") + } + updates <- cfg + case <-ctx.Done(): + return ctx.Err() + } + } +} + +// ParseConfig parses the raw configuration content and returns a HashringConfig. +func ParseConfig(content []byte) ([]HashringConfig, error) { + var config []HashringConfig + err := json.Unmarshal(content, &config) + return config, err +} + // loadConfig loads raw configuration content and returns a configuration. func loadConfig(logger log.Logger, path string) ([]HashringConfig, float64, error) { cfgContent, err := readFile(logger, path) @@ -262,7 +287,7 @@ func loadConfig(logger log.Logger, path string) ([]HashringConfig, float64, erro return nil, 0, errors.Wrap(err, "failed to read configuration file") } - config, err := parseConfig(cfgContent) + config, err := ParseConfig(cfgContent) if err != nil { return nil, 0, errors.Wrapf(errParseConfigurationFile, "failed to parse configuration file: %v", err) } @@ -290,13 +315,6 @@ func readFile(logger log.Logger, path string) ([]byte, error) { return io.ReadAll(fd) } -// parseConfig parses the raw configuration content and returns a HashringConfig. -func parseConfig(content []byte) ([]HashringConfig, error) { - var config []HashringConfig - err := json.Unmarshal(content, &config) - return config, err -} - // hashAsMetricValue generates metric value from hash of data. func hashAsMetricValue(data []byte) float64 { sum := md5.Sum(data) diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index ac1a57a029c..735c858e77f 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -203,7 +203,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin hashringAlgo = AlgorithmHashmod } - hashring, err := newMultiHashring(hashringAlgo, replicationFactor, cfg) + hashring, err := NewMultiHashring(hashringAlgo, replicationFactor, cfg) if err != nil { return nil, nil, err } diff --git a/pkg/receive/hashring.go b/pkg/receive/hashring.go index df41cdf72f3..4742aa8c988 100644 --- a/pkg/receive/hashring.go +++ b/pkg/receive/hashring.go @@ -4,7 +4,6 @@ package receive import ( - "context" "fmt" "sort" "strconv" @@ -239,7 +238,7 @@ func (m *multiHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st // groups. // Which hashring to use for a tenant is determined // by the tenants field of the hashring configuration. -func newMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg []HashringConfig) (Hashring, error) { +func NewMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg []HashringConfig) (Hashring, error) { m := &multiHashring{ cache: make(map[string]Hashring), } @@ -268,49 +267,6 @@ func newMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg return m, nil } -// HashringFromConfigWatcher creates multi-tenant hashrings from a -// hashring configuration file watcher. -// The configuration file is watched for updates. -// Hashrings are returned on the updates channel. -// Which hashring to use for a tenant is determined -// by the tenants field of the hashring configuration. -// The updates chan is closed before exiting. -func HashringFromConfigWatcher(ctx context.Context, algorithm HashringAlgorithm, replicationFactor uint64, updates chan<- Hashring, cw *ConfigWatcher) error { - defer close(updates) - go cw.Run(ctx) - - for { - select { - case cfg, ok := <-cw.C(): - if !ok { - return errors.New("hashring config watcher stopped unexpectedly") - } - h, err := newMultiHashring(algorithm, replicationFactor, cfg) - if err != nil { - return errors.Wrap(err, "unable to create new hashring from config") - } - updates <- h - case <-ctx.Done(): - return ctx.Err() - } - } -} - -// HashringFromConfig loads raw configuration content and returns a Hashring if the given configuration is not valid. -func HashringFromConfig(algorithm HashringAlgorithm, replicationFactor uint64, content string) (Hashring, error) { - config, err := parseConfig([]byte(content)) - if err != nil { - return nil, errors.Wrapf(err, "failed to parse configuration") - } - - // If hashring is empty, return an error. - if len(config) == 0 { - return nil, errors.Wrapf(err, "failed to load configuration") - } - - return newMultiHashring(algorithm, replicationFactor, config) -} - func newHashring(algorithm HashringAlgorithm, endpoints []string, replicationFactor uint64, hashring string, tenants []string) (Hashring, error) { switch algorithm { case AlgorithmHashmod: diff --git a/pkg/receive/hashring_test.go b/pkg/receive/hashring_test.go index 550ce03bec7..3a72aac9ebd 100644 --- a/pkg/receive/hashring_test.go +++ b/pkg/receive/hashring_test.go @@ -136,7 +136,7 @@ func TestHashringGet(t *testing.T) { }, }, } { - hs, err := newMultiHashring(AlgorithmHashmod, 3, tc.cfg) + hs, err := NewMultiHashring(AlgorithmHashmod, 3, tc.cfg) require.NoError(t, err) h, err := hs.Get(tc.tenant, ts) diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index d4588da2698..312bd06d730 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -10,6 +10,7 @@ import ( "path" "path/filepath" "sort" + "strings" "sync" "time" @@ -17,10 +18,12 @@ import ( "github.com/go-kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "go.uber.org/atomic" + "golang.org/x/exp/slices" "golang.org/x/sync/errgroup" "github.com/thanos-io/thanos/pkg/api/status" @@ -57,6 +60,7 @@ type MultiTSDB struct { tenants map[string]*tenant allowOutOfOrderUpload bool hashFunc metadata.HashFunc + hashringConfigs []HashringConfig } // NewMultiTSDB creates new MultiTSDB. @@ -563,7 +567,16 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant reg := prometheus.WrapRegistererWith(prometheus.Labels{"tenant": tenantID}, t.reg) reg = NewUnRegisterer(reg) - lset := labelpb.ExtendSortedLabels(t.labels, labels.FromStrings(t.tenantLabelName, tenantID)) + initialLset := labelpb.ExtendSortedLabels(t.labels, labels.FromStrings(t.tenantLabelName, tenantID)) + + lset, err := t.extractTenantsLabels(tenantID, initialLset) + if err != nil { + t.mtx.Lock() + delete(t.tenants, tenantID) + t.mtx.Unlock() + return err + } + dataDir := t.defaultTenantDataDir(tenantID) level.Info(logger).Log("msg", "opening TSDB") @@ -647,6 +660,42 @@ func (t *MultiTSDB) TenantAppendable(tenantID string) (Appendable, error) { return tenant.readyStorage(), nil } +func (t *MultiTSDB) SetHashringConfig(cfg []HashringConfig) error { + t.hashringConfigs = cfg + + // If a tenant's already existed in MultiTSDB, update its label set + // from the latest []HashringConfig. + // In case one tenant appears in multiple hashring configs, + // only the label set from the first hashring config is applied. + // This is the same logic as startTSDB. + updatedTenants := make([]string, 0) + for _, hc := range t.hashringConfigs { + for _, tenantId := range hc.Tenants { + if slices.Contains(updatedTenants, tenantId) { + continue + } + if t.tenants[tenantId] != nil { + updatedTenants = append(updatedTenants, tenantId) + + lset := labelpb.ExtendSortedLabels(t.labels, labels.FromStrings(t.tenantLabelName, tenantId)) + var err error + lset, err = extendLabels(lset, hc.ExternalLabels, t.logger) + if err != nil { + return errors.Wrap(err, "failed to extend external labels for tenant "+tenantId) + } + + if t.tenants[tenantId].ship != nil { + t.tenants[tenantId].ship.SetLabels(lset) + } + t.tenants[tenantId].storeTSDB.SetExtLset(lset) + t.tenants[tenantId].exemplarsTSDB.SetExtLabels(lset) + } + } + } + + return nil +} + // ErrNotReady is returned if the underlying storage is not ready yet. var ErrNotReady = errors.New("TSDB not ready") @@ -801,3 +850,60 @@ func (u *UnRegisterer) MustRegister(cs ...prometheus.Collector) { } } } + +// extractTenantsLabels extractes tenant's external labels from hashring configs. +// If one tenant appears in multiple hashring configs, +// only the external label set from the first hashring config is applied. +func (t *MultiTSDB) extractTenantsLabels(tenantID string, initialLset labels.Labels) (labels.Labels, error) { + for _, hc := range t.hashringConfigs { + for _, tenant := range hc.Tenants { + if tenant != tenantID { + continue + } + + lset, err := extendLabels(initialLset, hc.ExternalLabels, t.logger) + if err != nil { + return nil, errors.Wrap(err, "failed to extend external labels for tenant "+tenantID) + } + return lset, nil + } + } + + return nil, nil +} + +// extendLabels extends external labels of the initial label set. +// If an external label shares same name with a label in the initial label set, +// use the label in the initial label set and inform user about it. +func extendLabels(labelSet labels.Labels, extend map[string]string, logger log.Logger) (labels.Labels, error) { + var extendLabels labels.Labels + for name, value := range extend { + if !model.LabelName.IsValid(model.LabelName(name)) { + return nil, errors.Errorf("unsupported format for label's name: %s", name) + } + extendLabels = append(extendLabels, labels.Label{Name: name, Value: value}) + } + + extendedLabelSet := make(labels.Labels, 0, len(labelSet)+len(extendLabels)) + for len(labelSet) > 0 && len(extendLabels) > 0 { + d := strings.Compare(labelSet[0].Name, extendLabels[0].Name) + if d == 0 { + extendedLabelSet = append(extendedLabelSet, labelSet[0]) + level.Info(logger).Log("msg", "Duplicate label found. Using initial label instead.", + "label's name", extendLabels[0].Name) + labelSet, extendLabels = labelSet[1:], extendLabels[1:] + } else if d < 0 { + extendedLabelSet = append(extendedLabelSet, labelSet[0]) + labelSet = labelSet[1:] + } else if d > 0 { + extendedLabelSet = append(extendedLabelSet, extendLabels[0]) + extendLabels = extendLabels[1:] + } + } + extendedLabelSet = append(extendedLabelSet, labelSet...) + extendedLabelSet = append(extendedLabelSet, extendLabels...) + + sort.Sort(extendedLabelSet) + + return extendedLabelSet, nil +} diff --git a/pkg/receive/receive_test.go b/pkg/receive/receive_test.go index 4ce51d7184b..8d8cdf66db1 100644 --- a/pkg/receive/receive_test.go +++ b/pkg/receive/receive_test.go @@ -5,10 +5,838 @@ package receive import ( "testing" + "time" + "github.com/go-kit/log" + + "github.com/stretchr/testify/require" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/tsdb" + + "github.com/thanos-io/objstore" + + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/store" + "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/testutil/custom" ) func TestMain(m *testing.M) { custom.TolerantVerifyLeakMain(m) } + +func TestAddingExternalLabelsForTenants(t *testing.T) { + for _, tc := range []struct { + name string + cfg []HashringConfig + expectedExternalLabelSets []labels.Labels + }{ + { + name: "One tenant - No labels", + cfg: []HashringConfig{ + { + Endpoints: []string{"node1"}, + Tenants: []string{"tenant1"}, + }, + }, + expectedExternalLabelSets: []labels.Labels{ + labels.FromStrings("replica", "test", "tenant_id", "tenant1"), + }, + }, + { + name: "One tenant - One label", + cfg: []HashringConfig{ + { + Endpoints: []string{"node1"}, + Tenants: []string{"tenant1"}, + ExternalLabels: map[string]string{ + "name1": "value1", + }, + }, + }, + expectedExternalLabelSets: []labels.Labels{ + labels.FromStrings("name1", "value1", "replica", "test", "tenant_id", "tenant1"), + }, + }, + { + name: "One tenant - Multiple labels", + cfg: []HashringConfig{ + { + Endpoints: []string{"node1"}, + Tenants: []string{"tenant1"}, + ExternalLabels: map[string]string{ + "name1": "value1", + "name2": "value2", + "name3": "value3", + }, + }, + }, + expectedExternalLabelSets: []labels.Labels{ + labels.FromStrings("name1", "value1", "name2", "value2", "name3", "value3", + "replica", "test", "tenant_id", "tenant1"), + }, + }, + { + name: "Multiple tenants - No labels", + cfg: []HashringConfig{ + { + Endpoints: []string{"node1"}, + Tenants: []string{"tenant1", "tenant2", "tenant3"}, + }, + }, + expectedExternalLabelSets: []labels.Labels{ + labels.FromStrings("replica", "test", "tenant_id", "tenant1"), + labels.FromStrings("replica", "test", "tenant_id", "tenant2"), + labels.FromStrings("replica", "test", "tenant_id", "tenant3"), + }, + }, + { + name: "Multiple tenants - One label", + cfg: []HashringConfig{ + { + Endpoints: []string{"node1"}, + Tenants: []string{"tenant1", "tenant2", "tenant3"}, + ExternalLabels: map[string]string{ + "name1": "value1", + }, + }, + }, + expectedExternalLabelSets: []labels.Labels{ + labels.FromStrings("name1", "value1", "replica", "test", "tenant_id", "tenant1"), + labels.FromStrings("name1", "value1", "replica", "test", "tenant_id", "tenant2"), + labels.FromStrings("name1", "value1", "replica", "test", "tenant_id", "tenant3"), + }, + }, + { + name: "Multiple tenants - Multiple labels", + cfg: []HashringConfig{ + { + Endpoints: []string{"node1"}, + Tenants: []string{"tenant1", "tenant2", "tenant3"}, + ExternalLabels: map[string]string{ + "name1": "value1", + "name2": "value2", + "name3": "value3", + }, + }, + }, + expectedExternalLabelSets: []labels.Labels{ + labels.FromStrings("name1", "value1", "name2", "value2", "name3", "value3", + "replica", "test", "tenant_id", "tenant1"), + labels.FromStrings("name1", "value1", "name2", "value2", "name3", "value3", + "replica", "test", "tenant_id", "tenant2"), + labels.FromStrings("name1", "value1", "name2", "value2", "name3", "value3", + "replica", "test", "tenant_id", "tenant3"), + }, + }, + { + name: "Multiple hashrings - No repeated tenants", + cfg: []HashringConfig{ + { + Endpoints: []string{"node1"}, + Tenants: []string{"tenant1", "tenant2", "tenant3"}, + ExternalLabels: map[string]string{ + "name1": "value1", + "name2": "value2", + "name3": "value3", + }, + }, + { + Endpoints: []string{"node2"}, + Tenants: []string{"tenant4", "tenant5", "tenant6"}, + ExternalLabels: map[string]string{ + "name4": "value4", + "name5": "value5", + "name6": "value6", + }, + }, + }, + expectedExternalLabelSets: []labels.Labels{ + labels.FromStrings("name1", "value1", "name2", "value2", "name3", "value3", + "replica", "test", "tenant_id", "tenant1"), + labels.FromStrings("name1", "value1", "name2", "value2", "name3", "value3", + "replica", "test", "tenant_id", "tenant2"), + labels.FromStrings("name1", "value1", "name2", "value2", "name3", "value3", + "replica", "test", "tenant_id", "tenant3"), + labels.FromStrings("name4", "value4", "name5", "value5", "name6", "value6", + "replica", "test", "tenant_id", "tenant4"), + labels.FromStrings("name4", "value4", "name5", "value5", "name6", "value6", + "replica", "test", "tenant_id", "tenant5"), + labels.FromStrings("name4", "value4", "name5", "value5", "name6", "value6", + "replica", "test", "tenant_id", "tenant6"), + }, + }, + { + name: "Multiple hashrings - One repeated tenant", + cfg: []HashringConfig{ + { + Endpoints: []string{"node1"}, + Tenants: []string{"tenant1", "tenant2", "tenant3"}, + ExternalLabels: map[string]string{ + "name1": "value1", + "name2": "value2", + "name3": "value3", + }, + }, + { + Endpoints: []string{"node2"}, + Tenants: []string{"tenant4", "tenant5", "tenant1"}, + ExternalLabels: map[string]string{ + "name4": "value4", + "name5": "value5", + "name6": "value6", + }, + }, + }, + expectedExternalLabelSets: []labels.Labels{ + labels.FromStrings("name1", "value1", "name2", "value2", "name3", "value3", + "replica", "test", "tenant_id", "tenant1"), + labels.FromStrings("name1", "value1", "name2", "value2", "name3", "value3", + "replica", "test", "tenant_id", "tenant2"), + labels.FromStrings("name1", "value1", "name2", "value2", "name3", "value3", + "replica", "test", "tenant_id", "tenant3"), + labels.FromStrings("name4", "value4", "name5", "value5", "name6", "value6", + "replica", "test", "tenant_id", "tenant4"), + labels.FromStrings("name4", "value4", "name5", "value5", "name6", "value6", + "replica", "test", "tenant_id", "tenant5"), + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + m := initializeMultiTSDB(t.TempDir()) + + err := m.SetHashringConfig(tc.cfg) + require.NoError(t, err) + + for _, c := range tc.cfg { + for _, tenantId := range c.Tenants { + if m.tenants[tenantId] == nil { + err = appendSample(m, tenantId, time.Now()) + require.NoError(t, err) + } + } + } + + err = m.Open() + require.NoError(t, err) + + storeClients := m.TSDBLocalClients() + require.Equal(t, len(tc.expectedExternalLabelSets), len(storeClients)) + + setOfExpectedClientLabelSets, setOfActualClientLabelSets := setupSetsOfExpectedAndActualStoreClientLabelSets( + tc.expectedExternalLabelSets, storeClients) + + for _, cls := range setOfActualClientLabelSets { + require.Contains(t, setOfExpectedClientLabelSets, cls) + } + + err = m.Flush() + require.NoError(t, err) + + err = m.Close() + require.NoError(t, err) + }) + } +} + +func TestLabelSetsOfTenantsWhenAddingTenants(t *testing.T) { + initialConfig := []HashringConfig{ + { + Endpoints: []string{"node1"}, + Tenants: []string{"tenant1", "tenant2", "tenant3"}, + ExternalLabels: map[string]string{ + "name1": "value1", + "name2": "value2", + "name3": "value3", + }, + }, + } + initialExpectedExternalLabelSets := []labels.Labels{ + labels.FromStrings("name1", "value1", "name2", "value2", "name3", "value3", + "replica", "test", "tenant_id", "tenant1"), + labels.FromStrings("name1", "value1", "name2", "value2", "name3", "value3", + "replica", "test", "tenant_id", "tenant2"), + labels.FromStrings("name1", "value1", "name2", "value2", "name3", "value3", + "replica", "test", "tenant_id", "tenant3"), + } + + changedConfig := []HashringConfig{ + { + Endpoints: []string{"node1"}, + Tenants: []string{"tenant1", "tenant2", "tenant3", "tenant4", "tenant5"}, + ExternalLabels: map[string]string{ + "name1": "value1", + "name2": "value2", + "name3": "value3", + }, + }, + } + changedExpectedExternalLabelSets := []labels.Labels{ + labels.FromStrings("name1", "value1", "name2", "value2", "name3", "value3", + "replica", "test", "tenant_id", "tenant1"), + labels.FromStrings("name1", "value1", "name2", "value2", "name3", "value3", + "replica", "test", "tenant_id", "tenant2"), + labels.FromStrings("name1", "value1", "name2", "value2", "name3", "value3", + "replica", "test", "tenant_id", "tenant3"), + labels.FromStrings("name1", "value1", "name2", "value2", "name3", "value3", + "replica", "test", "tenant_id", "tenant4"), + labels.FromStrings("name1", "value1", "name2", "value2", "name3", "value3", + "replica", "test", "tenant_id", "tenant5"), + } + + t.Run("Adding tenants", func(t *testing.T) { + m := initializeMultiTSDB(t.TempDir()) + + err := m.SetHashringConfig(initialConfig) + require.NoError(t, err) + + for _, c := range initialConfig { + for _, tenantId := range c.Tenants { + if m.tenants[tenantId] == nil { + err = appendSample(m, tenantId, time.Now()) + require.NoError(t, err) + } + } + } + + err = m.Open() + require.NoError(t, err) + + initialStoreClients := m.TSDBLocalClients() + require.Equal(t, len(initialExpectedExternalLabelSets), len(initialStoreClients)) + + initialSetOfExpectedClientLabelSets, initialSetOfActualClientLabelSets := setupSetsOfExpectedAndActualStoreClientLabelSets( + initialExpectedExternalLabelSets, initialStoreClients) + + for _, cls := range initialSetOfActualClientLabelSets { + require.Contains(t, initialSetOfExpectedClientLabelSets, cls) + } + + err = m.SetHashringConfig(changedConfig) + require.NoError(t, err) + + for _, c := range changedConfig { + for _, tenantId := range c.Tenants { + if m.tenants[tenantId] == nil { + err = appendSample(m, tenantId, time.Now()) + require.NoError(t, err) + } + } + } + + err = m.Flush() + require.NoError(t, err) + + err = m.Open() + require.NoError(t, err) + + changedStoreClients := m.TSDBLocalClients() + require.Equal(t, len(changedExpectedExternalLabelSets), len(changedStoreClients)) + + changedSetOfExpectedClientLabelSets, changedSetOfActualClientLabelSets := setupSetsOfExpectedAndActualStoreClientLabelSets( + changedExpectedExternalLabelSets, changedStoreClients) + + for _, cls := range changedSetOfActualClientLabelSets { + require.Contains(t, changedSetOfExpectedClientLabelSets, cls) + } + + err = m.Flush() + require.NoError(t, err) + + err = m.Close() + require.NoError(t, err) + }) +} + +func TestLabelSetsOfTenantsWhenChangingLabels(t *testing.T) { + initialConfig := []HashringConfig{ + { + Endpoints: []string{"node1"}, + Tenants: []string{"tenant1", "tenant2", "tenant3"}, + ExternalLabels: map[string]string{ + "name1": "value1", + "name2": "value2", + "name3": "value3", + }, + }, + { + Endpoints: []string{"node2"}, + Tenants: []string{"tenant4", "tenant5", "tenant6"}, + ExternalLabels: map[string]string{ + "name4": "value4", + "name5": "value5", + "name6": "value6", + }, + }, + } + initialExpectedExternalLabelSets := []labels.Labels{ + labels.FromStrings("name1", "value1", "name2", "value2", "name3", "value3", + "replica", "test", "tenant_id", "tenant1"), + labels.FromStrings("name1", "value1", "name2", "value2", "name3", "value3", + "replica", "test", "tenant_id", "tenant2"), + labels.FromStrings("name1", "value1", "name2", "value2", "name3", "value3", + "replica", "test", "tenant_id", "tenant3"), + labels.FromStrings("name4", "value4", "name5", "value5", "name6", "value6", + "replica", "test", "tenant_id", "tenant4"), + labels.FromStrings("name4", "value4", "name5", "value5", "name6", "value6", + "replica", "test", "tenant_id", "tenant5"), + labels.FromStrings("name4", "value4", "name5", "value5", "name6", "value6", + "replica", "test", "tenant_id", "tenant6"), + } + + for _, tc := range []struct { + name string + changedConfig []HashringConfig + changedExpectedExternalLabelSets []labels.Labels + }{ + { + name: "Adding labels", + changedConfig: []HashringConfig{ + { + Endpoints: []string{"node1"}, + Tenants: []string{"tenant1", "tenant2", "tenant3"}, + ExternalLabels: map[string]string{ + "name1": "value1", + "name2": "value2", + "name3": "value3", + "name4": "value4", + }, + }, + { + Endpoints: []string{"node2"}, + Tenants: []string{"tenant4", "tenant5", "tenant6"}, + ExternalLabels: map[string]string{ + "name4": "value4", + "name5": "value5", + "name6": "value6", + "name7": "value7", + }, + }, + }, + changedExpectedExternalLabelSets: []labels.Labels{ + labels.FromStrings("name1", "value1", "name2", "value2", "name3", "value3", "name4", "value4", + "replica", "test", "tenant_id", "tenant1"), + labels.FromStrings("name1", "value1", "name2", "value2", "name3", "value3", "name4", "value4", + "replica", "test", "tenant_id", "tenant2"), + labels.FromStrings("name1", "value1", "name2", "value2", "name3", "value3", "name4", "value4", + "replica", "test", "tenant_id", "tenant3"), + labels.FromStrings("name4", "value4", "name5", "value5", "name6", "value6", "name7", "value7", + "replica", "test", "tenant_id", "tenant4"), + labels.FromStrings("name4", "value4", "name5", "value5", "name6", "value6", "name7", "value7", + "replica", "test", "tenant_id", "tenant5"), + labels.FromStrings("name4", "value4", "name5", "value5", "name6", "value6", "name7", "value7", + "replica", "test", "tenant_id", "tenant6"), + }, + }, + { + name: "Deleting some labels", + changedConfig: []HashringConfig{ + { + Endpoints: []string{"node1"}, + Tenants: []string{"tenant1", "tenant2", "tenant3"}, + ExternalLabels: map[string]string{ + "name1": "value1", + "name2": "value2", + }, + }, + { + Endpoints: []string{"node2"}, + Tenants: []string{"tenant4", "tenant5", "tenant6"}, + ExternalLabels: map[string]string{ + "name4": "value4", + "name5": "value5", + }, + }, + }, + changedExpectedExternalLabelSets: []labels.Labels{ + labels.FromStrings("name1", "value1", "name2", "value2", + "replica", "test", "tenant_id", "tenant1"), + labels.FromStrings("name1", "value1", "name2", "value2", + "replica", "test", "tenant_id", "tenant2"), + labels.FromStrings("name1", "value1", "name2", "value2", + "replica", "test", "tenant_id", "tenant3"), + labels.FromStrings("name4", "value4", "name5", "value5", + "replica", "test", "tenant_id", "tenant4"), + labels.FromStrings("name4", "value4", "name5", "value5", + "replica", "test", "tenant_id", "tenant5"), + labels.FromStrings("name4", "value4", "name5", "value5", + "replica", "test", "tenant_id", "tenant6"), + }, + }, + { + name: "Deleting all labels", + changedConfig: []HashringConfig{ + { + Endpoints: []string{"node1"}, + Tenants: []string{"tenant1", "tenant2", "tenant3"}, + }, + { + Endpoints: []string{"node2"}, + Tenants: []string{"tenant4", "tenant5", "tenant6"}, + }, + }, + changedExpectedExternalLabelSets: []labels.Labels{ + labels.FromStrings("replica", "test", "tenant_id", "tenant1"), + labels.FromStrings("replica", "test", "tenant_id", "tenant2"), + labels.FromStrings("replica", "test", "tenant_id", "tenant3"), + labels.FromStrings("replica", "test", "tenant_id", "tenant4"), + labels.FromStrings("replica", "test", "tenant_id", "tenant5"), + labels.FromStrings("replica", "test", "tenant_id", "tenant6"), + }, + }, + { + name: "Changing values of some labels", + changedConfig: []HashringConfig{ + { + Endpoints: []string{"node1"}, + Tenants: []string{"tenant1", "tenant2", "tenant3"}, + ExternalLabels: map[string]string{ + "name1": "value3", + "name2": "value2", + "name3": "value3", + }, + }, + { + Endpoints: []string{"node2"}, + Tenants: []string{"tenant4", "tenant5", "tenant6"}, + ExternalLabels: map[string]string{ + "name4": "value6", + "name5": "value5", + "name6": "value6", + }, + }, + }, + changedExpectedExternalLabelSets: []labels.Labels{ + labels.FromStrings("name1", "value3", "name2", "value2", "name3", "value3", + "replica", "test", "tenant_id", "tenant1"), + labels.FromStrings("name1", "value3", "name2", "value2", "name3", "value3", + "replica", "test", "tenant_id", "tenant2"), + labels.FromStrings("name1", "value3", "name2", "value2", "name3", "value3", + "replica", "test", "tenant_id", "tenant3"), + labels.FromStrings("name4", "value6", "name5", "value5", "name6", "value6", + "replica", "test", "tenant_id", "tenant4"), + labels.FromStrings("name4", "value6", "name5", "value5", "name6", "value6", + "replica", "test", "tenant_id", "tenant5"), + labels.FromStrings("name4", "value6", "name5", "value5", "name6", "value6", + "replica", "test", "tenant_id", "tenant6"), + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + m := initializeMultiTSDB(t.TempDir()) + + err := m.SetHashringConfig(initialConfig) + require.NoError(t, err) + + for _, c := range initialConfig { + for _, tenantId := range c.Tenants { + if m.tenants[tenantId] == nil { + err = appendSample(m, tenantId, time.Now()) + require.NoError(t, err) + } + } + } + + err = m.Open() + require.NoError(t, err) + + initialStoreClients := m.TSDBLocalClients() + require.Equal(t, len(initialExpectedExternalLabelSets), len(initialStoreClients)) + + initialSetOfExpectedClientLabelSets, initialSetOfActualClientLabelSets := setupSetsOfExpectedAndActualStoreClientLabelSets( + initialExpectedExternalLabelSets, initialStoreClients) + + for _, cls := range initialSetOfActualClientLabelSets { + require.Contains(t, initialSetOfExpectedClientLabelSets, cls) + } + + err = m.SetHashringConfig(tc.changedConfig) + require.NoError(t, err) + + err = m.Flush() + require.NoError(t, err) + + err = m.Open() + require.NoError(t, err) + + changedStoreClients := m.TSDBLocalClients() + require.Equal(t, len(tc.changedExpectedExternalLabelSets), len(changedStoreClients)) + + changedSetOfExpectedClientLabelSets, changedSetOfActualClientLabelSets := setupSetsOfExpectedAndActualStoreClientLabelSets( + tc.changedExpectedExternalLabelSets, changedStoreClients) + + for _, cls := range changedSetOfActualClientLabelSets { + require.Contains(t, changedSetOfExpectedClientLabelSets, cls) + } + + err = m.Flush() + require.NoError(t, err) + + err = m.Close() + require.NoError(t, err) + }) + } +} + +func TestAddingLabelsWhenTenantAppearsInMultipleHashrings(t *testing.T) { + initialConfig := []HashringConfig{ + { + Endpoints: []string{"node1"}, + Tenants: []string{"tenant1", "tenant2", "tenant3"}, + ExternalLabels: map[string]string{ + "name1": "value1", + "name2": "value2", + "name3": "value3", + }, + }, + { + Endpoints: []string{"node2"}, + Tenants: []string{"tenant4", "tenant5", "tenant1"}, + ExternalLabels: map[string]string{ + "name4": "value4", + "name5": "value5", + "name6": "value6", + }, + }, + } + initialExpectedExternalLabelSets := []labels.Labels{ + labels.FromStrings("name1", "value1", "name2", "value2", "name3", "value3", + "replica", "test", "tenant_id", "tenant1"), + labels.FromStrings("name1", "value1", "name2", "value2", "name3", "value3", + "replica", "test", "tenant_id", "tenant2"), + labels.FromStrings("name1", "value1", "name2", "value2", "name3", "value3", + "replica", "test", "tenant_id", "tenant3"), + labels.FromStrings("name4", "value4", "name5", "value5", "name6", "value6", + "replica", "test", "tenant_id", "tenant4"), + labels.FromStrings("name4", "value4", "name5", "value5", "name6", "value6", + "replica", "test", "tenant_id", "tenant5"), + } + + for _, tc := range []struct { + name string + changedConfig []HashringConfig + changedExpectedExternalLabelSets []labels.Labels + }{ + { + name: "Adding labels in first hashring that tenant appears", + changedConfig: []HashringConfig{ + { + Endpoints: []string{"node1"}, + Tenants: []string{"tenant1", "tenant2", "tenant3"}, + ExternalLabels: map[string]string{ + "name1": "value1", + "name2": "value2", + "name3": "value3", + "name4": "value4", + }, + }, + { + Endpoints: []string{"node2"}, + Tenants: []string{"tenant4", "tenant5", "tenant1"}, + ExternalLabels: map[string]string{ + "name4": "value4", + "name5": "value5", + "name6": "value6", + }, + }, + }, + changedExpectedExternalLabelSets: []labels.Labels{ + labels.FromStrings("name1", "value1", "name2", "value2", "name3", "value3", "name4", "value4", + "replica", "test", "tenant_id", "tenant1"), + labels.FromStrings("name1", "value1", "name2", "value2", "name3", "value3", "name4", "value4", + "replica", "test", "tenant_id", "tenant2"), + labels.FromStrings("name1", "value1", "name2", "value2", "name3", "value3", "name4", "value4", + "replica", "test", "tenant_id", "tenant3"), + labels.FromStrings("name4", "value4", "name5", "value5", "name6", "value6", + "replica", "test", "tenant_id", "tenant4"), + labels.FromStrings("name4", "value4", "name5", "value5", "name6", "value6", + "replica", "test", "tenant_id", "tenant5"), + }, + }, + { + name: "Adding labels in second hashring that tenant appears", + changedConfig: []HashringConfig{ + { + Endpoints: []string{"node1"}, + Tenants: []string{"tenant1", "tenant2", "tenant3"}, + ExternalLabels: map[string]string{ + "name1": "value1", + "name2": "value2", + "name3": "value3", + }, + }, + { + Endpoints: []string{"node2"}, + Tenants: []string{"tenant4", "tenant5", "tenant1"}, + ExternalLabels: map[string]string{ + "name4": "value4", + "name5": "value5", + "name6": "value6", + "name7": "value7", + }, + }, + }, + changedExpectedExternalLabelSets: []labels.Labels{ + labels.FromStrings("name1", "value1", "name2", "value2", "name3", "value3", + "replica", "test", "tenant_id", "tenant1"), + labels.FromStrings("name1", "value1", "name2", "value2", "name3", "value3", + "replica", "test", "tenant_id", "tenant2"), + labels.FromStrings("name1", "value1", "name2", "value2", "name3", "value3", + "replica", "test", "tenant_id", "tenant3"), + labels.FromStrings("name4", "value4", "name5", "value5", "name6", "value6", "name7", "value7", + "replica", "test", "tenant_id", "tenant4"), + labels.FromStrings("name4", "value4", "name5", "value5", "name6", "value6", "name7", "value7", + "replica", "test", "tenant_id", "tenant5"), + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + m := initializeMultiTSDB(t.TempDir()) + + err := m.SetHashringConfig(initialConfig) + require.NoError(t, err) + + for _, c := range initialConfig { + for _, tenantId := range c.Tenants { + if m.tenants[tenantId] == nil { + err = appendSample(m, tenantId, time.Now()) + require.NoError(t, err) + } + } + } + + err = m.Open() + require.NoError(t, err) + + initialStoreClients := m.TSDBLocalClients() + require.Equal(t, len(initialExpectedExternalLabelSets), len(initialStoreClients)) + + initialSetOfExpectedClientLabelSets, initialSetOfActualClientLabelSets := setupSetsOfExpectedAndActualStoreClientLabelSets( + initialExpectedExternalLabelSets, initialStoreClients) + + for _, cls := range initialSetOfActualClientLabelSets { + require.Contains(t, initialSetOfExpectedClientLabelSets, cls) + } + + err = m.SetHashringConfig(tc.changedConfig) + require.NoError(t, err) + + err = m.Flush() + require.NoError(t, err) + + err = m.Open() + require.NoError(t, err) + + changedStoreClients := m.TSDBLocalClients() + require.Equal(t, len(tc.changedExpectedExternalLabelSets), len(changedStoreClients)) + + changedSetOfExpectedClientLabelSets, changedSetOfActualClientLabelSets := setupSetsOfExpectedAndActualStoreClientLabelSets( + tc.changedExpectedExternalLabelSets, changedStoreClients) + + for _, cls := range changedSetOfActualClientLabelSets { + require.Contains(t, changedSetOfExpectedClientLabelSets, cls) + } + + err = m.Flush() + require.NoError(t, err) + + err = m.Close() + require.NoError(t, err) + }) + } +} + +func TestReceiverLabelsNotOverwrittenByExternalLabels(t *testing.T) { + cfg := []HashringConfig{ + { + Endpoints: []string{"node1"}, + Tenants: []string{"tenant1"}, + ExternalLabels: map[string]string{ + "replica": "0", + "tenant_id": "tenant2", + "name3": "value3", + }, + }, + } + expectedExternalLabelSets := []labels.Labels{ + labels.FromStrings("name3", "value3", "replica", "test", "tenant_id", "tenant1"), + } + + t.Run("Receiver's labels not overwritten by external labels", func(t *testing.T) { + m := initializeMultiTSDB(t.TempDir()) + + err := m.SetHashringConfig(cfg) + require.NoError(t, err) + + for _, c := range cfg { + for _, tenantId := range c.Tenants { + if m.tenants[tenantId] == nil { + err = appendSample(m, tenantId, time.Now()) + require.NoError(t, err) + } + } + } + + err = m.Open() + require.NoError(t, err) + + storeClients := m.TSDBLocalClients() + require.Equal(t, len(expectedExternalLabelSets), len(storeClients)) + + setOfExpectedClientLabelSets, setOfActualClientLabelSets := setupSetsOfExpectedAndActualStoreClientLabelSets( + expectedExternalLabelSets, storeClients) + + for _, cls := range setOfActualClientLabelSets { + require.Contains(t, setOfExpectedClientLabelSets, cls) + } + + err = m.Flush() + require.NoError(t, err) + + err = m.Close() + require.NoError(t, err) + }) +} + +func initializeMultiTSDB(dir string) *MultiTSDB { + var bucket objstore.Bucket + + m := NewMultiTSDB(dir, log.NewNopLogger(), prometheus.NewRegistry(), + &tsdb.Options{ + MinBlockDuration: (2 * time.Hour).Milliseconds(), + MaxBlockDuration: (2 * time.Hour).Milliseconds(), + RetentionDuration: (6 * time.Hour).Milliseconds(), + }, + labels.FromStrings("replica", "test"), + "tenant_id", + bucket, + false, + metadata.NoneFunc, + ) + + return m +} + +// Set up expected set of label sets of store clients from expected external label sets of all tenants +// and set up actual set of label sets from actual store clients. +func setupSetsOfExpectedAndActualStoreClientLabelSets( + expectedExternalLabelSets []labels.Labels, actualStoreClients []store.Client) ([][]labels.Labels, [][]labels.Labels) { + setOfExpectedClientLabelSets := make([][]labels.Labels, len(expectedExternalLabelSets)) + setOfActualClientLabelSets := make([][]labels.Labels, len(actualStoreClients)) + + for i := 0; i < len(actualStoreClients); i++ { + testStore := store.TSDBStore{} + testStore.SetExtLset(expectedExternalLabelSets[i]) + + expectedClientLabelSets := labelpb.ZLabelSetsToPromLabelSets(testStore.LabelSet()...) + setOfExpectedClientLabelSets = append(setOfExpectedClientLabelSets, expectedClientLabelSets) + + actualClientLabelSets := actualStoreClients[i].LabelSets() + setOfActualClientLabelSets = append(setOfActualClientLabelSets, actualClientLabelSets) + } + + return setOfExpectedClientLabelSets, setOfActualClientLabelSets +} diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go index 67c5f8feaf7..2a2c04df2d8 100644 --- a/pkg/shipper/shipper.go +++ b/pkg/shipper/shipper.go @@ -13,6 +13,7 @@ import ( "path" "path/filepath" "sort" + "sync" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -77,12 +78,14 @@ type Shipper struct { dir string metrics *metrics bucket objstore.Bucket - labels func() labels.Labels source metadata.SourceType uploadCompacted bool allowOutOfOrderUploads bool hashFunc metadata.HashFunc + + labels func() labels.Labels + mtx sync.RWMutex } // New creates a new shipper that detects new TSDB blocks in dir and uploads them to @@ -119,6 +122,20 @@ func New( } } +func (s *Shipper) SetLabels(lbls labels.Labels) { + s.mtx.Lock() + defer s.mtx.Unlock() + + s.labels = func() labels.Labels { return lbls } +} + +func (s *Shipper) getLabels() labels.Labels { + s.mtx.RLock() + defer s.mtx.RUnlock() + + return s.labels() +} + // Timestamps returns the minimum timestamp for which data is available and the highest timestamp // of blocks that were successfully uploaded. func (s *Shipper) Timestamps() (minTime, maxSyncTime int64, err error) { @@ -251,7 +268,7 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { meta.Uploaded = nil var ( - checker = newLazyOverlapChecker(s.logger, s.bucket, s.labels) + checker = newLazyOverlapChecker(s.logger, s.bucket, s.getLabels) uploadErrs int ) @@ -355,7 +372,7 @@ func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error { return errors.Wrap(err, "hard link block") } // Attach current labels and write a new meta file with Thanos extensions. - if lset := s.labels(); lset != nil { + if lset := s.getLabels(); lset != nil { meta.Thanos.Labels = lset.Map() } meta.Thanos.Source = s.source diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index bea8f04ac9a..5bbb469e98e 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -41,9 +41,11 @@ type TSDBStore struct { logger log.Logger db TSDBReader component component.StoreAPI - extLset labels.Labels buffers sync.Pool maxBytesPerFrame int + + extLset labels.Labels + mtx sync.RWMutex } func RegisterWritableStoreServer(storeSrv storepb.WriteableStoreServer) func(*grpc.Server) { @@ -77,6 +79,20 @@ func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI } } +func (s *TSDBStore) SetExtLset(extLset labels.Labels) { + s.mtx.Lock() + defer s.mtx.Unlock() + + s.extLset = extLset +} + +func (s *TSDBStore) getExtLset() labels.Labels { + s.mtx.RLock() + defer s.mtx.RUnlock() + + return s.extLset +} + // Info returns store information about the Prometheus instance. func (s *TSDBStore) Info(_ context.Context, _ *storepb.InfoRequest) (*storepb.InfoResponse, error) { minTime, err := s.db.StartTime() @@ -85,7 +101,7 @@ func (s *TSDBStore) Info(_ context.Context, _ *storepb.InfoRequest) (*storepb.In } res := &storepb.InfoResponse{ - Labels: labelpb.ZLabelsFromPromLabels(s.extLset), + Labels: labelpb.ZLabelsFromPromLabels(s.getExtLset()), StoreType: s.component.ToProto(), MinTime: minTime, MaxTime: math.MaxInt64, @@ -103,7 +119,7 @@ func (s *TSDBStore) Info(_ context.Context, _ *storepb.InfoRequest) (*storepb.In } func (s *TSDBStore) LabelSet() []labelpb.ZLabelSet { - labels := labelpb.ZLabelsFromPromLabels(s.extLset) + labels := labelpb.ZLabelsFromPromLabels(s.getExtLset()) labelSets := []labelpb.ZLabelSet{} if len(labels) > 0 { labelSets = append(labelSets, labelpb.ZLabelSet{ @@ -153,7 +169,7 @@ type CloseDelegator interface { // Series returns all series for a requested time range and label matcher. The returned data may // exceed the requested time bounds. func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { - match, matchers, err := matchesExternalLabels(r.Matchers, s.extLset) + match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset()) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } @@ -270,7 +286,7 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer func (s *TSDBStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) ( *storepb.LabelNamesResponse, error, ) { - match, matchers, err := matchesExternalLabels(r.Matchers, s.extLset) + match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset()) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } @@ -291,7 +307,7 @@ func (s *TSDBStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest } if len(res) > 0 { - for _, lbl := range s.extLset { + for _, lbl := range s.getExtLset() { res = append(res, lbl.Name) } sort.Strings(res) @@ -317,7 +333,7 @@ func (s *TSDBStore) LabelValues(ctx context.Context, r *storepb.LabelValuesReque return nil, status.Error(codes.InvalidArgument, "label name parameter cannot be empty") } - match, matchers, err := matchesExternalLabels(r.Matchers, s.extLset) + match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset()) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } @@ -326,7 +342,7 @@ func (s *TSDBStore) LabelValues(ctx context.Context, r *storepb.LabelValuesReque return &storepb.LabelValuesResponse{Values: nil}, nil } - if v := s.extLset.Get(r.Label); v != "" { + if v := s.getExtLset().Get(r.Label); v != "" { return &storepb.LabelValuesResponse{Values: []string{v}}, nil }