diff --git a/CHANGELOG.md b/CHANGELOG.md index b7fbf7fd38..03ce32ea63 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re - [#3421](https://github.com/thanos-io/thanos/pull/3421) Tools: Added `thanos tools bucket rewrite` command allowing to delete series from given block. - [#3509](https://github.com/thanos-io/thanos/pull/3509) Store: Added touch series limit - [#3388](https://github.com/thanos-io/thanos/pull/3378) Tools: Bucket replicator now can specify block IDs to copy. +- [#3121](https://github.com/thanos-io/thanos/pull/3121) Receive: Added `--receive.hashrings` alternative to `receive.hashrings-file` flag (lower priority). Content of JSON file that contains the hashring configuration. ### Fixed @@ -111,8 +112,8 @@ Highlights: ### Added -- [#3114](https://github.com/thanos-io/thanos/pull/3114) Query Frontend: Added support for Memacached cache. - - **breaking** Renamed flag `log_queries_longer_than` to `log-queries-longer-than`. +- [#3114](https://github.com/thanos-io/thanos/pull/3114) Query Frontend: Added support for Memcached cache. + - **breaking** Renamed flag `log_queries_longer_than` to `log-queries-longer-than`. - [#3166](https://github.com/thanos-io/thanos/pull/3166) UIs: Added UI for passing a `storeMatch[]` parameter to queries. - [#3181](https://github.com/thanos-io/thanos/pull/3181) Logging: Added debug level logging for responses between 300-399 - [#3133](https://github.com/thanos-io/thanos/pull/3133) Query: Allowed passing a `storeMatch[]` to Labels APIs; Time range metadata based store filtering is supported on Labels APIs. @@ -134,7 +135,7 @@ Highlights: - [#3022](https://github.com/thanos-io/thanos/pull/3022) \*: Thanos images are now build with Go 1.15. - [#3205](https://github.com/thanos-io/thanos/pull/3205) \*: Updated TSDB to ~2.21 -## [v0.15.0](https://github.com/thanos-io/thanos/releases) - 2020.09.07 +## [v0.15.0](https://github.com/thanos-io/thanos/releases/v0.15.0) - 2020.09.07 Highlights: diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 5204f17986..b90779a008 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -19,8 +19,10 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb" + "github.com/thanos-io/thanos/pkg/extkingpin" "github.com/thanos-io/thanos/pkg/component" @@ -63,8 +65,8 @@ func registerReceive(app *extkingpin.App) { retention := extkingpin.ModelDuration(cmd.Flag("tsdb.retention", "How long to retain raw samples on local storage. 0d - disables this retention.").Default("15d")) - hashringsFile := cmd.Flag("receive.hashrings-file", "Path to file that contains the hashring configuration."). - PlaceHolder("").String() + hashringsFilePath := cmd.Flag("receive.hashrings-file", "Path to file that contains the hashring configuration. A watcher is initialized to watch changes and update the hashring dynamically.").PlaceHolder("").String() + hashringsFileContent := cmd.Flag("receive.hashrings", "Alternative to 'receive.hashrings-file' flag (lower priority). Content of file that contains the hashring configuration.").PlaceHolder("").String() refreshInterval := extkingpin.ModelDuration(cmd.Flag("receive.hashrings-file-refresh-interval", "Refresh interval to re-read the hashring configuration file. (used as a fallback)"). Default("5m")) @@ -105,14 +107,6 @@ func registerReceive(app *extkingpin.App) { return errors.New("no external labels configured for receive, uniquely identifying external labels must be configured (ideally with `receive_` prefix); see https://thanos.io/tip/thanos/storage.md#external-labels for details.") } - var cw *receive.ConfigWatcher - if *hashringsFile != "" { - cw, err = receive.NewConfigWatcher(log.With(logger, "component", "config-watcher"), reg, *hashringsFile, *refreshInterval) - if err != nil { - return err - } - } - tsdbOpts := &tsdb.Options{ MinBlockDuration: int64(time.Duration(*tsdbMinBlockDuration) / time.Millisecond), MaxBlockDuration: int64(time.Duration(*tsdbMaxBlockDuration) / time.Millisecond), @@ -158,7 +152,9 @@ func registerReceive(app *extkingpin.App) { tsdbOpts, *ignoreBlockSize, lset, - cw, + *hashringsFilePath, + *hashringsFileContent, + refreshInterval, *localEndpoint, *tenantHeader, *defaultTenantID, @@ -197,7 +193,9 @@ func runReceive( tsdbOpts *tsdb.Options, ignoreBlockSize bool, lset labels.Labels, - cw *receive.ConfigWatcher, + hashringsFilePath string, + hashringsFileContent string, + refreshInterval *model.Duration, endpoint string, tenantHeader string, defaultTenantID string, @@ -373,7 +371,13 @@ func runReceive( // watcher, we close the chan ourselves. updates := make(chan receive.Hashring, 1) - if cw != nil { + // The Hashrings config file path is given initializing config watcher. + if hashringsFilePath != "" { + cw, err := receive.NewConfigWatcher(log.With(logger, "component", "config-watcher"), reg, hashringsFilePath, *refreshInterval) + if err != nil { + return errors.Wrap(err, "failed to initialize config watcher") + } + // Check the hashring configuration on before running the watcher. if err := cw.ValidateConfig(); err != nil { cw.Stop() @@ -383,15 +387,30 @@ func runReceive( ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { - return receive.HashringFromConfig(ctx, updates, cw) + level.Info(logger).Log("msg", "the hashring initialized with config watcher.") + return receive.HashringFromConfigWatcher(ctx, updates, cw) }, func(error) { cancel() }) } else { + var ring receive.Hashring + // The Hashrings config file content given initialize configuration from content. + if len(hashringsFileContent) > 0 { + ring, err = receive.HashringFromConfig(hashringsFileContent) + if err != nil { + close(updates) + return errors.Wrap(err, "failed to validate hashring configuration file") + } + level.Info(logger).Log("msg", "the hashring initialized directly with the given content through the flag.") + } else { + level.Info(logger).Log("msg", "the hashring file is not specified use single node hashring.") + ring = receive.SingleNodeHashring(endpoint) + } + cancel := make(chan struct{}) g.Add(func() error { defer close(updates) - updates <- receive.SingleNodeHashring(endpoint) + updates <- ring <-cancel return nil }, func(error) { diff --git a/docs/components/receive.md b/docs/components/receive.md index 3225bab829..d937879984 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -149,7 +149,13 @@ Flags: storage. 0d - disables this retention. --receive.hashrings-file= Path to file that contains the hashring - configuration. + configuration. A watcher is initialized to + watch changes and update the hashring + dynamically. + --receive.hashrings= + Alternative to 'receive.hashrings-file' flag + (lower priority). Content of file that contains + the hashring configuration. --receive.hashrings-file-refresh-interval=5m Refresh interval to re-read the hashring configuration file. (used as a fallback) diff --git a/pkg/extflag/pathorcontent.go b/pkg/extflag/pathorcontent.go index 189c7af9de..5fed5a93a0 100644 --- a/pkg/extflag/pathorcontent.go +++ b/pkg/extflag/pathorcontent.go @@ -44,21 +44,21 @@ func RegisterPathOrContent(cmd FlagClause, flagName string, help string, require } } -// Content returns content of the file. Flag that specifies path has priority. +// Content returns the content of the file when given or directly the content that has passed to the flag. +// Flag that specifies path has priority. // It returns error if the content is empty and required flag is set to true. func (p *PathOrContent) Content() ([]byte, error) { - contentFlagName := p.flagName fileFlagName := fmt.Sprintf("%s-file", p.flagName) if len(*p.path) > 0 && len(*p.content) > 0 { - return nil, errors.Errorf("both %s and %s flags set.", fileFlagName, contentFlagName) + return nil, errors.Errorf("both %s and %s flags set.", fileFlagName, p.flagName) } var content []byte if len(*p.path) > 0 { c, err := ioutil.ReadFile(*p.path) if err != nil { - return nil, errors.Wrapf(err, "loading YAML file %s for %s", *p.path, fileFlagName) + return nil, errors.Wrapf(err, "loading file %s for %s", *p.path, fileFlagName) } content = c } else { @@ -66,7 +66,7 @@ func (p *PathOrContent) Content() ([]byte, error) { } if len(content) == 0 && p.required { - return nil, errors.Errorf("flag %s or %s is required for running this command and content cannot be empty.", fileFlagName, contentFlagName) + return nil, errors.Errorf("flag %s or %s is required for running this command and content cannot be empty.", fileFlagName, p.flagName) } return content, nil diff --git a/pkg/receive/config.go b/pkg/receive/config.go index 1eb198cace..272c3c5321 100644 --- a/pkg/receive/config.go +++ b/pkg/receive/config.go @@ -176,35 +176,42 @@ func (cw *ConfigWatcher) C() <-chan []HashringConfig { // ValidateConfig returns an error if the configuration that's being watched is not valid. func (cw *ConfigWatcher) ValidateConfig() error { - _, _, err := cw.loadConfig() + _, _, err := loadConfig(cw.logger, cw.path) return err } -// loadConfig loads raw configuration content and returns a configuration. -func (cw *ConfigWatcher) loadConfig() ([]HashringConfig, float64, error) { - cfgContent, err := cw.readFile() - if err != nil { - return nil, 0, errors.Wrap(err, "failed to read configuration file") - } +// Stop shuts down the config watcher. +func (cw *ConfigWatcher) Stop() { + level.Debug(cw.logger).Log("msg", "stopping hashring configuration watcher...", "path", cw.path) - config, err := cw.parseConfig(cfgContent) - if err != nil { - return nil, 0, errors.Wrapf(errParseConfigurationFile, "failed to parse configuration file: %v", err) - } + done := make(chan struct{}) + defer close(done) - // If hashring is empty, return an error. - if len(config) == 0 { - return nil, 0, errors.Wrapf(errEmptyConfigurationFile, "failed to load configuration file, path: %s", cw.path) + // Closing the watcher will deadlock unless all events and errors are drained. + go func() { + for { + select { + case <-cw.watcher.Errors: + case <-cw.watcher.Events: + // Drain all events and errors. + case <-done: + return + } + } + }() + if err := cw.watcher.Close(); err != nil { + level.Error(cw.logger).Log("msg", "error closing file watcher", "path", cw.path, "err", err) } - return config, hashAsMetricValue(cfgContent), nil + close(cw.ch) + level.Debug(cw.logger).Log("msg", "hashring configuration watcher stopped") } // refresh reads the configured file and sends the hashring configuration on the channel. func (cw *ConfigWatcher) refresh(ctx context.Context) { cw.refreshCounter.Inc() - config, cfgHash, err := cw.loadConfig() + config, cfgHash, err := loadConfig(cw.logger, cw.path) if err != nil { cw.errorCounter.Inc() level.Error(cw.logger).Log("msg", "failed to load configuration file", "err", err, "path", cw.path) @@ -238,42 +245,35 @@ func (cw *ConfigWatcher) refresh(ctx context.Context) { } } -// Stop shuts down the config watcher. -func (cw *ConfigWatcher) Stop() { - level.Debug(cw.logger).Log("msg", "stopping hashring configuration watcher...", "path", cw.path) +// loadConfig loads raw configuration content and returns a configuration. +func loadConfig(logger log.Logger, path string) ([]HashringConfig, float64, error) { + cfgContent, err := readFile(logger, path) + if err != nil { + return nil, 0, errors.Wrap(err, "failed to read configuration file") + } - done := make(chan struct{}) - defer close(done) + config, err := parseConfig(cfgContent) + if err != nil { + return nil, 0, errors.Wrapf(errParseConfigurationFile, "failed to parse configuration file: %v", err) + } - // Closing the watcher will deadlock unless all events and errors are drained. - go func() { - for { - select { - case <-cw.watcher.Errors: - case <-cw.watcher.Events: - // Drain all events and errors. - case <-done: - return - } - } - }() - if err := cw.watcher.Close(); err != nil { - level.Error(cw.logger).Log("msg", "error closing file watcher", "path", cw.path, "err", err) + // If hashring is empty, return an error. + if len(config) == 0 { + return nil, 0, errors.Wrapf(errEmptyConfigurationFile, "failed to load configuration file, path: %s", path) } - close(cw.ch) - level.Debug(cw.logger).Log("msg", "hashring configuration watcher stopped") + return config, hashAsMetricValue(cfgContent), nil } // readFile reads the configuration file and returns content of configuration file. -func (cw *ConfigWatcher) readFile() ([]byte, error) { - fd, err := os.Open(cw.path) +func readFile(logger log.Logger, path string) ([]byte, error) { + fd, err := os.Open(path) if err != nil { return nil, err } defer func() { if err := fd.Close(); err != nil { - level.Error(cw.logger).Log("msg", "failed to close file", "err", err, "path", cw.path) + level.Error(logger).Log("msg", "failed to close file", "err", err, "path", path) } }() @@ -281,7 +281,7 @@ func (cw *ConfigWatcher) readFile() ([]byte, error) { } // parseConfig parses the raw configuration content and returns a HashringConfig. -func (cw *ConfigWatcher) parseConfig(content []byte) ([]HashringConfig, error) { +func parseConfig(content []byte) ([]HashringConfig, error) { var config []HashringConfig err := json.Unmarshal(content, &config) return config, err diff --git a/pkg/receive/hashring.go b/pkg/receive/hashring.go index ef6c94390a..cb5ceb71a5 100644 --- a/pkg/receive/hashring.go +++ b/pkg/receive/hashring.go @@ -11,6 +11,7 @@ import ( "github.com/cespare/xxhash" "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/store/storepb/prompb" ) @@ -158,14 +159,14 @@ func newMultiHashring(cfg []HashringConfig) Hashring { return m } -// HashringFromConfig creates multi-tenant hashrings from a +// HashringFromConfigWatcher creates multi-tenant hashrings from a // hashring configuration file watcher. // The configuration file is watched for updates. // Hashrings are returned on the updates channel. // Which hashring to use for a tenant is determined // by the tenants field of the hashring configuration. // The updates chan is closed before exiting. -func HashringFromConfig(ctx context.Context, updates chan<- Hashring, cw *ConfigWatcher) error { +func HashringFromConfigWatcher(ctx context.Context, updates chan<- Hashring, cw *ConfigWatcher) error { defer close(updates) go cw.Run(ctx) @@ -181,3 +182,18 @@ func HashringFromConfig(ctx context.Context, updates chan<- Hashring, cw *Config } } } + +// HashringFromConfig loads raw configuration content and returns a Hashring if the given configuration is not valid. +func HashringFromConfig(content string) (Hashring, error) { + config, err := parseConfig([]byte(content)) + if err != nil { + return nil, errors.Wrapf(err, "failed to parse configuration") + } + + // If hashring is empty, return an error. + if len(config) == 0 { + return nil, errors.Wrapf(err, "failed to load configuration") + } + + return newMultiHashring(config), err +} diff --git a/scripts/cfggen/main.go b/scripts/cfggen/main.go index 055a0d1c65..5570234c8f 100644 --- a/scripts/cfggen/main.go +++ b/scripts/cfggen/main.go @@ -15,6 +15,9 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" + "gopkg.in/alecthomas/kingpin.v2" + "gopkg.in/yaml.v2" + "github.com/thanos-io/thanos/pkg/alert" "github.com/thanos-io/thanos/pkg/cacheutil" http_util "github.com/thanos-io/thanos/pkg/http" @@ -34,8 +37,6 @@ import ( "github.com/thanos-io/thanos/pkg/tracing/jaeger" "github.com/thanos-io/thanos/pkg/tracing/lightstep" "github.com/thanos-io/thanos/pkg/tracing/stackdriver" - kingpin "gopkg.in/alecthomas/kingpin.v2" - yaml "gopkg.in/yaml.v2" ) var ( diff --git a/scripts/quickstart.sh b/scripts/quickstart.sh index e03b377cf1..e99ab8fdd9 100755 --- a/scripts/quickstart.sh +++ b/scripts/quickstart.sh @@ -184,10 +184,6 @@ sleep 0.5 if [ -n "${REMOTE_WRITE_ENABLED}" ]; then - cat <<-EOF >./data/hashring.json - [{"endpoints":["127.0.0.1:10907","127.0.0.1:11907","127.0.0.1:12907"]}] - EOF - for i in $(seq 0 1 2); do ${THANOS_EXECUTABLE} receive \ --debug.name receive${i} \ @@ -203,7 +199,7 @@ if [ -n "${REMOTE_WRITE_ENABLED}" ]; then --label "receive_replica=\"${i}\"" \ --label 'receive="true"' \ --receive.local-endpoint 127.0.0.1:1${i}907 \ - --receive.hashrings-file ./data/hashring.json \ + --receive.hashrings '[{"endpoints":["127.0.0.1:10907","127.0.0.1:11907","127.0.0.1:12907"]}]' \ --remote-write.address 0.0.0.0:1${i}908 \ ${OBJSTORECFG} & diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index 4e32414ac1..8f692883aa 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -201,6 +201,51 @@ func NewReceiver(sharedDir string, networkName string, name string, replicationF return nil, errors.Wrapf(err, "generate hashring file: %v", hashring) } + receiver := NewService( + fmt.Sprintf("receive-%v", name), + DefaultImage(), + // TODO(bwplotka): BuildArgs should be interface. + e2e.NewCommand("receive", e2e.BuildArgs(map[string]string{ + "--debug.name": fmt.Sprintf("receive-%v", name), + "--grpc-address": ":9091", + "--grpc-grace-period": "0s", + "--http-address": ":8080", + "--remote-write.address": ":8081", + "--label": fmt.Sprintf(`receive="%s"`, name), + "--tsdb.path": filepath.Join(container, "data"), + "--log.level": logLevel, + "--receive.replication-factor": strconv.Itoa(replicationFactor), + "--receive.local-endpoint": localEndpoint, + "--receive.hashrings": string(b), + })...), + e2e.NewHTTPReadinessProbe(8080, "/-/ready", 200, 200), + 8080, + 9091, + 8081, + ) + receiver.SetUser(strconv.Itoa(os.Getuid())) + receiver.SetBackoff(defaultBackoffConfig) + + return receiver, nil +} + +func NewReceiverWithConfigWatcher(sharedDir string, networkName string, name string, replicationFactor int, hashring ...receive.HashringConfig) (*Service, error) { + localEndpoint := NewService(fmt.Sprintf("receive-%v", name), "", e2e.NewCommand("", ""), nil, 8080, 9091, 8081).GRPCNetworkEndpointFor(networkName) + if len(hashring) == 0 { + hashring = []receive.HashringConfig{{Endpoints: []string{localEndpoint}}} + } + + dir := filepath.Join(sharedDir, "data", "receive", name) + dataDir := filepath.Join(dir, "data") + container := filepath.Join(e2e.ContainerSharedDir, "data", "receive", name) + if err := os.MkdirAll(dataDir, 0777); err != nil { + return nil, errors.Wrap(err, "create receive dir") + } + b, err := json.Marshal(hashring) + if err != nil { + return nil, errors.Wrapf(err, "generate hashring file: %v", hashring) + } + if err := ioutil.WriteFile(filepath.Join(dir, "hashrings.json"), b, 0666); err != nil { return nil, errors.Wrap(err, "creating receive config") } diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index b9e3aaf502..a92b5baf2b 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -138,6 +138,82 @@ func TestReceive(t *testing.T) { }) }) + t.Run("hashring with config watcher", func(t *testing.T) { + t.Parallel() + + s, err := e2e.NewScenario("e2e_test_receive_hashring") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, s)) + + r1, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "1", 1) + testutil.Ok(t, err) + r2, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "2", 1) + testutil.Ok(t, err) + r3, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "3", 1) + testutil.Ok(t, err) + + h := receive.HashringConfig{ + Endpoints: []string{ + r1.GRPCNetworkEndpointFor(s.NetworkName()), + r2.GRPCNetworkEndpointFor(s.NetworkName()), + r3.GRPCNetworkEndpointFor(s.NetworkName()), + }, + } + + // Recreate again, but with hashring config. + // TODO(kakkoyun): Update config file and wait config watcher to reconcile hashring. + r1, err = e2ethanos.NewReceiverWithConfigWatcher(s.SharedDir(), s.NetworkName(), "1", 1, h) + testutil.Ok(t, err) + r2, err = e2ethanos.NewReceiverWithConfigWatcher(s.SharedDir(), s.NetworkName(), "2", 1, h) + testutil.Ok(t, err) + r3, err = e2ethanos.NewReceiverWithConfigWatcher(s.SharedDir(), s.NetworkName(), "3", 1, h) + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(r1, r2, r3)) + + prom1, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "1", defaultPromConfig("prom1", 0, e2ethanos.RemoteWriteEndpoint(r1.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage()) + testutil.Ok(t, err) + prom2, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "2", defaultPromConfig("prom2", 0, e2ethanos.RemoteWriteEndpoint(r2.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage()) + testutil.Ok(t, err) + prom3, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "3", defaultPromConfig("prom3", 0, e2ethanos.RemoteWriteEndpoint(r3.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage()) + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(prom1, prom2, prom3)) + + q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint(), r3.GRPCNetworkEndpoint()}, nil, nil, "", "") + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(q)) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + t.Cleanup(cancel) + + testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics)) + + queryAndAssertSeries(t, ctx, q.HTTPEndpoint(), queryUpWithoutInstance, promclient.QueryOptions{ + Deduplicate: false, + }, []model.Metric{ + { + "job": "myself", + "prometheus": "prom1", + "receive": "2", + "replica": "0", + "tenant_id": "default-tenant", + }, + { + "job": "myself", + "prometheus": "prom2", + "receive": "1", + "replica": "0", + "tenant_id": "default-tenant", + }, + { + "job": "myself", + "prometheus": "prom3", + "receive": "2", + "replica": "0", + "tenant_id": "default-tenant", + }, + }) + }) + t.Run("replication", func(t *testing.T) { t.Parallel()