Skip to content

Commit

Permalink
Merge pull request #5639 from r-vasquez/v22.1.x-rpk-topic-err-remap
Browse files Browse the repository at this point in the history
[v22.1.x] rpk topic error remap
  • Loading branch information
BenPope committed Jul 26, 2022
2 parents 3ac6e65 + ac8bb7c commit ce21d57
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 3 deletions.
18 changes: 17 additions & 1 deletion src/go/rpk/pkg/cli/cmd/topic/add_partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@ package topic

import (
"context"
"errors"
"fmt"
"os"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/kafka"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/out"
"github.com/spf13/afero"
"github.com/spf13/cobra"
"github.com/twmb/franz-go/pkg/kerr"
)

func NewAddPartitionsCommand(fs afero.Fs) *cobra.Command {
Expand All @@ -42,13 +46,25 @@ func NewAddPartitionsCommand(fs afero.Fs) *cobra.Command {
resps, err := adm.CreatePartitions(context.Background(), num, topics...)
out.MaybeDie(err, "create partitions request failed: %v", err)

var exit1 bool
defer func() {
if exit1 {
os.Exit(1)
}
}()

tw := out.NewTable("topic", "error")
defer tw.Flush()

for _, resp := range resps.Sorted() {
msg := "OK"
if e := resp.Err; e != nil {
msg = e.Error()
if errors.Is(e, kerr.InvalidPartitions) && num > 0 {
msg = fmt.Sprintf("INVALID_PARTITIONS: unable to add %d partitions due to hardware constraints", num)
} else {
msg = e.Error()
}
exit1 = true
}
tw.Print(resp.Topic, msg)
}
Expand Down
21 changes: 19 additions & 2 deletions src/go/rpk/pkg/cli/cmd/topic/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ package topic

import (
"context"
"errors"
"fmt"
"os"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/kafka"
Expand Down Expand Up @@ -85,21 +88,35 @@ the cleanup.policy=compact config option set.
resp, err := req.RequestWith(context.Background(), cl)
out.MaybeDie(err, "unable to create topics %v: %v", topics, err)

var exit1 bool
defer func() {
if exit1 {
os.Exit(1)
}
}()

tw := out.NewTable("topic", "status")
defer tw.Flush()

for _, topic := range resp.Topics {
msg := "OK"
if err := kerr.ErrorForCode(topic.ErrorCode); err != nil {
msg = err.Error()
if errors.Is(err, kerr.InvalidPartitions) && partitions > 0 {
msg = fmt.Sprintf("INVALID_PARTITIONS: unable to create topic with %d partitions due to hardware constraints", partitions)
} else if errors.Is(err, kerr.InvalidReplicationFactor) && replicas%2 == 0 {
msg = "INVALID_REPLICATION_FACTOR: replication factor must be odd"
} else {
msg = err.Error()
}
exit1 = true
}
tw.Print(topic.Topic, msg)
}
},
}
cmd.Flags().StringArrayVarP(&configKVs, "topic-config", "c", nil, "key=value; Config parameters (repeatable; e.g. -c cleanup.policy=compact)")
cmd.Flags().Int32VarP(&partitions, "partitions", "p", 1, "Number of partitions to create per topic")
cmd.Flags().Int16VarP(&replicas, "replicas", "r", -1, "Replication factor; if -1, this will be the broker's default.replication.factor")
cmd.Flags().Int16VarP(&replicas, "replicas", "r", -1, "Replication factor (must be odd); if -1, this will be the broker's default.replication.factor")
cmd.Flags().BoolVarP(&dry, "dry", "d", false, "dry run: validate the topic creation request; do not create topics")

// Sept 2021
Expand Down
4 changes: 4 additions & 0 deletions tests/rptest/clients/rpk.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ def create_topic(self, topic, partitions=1, replicas=None, config=None):
self._check_stdout_success(output)
return output

def add_partitions(self, topic, partitions):
cmd = ["add-partitions", topic, "-n", str(partitions)]
return self._run_topic(cmd)

def _check_stdout_success(self, output):
"""
Helper for topic operations where rpk does not surface errors
Expand Down
7 changes: 7 additions & 0 deletions tests/rptest/tests/rpk_topic_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ def test_create_topic_with_invalid_config(self, config_type):
lambda e: "INVALID_CONFIG" in str(e)):
out = self._rpk.create_topic("topic", config={config_type: "foo"})

@cluster(num_nodes=1)
def test_add_unfeasible_number_of_partitions(self):
with expect_exception(RpkException,
lambda e: "INVALID_REQUEST" in str(e)):
self._rpk.create_topic("topic")
out = self._rpk.add_partitions("topic", 2000000000000)

@cluster(num_nodes=4)
def test_produce(self):
topic = 'topic'
Expand Down

0 comments on commit ce21d57

Please sign in to comment.