Skip to content

Commit

Permalink
rpk: avoid listing offsets for no topics, fix group seek
Browse files Browse the repository at this point in the history
For rpk group seek, if we seeked in a group that had no topics, we would
list *all* topics (as is the api for list) and then commit a group to
the beginning of *all* topics.

We now only seek if there are topics to seek, otherwise we print the
standard output that no offset was changed.

We also fix this potential seek problem in rpk group describe, but
realistically all groups have topics assigned.
  • Loading branch information
twmb committed Jun 7, 2022
1 parent 0defc31 commit bc6726d
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 22 deletions.
7 changes: 5 additions & 2 deletions src/go/rpk/pkg/cli/cmd/group/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,11 @@ information about the members.
out.Die("unable to fetch offsets for any group")
}

listed, err := adm.ListEndOffsets(ctx, described.AssignedPartitions().Topics()...)
out.HandleShardError("ListOffsets", err)
var listed kadm.ListedOffsets
if topics := described.AssignedPartitions().Topics(); len(topics) > 0 {
listed, err = adm.ListEndOffsets(ctx, topics...)
out.HandleShardError("ListOffsets", err)
}

printDescribed(
described,
Expand Down
43 changes: 24 additions & 19 deletions src/go/rpk/pkg/cli/cmd/group/seek.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,28 +220,33 @@ func seek(
tps[topic] = map[int32]struct{}{} // ensure exists
}
topics := tps.Topics()

var listed kadm.ListedOffsets
var err error
switch to {
case "start":
listed, err = adm.ListStartOffsets(context.Background(), topics...)
case "end":
listed, err = adm.ListEndOffsets(context.Background(), topics...)
default:
var milli int64
milli, err = strconv.ParseInt(to, 10, 64)
out.MaybeDie(err, "unable to parse millisecond %q: %v", to, err)
switch len(to) {
case 10: // e.g. "1622505600"; sec to milli
milli *= 1000
case 13: // e.g. "1622505600000", already in milli
case 19: // e.g. "1622505600000000000"; nano to milli
milli /= 1e6
if len(topics) > 0 {
var err error
switch to {
case "start":
listed, err = adm.ListStartOffsets(context.Background(), topics...)
case "end":
listed, err = adm.ListEndOffsets(context.Background(), topics...)
default:
out.Die("--to timestamp %q is not a second, nor a millisecond, nor a nanosecond", to)
var milli int64
milli, err = strconv.ParseInt(to, 10, 64)
out.MaybeDie(err, "unable to parse millisecond %q: %v", to, err)
switch len(to) {
case 10: // e.g. "1622505600"; sec to milli
milli *= 1000
case 13: // e.g. "1622505600000", already in milli
case 19: // e.g. "1622505600000000000"; nano to milli
milli /= 1e6
default:
out.Die("--to timestamp %q is not a second, nor a millisecond, nor a nanosecond", to)
}
listed, err = adm.ListOffsetsAfterMilli(context.Background(), milli, topics...)
}
if err == nil { // ListOffsets can return ShardErrors, but we want to be entirely successful
err = listed.Error()
}
listed, err = adm.ListOffsetsAfterMilli(context.Background(), milli, topics...)
out.MaybeDie(err, "unable to list all offsets successfully: %v", err)
}
if err == nil { // ListOffsets can return ShardErrors, but we want to be entirely successful
err = listed.Error()
Expand Down
2 changes: 1 addition & 1 deletion src/go/rpk/pkg/cli/cmd/topic/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ information.
Run: func(cmd *cobra.Command, topics []string) {
// The purpose of the regex flag really is for users to
// know what topics they will delete when using regex.
// We forbit deleting internal topics (redpanda
// We forbid deleting internal topics (redpanda
// actually does not expose these currently), so we
// make -r and -i incompatible.
if internal && re {
Expand Down

0 comments on commit bc6726d

Please sign in to comment.