From ec2cbc38140c80ff9b015d654c777a81610a6297 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Wed, 22 Aug 2018 10:58:12 +0100 Subject: [PATCH 1/2] Satisfy the error interface in create responses Contributes-to: Shopify/sarama#1153 Signed-off-by: Dominic Evans --- admin.go | 4 +- admin_test.go | 60 ++++++++++++++++++++++++++++++ create_partitions_response.go | 13 ++++++- create_partitions_response_test.go | 24 ++++++++++++ create_topics_response.go | 13 ++++++- create_topics_response_test.go | 24 ++++++++++++ mockresponses.go | 21 ++++++++++- 7 files changed, 154 insertions(+), 5 deletions(-) diff --git a/admin.go b/admin.go index c58c60ec6..f45f87fd8 100644 --- a/admin.go +++ b/admin.go @@ -166,7 +166,7 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO } if topicErr.Err != ErrNoError { - return topicErr.Err + return topicErr } return nil @@ -354,7 +354,7 @@ func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [ } if topicErr.Err != ErrNoError { - return topicErr.Err + return topicErr } return nil diff --git a/admin_test.go b/admin_test.go index 0c02ba893..4327670ff 100644 --- a/admin_test.go +++ b/admin_test.go @@ -2,6 +2,7 @@ package sarama import ( "errors" + "strings" "testing" ) @@ -105,6 +106,36 @@ func TestClusterAdminCreateTopicWithInvalidTopicDetail(t *testing.T) { } } +func TestClusterAdminCreateTopicWithoutAuthorization(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(seedBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), + "CreateTopicsRequest": NewMockCreateTopicsResponse(t), + }) + + config := NewConfig() + config.Version = V0_11_0_0 + + admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + err = admin.CreateTopic("_internal_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false) + want := "insufficient permissions to create topic with reserved prefix" + if !strings.HasSuffix(err.Error(), want) { + t.Fatal(err) + } + err = admin.Close() + if err != nil { + t.Fatal(err) + } +} + func TestClusterAdminCreateTopicWithDiffVersion(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -297,6 +328,35 @@ func TestClusterAdminCreatePartitionsWithDiffVersion(t *testing.T) { } } +func TestClusterAdminCreatePartitionsWithoutAuthorization(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(seedBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), + "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t), + }) + + config := NewConfig() + config.Version = V1_0_0_0 + admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + err = admin.CreatePartitions("_internal_topic", 3, nil, false) + want := "insufficient permissions to create partition on topic with reserved prefix" + if !strings.HasSuffix(err.Error(), want) { + t.Fatal(err) + } + err = admin.Close() + if err != nil { + t.Fatal(err) + } +} + func TestClusterAdminDeleteRecords(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() diff --git a/create_partitions_response.go b/create_partitions_response.go index abd621c64..bb18204a7 100644 --- a/create_partitions_response.go +++ b/create_partitions_response.go @@ -1,6 +1,9 @@ package sarama -import "time" +import ( + "fmt" + "time" +) type CreatePartitionsResponse struct { ThrottleTime time.Duration @@ -69,6 +72,14 @@ type TopicPartitionError struct { ErrMsg *string } +func (t *TopicPartitionError) Error() string { + text := t.Err.Error() + if t.ErrMsg != nil { + text = fmt.Sprintf("%s - %s", text, *t.ErrMsg) + } + return text +} + func (t *TopicPartitionError) encode(pe packetEncoder) error { pe.putInt16(int16(t.Err)) diff --git a/create_partitions_response_test.go b/create_partitions_response_test.go index 3219882ca..815aba162 100644 --- a/create_partitions_response_test.go +++ b/create_partitions_response_test.go @@ -50,3 +50,27 @@ func TestCreatePartitionsResponse(t *testing.T) { t.Errorf("Decoding error: expected %v but got %v", decodedresp, resp) } } + +func TestTopicPartitionError(t *testing.T) { + // Assert that TopicPartitionError satisfies error interface + var err error = &TopicPartitionError{ + Err: ErrTopicAuthorizationFailed, + } + + got := err.Error() + want := ErrTopicAuthorizationFailed.Error() + if got != want { + t.Errorf("TopicPartitionError.Error() = %v; want %v", got, want) + } + + msg := "reason why topic authorization failed" + err = &TopicPartitionError{ + Err: ErrTopicAuthorizationFailed, + ErrMsg: &msg, + } + got = err.Error() + want = ErrTopicAuthorizationFailed.Error() + " - " + msg + if got != want { + t.Errorf("TopicPartitionError.Error() = %v; want %v", got, want) + } +} diff --git a/create_topics_response.go b/create_topics_response.go index 66207e00c..a493e02ac 100644 --- a/create_topics_response.go +++ b/create_topics_response.go @@ -1,6 +1,9 @@ package sarama -import "time" +import ( + "fmt" + "time" +) type CreateTopicsResponse struct { Version int16 @@ -83,6 +86,14 @@ type TopicError struct { ErrMsg *string } +func (t *TopicError) Error() string { + text := t.Err.Error() + if t.ErrMsg != nil { + text = fmt.Sprintf("%s - %s", text, *t.ErrMsg) + } + return text +} + func (t *TopicError) encode(pe packetEncoder, version int16) error { pe.putInt16(int16(t.Err)) diff --git a/create_topics_response_test.go b/create_topics_response_test.go index 53790064f..30c1fb2a0 100644 --- a/create_topics_response_test.go +++ b/create_topics_response_test.go @@ -50,3 +50,27 @@ func TestCreateTopicsResponse(t *testing.T) { testResponse(t, "version 2", resp, createTopicsResponseV2) } + +func TestTopicError(t *testing.T) { + // Assert that TopicError satisfies error interface + var err error = &TopicError{ + Err: ErrTopicAuthorizationFailed, + } + + got := err.Error() + want := ErrTopicAuthorizationFailed.Error() + if got != want { + t.Errorf("TopicError.Error() = %v; want %v", got, want) + } + + msg := "reason why topic authorization failed" + err = &TopicError{ + Err: ErrTopicAuthorizationFailed, + ErrMsg: &msg, + } + got = err.Error() + want = ErrTopicAuthorizationFailed.Error() + " - " + msg + if got != want { + t.Errorf("TopicError.Error() = %v; want %v", got, want) + } +} diff --git a/mockresponses.go b/mockresponses.go index 61a99e104..063dcc9a0 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -2,6 +2,7 @@ package sarama import ( "fmt" + "strings" ) // TestReporter has methods matching go's testing.T to avoid importing @@ -612,10 +613,20 @@ func NewMockCreateTopicsResponse(t TestReporter) *MockCreateTopicsResponse { func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoder { req := reqBody.(*CreateTopicsRequest) - res := &CreateTopicsResponse{} + res := &CreateTopicsResponse{ + Version: req.Version, + } res.TopicErrors = make(map[string]*TopicError) for topic, _ := range req.TopicDetails { + if res.Version >= 1 && strings.HasPrefix(topic, "_") { + msg := "insufficient permissions to create topic with reserved prefix" + res.TopicErrors[topic] = &TopicError{ + Err: ErrTopicAuthorizationFailed, + ErrMsg: &msg, + } + continue + } res.TopicErrors[topic] = &TopicError{Err: ErrNoError} } return res @@ -654,6 +665,14 @@ func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoder { res.TopicPartitionErrors = make(map[string]*TopicPartitionError) for topic, _ := range req.TopicPartitions { + if strings.HasPrefix(topic, "_") { + msg := "insufficient permissions to create partition on topic with reserved prefix" + res.TopicPartitionErrors[topic] = &TopicPartitionError{ + Err: ErrTopicAuthorizationFailed, + ErrMsg: &msg, + } + continue + } res.TopicPartitionErrors[topic] = &TopicPartitionError{Err: ErrNoError} } return res From e581f69e63a6da948057a79619dc5548ca4e9ea4 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Mon, 11 Feb 2019 23:05:12 +0000 Subject: [PATCH 2/2] bug: remove invalid test I'm not sure what the purpose of `TestClusterAdminCreateTopicWithDiffVersion` originally was, but rather than testing desired behaviour, it seemed to be asserting that a previous deficiency in the mockbroker responses would return an ErrInsufficientData because a V1 request got a V0 response from the mockbroker. Since ec2cbc3 the mockbroker responses with a matching response version to the original request, so this test would no longer pass its assertions and should be removed. --- admin_test.go | 29 ----------------------------- 1 file changed, 29 deletions(-) diff --git a/admin_test.go b/admin_test.go index 4327670ff..66f497bc3 100644 --- a/admin_test.go +++ b/admin_test.go @@ -136,35 +136,6 @@ func TestClusterAdminCreateTopicWithoutAuthorization(t *testing.T) { } } -func TestClusterAdminCreateTopicWithDiffVersion(t *testing.T) { - seedBroker := NewMockBroker(t, 1) - defer seedBroker.Close() - - seedBroker.SetHandlerByMap(map[string]MockResponse{ - "MetadataRequest": NewMockMetadataResponse(t). - SetController(seedBroker.BrokerID()). - SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), - "CreateTopicsRequest": NewMockCreateTopicsResponse(t), - }) - - config := NewConfig() - config.Version = V0_11_0_0 - admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) - if err != nil { - t.Fatal(err) - } - - err = admin.CreateTopic("my_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false) - if err != ErrInsufficientData { - t.Fatal(err) - } - - err = admin.Close() - if err != nil { - t.Fatal(err) - } -} - func TestClusterAdminListTopics(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close()