Skip to content

Commit

Permalink
Second approach: adding + handling changes for external labels
Browse files Browse the repository at this point in the history
Signed-off-by: haanhvu <haanh6594@gmail.com>
  • Loading branch information
haanhvu committed Nov 17, 2022
1 parent 80bdb6c commit 9ddbdbc
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 39 deletions.
91 changes: 64 additions & 27 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,23 +200,29 @@ func runReceive(
// In the single-node case, which has no configuration
// watcher, we close the chan ourselves.
updates := make(chan receive.Hashring, 1)
var (
dbs *receive.MultiTSDB
cf []receive.HashringConfig
cw *receive.ConfigWatcher
)

// The Hashrings config file path is given initializing config watcher.
if conf.hashringsFilePath != "" {
cw, err := receive.NewConfigWatcher(log.With(logger, "component", "config-watcher"), reg, conf.hashringsFilePath, *conf.refreshInterval)
var err error

// The Hashrings config file path is given initializing config watcher.
cw, err = receive.NewConfigWatcher(log.With(logger, "component", "config-watcher"), reg, conf.hashringsFilePath, *conf.refreshInterval)
if err != nil {
return errors.Wrap(err, "failed to initialize config watcher")
}

// Check and load the hashrings configuration
hcs, err := cw.ValidateConfig()
cf, err := cw.ValidateConfig()
if err != nil {
cw.Stop()
close(updates)
return errors.Wrap(err, "failed to validate hashrings configuration file")
return errors.Wrap(err, "failed to validate hashring configuration file")
}

dbs := receive.NewMultiTSDB(
dbs = receive.NewMultiTSDB(
conf.dataDir,
logger,
reg,
Expand All @@ -226,21 +232,44 @@ func runReceive(
bkt,
conf.allowOutOfOrderUpload,
hashFunc,
hcs,
cf,
)
} else {
dbs := receive.NewMultiTSDB(
conf.dataDir,
logger,
reg,
tsdbOpts,
lset,
conf.tenantLabelName,
bkt,
conf.allowOutOfOrderUpload,
hashFunc,
nil,
)
if len(conf.hashringsFileContent) > 0 {
var err error

// The Hashrings config file content given initialize configuration from content.
cf, err = receive.ParseConfig([]byte(conf.hashringsFileContent))
if err != nil {
close(updates)
return errors.Wrap(err, "failed to validate hashring configuration content")
}
dbs = receive.NewMultiTSDB(
conf.dataDir,
logger,
reg,
tsdbOpts,
lset,
conf.tenantLabelName,
bkt,
conf.allowOutOfOrderUpload,
hashFunc,
cf,
)
} else {
dbs = receive.NewMultiTSDB(
conf.dataDir,
logger,
reg,
tsdbOpts,
lset,
conf.tenantLabelName,
bkt,
conf.allowOutOfOrderUpload,
hashFunc,
nil,
)
}
}

writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs)
Expand Down Expand Up @@ -289,8 +318,8 @@ func runReceive(
prober.NewInstrumentation(comp, logger, extprom.WrapRegistererWithPrefix("thanos_", reg)),
)

// Start all components while we wait for TSDB to open but only load
// initial config and mark ourselves as ready after it completes.
// Start all components while we wait for TSDB to open but only set up
// hashring and mark ourselves as ready after it completes.

// hashringChangedChan signals when TSDB needs to be flushed and updated due to hashring config change.
hashringChangedChan := make(chan struct{}, 1)
Expand All @@ -303,15 +332,15 @@ func runReceive(

level.Debug(logger).Log("msg", "setting up TSDB")
{
if err := startTSDBAndUpload(g, logger, reg, dbs, uploadC, hashringChangedChan, upload, uploadDone, statusProber, bkt, receive.HashringAlgorithm(conf.hashringsAlgorithm)); err != nil {
if err := startTSDBAndUpload(g, logger, reg, dbs, uploadC, hashringChangedChan, upload, uploadDone, statusProber, bkt, receive.HashringAlgorithm(conf.hashringsAlgorithm), cw); err != nil {
return err
}
}
}

level.Debug(logger).Log("msg", "setting up hashring")
{
if err := setupHashring(g, logger, reg, conf, hashringChangedChan, webHandler, statusProber, enableIngestion, updates, cw); err != nil {
if err := setupHashring(g, logger, reg, conf, hashringChangedChan, webHandler, statusProber, enableIngestion, updates, cw, cf); err != nil {
return err
}
}
Expand Down Expand Up @@ -473,8 +502,9 @@ func setupHashring(g *run.Group,
webHandler *receive.Handler,
statusProber prober.Probe,
enableIngestion bool,
updates chan struct{},
updates chan receive.Hashring,
cw *receive.ConfigWatcher,
cf []receive.HashringConfig,
) error {
algorithm := receive.HashringAlgorithm(conf.hashringsAlgorithm)

Expand All @@ -491,12 +521,12 @@ func setupHashring(g *run.Group,
ring receive.Hashring
err error
)
// The Hashrings config file content given initialize configuration from content.

if len(conf.hashringsFileContent) > 0 {
ring, err = receive.HashringFromConfig(algorithm, conf.replicationFactor, conf.hashringsFileContent)
ring, err = receive.HashringFromConfig(algorithm, conf.replicationFactor, cf)
if err != nil {
close(updates)
return errors.Wrap(err, "failed to validate hashring configuration file")
return errors.Wrap(err, "failed to set up hashring from content")
}
level.Info(logger).Log("msg", "the hashring initialized directly with the given content through the flag.")
} else {
Expand Down Expand Up @@ -560,6 +590,7 @@ func startTSDBAndUpload(g *run.Group,
statusProber prober.Probe,
bkt objstore.Bucket,
hashringAlgorithm receive.HashringAlgorithm,
cw *receive.ConfigWatcher,
) error {

log.With(logger, "component", "storage")
Expand Down Expand Up @@ -625,6 +656,12 @@ func startTSDBAndUpload(g *run.Group,
if err := dbs.Flush(); err != nil {
return errors.Wrap(err, "flushing storage")
}
cf, err := cw.ValidateConfig()
if err != nil {
cw.Stop()
return errors.Wrap(err, "failed to validate hashring configuration file")
}
dbs.SetHashringConfig(cf)
if err := dbs.Open(); err != nil {
return errors.Wrap(err, "opening storage")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/receive/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func loadConfig(logger log.Logger, path string) ([]HashringConfig, float64, erro
return nil, 0, errors.Wrap(err, "failed to read configuration file")
}

config, err := parseConfig(cfgContent)
config, err := ParseConfig(cfgContent)
if err != nil {
return nil, 0, errors.Wrapf(errParseConfigurationFile, "failed to parse configuration file: %v", err)
}
Expand Down Expand Up @@ -291,7 +291,7 @@ func readFile(logger log.Logger, path string) ([]byte, error) {
}

// parseConfig parses the raw configuration content and returns a HashringConfig.
func parseConfig(content []byte) ([]HashringConfig, error) {
func ParseConfig(content []byte) ([]HashringConfig, error) {
var config []HashringConfig
err := json.Unmarshal(content, &config)
return config, err
Expand Down
1 change: 1 addition & 0 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1324,6 +1324,7 @@ func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) {
nil,
false,
metadata.NoneFunc,
nil,
)
defer func() { testutil.Ok(b, m.Close()) }()
handler.writer = NewWriter(logger, m)
Expand Down
15 changes: 5 additions & 10 deletions pkg/receive/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,17 +285,12 @@ func HashringFromConfigWatcher(ctx context.Context, algorithm HashringAlgorithm,
}
}

// HashringFromConfig loads raw configuration content and returns a Hashring if the given configuration is not valid.
func HashringFromConfig(algorithm HashringAlgorithm, replicationFactor uint64, content string) (Hashring, error) {
config, err := parseConfig([]byte(content))
if err != nil {
return nil, errors.Wrapf(err, "failed to parse configuration")
}

// If hashring is empty, return an error.
// HashringFromConfig loads hashring configuration and returns a MultiHashring if the given configuration is valid.
func HashringFromConfig(algorithm HashringAlgorithm, replicationFactor uint64, config []HashringConfig) (Hashring, error) {
// If hashring configuration is empty, return an error.
if len(config) == 0 {
return nil, errors.Wrapf(err, "failed to load configuration")
return nil, errors.New("failed to load configuration")
}

return newMultiHashring(algorithm, replicationFactor, config), err
return newMultiHashring(algorithm, replicationFactor, config), nil
}
4 changes: 4 additions & 0 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,10 @@ func (t *MultiTSDB) TenantAppendable(tenantID string) (Appendable, error) {
return tenant.readyStorage(), nil
}

func (t *MultiTSDB) SetHashringConfig(cfg []HashringConfig) {
t.hashringConfigs = cfg
}

// ErrNotReady is returned if the underlying storage is not ready yet.
var ErrNotReady = errors.New("TSDB not ready")

Expand Down
6 changes: 6 additions & 0 deletions pkg/receive/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func TestMultiTSDB(t *testing.T) {
nil,
false,
metadata.NoneFunc,
nil,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -133,6 +134,7 @@ func TestMultiTSDB(t *testing.T) {
nil,
false,
metadata.NoneFunc,
nil,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -414,6 +416,7 @@ func TestMultiTSDBPrune(t *testing.T) {
test.bucket,
false,
metadata.NoneFunc,
nil,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -453,6 +456,7 @@ func TestMultiTSDBRecreatePrunedTenant(t *testing.T) {
objstore.NewInMemBucket(),
false,
metadata.NoneFunc,
nil,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -507,6 +511,7 @@ func TestMultiTSDBStats(t *testing.T) {
nil,
false,
metadata.NoneFunc,
nil,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -559,6 +564,7 @@ func BenchmarkMultiTSDB(b *testing.B) {
nil,
false,
metadata.NoneFunc,
nil,
)
defer func() { testutil.Ok(b, m.Close()) }()

Expand Down
2 changes: 2 additions & 0 deletions pkg/receive/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ func TestWriter(t *testing.T) {
nil,
false,
metadata.NoneFunc,
nil,
)
t.Cleanup(func() { testutil.Ok(t, m.Close()) })

Expand Down Expand Up @@ -294,6 +295,7 @@ func benchmarkWriter(b *testing.B, labelsNum int, seriesNum int) {
nil,
false,
metadata.NoneFunc,
nil,
)
b.Cleanup(func() { testutil.Ok(b, m.Close()) })

Expand Down

0 comments on commit 9ddbdbc

Please sign in to comment.