Skip to content

Commit

Permalink
feat: add migration support
Browse files Browse the repository at this point in the history
Adds support for topic mounting and unmounting using the newly added migration flow.
  • Loading branch information
gene-redpanda committed Aug 30, 2024
1 parent 42680cf commit 15abe7c
Show file tree
Hide file tree
Showing 6 changed files with 494 additions and 3 deletions.
3 changes: 2 additions & 1 deletion src/go/rpk/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.55.0
github.com/redpanda-data/common-go/rpadmin v0.1.4
github.com/redpanda-data/common-go/rpadmin v0.1.7-0.20240830024832-47d9e3d205de
github.com/rs/xid v1.5.0
github.com/safchain/ethtool v0.4.1
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1
Expand Down Expand Up @@ -119,6 +119,7 @@ require (
github.com/redpanda-data/common-go/net v0.1.0 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/sethgrid/pester v1.2.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/tklauser/numcpus v0.8.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect
go.opentelemetry.io/otel v1.28.0 // indirect
Expand Down
5 changes: 3 additions & 2 deletions src/go/rpk/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G
github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8=
github.com/redpanda-data/common-go/net v0.1.0 h1:JnJioRJuL961r1QXiJQ1tW9+yEaJfu8FpXnUmvQbwNM=
github.com/redpanda-data/common-go/net v0.1.0/go.mod h1:iOdNkjxM7a1T8F3cYHTaKIPFCHzzp/ia6TN+Z+7Tt5w=
github.com/redpanda-data/common-go/rpadmin v0.1.4 h1:NkVvurQRmBT9r58UVezC0DhYjR3vzYkQnkFndJBb5xE=
github.com/redpanda-data/common-go/rpadmin v0.1.4/go.mod h1:I7umqhnMhIOSEnIA3fvLtdQU7QO/SbWGCwFfFDs3De4=
github.com/redpanda-data/common-go/rpadmin v0.1.7-0.20240830024832-47d9e3d205de h1:yQDTRnTW0XwoWKLwoEUZjWg7m31hvQw6ebj1oAfh2rw=
github.com/redpanda-data/common-go/rpadmin v0.1.7-0.20240830024832-47d9e3d205de/go.mod h1:I7umqhnMhIOSEnIA3fvLtdQU7QO/SbWGCwFfFDs3De4=
github.com/redpanda-data/go-avro/v2 v2.0.0-20240405204525-77b1144dc525 h1:vskZrV6q8W8flL0Ud23AJUYAd8ZgTadO45+loFnG2G0=
github.com/redpanda-data/go-avro/v2 v2.0.0-20240405204525-77b1144dc525/go.mod h1:3YqAM7pgS5vW/EH7naCjFqnAajSgi0f0CfMe1HGhLxQ=
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
Expand Down Expand Up @@ -239,6 +239,7 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand Down
146 changes: 146 additions & 0 deletions src/go/rpk/pkg/cli/topic/mount.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package topic

import (
"context"
"strings"
"time"

"github.com/redpanda-data/common-go/rpadmin"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/out"
"github.com/spf13/afero"
"github.com/spf13/cobra"
)

func newMountCommand(fs afero.Fs, p *config.Params) *cobra.Command {
var (
to string
location string
resume bool
timeout time.Duration
)

cmd := &cobra.Command{
Use: "mount [TOPIC]",
Short: "Mount a topic to the Redpanda Cluster from Tiered Storage",
Long: `Mount a topic from Tiered Storage to the Redpanda cluster.
This command mounts a topic in the Redpanda Cluster using log segments stored
in Tiered Storage.
Requirements:
- Tiered storage must be enabled.
- The topic must have a minimum of three partitions.
- Log segments for the topic must be available in Tiered Storage.
- A topic with the same name must not already exist in the cluster.`,
Example: `
Mounts topic my-typic from Tiered Storage to the cluster in the my-namespace
rpk topic mount my-topic
Mount topic my-topic from Tiered Storage to the cluster in the my-namespace
with my-new-topic as the new topic name
rpk topic mount my-namespace/my-topic --to my-namespace/my-new-topic
`,
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, from []string) {
pf, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "rpk unable to load config: %v", err)
config.CheckExitCloudAdmin(pf)
adm, err := adminapi.NewClient(fs, pf)
out.MaybeDie(err, "unable to initialize admin client: %v", err)

n, t := nsTopic(from[0])
if t == "" {
out.Die("topic is required")
}
topic := rpadmin.InboundTopic{
SourceTopic: rpadmin.Topic{
Topic: n,
Namespace: t,
},
}
an, at := nsTopic(to)
if at != "" {
topic.Alias = &rpadmin.Topic{
Topic: an,
Namespace: at,
}
}

if location != "" {
topic.Location = location
}

migration, err := adm.AddInboundMigration(cmd.Context(), rpadmin.InboundMigration{
MigrationType: "inbound",
Topics: []rpadmin.InboundTopic{topic},
ConsumerGroups: []string{}, // not implemented yet
})
out.MaybeDie(err, "unable to request topic migration: %v", err)

ctx := cmd.Context()
if timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}

if resume {
migrations, err := adm.ListMigrations(ctx)
out.MaybeDie(err, "unable to list migrations: %v", err)
id, err := getMigrationIdByName(t, migrations)
if err != nil {
return
}

ms, err := adm.GetMigration(ctx, id)
out.MaybeDie(err, "unable to get migration state: %v", err)

status, err := rpadmin.MigrationStatusFromString(ms.State)
out.MaybeDie(err, "unable to parse migration status: %v", err)
err = resumeMigrationByStatus(ctx, id, adm, status)
out.MaybeDie(err, "unable to resume migration: %v", err)
out.Die("migration resumed and completed successfully")
}

// TODO add more words to tell the user current state
err = checkMigrationActionAndAdvanceState(ctx, migration.ID, adm, rpadmin.MigrationActionPrepare, rpadmin.MigrationStatusPrepared, timeout)
out.MaybeDie(err, "unable to prepare migration %v", err)

err = checkMigrationActionAndAdvanceState(ctx, migration.ID, adm, rpadmin.MigrationActionExecute, rpadmin.MigrationStatusExecuted, timeout)
out.MaybeDie(err, "unable to execute migration %v", err)

err = checkMigrationActionAndAdvanceState(ctx, migration.ID, adm, rpadmin.MigrationActionFinish, rpadmin.MigrationStatusFinished, timeout)
out.MaybeDie(err, "unable to finish migration %v", err)
},
}
cmd.Flags().StringVar(&to, "to", "", "New namespace/topic name for the mounted topic (optional)")
cmd.Flags().StringVarP(&location, "location", "l", "", "Location (optional)") // need more info on this
cmd.Flags().DurationVar(&timeout, "timeout", 0, "Timeout for the migration to finish (optional)")
cmd.Flags().BoolVar(&resume, "resume", false, "Resume allows resuming an in progress unmount. Parameters must be the same as the original unmount")
return cmd
}

// nsTopic splits a topic string consisting of <namespace>/<topicName> and
// returns each component, if the namespace is not specified, returns 'kafka'.
func nsTopic(nst string) (namespace string, topic string) {
nsTopic := strings.SplitN(nst, "/", 2)
if len(nsTopic) == 1 {
namespace = "kafka"
topic = nsTopic[0]
} else {
namespace = nsTopic[0]
topic = nsTopic[1]
}
return namespace, topic
}
2 changes: 2 additions & 0 deletions src/go/rpk/pkg/cli/topic/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ func NewCommand(fs afero.Fs, p *config.Params) *cobra.Command {
newListCommand(fs, p),
newTrimPrefixCommand(fs, p),
newProduceCommand(fs, p),
newMountCommand(fs, p),
newUnmountCommand(fs, p),
)
return cmd
}
192 changes: 192 additions & 0 deletions src/go/rpk/pkg/cli/topic/unmount.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package topic

import (
"context"
"errors"
"fmt"
"time"

"github.com/redpanda-data/common-go/rpadmin"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/out"
"github.com/spf13/afero"
"github.com/spf13/cobra"
)

func newUnmountCommand(fs afero.Fs, p *config.Params) *cobra.Command {
var resume bool
var timeout time.Duration

cmd := &cobra.Command{
Use: "unmount [TOPIC]",
Short: "Unmount a topic from the Redpanda Cluster",
Long: `Unmount a topic from the Redpanda cluster and secure it in Tiered
Storage.
This command performs an operation that:
1. Rejects all writes to the topic
2. Flushes data to Tiered Storage
3. Removes the topic from the cluster
The unmount process ensures data safety and cluster integrity. Every
acknowledged message before the initiation of unmount is guaranteed to be
persisted in Tiered Storage, subject to Redpanda's existing data safety
guarantees.
Key Points:
- During unmounting, any attempted writes or reads will receive an
UNKNOWN_TOPIC_OR_PARTITION error.
- The unmount operation works independently of other topic configurations like
remote.delete=false.
- After unmounting, the topic can be remounted to this cluster or a different
cluster if the log segments are moved to that cluster's Tiered Storage.
`,
Example: `Unmount topic 'my-topic' from the cluster in the 'my-namespace'
rpk topic unmount my-namespace/my-topic
`,
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, topics []string) {
pf, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "rpk unable to load config: %v", err)
config.CheckExitCloudAdmin(pf)
adm, err := adminapi.NewClient(fs, pf)
out.MaybeDie(err, "unable to initialize admin client: %v", err)

n, t := nsTopic(topics[0])
if t == "" {
out.Die("topic is required")
}

resp, err := adm.AddOutboundMigration(cmd.Context(), rpadmin.OutboundMigration{
MigrationType: "outbound",
Topics: []rpadmin.Topic{
{
Topic: t,
Namespace: n,
},
},
ConsumerGroups: []string{}, // not implemented yet
})
out.MaybeDie(err, "unable to request topic migration: %v", err)

ctx := cmd.Context()
if timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}

if resume {
migrations, err := adm.ListMigrations(ctx)
out.MaybeDie(err, "unable to list migrations: %v", err)
id, err := getMigrationIdByName(t, migrations)
if err != nil {
return
}

ms, err := adm.GetMigration(ctx, id)
out.MaybeDie(err, "unable to get migration state: %v", err)

status, err := rpadmin.MigrationStatusFromString(ms.State)
out.MaybeDie(err, "unable to parse migration status: %v", err)
err = resumeMigrationByStatus(ctx, id, adm, status)
out.MaybeDie(err, "unable to resume migration: %v", err)
out.Die("migration resumed and completed successfully")
}

// TODO add more words to tell the user current state
err = checkMigrationActionAndAdvanceState(ctx, resp.ID, adm, rpadmin.MigrationActionPrepare, rpadmin.MigrationStatusPrepared, timeout)
out.MaybeDie(err, "unable to prepare migration %v", err)
err = checkMigrationActionAndAdvanceState(ctx, resp.ID, adm, rpadmin.MigrationActionExecute, rpadmin.MigrationStatusExecuted, timeout)
out.MaybeDie(err, "unable to execute migration %v", err)
err = checkMigrationActionAndAdvanceState(ctx, resp.ID, adm, rpadmin.MigrationActionFinish, rpadmin.MigrationStatusFinished, timeout)
out.MaybeDie(err, "unable to finish migration %v", err)
},
}
cmd.Flags().DurationVar(&timeout, "timeout", 0, "Timeout for the unmount process to finish")
cmd.Flags().BoolVar(&resume, "resume", false, "Resume allows resuming an in progress unmount. Parameters must be the same as the original unmount")
return cmd
}

type migrationClient interface {
ExecuteMigration(ctx context.Context, id int, action rpadmin.MigrationAction) error
GetMigration(ctx context.Context, id int) (rpadmin.MigrationState, error)
}

func checkMigrationActionAndAdvanceState(ctx context.Context, id int, adm migrationClient, doingAction rpadmin.MigrationAction, desiredStatus rpadmin.MigrationStatus, timeout time.Duration) error {
if err := adm.ExecuteMigration(ctx, id, doingAction); err != nil {
return fmt.Errorf("unable to execute migration: %w", err)
}

ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
m, err := adm.GetMigration(ctx, id)
if err != nil {
return fmt.Errorf("unable to get migration state: %w", err)
}
if m.State == desiredStatus.String() {
return nil
}
fmt.Printf("Current migration state: %s\n", m.State)
case <-ctx.Done():
switch err := ctx.Err(); {
case errors.Is(err, context.DeadlineExceeded):
return fmt.Errorf("operation timed out: %w", ctx.Err())
case errors.Is(err, context.Canceled):
return fmt.Errorf("operation was canceled: %w", ctx.Err())
default:
return fmt.Errorf("operation interrupted: %w", ctx.Err())
}
}
}
}

func getMigrationIdByName(name string, migrations []rpadmin.MigrationState) (int, error) {
for _, m := range migrations {
for _, t := range m.Migration.Topics {
if t.Topic == name {
return m.ID, nil
}
}
}
return 0, fmt.Errorf("migration not found for topic %s", name)
}

func resumeMigrationByStatus(ctx context.Context, id int, adm migrationClient, state rpadmin.MigrationStatus) error {
switch state {
case rpadmin.MigrationStatusPlanned:
err := checkMigrationActionAndAdvanceState(ctx, id, adm, rpadmin.MigrationActionPrepare, rpadmin.MigrationStatusPrepared, 0)
out.MaybeDie(err, "unable to prepare migration %v", err)
err = checkMigrationActionAndAdvanceState(ctx, id, adm, rpadmin.MigrationActionExecute, rpadmin.MigrationStatusExecuted, 0)
out.MaybeDie(err, "unable to execute migration %v", err)
err = checkMigrationActionAndAdvanceState(ctx, id, adm, rpadmin.MigrationActionFinish, rpadmin.MigrationStatusFinished, 0)
out.MaybeDie(err, "unable to finish migration %v", err)
case rpadmin.MigrationStatusPrepared:
err := checkMigrationActionAndAdvanceState(ctx, id, adm, rpadmin.MigrationActionExecute, rpadmin.MigrationStatusExecuted, 0)
out.MaybeDie(err, "unable to execute migration %v", err)
err = checkMigrationActionAndAdvanceState(ctx, id, adm, rpadmin.MigrationActionFinish, rpadmin.MigrationStatusFinished, 0)
out.MaybeDie(err, "unable to finish migration %v", err)
case rpadmin.MigrationStatusExecuted:
err := checkMigrationActionAndAdvanceState(ctx, id, adm, rpadmin.MigrationActionFinish, rpadmin.MigrationStatusFinished, 0)
out.MaybeDie(err, "unable to finish migration %v", err)
case rpadmin.MigrationStatusFinished:
out.Die("migration is already finished")
default:
out.Die("invalid migration status")
}
return nil
}
Loading

0 comments on commit 15abe7c

Please sign in to comment.