diff --git a/src/go/rpk/go.mod b/src/go/rpk/go.mod index f10b4e7e82db..aa9f64ed52c3 100644 --- a/src/go/rpk/go.mod +++ b/src/go/rpk/go.mod @@ -37,7 +37,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_model v0.6.1 github.com/prometheus/common v0.55.0 - github.com/redpanda-data/common-go/rpadmin v0.1.4 + github.com/redpanda-data/common-go/rpadmin v0.1.7-0.20240830024832-47d9e3d205de github.com/rs/xid v1.5.0 github.com/safchain/ethtool v0.4.1 github.com/santhosh-tekuri/jsonschema/v6 v6.0.1 @@ -119,6 +119,7 @@ require ( github.com/redpanda-data/common-go/net v0.1.0 // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/sethgrid/pester v1.2.0 // indirect + github.com/stretchr/objx v0.5.2 // indirect github.com/tklauser/numcpus v0.8.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect go.opentelemetry.io/otel v1.28.0 // indirect diff --git a/src/go/rpk/go.sum b/src/go/rpk/go.sum index 75f8bceee579..74dad87564f2 100644 --- a/src/go/rpk/go.sum +++ b/src/go/rpk/go.sum @@ -208,8 +208,8 @@ github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= github.com/redpanda-data/common-go/net v0.1.0 h1:JnJioRJuL961r1QXiJQ1tW9+yEaJfu8FpXnUmvQbwNM= github.com/redpanda-data/common-go/net v0.1.0/go.mod h1:iOdNkjxM7a1T8F3cYHTaKIPFCHzzp/ia6TN+Z+7Tt5w= -github.com/redpanda-data/common-go/rpadmin v0.1.4 h1:NkVvurQRmBT9r58UVezC0DhYjR3vzYkQnkFndJBb5xE= -github.com/redpanda-data/common-go/rpadmin v0.1.4/go.mod h1:I7umqhnMhIOSEnIA3fvLtdQU7QO/SbWGCwFfFDs3De4= +github.com/redpanda-data/common-go/rpadmin v0.1.7-0.20240830024832-47d9e3d205de h1:yQDTRnTW0XwoWKLwoEUZjWg7m31hvQw6ebj1oAfh2rw= +github.com/redpanda-data/common-go/rpadmin v0.1.7-0.20240830024832-47d9e3d205de/go.mod h1:I7umqhnMhIOSEnIA3fvLtdQU7QO/SbWGCwFfFDs3De4= github.com/redpanda-data/go-avro/v2 v2.0.0-20240405204525-77b1144dc525 h1:vskZrV6q8W8flL0Ud23AJUYAd8ZgTadO45+loFnG2G0= github.com/redpanda-data/go-avro/v2 v2.0.0-20240405204525-77b1144dc525/go.mod h1:3YqAM7pgS5vW/EH7naCjFqnAajSgi0f0CfMe1HGhLxQ= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= @@ -239,6 +239,7 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= diff --git a/src/go/rpk/pkg/cli/topic/mount.go b/src/go/rpk/pkg/cli/topic/mount.go new file mode 100644 index 000000000000..cfbd45a05247 --- /dev/null +++ b/src/go/rpk/pkg/cli/topic/mount.go @@ -0,0 +1,146 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package topic + +import ( + "context" + "strings" + "time" + + "github.com/redpanda-data/common-go/rpadmin" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/out" + "github.com/spf13/afero" + "github.com/spf13/cobra" +) + +func newMountCommand(fs afero.Fs, p *config.Params) *cobra.Command { + var ( + to string + location string + resume bool + timeout time.Duration + ) + + cmd := &cobra.Command{ + Use: "mount [TOPIC]", + Short: "Mount a topic to the Redpanda Cluster from Tiered Storage", + Long: `Mount a topic from Tiered Storage to the Redpanda cluster. + +This command mounts a topic in the Redpanda Cluster using log segments stored +in Tiered Storage. + +Requirements: +- Tiered storage must be enabled. +- The topic must have a minimum of three partitions. +- Log segments for the topic must be available in Tiered Storage. +- A topic with the same name must not already exist in the cluster.`, + Example: ` +Mounts topic my-typic from Tiered Storage to the cluster in the my-namespace + rpk topic mount my-topic + +Mount topic my-topic from Tiered Storage to the cluster in the my-namespace +with my-new-topic as the new topic name + rpk topic mount my-namespace/my-topic --to my-namespace/my-new-topic +`, + Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, from []string) { + pf, err := p.LoadVirtualProfile(fs) + out.MaybeDie(err, "rpk unable to load config: %v", err) + config.CheckExitCloudAdmin(pf) + adm, err := adminapi.NewClient(fs, pf) + out.MaybeDie(err, "unable to initialize admin client: %v", err) + + n, t := nsTopic(from[0]) + if t == "" { + out.Die("topic is required") + } + topic := rpadmin.InboundTopic{ + SourceTopic: rpadmin.Topic{ + Topic: n, + Namespace: t, + }, + } + an, at := nsTopic(to) + if at != "" { + topic.Alias = &rpadmin.Topic{ + Topic: an, + Namespace: at, + } + } + + if location != "" { + topic.Location = location + } + + migration, err := adm.AddInboundMigration(cmd.Context(), rpadmin.InboundMigration{ + MigrationType: "inbound", + Topics: []rpadmin.InboundTopic{topic}, + ConsumerGroups: []string{}, // not implemented yet + }) + out.MaybeDie(err, "unable to request topic migration: %v", err) + + ctx := cmd.Context() + if timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, timeout) + defer cancel() + } + + if resume { + migrations, err := adm.ListMigrations(ctx) + out.MaybeDie(err, "unable to list migrations: %v", err) + id, err := getMigrationIdByName(t, migrations) + if err != nil { + return + } + + ms, err := adm.GetMigration(ctx, id) + out.MaybeDie(err, "unable to get migration state: %v", err) + + status, err := rpadmin.MigrationStatusFromString(ms.State) + out.MaybeDie(err, "unable to parse migration status: %v", err) + err = resumeMigrationByStatus(ctx, id, adm, status) + out.MaybeDie(err, "unable to resume migration: %v", err) + out.Die("migration resumed and completed successfully") + } + + // TODO add more words to tell the user current state + err = checkMigrationActionAndAdvanceState(ctx, migration.ID, adm, rpadmin.MigrationActionPrepare, rpadmin.MigrationStatusPrepared, timeout) + out.MaybeDie(err, "unable to prepare migration %v", err) + + err = checkMigrationActionAndAdvanceState(ctx, migration.ID, adm, rpadmin.MigrationActionExecute, rpadmin.MigrationStatusExecuted, timeout) + out.MaybeDie(err, "unable to execute migration %v", err) + + err = checkMigrationActionAndAdvanceState(ctx, migration.ID, adm, rpadmin.MigrationActionFinish, rpadmin.MigrationStatusFinished, timeout) + out.MaybeDie(err, "unable to finish migration %v", err) + }, + } + cmd.Flags().StringVar(&to, "to", "", "New namespace/topic name for the mounted topic (optional)") + cmd.Flags().StringVarP(&location, "location", "l", "", "Location (optional)") // need more info on this + cmd.Flags().DurationVar(&timeout, "timeout", 0, "Timeout for the migration to finish (optional)") + cmd.Flags().BoolVar(&resume, "resume", false, "Resume allows resuming an in progress unmount. Parameters must be the same as the original unmount") + return cmd +} + +// nsTopic splits a topic string consisting of / and +// returns each component, if the namespace is not specified, returns 'kafka'. +func nsTopic(nst string) (namespace string, topic string) { + nsTopic := strings.SplitN(nst, "/", 2) + if len(nsTopic) == 1 { + namespace = "kafka" + topic = nsTopic[0] + } else { + namespace = nsTopic[0] + topic = nsTopic[1] + } + return namespace, topic +} diff --git a/src/go/rpk/pkg/cli/topic/topic.go b/src/go/rpk/pkg/cli/topic/topic.go index a3042a862f64..dfb91440b4df 100644 --- a/src/go/rpk/pkg/cli/topic/topic.go +++ b/src/go/rpk/pkg/cli/topic/topic.go @@ -32,6 +32,8 @@ func NewCommand(fs afero.Fs, p *config.Params) *cobra.Command { newListCommand(fs, p), newTrimPrefixCommand(fs, p), newProduceCommand(fs, p), + newMountCommand(fs, p), + newUnmountCommand(fs, p), ) return cmd } diff --git a/src/go/rpk/pkg/cli/topic/unmount.go b/src/go/rpk/pkg/cli/topic/unmount.go new file mode 100644 index 000000000000..66f5cfabf458 --- /dev/null +++ b/src/go/rpk/pkg/cli/topic/unmount.go @@ -0,0 +1,192 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package topic + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/redpanda-data/common-go/rpadmin" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/out" + "github.com/spf13/afero" + "github.com/spf13/cobra" +) + +func newUnmountCommand(fs afero.Fs, p *config.Params) *cobra.Command { + var resume bool + var timeout time.Duration + + cmd := &cobra.Command{ + Use: "unmount [TOPIC]", + Short: "Unmount a topic from the Redpanda Cluster", + Long: `Unmount a topic from the Redpanda cluster and secure it in Tiered +Storage. + +This command performs an operation that: +1. Rejects all writes to the topic +2. Flushes data to Tiered Storage +3. Removes the topic from the cluster + +The unmount process ensures data safety and cluster integrity. Every +acknowledged message before the initiation of unmount is guaranteed to be +persisted in Tiered Storage, subject to Redpanda's existing data safety +guarantees. + +Key Points: +- During unmounting, any attempted writes or reads will receive an + UNKNOWN_TOPIC_OR_PARTITION error. +- The unmount operation works independently of other topic configurations like + remote.delete=false. +- After unmounting, the topic can be remounted to this cluster or a different + cluster if the log segments are moved to that cluster's Tiered Storage. +`, + Example: `Unmount topic 'my-topic' from the cluster in the 'my-namespace' + rpk topic unmount my-namespace/my-topic +`, + Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, topics []string) { + pf, err := p.LoadVirtualProfile(fs) + out.MaybeDie(err, "rpk unable to load config: %v", err) + config.CheckExitCloudAdmin(pf) + adm, err := adminapi.NewClient(fs, pf) + out.MaybeDie(err, "unable to initialize admin client: %v", err) + + n, t := nsTopic(topics[0]) + if t == "" { + out.Die("topic is required") + } + + resp, err := adm.AddOutboundMigration(cmd.Context(), rpadmin.OutboundMigration{ + MigrationType: "outbound", + Topics: []rpadmin.Topic{ + { + Topic: t, + Namespace: n, + }, + }, + ConsumerGroups: []string{}, // not implemented yet + }) + out.MaybeDie(err, "unable to request topic migration: %v", err) + + ctx := cmd.Context() + if timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, timeout) + defer cancel() + } + + if resume { + migrations, err := adm.ListMigrations(ctx) + out.MaybeDie(err, "unable to list migrations: %v", err) + id, err := getMigrationIdByName(t, migrations) + if err != nil { + return + } + + ms, err := adm.GetMigration(ctx, id) + out.MaybeDie(err, "unable to get migration state: %v", err) + + status, err := rpadmin.MigrationStatusFromString(ms.State) + out.MaybeDie(err, "unable to parse migration status: %v", err) + err = resumeMigrationByStatus(ctx, id, adm, status) + out.MaybeDie(err, "unable to resume migration: %v", err) + out.Die("migration resumed and completed successfully") + } + + // TODO add more words to tell the user current state + err = checkMigrationActionAndAdvanceState(ctx, resp.ID, adm, rpadmin.MigrationActionPrepare, rpadmin.MigrationStatusPrepared, timeout) + out.MaybeDie(err, "unable to prepare migration %v", err) + err = checkMigrationActionAndAdvanceState(ctx, resp.ID, adm, rpadmin.MigrationActionExecute, rpadmin.MigrationStatusExecuted, timeout) + out.MaybeDie(err, "unable to execute migration %v", err) + err = checkMigrationActionAndAdvanceState(ctx, resp.ID, adm, rpadmin.MigrationActionFinish, rpadmin.MigrationStatusFinished, timeout) + out.MaybeDie(err, "unable to finish migration %v", err) + }, + } + cmd.Flags().DurationVar(&timeout, "timeout", 0, "Timeout for the unmount process to finish") + cmd.Flags().BoolVar(&resume, "resume", false, "Resume allows resuming an in progress unmount. Parameters must be the same as the original unmount") + return cmd +} + +type migrationClient interface { + ExecuteMigration(ctx context.Context, id int, action rpadmin.MigrationAction) error + GetMigration(ctx context.Context, id int) (rpadmin.MigrationState, error) +} + +func checkMigrationActionAndAdvanceState(ctx context.Context, id int, adm migrationClient, doingAction rpadmin.MigrationAction, desiredStatus rpadmin.MigrationStatus, timeout time.Duration) error { + if err := adm.ExecuteMigration(ctx, id, doingAction); err != nil { + return fmt.Errorf("unable to execute migration: %w", err) + } + + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + m, err := adm.GetMigration(ctx, id) + if err != nil { + return fmt.Errorf("unable to get migration state: %w", err) + } + if m.State == desiredStatus.String() { + return nil + } + fmt.Printf("Current migration state: %s\n", m.State) + case <-ctx.Done(): + switch err := ctx.Err(); { + case errors.Is(err, context.DeadlineExceeded): + return fmt.Errorf("operation timed out: %w", ctx.Err()) + case errors.Is(err, context.Canceled): + return fmt.Errorf("operation was canceled: %w", ctx.Err()) + default: + return fmt.Errorf("operation interrupted: %w", ctx.Err()) + } + } + } +} + +func getMigrationIdByName(name string, migrations []rpadmin.MigrationState) (int, error) { + for _, m := range migrations { + for _, t := range m.Migration.Topics { + if t.Topic == name { + return m.ID, nil + } + } + } + return 0, fmt.Errorf("migration not found for topic %s", name) +} + +func resumeMigrationByStatus(ctx context.Context, id int, adm migrationClient, state rpadmin.MigrationStatus) error { + switch state { + case rpadmin.MigrationStatusPlanned: + err := checkMigrationActionAndAdvanceState(ctx, id, adm, rpadmin.MigrationActionPrepare, rpadmin.MigrationStatusPrepared, 0) + out.MaybeDie(err, "unable to prepare migration %v", err) + err = checkMigrationActionAndAdvanceState(ctx, id, adm, rpadmin.MigrationActionExecute, rpadmin.MigrationStatusExecuted, 0) + out.MaybeDie(err, "unable to execute migration %v", err) + err = checkMigrationActionAndAdvanceState(ctx, id, adm, rpadmin.MigrationActionFinish, rpadmin.MigrationStatusFinished, 0) + out.MaybeDie(err, "unable to finish migration %v", err) + case rpadmin.MigrationStatusPrepared: + err := checkMigrationActionAndAdvanceState(ctx, id, adm, rpadmin.MigrationActionExecute, rpadmin.MigrationStatusExecuted, 0) + out.MaybeDie(err, "unable to execute migration %v", err) + err = checkMigrationActionAndAdvanceState(ctx, id, adm, rpadmin.MigrationActionFinish, rpadmin.MigrationStatusFinished, 0) + out.MaybeDie(err, "unable to finish migration %v", err) + case rpadmin.MigrationStatusExecuted: + err := checkMigrationActionAndAdvanceState(ctx, id, adm, rpadmin.MigrationActionFinish, rpadmin.MigrationStatusFinished, 0) + out.MaybeDie(err, "unable to finish migration %v", err) + case rpadmin.MigrationStatusFinished: + out.Die("migration is already finished") + default: + out.Die("invalid migration status") + } + return nil +} diff --git a/src/go/rpk/pkg/cli/topic/unmount_test.go b/src/go/rpk/pkg/cli/topic/unmount_test.go new file mode 100644 index 000000000000..7826bfa5a9d4 --- /dev/null +++ b/src/go/rpk/pkg/cli/topic/unmount_test.go @@ -0,0 +1,149 @@ +package topic + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/redpanda-data/common-go/rpadmin" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +// MockMigrationClient is a mock implementation of the migrationClient interface +type MockMigrationClient struct { + mock.Mock +} + +func (m *MockMigrationClient) ExecuteMigration(ctx context.Context, id int, action rpadmin.MigrationAction) error { + args := m.Called(ctx, id, action) + return args.Error(0) +} + +func (m *MockMigrationClient) GetMigration(ctx context.Context, id int) (rpadmin.MigrationState, error) { + args := m.Called(ctx, id) + return args.Get(0).(rpadmin.MigrationState), args.Error(1) +} + +func TestCheckMigrationActionAndAdvanceState(t *testing.T) { + tests := []struct { + name string + id int + action rpadmin.MigrationAction + desiredStatus rpadmin.MigrationStatus + timeout time.Duration + executionError error + getMigrations []rpadmin.MigrationState + getErrors []error + expectedError string + }{ + { + name: "Prepare Migration Success", + id: 1, + action: rpadmin.MigrationActionPrepare, + desiredStatus: rpadmin.MigrationStatusPrepared, + timeout: 11 * time.Second, + executionError: nil, + getMigrations: []rpadmin.MigrationState{{State: "preparing"}, {State: "prepared"}}, + getErrors: []error{nil, nil}, + expectedError: "", + }, + { + name: "Execute Migration Success", + id: 2, + action: rpadmin.MigrationActionExecute, + desiredStatus: rpadmin.MigrationStatusExecuted, + timeout: 11 * time.Second, + executionError: nil, + getMigrations: []rpadmin.MigrationState{{State: "executing"}, {State: "executed"}}, + getErrors: []error{nil, nil}, + expectedError: "", + }, + { + name: "Finish Migration Success", + id: 3, + action: rpadmin.MigrationActionFinish, + desiredStatus: rpadmin.MigrationStatusFinished, + timeout: 11 * time.Second, + executionError: nil, + getMigrations: []rpadmin.MigrationState{{State: "finishing"}, {State: "finished"}}, + getErrors: []error{nil, nil}, + expectedError: "", + }, + { + name: "Execution Error", + id: 4, + action: rpadmin.MigrationActionPrepare, + desiredStatus: rpadmin.MigrationStatusPrepared, + timeout: 10 * time.Second, + executionError: errors.New("execution failed"), + getMigrations: []rpadmin.MigrationState{}, + getErrors: []error{}, + expectedError: "unable to execute migration: execution failed", + }, + { + name: "Get Migration Error", + id: 5, + action: rpadmin.MigrationActionExecute, + desiredStatus: rpadmin.MigrationStatusExecuted, + timeout: 10 * time.Second, + executionError: nil, + getMigrations: []rpadmin.MigrationState{{State: "executing"}}, + getErrors: []error{errors.New("get migration failed")}, + expectedError: "unable to get migration state: get migration failed", + }, + { + name: "Timeout", + id: 6, + action: rpadmin.MigrationActionFinish, + desiredStatus: rpadmin.MigrationStatusFinished, + timeout: 16 * time.Second, + executionError: nil, + getMigrations: []rpadmin.MigrationState{{State: "finishing"}, {State: "finishing"}, {State: "finishing"}}, + getErrors: []error{nil, nil, nil}, + expectedError: "operation interrupted: context deadline exceeded", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create a context with timeout + ctx, cancel := context.WithTimeout(context.Background(), tt.timeout) + defer cancel() // Ensure the context is canceled when we're done + + mockClient := new(MockMigrationClient) + + // Use a custom matcher for context + contextMatcher := mock.MatchedBy(func(c context.Context) bool { + // Check if the context has a deadline (timeout) + deadline, hasDeadline := c.Deadline() + // Check if the context can be canceled + select { + case <-c.Done(): + return false // Context is already done, which is not what we expect + default: + // Context is not done, which is what we expect + } + return hasDeadline && time.Until(deadline) <= tt.timeout + }) + + mockClient.On("ExecuteMigration", contextMatcher, tt.id, tt.action).Return(tt.executionError) + + for i := range tt.getMigrations { + mockClient.On("GetMigration", contextMatcher, tt.id).Return(tt.getMigrations[i], tt.getErrors[i]).Once() + } + + err := checkMigrationActionAndAdvanceState(ctx, tt.id, mockClient, tt.action, tt.desiredStatus, tt.timeout) + + if tt.expectedError != "" { + assert.Error(t, err) + assert.Contains(t, err.Error(), tt.expectedError) + } else { + assert.NoError(t, err) + } + + mockClient.AssertExpectations(t) + }) + } +}