From 9fa523711c9837fffaa82884aef19a3dcb6c48ab Mon Sep 17 00:00:00 2001 From: haanhvu Date: Tue, 1 Nov 2022 18:29:34 +0700 Subject: [PATCH] Extract external labels from hashring configs in receive cmd and put it in multiTSDB Signed-off-by: haanhvu --- cmd/thanos/receive.go | 38 ++++++++++++++++++++++---------------- pkg/receive/config.go | 6 +++--- pkg/receive/config_test.go | 2 +- pkg/receive/multitsdb.go | 34 ++++++++++++++++++++++++++++------ 4 files changed, 54 insertions(+), 26 deletions(-) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 05c38507a77..ab1b8f5d4bb 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -203,6 +203,7 @@ func runReceive( bkt, conf.allowOutOfOrderUpload, hashFunc, + nil, ) writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs) @@ -256,6 +257,15 @@ func runReceive( // hashringChangedChan signals when TSDB needs to be flushed and updated due to hashring config change. hashringChangedChan := make(chan struct{}, 1) + level.Debug(logger).Log("msg", "setting up hashring") + { + if hashringConfigs, err := setupHashring(g, logger, reg, conf, hashringChangedChan, webHandler, statusProber, enableIngestion); err != nil { + return err + } + + dbs.hashringConfigs = hashringConfigs + } + if enableIngestion { // uploadC signals when new blocks should be uploaded. uploadC := make(chan struct{}, 1) @@ -270,13 +280,6 @@ func runReceive( } } - level.Debug(logger).Log("msg", "setting up hashring") - { - if err := setupHashring(g, logger, reg, conf, hashringChangedChan, webHandler, statusProber, enableIngestion); err != nil { - return err - } - } - level.Debug(logger).Log("msg", "setting up HTTP server") { srv := httpserver.New(logger, reg, comp, httpProbe, @@ -434,23 +437,24 @@ func setupHashring(g *run.Group, webHandler *receive.Handler, statusProber prober.Probe, enableIngestion bool, -) error { +) []receive.HashringConfig, error { // Note: the hashring configuration watcher // is the sender and thus closes the chan. // In the single-node case, which has no configuration // watcher, we close the chan ourselves. updates := make(chan receive.Hashring, 1) algorithm := receive.HashringAlgorithm(conf.hashringsAlgorithm) + var hashringConfigs []receive.HashringConfig // The Hashrings config file path is given initializing config watcher. if conf.hashringsFilePath != "" { cw, err := receive.NewConfigWatcher(log.With(logger, "component", "config-watcher"), reg, conf.hashringsFilePath, *conf.refreshInterval) if err != nil { - return errors.Wrap(err, "failed to initialize config watcher") + return nil, errors.Wrap(err, "failed to initialize config watcher") } // Check the hashring configuration on before running the watcher. - if err := cw.ValidateConfig(); err != nil { + if hcs, err := cw.ValidateConfig(); err != nil { cw.Stop() close(updates) return errors.Wrap(err, "failed to validate hashring configuration file") @@ -459,10 +463,12 @@ func setupHashring(g *run.Group, ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { level.Info(logger).Log("msg", "the hashring initialized with config watcher.") - return receive.HashringFromConfigWatcher(ctx, algorithm, conf.replicationFactor, updates, cw) + return nil, receive.HashringFromConfigWatcher(ctx, algorithm, conf.replicationFactor, updates, cw) }, func(error) { cancel() }) + + hashringConfigs = hcs } else { var ( ring receive.Hashring @@ -473,7 +479,7 @@ func setupHashring(g *run.Group, ring, err = receive.HashringFromConfig(algorithm, conf.replicationFactor, conf.hashringsFileContent) if err != nil { close(updates) - return errors.Wrap(err, "failed to validate hashring configuration file") + return nil, errors.Wrap(err, "failed to validate hashring configuration file") } level.Info(logger).Log("msg", "the hashring initialized directly with the given content through the flag.") } else { @@ -486,7 +492,7 @@ func setupHashring(g *run.Group, defer close(updates) updates <- ring <-cancel - return nil + return hashringConfigs, nil }, func(error) { close(cancel) }) @@ -503,7 +509,7 @@ func setupHashring(g *run.Group, select { case h, ok := <-updates: if !ok { - return nil + return hashringConfigs, nil } webHandler.Hashring(h) // If ingestion is enabled, send a signal to TSDB to flush. @@ -514,14 +520,14 @@ func setupHashring(g *run.Group, statusProber.Ready() } case <-cancel: - return nil + return hashringConfigs, nil } } }, func(err error) { close(cancel) }, ) - return nil + return hashringConfigs, nil } // startTSDBAndUpload starts the multi-TSDB and sets up the rungroup to flush the TSDB and reload on hashring change. diff --git a/pkg/receive/config.go b/pkg/receive/config.go index 4ec4fd8cbe2..afb65657383 100644 --- a/pkg/receive/config.go +++ b/pkg/receive/config.go @@ -185,9 +185,9 @@ func (cw *ConfigWatcher) C() <-chan []HashringConfig { } // ValidateConfig returns an error if the configuration that's being watched is not valid. -func (cw *ConfigWatcher) ValidateConfig() error { - _, _, err := loadConfig(cw.logger, cw.path) - return err +func (cw *ConfigWatcher) ValidateConfig() ([]HashringConfig, error) { + config, _, err := loadConfig(cw.logger, cw.path) + return config, err } // Stop shuts down the config watcher. diff --git a/pkg/receive/config_test.go b/pkg/receive/config_test.go index d67de827d9b..56b99fe0725 100644 --- a/pkg/receive/config_test.go +++ b/pkg/receive/config_test.go @@ -65,7 +65,7 @@ func TestValidateConfig(t *testing.T) { testutil.Ok(t, err) defer cw.Stop() - if err := cw.ValidateConfig(); err != nil && !errors.Is(err, tc.err) { + if _, err := cw.ValidateConfig(); err != nil && !errors.Is(err, tc.err) { t.Errorf("case %q: got unexpected error: %v", tc.name, err) } }) diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 2b6a9d89253..b17bad8227a 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -10,6 +10,7 @@ import ( "path" "path/filepath" "sort" + "strings" "sync" "time" @@ -17,6 +18,7 @@ import ( "github.com/go-kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" @@ -57,6 +59,7 @@ type MultiTSDB struct { tenants map[string]*tenant allowOutOfOrderUpload bool hashFunc metadata.HashFunc + hashringConfigs []HashringConfig } // NewMultiTSDB creates new MultiTSDB. @@ -71,6 +74,7 @@ func NewMultiTSDB( bucket objstore.Bucket, allowOutOfOrderUpload bool, hashFunc metadata.HashFunc, + hashringConfigs []HashringConfig ) *MultiTSDB { if l == nil { l = log.NewNopLogger() @@ -88,6 +92,7 @@ func NewMultiTSDB( bucket: bucket, allowOutOfOrderUpload: allowOutOfOrderUpload, hashFunc: hashFunc, + hashringConfigs: hashringConfigs, } } @@ -139,7 +144,6 @@ type tenant struct { storeTSDB *store.TSDBStore exemplarsTSDB *exemplars.TSDB ship *shipper.Shipper - externalLabels HashringConfig.ExternalLabels mtx *sync.RWMutex } @@ -182,13 +186,12 @@ func (t *tenant) shipper() *shipper.Shipper { return t.ship } -func (t *tenant) set(storeTSDB *store.TSDBStore, tenantTSDB *tsdb.DB, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB, externalLabels HashringConfig.ExternalLabels) { +func (t *tenant) set(storeTSDB *store.TSDBStore, tenantTSDB *tsdb.DB, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB) { t.readyS.Set(tenantTSDB) t.mtx.Lock() t.storeTSDB = storeTSDB t.ship = ship t.exemplarsTSDB = exemplarsTSDB - t.externalLabels = HashringConfig.ExternalLabels t.mtx.Unlock() } @@ -496,10 +499,29 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant reg = NewUnRegisterer(reg) lset := labelpb.ExtendSortedLabels(t.labels, labels.FromStrings(t.tenantLabelName, tenantID)) - tenantExternalLabels := thanos.parseFlagLabels(tenant.externalLabels) - for _, tenantInstance := range t.tenants { - lset = labelpb.ExtendSortedLabels(t.labels, tenantExternalLabels) + + hcLoop: + for hc := range t.hashringConfigs { + for t := range hc.Tenants { + if t == tenantID { + var elset labels.Labels + for el := range hc.ExternalLabels { + parts := strings.SplitN(l, "=", 2) + if len(parts) != 2 { + return nil, errors.Errorf("unrecognized label %q", el) + } + if !model.LabelName.IsValid(model.LabelName(parts[0])) { + return nil, errors.Errorf("unsupported format for label %s", el) + } + elset = append(elset, labels.Label{Name: parts[0], Value: parts[1]}) + } + sort.Sort(elset) + lset = labelpb.ExtendSortedLabels(lset, elset) + break hcLoop + } + } } + dataDir := t.defaultTenantDataDir(tenantID) level.Info(logger).Log("msg", "opening TSDB")