Skip to content

Commit

Permalink
Tools: Added remove flag on bucket mark command to remove deletion, n…
Browse files Browse the repository at this point in the history
…o-downsample or no-compact markers on the blocks.

Signed-off-by: maheshbaliga <mahesh.baliga@infracloud.io>
  • Loading branch information
maheshbaliga committed Dec 21, 2022
1 parent e85bc1f commit 112fc5f
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5889](https://github.com/thanos-io/thanos/pull/5889) Query Frontend: Support sharding vertical sharding `label_replace` and `label_join` functions.
- [#5819](https://github.com/thanos-io/thanos/pull/5819) Store: Add a few objectives for Store's data touched/fetched amount and sizes. They are: 50, 95, and 99 quantiles.
- [#5940](https://github.com/thanos-io/thanos/pull/5940) Objstore: Support for authenticating to Swift using application credentials.
- [#5977](https://github.com/thanos-io/thanos/pull/5977) Tools: Added remove flag on bucket mark command to remove deletion, no-downsample or no-compact markers on the block.

### Changed

Expand Down
22 changes: 17 additions & 5 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,10 @@ type bucketRetentionConfig struct {
}

type bucketMarkBlockConfig struct {
details string
marker string
blockIDs []string
details string
marker string
blockIDs []string
removeMarker bool
}

func (tbc *bucketVerifyConfig) registerBucketVerifyFlag(cmd extkingpin.FlagClause) *bucketVerifyConfig {
Expand Down Expand Up @@ -239,8 +240,8 @@ func (tbc *bucketDownsampleConfig) registerBucketDownsampleFlag(cmd extkingpin.F
func (tbc *bucketMarkBlockConfig) registerBucketMarkBlockFlag(cmd extkingpin.FlagClause) *bucketMarkBlockConfig {
cmd.Flag("id", "ID (ULID) of the blocks to be marked for deletion (repeated flag)").Required().StringsVar(&tbc.blockIDs)
cmd.Flag("marker", "Marker to be put.").Required().EnumVar(&tbc.marker, metadata.DeletionMarkFilename, metadata.NoCompactMarkFilename, metadata.NoDownsampleMarkFilename)
cmd.Flag("details", "Human readable details to be put into marker.").Required().StringVar(&tbc.details)

cmd.Flag("details", "Human readable details to be put into marker.").StringVar(&tbc.details)
cmd.Flag("remove", "Remove the marker.").Default("false").BoolVar(&tbc.removeMarker)
return tbc
}

Expand Down Expand Up @@ -1047,9 +1048,20 @@ func registerBucketMarkBlock(app extkingpin.AppClause, objStoreConfig *extflag.P
ids = append(ids, u)
}

if !tbc.removeMarker && tbc.details == "" {
return errors.Errorf("required flag --details not provided")
}

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
g.Add(func() error {
for _, id := range ids {
if tbc.removeMarker {
err := block.RemoveMark(ctx, logger, bkt, id, promauto.With(nil).NewCounter(prometheus.CounterOpts{}), tbc.marker)
if err != nil {
return errors.Wrapf(err, "remove mark %v for %v", id, tbc.marker)
}
continue
}
switch tbc.marker {
case metadata.DeletionMarkFilename:
if err := block.MarkForDeletion(ctx, logger, bkt, id, tbc.details, promauto.With(nil).NewCounter(prometheus.CounterOpts{})); err != nil {
Expand Down
7 changes: 4 additions & 3 deletions docs/components/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ Subcommands:
tools bucket cleanup [<flags>]
Cleans up all blocks marked for deletion.
tools bucket mark --id=ID --marker=MARKER --details=DETAILS
tools bucket mark --id=ID --marker=MARKER [<flags>]
Mark block for deletion or no-compact in a safe way. NOTE: If the compactor
is currently running compacting same block, this operation would be
potentially a noop.
Expand Down Expand Up @@ -161,7 +161,7 @@ Subcommands:
tools bucket cleanup [<flags>]
Cleans up all blocks marked for deletion.
tools bucket mark --id=ID --marker=MARKER --details=DETAILS
tools bucket mark --id=ID --marker=MARKER [<flags>]
Mark block for deletion or no-compact in a safe way. NOTE: If the compactor
is currently running compacting same block, this operation would be
potentially a noop.
Expand Down Expand Up @@ -681,7 +681,7 @@ prefix: ""
```

```$ mdox-exec="thanos tools bucket mark --help"
usage: thanos tools bucket mark --id=ID --marker=MARKER --details=DETAILS
usage: thanos tools bucket mark --id=ID --marker=MARKER [<flags>]
Mark block for deletion or no-compact in a safe way. NOTE: If the compactor is
currently running compacting same block, this operation would be potentially a
Expand All @@ -705,6 +705,7 @@ Flags:
Path to YAML file that contains object
store configuration. See format details:
https://thanos.io/tip/thanos/storage.md/#configuration
--remove Remove the marker.
--tracing.config=<content>
Alternative to 'tracing.config-file' flag
(mutually exclusive). Content of YAML file
Expand Down
19 changes: 19 additions & 0 deletions pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,3 +433,22 @@ func MarkForNoDownsample(ctx context.Context, logger log.Logger, bkt objstore.Bu
level.Info(logger).Log("msg", "block has been marked for no downsample", "block", id)
return nil
}

// RemoveMark removes the file which marked the block for deletion, no-downsample or no-compact.
func RemoveMark(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, removeMark prometheus.Counter, markedFilename string) error {
markedFile := path.Join(id.String(), markedFilename)
markedFileExists, err := bkt.Exists(ctx, markedFile)
if err != nil {
return errors.Wrapf(err, "check if %s file exists in bucket", markedFile)
}
if !markedFileExists {
level.Warn(logger).Log("msg", "requested to remove the mark, but file does not exist", "err", errors.Errorf("file %s does not exist in bucket", markedFile))
return nil
}
if err := bkt.Delete(ctx, markedFile); err != nil {
return errors.Wrapf(err, "delete file %s from bucket", markedFile)
}
removeMark.Inc()
level.Info(logger).Log("msg", "mark has been removed from the block", "block", id)
return nil
}
141 changes: 141 additions & 0 deletions pkg/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,3 +593,144 @@ func (eb errBucket) Upload(ctx context.Context, name string, r io.Reader) error
}
return nil
}

func TestRemoveMarkForDeletion(t *testing.T) {
defer custom.TolerantVerifyLeak(t)
ctx := context.Background()
tmpDir := t.TempDir()
for _, testcases := range []struct {
name string
preDelete func(t testing.TB, id ulid.ULID, bkt objstore.Bucket)
blocksUnmarked int
}{
{
name: "unmarked block for deletion",
preDelete: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {
deletionMark, err := json.Marshal(metadata.DeletionMark{
ID: id,
DeletionTime: time.Now().Unix(),
Version: metadata.DeletionMarkVersion1,
})
testutil.Ok(t, err)
testutil.Ok(t, bkt.Upload(ctx, path.Join(id.String(), metadata.DeletionMarkFilename), bytes.NewReader(deletionMark)))
},
blocksUnmarked: 1,
},
{
name: "block not marked for deletion, message logged and metric not incremented",
preDelete: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {},
blocksUnmarked: 0,
},
} {
t.Run(testcases.name, func(t *testing.T) {
bkt := objstore.NewInMemBucket()
id, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "cluster-eu1", Value: "service-1"}},
{{Name: "cluster-eu1", Value: "service-2"}},
{{Name: "cluster-eu1", Value: "service-3"}},
{{Name: "cluster-us1", Value: "service-1"}},
{{Name: "cluster-us1", Value: "service-2"}},
}, 100, 0, 1000, labels.Labels{{Name: "region-1", Value: "eu-west"}}, 124, metadata.NoneFunc)
testutil.Ok(t, err)
testcases.preDelete(t, id, bkt)
counter := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
err = RemoveMark(ctx, log.NewNopLogger(), bkt, id, counter, metadata.DeletionMarkFilename)
testutil.Ok(t, err)
testutil.Equals(t, float64(testcases.blocksUnmarked), promtest.ToFloat64(counter))
})
}
}

func TestRemoveMarkForNoCompact(t *testing.T) {
defer custom.TolerantVerifyLeak(t)
ctx := context.Background()
tmpDir := t.TempDir()
for _, testCases := range []struct {
name string
preDelete func(t testing.TB, id ulid.ULID, bkt objstore.Bucket)
blocksUnmarked int
}{
{
name: "unmarked block for no-compact",
preDelete: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {
m, err := json.Marshal(metadata.NoCompactMark{
ID: id,
NoCompactTime: time.Now().Unix(),
Version: metadata.NoCompactMarkVersion1,
})
testutil.Ok(t, err)
testutil.Ok(t, bkt.Upload(ctx, path.Join(id.String(), metadata.NoCompactMarkFilename), bytes.NewReader(m)))
},
blocksUnmarked: 1,
},
{
name: "block not marked for no-compact, message logged and metric not incremented",
preDelete: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {},
blocksUnmarked: 0,
},
} {
t.Run(testCases.name, func(t *testing.T) {
bkt := objstore.NewInMemBucket()
id, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "cluster-eu1", Value: "service-1"}},
{{Name: "cluster-eu1", Value: "service-2"}},
{{Name: "cluster-eu1", Value: "service-3"}},
{{Name: "cluster-us1", Value: "service-1"}},
{{Name: "cluster-us1", Value: "service-2"}},
}, 100, 0, 1000, labels.Labels{{Name: "region-1", Value: "eu-west"}}, 124, metadata.NoneFunc)
testutil.Ok(t, err)
testCases.preDelete(t, id, bkt)
counter := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
err = RemoveMark(ctx, log.NewNopLogger(), bkt, id, counter, metadata.NoCompactMarkFilename)
testutil.Ok(t, err)
testutil.Equals(t, float64(testCases.blocksUnmarked), promtest.ToFloat64(counter))
})
}
}

func TestRemoveMmarkForNoDownsample(t *testing.T) {
defer custom.TolerantVerifyLeak(t)
ctx := context.Background()
tmpDir := t.TempDir()
for _, testCases := range []struct {
name string
preDelete func(t testing.TB, id ulid.ULID, bkt objstore.Bucket)
blocksUnmarked int
}{
{
name: "unmarked block for no-downsample",
preDelete: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {
m, err := json.Marshal(metadata.NoDownsampleMark{
ID: id,
NoDownsampleTime: time.Now().Unix(),
Version: metadata.NoDownsampleMarkVersion1,
})
testutil.Ok(t, err)
testutil.Ok(t, bkt.Upload(ctx, path.Join(id.String(), metadata.NoDownsampleMarkFilename), bytes.NewReader(m)))
},
blocksUnmarked: 1,
},
{
name: "block not marked for no-downsample, message logged and metric not incremented",
preDelete: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {},
blocksUnmarked: 0,
},
} {
t.Run(testCases.name, func(t *testing.T) {
bkt := objstore.NewInMemBucket()
id, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "cluster-eu1", Value: "service-1"}},
{{Name: "cluster-eu1", Value: "service-2"}},
{{Name: "cluster-eu1", Value: "service-3"}},
{{Name: "cluster-us1", Value: "service-1"}},
{{Name: "cluster-us1", Value: "service-2"}},
}, 100, 0, 1000, labels.Labels{{Name: "region-1", Value: "eu-west"}}, 124, metadata.NoneFunc)
testutil.Ok(t, err)
testCases.preDelete(t, id, bkt)
counter := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
err = RemoveMark(ctx, log.NewNopLogger(), bkt, id, counter, metadata.NoDownsampleMarkFilename)
testutil.Ok(t, err)
testutil.Equals(t, float64(testCases.blocksUnmarked), promtest.ToFloat64(counter))
})
}
}

0 comments on commit 112fc5f

Please sign in to comment.