Skip to content

Commit

Permalink
[CHORE] adding thanos upload-snapshot command
Browse files Browse the repository at this point in the history
Signed-off-by: Nicolas Takashi <nicolas.takashi@coralogix.com>
  • Loading branch information
nicolastakashi committed Nov 9, 2023
1 parent c74a050 commit 63d80db
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6773](https://github.com/thanos-io/thanos/pull/6773) Index Cache: Add `ttl` to control the ttl to store items in remote index caches like memcached and redis.
- [#6794](https://github.com/thanos-io/thanos/pull/6794) Query: *breaking :warning:* Add tenant label to relevant exported metrics. Note that this change may cause some pre-existing custom dashboard queries to be incorrect due to the added label.
- [#6847](https://github.com/thanos-io/thanos/pull/6847) Store: Add `thanos_bucket_store_indexheader_download_duration_seconds` and `thanos_bucket_store_indexheader_load_duration_seconds` metrics for tracking latency of downloading and initializing the index-header.
- [#6884](https://github.com/thanos-io/thanos/pull/6884) Tools: Add upload-snapshot command to upload a snapshot blocks to object storage.

### Changed

Expand Down
122 changes: 122 additions & 0 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,15 @@ import (
"github.com/thanos-io/thanos/pkg/extkingpin"
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/httpconfig"
"github.com/thanos-io/thanos/pkg/logging"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/replicate"
"github.com/thanos-io/thanos/pkg/runutil"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/ui"
"github.com/thanos-io/thanos/pkg/verifier"
Expand Down Expand Up @@ -158,6 +161,12 @@ type bucketMarkBlockConfig struct {
removeMarker bool
}

type bucketUploadBlocksConfig struct {
snapshotName string
path string
prometheus prometheusConfig
}

func (tbc *bucketVerifyConfig) registerBucketVerifyFlag(cmd extkingpin.FlagClause) *bucketVerifyConfig {
cmd.Flag("repair", "Attempt to repair blocks for which issues were detected").
Short('r').Default("false").BoolVar(&tbc.repair)
Expand Down Expand Up @@ -277,6 +286,31 @@ func (tbc *bucketRetentionConfig) registerBucketRetentionFlag(cmd extkingpin.Fla
return tbc
}

func (tbc *bucketUploadBlocksConfig) registerBucketUploadBlocksFlag(cmd extkingpin.FlagClause) *bucketUploadBlocksConfig {
cmd.Flag("path", "Path to the directory containing blocks to upload.").Default("./data").StringVar(&tbc.path)
cmd.Flag("snapshot-name", "Name of the snapshot to upload blocks to.").Required().StringVar(&tbc.snapshotName)

cmd.Flag("prometheus.url",
"URL at which to reach Prometheus's API. For better performance use local network.").
Default("http://localhost:9090").URLVar(&tbc.prometheus.url)
cmd.Flag("prometheus.ready_timeout",
"Maximum time to wait for the Prometheus instance to start up").
Default("10m").DurationVar(&tbc.prometheus.readyTimeout)
cmd.Flag("prometheus.get_config_interval",
"How often to get Prometheus config").
Default("30s").DurationVar(&tbc.prometheus.getConfigInterval)
cmd.Flag("prometheus.get_config_timeout",
"Timeout for getting Prometheus config").
Default("5s").DurationVar(&tbc.prometheus.getConfigTimeout)
tbc.prometheus.httpClient = extflag.RegisterPathOrContent(
cmd,
"prometheus.http-client",
"YAML file or string with http client configs. See Format details: https://thanos.io/tip/components/sidecar.md/#configuration.",
)

return tbc
}

func registerBucket(app extkingpin.AppClause) {
cmd := app.Command("bucket", "Bucket utility commands")

Expand All @@ -291,6 +325,7 @@ func registerBucket(app extkingpin.AppClause) {
registerBucketMarkBlock(cmd, objStoreConfig)
registerBucketRewrite(cmd, objStoreConfig)
registerBucketRetention(cmd, objStoreConfig)
registerBucketUploadSnapshotBlocks(cmd, objStoreConfig)
}

func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) {
Expand Down Expand Up @@ -1414,3 +1449,90 @@ func registerBucketRetention(app extkingpin.AppClause, objStoreConfig *extflag.P
return nil
})
}

func registerBucketUploadSnapshotBlocks(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) {
cmd := app.Command("upload-snapshot", "Upload Snapshot push blocks from the provided path to the object storeage.")

tbc := &bucketUploadBlocksConfig{}
tbc.registerBucketUploadBlocksFlag(cmd)

cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
ctx := context.Background()
httpConfContentYaml, err := tbc.prometheus.httpClient.Content()
if err != nil {
return errors.Wrap(err, "getting http client config")
}
httpClientConfig, err := httpconfig.NewClientConfigFromYAML(httpConfContentYaml)
if err != nil {
return errors.Wrap(err, "parsing http config YAML")
}

httpClient, err := httpconfig.NewHTTPClient(*httpClientConfig, "thanos-tool")
if err != nil {
return errors.Wrap(err, "Improper http client config")
}

m := &promMetadata{
promURL: tbc.prometheus.url,
client: promclient.NewWithTracingClient(logger, httpClient, "thanos-tool"),
}

err = runutil.Retry(2*time.Second, ctx.Done(), func() error {
if err := m.UpdateLabels(ctx); err != nil {
level.Warn(logger).Log(
"msg", "failed to fetch initial external labels. Is Prometheus running? Retrying",
"err", err,
)
return err
}

level.Info(logger).Log(
"msg", "successfully loaded prometheus external labels",
"external_labels", m.Labels().String(),
)
return nil
})

if err != nil {
return errors.Wrap(err, "initial external labels query")
}

if len(m.Labels()) == 0 {
return errors.New("no external labels configured on Prometheus server, uniquely identifying external labels must be configured; see https://thanos.io/tip/thanos/storage.md#external-labels for details.")
}

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, component.Upload.String())
if err != nil {
return err
}

bkt = objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(bkt, extprom.WrapRegistererWithPrefix("thanos_", reg), bkt.Name()))

// Ensure we close up everything properly.
defer func() {
if err != nil {
runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
}
}()

if err := promclient.IsWALDirAccessible(tbc.path); err != nil {
level.Error(logger).Log("err", err)
}

defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

s := shipper.New(logger, reg, filepath.Join(tbc.path, "snapshots", tbc.snapshotName), bkt, m.Labels, metadata.BucketUploadSource,
nil, false, metadata.HashFunc(""))

if _, err := s.Sync(ctx); err != nil {
level.Error(logger).Log("err", err)
}

return nil
})
}
62 changes: 62 additions & 0 deletions docs/components/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ Subcommands:
Retention applies retention policies on the given bucket. Please make sure
no compactor is running on the same bucket at the same time.
tools bucket upload-snapshot --snapshot-name=SNAPSHOT-NAME [<flags>]
Upload Snapshot push blocks from the provided path to the object storeage.
tools rules-check --rules=RULES
Check if the rule files are valid or not.
Expand Down Expand Up @@ -187,6 +190,8 @@ Subcommands:
Retention applies retention policies on the given bucket. Please make sure
no compactor is running on the same bucket at the same time.
tools bucket upload-snapshot --snapshot-name=SNAPSHOT-NAME [<flags>]
Upload Snapshot push blocks from the provided path to the object storeage.
```

Expand Down Expand Up @@ -841,6 +846,63 @@ Flags:
```

### Bucket Upload Snapshot

`tools bucket upload-snapshot` uploads a blocks created from a snapshot on the given bucket.

```$ mdox-exec="thanos tools bucket upload-snapshot --help"
usage: thanos tools bucket upload-snapshot --snapshot-name=SNAPSHOT-NAME [<flags>]
Upload Snapshot push blocks from the provided path to the object storeage.
-h, --help Show context-sensitive help (also try --help-long and
--help-man).
--log.format=logfmt Log format to use. Possible options: logfmt or json.
--log.level=info Log filtering level.
--objstore.config=<content>
Alternative to 'objstore.config-file' flag (mutually
exclusive). Content of YAML file that contains
object store configuration. See format details:
https://thanos.io/tip/thanos/storage.md/#configuration
--objstore.config-file=<file-path>
Path to YAML file that contains object
store configuration. See format details:
https://thanos.io/tip/thanos/storage.md/#configuration
--path="./data" Path to the directory containing blocks to upload.
--prometheus.get_config_interval=30s
How often to get Prometheus config
--prometheus.get_config_timeout=5s
Timeout for getting Prometheus config
--prometheus.http-client=<content>
Alternative to 'prometheus.http-client-file' flag
(mutually exclusive). Content of YAML file or string
with http client configs. See Format details:
https://thanos.io/tip/components/sidecar.md/#configuration.
--prometheus.http-client-file=<file-path>
Path to YAML file or string with http
client configs. See Format details:
https://thanos.io/tip/components/sidecar.md/#configuration.
--prometheus.ready_timeout=10m
Maximum time to wait for the Prometheus instance to
start up
--prometheus.url=http://localhost:9090
URL at which to reach Prometheus's API. For better
performance use local network.
--snapshot-name=SNAPSHOT-NAME
Name of the snapshot to upload blocks to.
--tracing.config=<content>
Alternative to 'tracing.config-file' flag
(mutually exclusive). Content of YAML file
with tracing configuration. See format details:
https://thanos.io/tip/thanos/tracing.md/#configuration
--tracing.config-file=<file-path>
Path to YAML file with tracing
configuration. See format details:
https://thanos.io/tip/thanos/tracing.md/#configuration
--version Show application version.
```

## Rules-check

The `tools rules-check` subcommand contains tools for validation of Prometheus rules.
Expand Down
1 change: 1 addition & 0 deletions pkg/block/metadata/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
RulerSource SourceType = "ruler"
BucketRepairSource SourceType = "bucket.repair"
BucketRewriteSource SourceType = "bucket.rewrite"
BucketUploadSource SourceType = "bucket.upload"
TestSource SourceType = "test"
)

Expand Down
1 change: 1 addition & 0 deletions pkg/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ var (
Bucket = source{component: component{name: "bucket"}}
Cleanup = source{component: component{name: "cleanup"}}
Mark = source{component: component{name: "mark"}}
Upload = source{component: component{name: "upload"}}
Rewrite = source{component: component{name: "rewrite"}}
Retention = source{component: component{name: "retention"}}
Compact = source{component: component{name: "compact"}}
Expand Down
15 changes: 15 additions & 0 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,21 @@ func (c *Client) req2xx(ctx context.Context, u *url.URL, method string, headers
func IsWALDirAccessible(dir string) error {
const errMsg = "WAL dir is not accessible. Is this dir a TSDB directory? If yes it is shared with TSDB?"

f, err := os.Stat(filepath.Join(dir, "snapshots"))
if err != nil {
return errors.Wrap(err, errMsg)
}

if !f.IsDir() {
return errors.New(errMsg)
}

return nil
}

func IsSnapShotDirAccessible(dir string) error {
const errMsg = "Snapshot dir is not accessible. Is this dir a Snapshot directory? If yes it is shared with Snapshot?"

f, err := os.Stat(filepath.Join(dir, "wal"))
if err != nil {
return errors.Wrap(err, errMsg)
Expand Down

0 comments on commit 63d80db

Please sign in to comment.