Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(consumer): follow preferred broker #1936

Merged
merged 3 commits into from
May 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions acl_bindings.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package sarama

//Resource holds information about acl resource type
// Resource holds information about acl resource type
type Resource struct {
ResourceType AclResourceType
ResourceName string
Expand Down Expand Up @@ -46,7 +46,7 @@ func (r *Resource) decode(pd packetDecoder, version int16) (err error) {
return nil
}

//Acl holds information about acl type
// Acl holds information about acl type
type Acl struct {
Principal string
Host string
Expand Down Expand Up @@ -93,7 +93,7 @@ func (a *Acl) decode(pd packetDecoder, version int16) (err error) {
return nil
}

//ResourceAcls is an acl resource type
// ResourceAcls is an acl resource type
type ResourceAcls struct {
Resource
Acls []*Acl
Expand Down
4 changes: 2 additions & 2 deletions acl_create_request.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package sarama

//CreateAclsRequest is an acl creation request
// CreateAclsRequest is an acl creation request
type CreateAclsRequest struct {
Version int16
AclCreations []*AclCreation
Expand Down Expand Up @@ -60,7 +60,7 @@ func (c *CreateAclsRequest) requiredVersion() KafkaVersion {
}
}

//AclCreation is a wrapper around Resource and Acl type
// AclCreation is a wrapper around Resource and Acl type
type AclCreation struct {
Resource
Acl
Expand Down
46 changes: 25 additions & 21 deletions acl_create_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,19 @@ var (
func TestCreateAclsRequestv0(t *testing.T) {
req := &CreateAclsRequest{
Version: 0,
AclCreations: []*AclCreation{{
Resource: Resource{
ResourceType: AclResourceGroup,
ResourceName: "group",
AclCreations: []*AclCreation{
{
Resource: Resource{
ResourceType: AclResourceGroup,
ResourceName: "group",
},
Acl: Acl{
Principal: "principal",
Host: "host",
Operation: AclOperationAll,
PermissionType: AclPermissionDeny,
},
},
Acl: Acl{
Principal: "principal",
Host: "host",
Operation: AclOperationAll,
PermissionType: AclPermissionDeny,
}},
},
}

Expand All @@ -47,18 +49,20 @@ func TestCreateAclsRequestv0(t *testing.T) {
func TestCreateAclsRequestv1(t *testing.T) {
req := &CreateAclsRequest{
Version: 1,
AclCreations: []*AclCreation{{
Resource: Resource{
ResourceType: AclResourceGroup,
ResourceName: "group",
ResourcePatternType: AclPatternLiteral,
AclCreations: []*AclCreation{
{
Resource: Resource{
ResourceType: AclResourceGroup,
ResourceName: "group",
ResourcePatternType: AclPatternLiteral,
},
Acl: Acl{
Principal: "principal",
Host: "host",
Operation: AclOperationAll,
PermissionType: AclPermissionDeny,
},
},
Acl: Acl{
Principal: "principal",
Host: "host",
Operation: AclOperationAll,
PermissionType: AclPermissionDeny,
}},
},
}

Expand Down
4 changes: 2 additions & 2 deletions acl_create_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package sarama

import "time"

//CreateAclsResponse is a an acl response creation type
// CreateAclsResponse is a an acl response creation type
type CreateAclsResponse struct {
ThrottleTime time.Duration
AclCreationResponses []*AclCreationResponse
Expand Down Expand Up @@ -63,7 +63,7 @@ func (c *CreateAclsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}

//AclCreationResponse is an acl creation response type
// AclCreationResponse is an acl creation response type
type AclCreationResponse struct {
Err KError
ErrMsg *string
Expand Down
2 changes: 1 addition & 1 deletion acl_delete_request.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package sarama

//DeleteAclsRequest is a delete acl request
// DeleteAclsRequest is a delete acl request
type DeleteAclsRequest struct {
Version int
Filters []*AclFilter
Expand Down
6 changes: 3 additions & 3 deletions acl_delete_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package sarama

import "time"

//DeleteAclsResponse is a delete acl response
// DeleteAclsResponse is a delete acl response
type DeleteAclsResponse struct {
Version int16
ThrottleTime time.Duration
Expand Down Expand Up @@ -64,7 +64,7 @@ func (d *DeleteAclsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}

//FilterResponse is a filter response type
// FilterResponse is a filter response type
type FilterResponse struct {
Err KError
ErrMsg *string
Expand Down Expand Up @@ -115,7 +115,7 @@ func (f *FilterResponse) decode(pd packetDecoder, version int16) (err error) {
return nil
}

//MatchingAcl is a matching acl type
// MatchingAcl is a matching acl type
type MatchingAcl struct {
Err KError
ErrMsg *string
Expand Down
32 changes: 15 additions & 17 deletions acl_delete_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,21 @@ import (
"time"
)

var (
deleteAclsResponse = []byte{
0, 0, 0, 100,
0, 0, 0, 1,
0, 0, // no error
255, 255, // no error message
0, 0, 0, 1, // 1 matching acl
0, 0, // no error
255, 255, // no error message
2, // resource type
0, 5, 't', 'o', 'p', 'i', 'c',
0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l',
0, 4, 'h', 'o', 's', 't',
4,
3,
}
)
var deleteAclsResponse = []byte{
0, 0, 0, 100,
0, 0, 0, 1,
0, 0, // no error
255, 255, // no error message
0, 0, 0, 1, // 1 matching acl
0, 0, // no error
255, 255, // no error message
2, // resource type
0, 5, 't', 'o', 'p', 'i', 'c',
0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l',
0, 4, 'h', 'o', 's', 't',
4,
3,
}

func TestDeleteAclsResponse(t *testing.T) {
resp := &DeleteAclsResponse{
Expand Down
2 changes: 1 addition & 1 deletion acl_describe_request.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package sarama

//DescribeAclsRequest is a secribe acl request type
// DescribeAclsRequest is a secribe acl request type
type DescribeAclsRequest struct {
Version int
AclFilter
Expand Down
2 changes: 1 addition & 1 deletion acl_describe_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package sarama

import "time"

//DescribeAclsResponse is a describe acl response type
// DescribeAclsResponse is a describe acl response type
type DescribeAclsResponse struct {
Version int16
ThrottleTime time.Duration
Expand Down
1 change: 0 additions & 1 deletion acl_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func (a *AclFilter) decode(pd packetDecoder, version int16) (err error) {

if a.Version == 1 {
pattern, err := pd.getInt8()

if err != nil {
return err
}
Expand Down
16 changes: 8 additions & 8 deletions acl_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ func (a *AclOperation) String() string {
return s
}

//MarshalText returns the text form of the AclOperation (name without prefix)
// MarshalText returns the text form of the AclOperation (name without prefix)
func (a *AclOperation) MarshalText() ([]byte, error) {
return []byte(a.String()), nil
}

//UnmarshalText takes a text reprentation of the operation and converts it to an AclOperation
// UnmarshalText takes a text reprentation of the operation and converts it to an AclOperation
func (a *AclOperation) UnmarshalText(text []byte) error {
normalized := strings.ToLower(string(text))
mapping := map[string]AclOperation{
Expand Down Expand Up @@ -109,12 +109,12 @@ func (a *AclPermissionType) String() string {
return s
}

//MarshalText returns the text form of the AclPermissionType (name without prefix)
// MarshalText returns the text form of the AclPermissionType (name without prefix)
func (a *AclPermissionType) MarshalText() ([]byte, error) {
return []byte(a.String()), nil
}

//UnmarshalText takes a text reprentation of the permission type and converts it to an AclPermissionType
// UnmarshalText takes a text reprentation of the permission type and converts it to an AclPermissionType
func (a *AclPermissionType) UnmarshalText(text []byte) error {
normalized := strings.ToLower(string(text))
mapping := map[string]AclPermissionType{
Expand Down Expand Up @@ -159,12 +159,12 @@ func (a *AclResourceType) String() string {
return s
}

//MarshalText returns the text form of the AclResourceType (name without prefix)
// MarshalText returns the text form of the AclResourceType (name without prefix)
func (a *AclResourceType) MarshalText() ([]byte, error) {
return []byte(a.String()), nil
}

//UnmarshalText takes a text reprentation of the resource type and converts it to an AclResourceType
// UnmarshalText takes a text reprentation of the resource type and converts it to an AclResourceType
func (a *AclResourceType) UnmarshalText(text []byte) error {
normalized := strings.ToLower(string(text))
mapping := map[string]AclResourceType{
Expand Down Expand Up @@ -209,12 +209,12 @@ func (a *AclResourcePatternType) String() string {
return s
}

//MarshalText returns the text form of the AclResourcePatternType (name without prefix)
// MarshalText returns the text form of the AclResourcePatternType (name without prefix)
func (a *AclResourcePatternType) MarshalText() ([]byte, error) {
return []byte(a.String()), nil
}

//UnmarshalText takes a text reprentation of the resource pattern type and converts it to an AclResourcePatternType
// UnmarshalText takes a text reprentation of the resource pattern type and converts it to an AclResourcePatternType
func (a *AclResourcePatternType) UnmarshalText(text []byte) error {
normalized := strings.ToLower(string(text))
mapping := map[string]AclResourcePatternType{
Expand Down
2 changes: 2 additions & 0 deletions acl_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestAclPermissionTypeTextMarshal(t *testing.T) {
}
}
}

func TestAclResourceTypeTextMarshal(t *testing.T) {
for i := AclResourceUnknown; i <= AclResourceTransactionalID; i++ {
text, err := i.MarshalText()
Expand All @@ -53,6 +54,7 @@ func TestAclResourceTypeTextMarshal(t *testing.T) {
}
}
}

func TestAclResourcePatternTypeTextMarshal(t *testing.T) {
for i := AclPatternUnknown; i <= AclPatternPrefixed; i++ {
text, err := i.MarshalText()
Expand Down
2 changes: 1 addition & 1 deletion add_offsets_to_txn_request.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package sarama

//AddOffsetsToTxnRequest adds offsets to a transaction request
// AddOffsetsToTxnRequest adds offsets to a transaction request
type AddOffsetsToTxnRequest struct {
TransactionalID string
ProducerID int64
Expand Down
14 changes: 6 additions & 8 deletions add_offsets_to_txn_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package sarama

import "testing"

var (
addOffsetsToTxnRequest = []byte{
0, 3, 't', 'x', 'n',
0, 0, 0, 0, 0, 0, 31, 64,
0, 0,
0, 7, 'g', 'r', 'o', 'u', 'p', 'i', 'd',
}
)
var addOffsetsToTxnRequest = []byte{
0, 3, 't', 'x', 'n',
0, 0, 0, 0, 0, 0, 31, 64,
0, 0,
0, 7, 'g', 'r', 'o', 'u', 'p', 'i', 'd',
}

func TestAddOffsetsToTxnRequest(t *testing.T) {
req := &AddOffsetsToTxnRequest{
Expand Down
2 changes: 1 addition & 1 deletion add_offsets_to_txn_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"time"
)

//AddOffsetsToTxnResponse is a response type for adding offsets to txns
// AddOffsetsToTxnResponse is a response type for adding offsets to txns
type AddOffsetsToTxnResponse struct {
ThrottleTime time.Duration
Err KError
Expand Down
10 changes: 4 additions & 6 deletions add_offsets_to_txn_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@ import (
"time"
)

var (
addOffsetsToTxnResponse = []byte{
0, 0, 0, 100,
0, 47,
}
)
var addOffsetsToTxnResponse = []byte{
0, 0, 0, 100,
0, 47,
}

func TestAddOffsetsToTxnResponse(t *testing.T) {
resp := &AddOffsetsToTxnResponse{
Expand Down
2 changes: 1 addition & 1 deletion add_partitions_to_txn_request.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package sarama

//AddPartitionsToTxnRequest is a add paartition request
// AddPartitionsToTxnRequest is a add paartition request
type AddPartitionsToTxnRequest struct {
TransactionalID string
ProducerID int64
Expand Down
18 changes: 8 additions & 10 deletions add_partitions_to_txn_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@ package sarama

import "testing"

var (
addPartitionsToTxnRequest = []byte{
0, 3, 't', 'x', 'n',
0, 0, 0, 0, 0, 0, 31, 64, // ProducerID
0, 0, 0, 0, // ProducerEpoch
0, 1, // 1 topic
0, 5, 't', 'o', 'p', 'i', 'c',
0, 0, 0, 1, 0, 0, 0, 1,
}
)
var addPartitionsToTxnRequest = []byte{
0, 3, 't', 'x', 'n',
0, 0, 0, 0, 0, 0, 31, 64, // ProducerID
0, 0, 0, 0, // ProducerEpoch
0, 1, // 1 topic
0, 5, 't', 'o', 'p', 'i', 'c',
0, 0, 0, 1, 0, 0, 0, 1,
}

func TestAddPartitionsToTxnRequest(t *testing.T) {
req := &AddPartitionsToTxnRequest{
Expand Down
Loading