Skip to content

Commit

Permalink
Merge pull request #1154 from dnwe/topic-response-errors
Browse files Browse the repository at this point in the history
Satisfy the error interface in create responses
  • Loading branch information
bai committed Feb 25, 2019
2 parents a283804 + e581f69 commit 0cfd4e6
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 9 deletions.
4 changes: 2 additions & 2 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO
}

if topicErr.Err != ErrNoError {
return topicErr.Err
return topicErr
}

return nil
Expand Down Expand Up @@ -358,7 +358,7 @@ func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [
}

if topicErr.Err != ErrNoError {
return topicErr.Err
return topicErr
}

return nil
Expand Down
39 changes: 35 additions & 4 deletions admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

import (
"errors"
"strings"
"testing"
)

Expand Down Expand Up @@ -105,7 +106,7 @@ func TestClusterAdminCreateTopicWithInvalidTopicDetail(t *testing.T) {
}
}

func TestClusterAdminCreateTopicWithDiffVersion(t *testing.T) {
func TestClusterAdminCreateTopicWithoutAuthorization(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

Expand All @@ -118,16 +119,17 @@ func TestClusterAdminCreateTopicWithDiffVersion(t *testing.T) {

config := NewConfig()
config.Version = V0_11_0_0

admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

err = admin.CreateTopic("my_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false)
if err != ErrInsufficientData {
err = admin.CreateTopic("_internal_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false)
want := "insufficient permissions to create topic with reserved prefix"
if !strings.HasSuffix(err.Error(), want) {
t.Fatal(err)
}

err = admin.Close()
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -301,6 +303,35 @@ func TestClusterAdminCreatePartitionsWithDiffVersion(t *testing.T) {
}
}

func TestClusterAdminCreatePartitionsWithoutAuthorization(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(seedBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
"CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
})

config := NewConfig()
config.Version = V1_0_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

err = admin.CreatePartitions("_internal_topic", 3, nil, false)
want := "insufficient permissions to create partition on topic with reserved prefix"
if !strings.HasSuffix(err.Error(), want) {
t.Fatal(err)
}
err = admin.Close()
if err != nil {
t.Fatal(err)
}
}

func TestClusterAdminDeleteRecords(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()
Expand Down
13 changes: 12 additions & 1 deletion create_partitions_response.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package sarama

import "time"
import (
"fmt"
"time"
)

type CreatePartitionsResponse struct {
ThrottleTime time.Duration
Expand Down Expand Up @@ -69,6 +72,14 @@ type TopicPartitionError struct {
ErrMsg *string
}

func (t *TopicPartitionError) Error() string {
text := t.Err.Error()
if t.ErrMsg != nil {
text = fmt.Sprintf("%s - %s", text, *t.ErrMsg)
}
return text
}

func (t *TopicPartitionError) encode(pe packetEncoder) error {
pe.putInt16(int16(t.Err))

Expand Down
24 changes: 24 additions & 0 deletions create_partitions_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,27 @@ func TestCreatePartitionsResponse(t *testing.T) {
t.Errorf("Decoding error: expected %v but got %v", decodedresp, resp)
}
}

func TestTopicPartitionError(t *testing.T) {
// Assert that TopicPartitionError satisfies error interface
var err error = &TopicPartitionError{
Err: ErrTopicAuthorizationFailed,
}

got := err.Error()
want := ErrTopicAuthorizationFailed.Error()
if got != want {
t.Errorf("TopicPartitionError.Error() = %v; want %v", got, want)
}

msg := "reason why topic authorization failed"
err = &TopicPartitionError{
Err: ErrTopicAuthorizationFailed,
ErrMsg: &msg,
}
got = err.Error()
want = ErrTopicAuthorizationFailed.Error() + " - " + msg
if got != want {
t.Errorf("TopicPartitionError.Error() = %v; want %v", got, want)
}
}
13 changes: 12 additions & 1 deletion create_topics_response.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package sarama

import "time"
import (
"fmt"
"time"
)

type CreateTopicsResponse struct {
Version int16
Expand Down Expand Up @@ -83,6 +86,14 @@ type TopicError struct {
ErrMsg *string
}

func (t *TopicError) Error() string {
text := t.Err.Error()
if t.ErrMsg != nil {
text = fmt.Sprintf("%s - %s", text, *t.ErrMsg)
}
return text
}

func (t *TopicError) encode(pe packetEncoder, version int16) error {
pe.putInt16(int16(t.Err))

Expand Down
24 changes: 24 additions & 0 deletions create_topics_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,27 @@ func TestCreateTopicsResponse(t *testing.T) {

testResponse(t, "version 2", resp, createTopicsResponseV2)
}

func TestTopicError(t *testing.T) {
// Assert that TopicError satisfies error interface
var err error = &TopicError{
Err: ErrTopicAuthorizationFailed,
}

got := err.Error()
want := ErrTopicAuthorizationFailed.Error()
if got != want {
t.Errorf("TopicError.Error() = %v; want %v", got, want)
}

msg := "reason why topic authorization failed"
err = &TopicError{
Err: ErrTopicAuthorizationFailed,
ErrMsg: &msg,
}
got = err.Error()
want = ErrTopicAuthorizationFailed.Error() + " - " + msg
if got != want {
t.Errorf("TopicError.Error() = %v; want %v", got, want)
}
}
21 changes: 20 additions & 1 deletion mockresponses.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

import (
"fmt"
"strings"
)

// TestReporter has methods matching go's testing.T to avoid importing
Expand Down Expand Up @@ -620,10 +621,20 @@ func NewMockCreateTopicsResponse(t TestReporter) *MockCreateTopicsResponse {

func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoder {
req := reqBody.(*CreateTopicsRequest)
res := &CreateTopicsResponse{}
res := &CreateTopicsResponse{
Version: req.Version,
}
res.TopicErrors = make(map[string]*TopicError)

for topic, _ := range req.TopicDetails {
if res.Version >= 1 && strings.HasPrefix(topic, "_") {
msg := "insufficient permissions to create topic with reserved prefix"
res.TopicErrors[topic] = &TopicError{
Err: ErrTopicAuthorizationFailed,
ErrMsg: &msg,
}
continue
}
res.TopicErrors[topic] = &TopicError{Err: ErrNoError}
}
return res
Expand Down Expand Up @@ -662,6 +673,14 @@ func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoder {
res.TopicPartitionErrors = make(map[string]*TopicPartitionError)

for topic, _ := range req.TopicPartitions {
if strings.HasPrefix(topic, "_") {
msg := "insufficient permissions to create partition on topic with reserved prefix"
res.TopicPartitionErrors[topic] = &TopicPartitionError{
Err: ErrTopicAuthorizationFailed,
ErrMsg: &msg,
}
continue
}
res.TopicPartitionErrors[topic] = &TopicPartitionError{Err: ErrNoError}
}
return res
Expand Down

0 comments on commit 0cfd4e6

Please sign in to comment.