diff --git a/acl_bindings.go b/acl_bindings.go index 50b689d1d..13440be67 100644 --- a/acl_bindings.go +++ b/acl_bindings.go @@ -1,6 +1,6 @@ package sarama -//Resource holds information about acl resource type +// Resource holds information about acl resource type type Resource struct { ResourceType AclResourceType ResourceName string @@ -46,7 +46,7 @@ func (r *Resource) decode(pd packetDecoder, version int16) (err error) { return nil } -//Acl holds information about acl type +// Acl holds information about acl type type Acl struct { Principal string Host string @@ -93,7 +93,7 @@ func (a *Acl) decode(pd packetDecoder, version int16) (err error) { return nil } -//ResourceAcls is an acl resource type +// ResourceAcls is an acl resource type type ResourceAcls struct { Resource Acls []*Acl diff --git a/acl_create_request.go b/acl_create_request.go index 6d8a70e1a..449102f74 100644 --- a/acl_create_request.go +++ b/acl_create_request.go @@ -1,6 +1,6 @@ package sarama -//CreateAclsRequest is an acl creation request +// CreateAclsRequest is an acl creation request type CreateAclsRequest struct { Version int16 AclCreations []*AclCreation @@ -60,7 +60,7 @@ func (c *CreateAclsRequest) requiredVersion() KafkaVersion { } } -//AclCreation is a wrapper around Resource and Acl type +// AclCreation is a wrapper around Resource and Acl type type AclCreation struct { Resource Acl diff --git a/acl_create_request_test.go b/acl_create_request_test.go index 8dd2a5f3e..1c636d01f 100644 --- a/acl_create_request_test.go +++ b/acl_create_request_test.go @@ -27,17 +27,19 @@ var ( func TestCreateAclsRequestv0(t *testing.T) { req := &CreateAclsRequest{ Version: 0, - AclCreations: []*AclCreation{{ - Resource: Resource{ - ResourceType: AclResourceGroup, - ResourceName: "group", + AclCreations: []*AclCreation{ + { + Resource: Resource{ + ResourceType: AclResourceGroup, + ResourceName: "group", + }, + Acl: Acl{ + Principal: "principal", + Host: "host", + Operation: AclOperationAll, + PermissionType: AclPermissionDeny, + }, }, - Acl: Acl{ - Principal: "principal", - Host: "host", - Operation: AclOperationAll, - PermissionType: AclPermissionDeny, - }}, }, } @@ -47,18 +49,20 @@ func TestCreateAclsRequestv0(t *testing.T) { func TestCreateAclsRequestv1(t *testing.T) { req := &CreateAclsRequest{ Version: 1, - AclCreations: []*AclCreation{{ - Resource: Resource{ - ResourceType: AclResourceGroup, - ResourceName: "group", - ResourcePatternType: AclPatternLiteral, + AclCreations: []*AclCreation{ + { + Resource: Resource{ + ResourceType: AclResourceGroup, + ResourceName: "group", + ResourcePatternType: AclPatternLiteral, + }, + Acl: Acl{ + Principal: "principal", + Host: "host", + Operation: AclOperationAll, + PermissionType: AclPermissionDeny, + }, }, - Acl: Acl{ - Principal: "principal", - Host: "host", - Operation: AclOperationAll, - PermissionType: AclPermissionDeny, - }}, }, } diff --git a/acl_create_response.go b/acl_create_response.go index 14b1b9e13..21d6c340c 100644 --- a/acl_create_response.go +++ b/acl_create_response.go @@ -2,7 +2,7 @@ package sarama import "time" -//CreateAclsResponse is a an acl response creation type +// CreateAclsResponse is a an acl response creation type type CreateAclsResponse struct { ThrottleTime time.Duration AclCreationResponses []*AclCreationResponse @@ -63,7 +63,7 @@ func (c *CreateAclsResponse) requiredVersion() KafkaVersion { return V0_11_0_0 } -//AclCreationResponse is an acl creation response type +// AclCreationResponse is an acl creation response type type AclCreationResponse struct { Err KError ErrMsg *string diff --git a/acl_delete_request.go b/acl_delete_request.go index 6efb44303..5e5c03bc2 100644 --- a/acl_delete_request.go +++ b/acl_delete_request.go @@ -1,6 +1,6 @@ package sarama -//DeleteAclsRequest is a delete acl request +// DeleteAclsRequest is a delete acl request type DeleteAclsRequest struct { Version int Filters []*AclFilter diff --git a/acl_delete_response.go b/acl_delete_response.go index cb6308826..cd33749d5 100644 --- a/acl_delete_response.go +++ b/acl_delete_response.go @@ -2,7 +2,7 @@ package sarama import "time" -//DeleteAclsResponse is a delete acl response +// DeleteAclsResponse is a delete acl response type DeleteAclsResponse struct { Version int16 ThrottleTime time.Duration @@ -64,7 +64,7 @@ func (d *DeleteAclsResponse) requiredVersion() KafkaVersion { return V0_11_0_0 } -//FilterResponse is a filter response type +// FilterResponse is a filter response type type FilterResponse struct { Err KError ErrMsg *string @@ -115,7 +115,7 @@ func (f *FilterResponse) decode(pd packetDecoder, version int16) (err error) { return nil } -//MatchingAcl is a matching acl type +// MatchingAcl is a matching acl type type MatchingAcl struct { Err KError ErrMsg *string diff --git a/acl_delete_response_test.go b/acl_delete_response_test.go index 0d9dea684..eb57d68a5 100644 --- a/acl_delete_response_test.go +++ b/acl_delete_response_test.go @@ -5,23 +5,21 @@ import ( "time" ) -var ( - deleteAclsResponse = []byte{ - 0, 0, 0, 100, - 0, 0, 0, 1, - 0, 0, // no error - 255, 255, // no error message - 0, 0, 0, 1, // 1 matching acl - 0, 0, // no error - 255, 255, // no error message - 2, // resource type - 0, 5, 't', 'o', 'p', 'i', 'c', - 0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l', - 0, 4, 'h', 'o', 's', 't', - 4, - 3, - } -) +var deleteAclsResponse = []byte{ + 0, 0, 0, 100, + 0, 0, 0, 1, + 0, 0, // no error + 255, 255, // no error message + 0, 0, 0, 1, // 1 matching acl + 0, 0, // no error + 255, 255, // no error message + 2, // resource type + 0, 5, 't', 'o', 'p', 'i', 'c', + 0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l', + 0, 4, 'h', 'o', 's', 't', + 4, + 3, +} func TestDeleteAclsResponse(t *testing.T) { resp := &DeleteAclsResponse{ diff --git a/acl_describe_request.go b/acl_describe_request.go index 29841a5ce..e0fe9023a 100644 --- a/acl_describe_request.go +++ b/acl_describe_request.go @@ -1,6 +1,6 @@ package sarama -//DescribeAclsRequest is a secribe acl request type +// DescribeAclsRequest is a secribe acl request type type DescribeAclsRequest struct { Version int AclFilter diff --git a/acl_describe_response.go b/acl_describe_response.go index c43408b24..3255fd485 100644 --- a/acl_describe_response.go +++ b/acl_describe_response.go @@ -2,7 +2,7 @@ package sarama import "time" -//DescribeAclsResponse is a describe acl response type +// DescribeAclsResponse is a describe acl response type type DescribeAclsResponse struct { Version int16 ThrottleTime time.Duration diff --git a/acl_filter.go b/acl_filter.go index fad555875..b380161aa 100644 --- a/acl_filter.go +++ b/acl_filter.go @@ -46,7 +46,6 @@ func (a *AclFilter) decode(pd packetDecoder, version int16) (err error) { if a.Version == 1 { pattern, err := pd.getInt8() - if err != nil { return err } diff --git a/acl_types.go b/acl_types.go index f3e6e79f4..16a21fc3f 100644 --- a/acl_types.go +++ b/acl_types.go @@ -55,12 +55,12 @@ func (a *AclOperation) String() string { return s } -//MarshalText returns the text form of the AclOperation (name without prefix) +// MarshalText returns the text form of the AclOperation (name without prefix) func (a *AclOperation) MarshalText() ([]byte, error) { return []byte(a.String()), nil } -//UnmarshalText takes a text reprentation of the operation and converts it to an AclOperation +// UnmarshalText takes a text reprentation of the operation and converts it to an AclOperation func (a *AclOperation) UnmarshalText(text []byte) error { normalized := strings.ToLower(string(text)) mapping := map[string]AclOperation{ @@ -109,12 +109,12 @@ func (a *AclPermissionType) String() string { return s } -//MarshalText returns the text form of the AclPermissionType (name without prefix) +// MarshalText returns the text form of the AclPermissionType (name without prefix) func (a *AclPermissionType) MarshalText() ([]byte, error) { return []byte(a.String()), nil } -//UnmarshalText takes a text reprentation of the permission type and converts it to an AclPermissionType +// UnmarshalText takes a text reprentation of the permission type and converts it to an AclPermissionType func (a *AclPermissionType) UnmarshalText(text []byte) error { normalized := strings.ToLower(string(text)) mapping := map[string]AclPermissionType{ @@ -159,12 +159,12 @@ func (a *AclResourceType) String() string { return s } -//MarshalText returns the text form of the AclResourceType (name without prefix) +// MarshalText returns the text form of the AclResourceType (name without prefix) func (a *AclResourceType) MarshalText() ([]byte, error) { return []byte(a.String()), nil } -//UnmarshalText takes a text reprentation of the resource type and converts it to an AclResourceType +// UnmarshalText takes a text reprentation of the resource type and converts it to an AclResourceType func (a *AclResourceType) UnmarshalText(text []byte) error { normalized := strings.ToLower(string(text)) mapping := map[string]AclResourceType{ @@ -209,12 +209,12 @@ func (a *AclResourcePatternType) String() string { return s } -//MarshalText returns the text form of the AclResourcePatternType (name without prefix) +// MarshalText returns the text form of the AclResourcePatternType (name without prefix) func (a *AclResourcePatternType) MarshalText() ([]byte, error) { return []byte(a.String()), nil } -//UnmarshalText takes a text reprentation of the resource pattern type and converts it to an AclResourcePatternType +// UnmarshalText takes a text reprentation of the resource pattern type and converts it to an AclResourcePatternType func (a *AclResourcePatternType) UnmarshalText(text []byte) error { normalized := strings.ToLower(string(text)) mapping := map[string]AclResourcePatternType{ diff --git a/acl_types_test.go b/acl_types_test.go index 295b7e2bd..0b5247a14 100644 --- a/acl_types_test.go +++ b/acl_types_test.go @@ -37,6 +37,7 @@ func TestAclPermissionTypeTextMarshal(t *testing.T) { } } } + func TestAclResourceTypeTextMarshal(t *testing.T) { for i := AclResourceUnknown; i <= AclResourceTransactionalID; i++ { text, err := i.MarshalText() @@ -53,6 +54,7 @@ func TestAclResourceTypeTextMarshal(t *testing.T) { } } } + func TestAclResourcePatternTypeTextMarshal(t *testing.T) { for i := AclPatternUnknown; i <= AclPatternPrefixed; i++ { text, err := i.MarshalText() diff --git a/add_offsets_to_txn_request.go b/add_offsets_to_txn_request.go index 95586f9a1..a96af9341 100644 --- a/add_offsets_to_txn_request.go +++ b/add_offsets_to_txn_request.go @@ -1,6 +1,6 @@ package sarama -//AddOffsetsToTxnRequest adds offsets to a transaction request +// AddOffsetsToTxnRequest adds offsets to a transaction request type AddOffsetsToTxnRequest struct { TransactionalID string ProducerID int64 diff --git a/add_offsets_to_txn_request_test.go b/add_offsets_to_txn_request_test.go index e96b3d33f..471d085cd 100644 --- a/add_offsets_to_txn_request_test.go +++ b/add_offsets_to_txn_request_test.go @@ -2,14 +2,12 @@ package sarama import "testing" -var ( - addOffsetsToTxnRequest = []byte{ - 0, 3, 't', 'x', 'n', - 0, 0, 0, 0, 0, 0, 31, 64, - 0, 0, - 0, 7, 'g', 'r', 'o', 'u', 'p', 'i', 'd', - } -) +var addOffsetsToTxnRequest = []byte{ + 0, 3, 't', 'x', 'n', + 0, 0, 0, 0, 0, 0, 31, 64, + 0, 0, + 0, 7, 'g', 'r', 'o', 'u', 'p', 'i', 'd', +} func TestAddOffsetsToTxnRequest(t *testing.T) { req := &AddOffsetsToTxnRequest{ diff --git a/add_offsets_to_txn_response.go b/add_offsets_to_txn_response.go index bdb184419..bb61973d1 100644 --- a/add_offsets_to_txn_response.go +++ b/add_offsets_to_txn_response.go @@ -4,7 +4,7 @@ import ( "time" ) -//AddOffsetsToTxnResponse is a response type for adding offsets to txns +// AddOffsetsToTxnResponse is a response type for adding offsets to txns type AddOffsetsToTxnResponse struct { ThrottleTime time.Duration Err KError diff --git a/add_offsets_to_txn_response_test.go b/add_offsets_to_txn_response_test.go index 4504966fe..d1730cee4 100644 --- a/add_offsets_to_txn_response_test.go +++ b/add_offsets_to_txn_response_test.go @@ -5,12 +5,10 @@ import ( "time" ) -var ( - addOffsetsToTxnResponse = []byte{ - 0, 0, 0, 100, - 0, 47, - } -) +var addOffsetsToTxnResponse = []byte{ + 0, 0, 0, 100, + 0, 47, +} func TestAddOffsetsToTxnResponse(t *testing.T) { resp := &AddOffsetsToTxnResponse{ diff --git a/add_partitions_to_txn_request.go b/add_partitions_to_txn_request.go index 6289f4514..57ecf6488 100644 --- a/add_partitions_to_txn_request.go +++ b/add_partitions_to_txn_request.go @@ -1,6 +1,6 @@ package sarama -//AddPartitionsToTxnRequest is a add paartition request +// AddPartitionsToTxnRequest is a add paartition request type AddPartitionsToTxnRequest struct { TransactionalID string ProducerID int64 diff --git a/add_partitions_to_txn_request_test.go b/add_partitions_to_txn_request_test.go index f8dfb27d0..f60a88695 100644 --- a/add_partitions_to_txn_request_test.go +++ b/add_partitions_to_txn_request_test.go @@ -2,16 +2,14 @@ package sarama import "testing" -var ( - addPartitionsToTxnRequest = []byte{ - 0, 3, 't', 'x', 'n', - 0, 0, 0, 0, 0, 0, 31, 64, // ProducerID - 0, 0, 0, 0, // ProducerEpoch - 0, 1, // 1 topic - 0, 5, 't', 'o', 'p', 'i', 'c', - 0, 0, 0, 1, 0, 0, 0, 1, - } -) +var addPartitionsToTxnRequest = []byte{ + 0, 3, 't', 'x', 'n', + 0, 0, 0, 0, 0, 0, 31, 64, // ProducerID + 0, 0, 0, 0, // ProducerEpoch + 0, 1, // 1 topic + 0, 5, 't', 'o', 'p', 'i', 'c', + 0, 0, 0, 1, 0, 0, 0, 1, +} func TestAddPartitionsToTxnRequest(t *testing.T) { req := &AddPartitionsToTxnRequest{ diff --git a/add_partitions_to_txn_response.go b/add_partitions_to_txn_response.go index 73b73b07f..098956507 100644 --- a/add_partitions_to_txn_response.go +++ b/add_partitions_to_txn_response.go @@ -4,7 +4,7 @@ import ( "time" ) -//AddPartitionsToTxnResponse is a partition errors to transaction type +// AddPartitionsToTxnResponse is a partition errors to transaction type type AddPartitionsToTxnResponse struct { ThrottleTime time.Duration Errors map[string][]*PartitionError @@ -87,7 +87,7 @@ func (a *AddPartitionsToTxnResponse) requiredVersion() KafkaVersion { return V0_11_0_0 } -//PartitionError is a partition error type +// PartitionError is a partition error type type PartitionError struct { Partition int32 Err KError diff --git a/add_partitions_to_txn_response_test.go b/add_partitions_to_txn_response_test.go index 9cf93e821..b3635e58d 100644 --- a/add_partitions_to_txn_response_test.go +++ b/add_partitions_to_txn_response_test.go @@ -5,16 +5,14 @@ import ( "time" ) -var ( - addPartitionsToTxnResponse = []byte{ - 0, 0, 0, 100, - 0, 0, 0, 1, - 0, 5, 't', 'o', 'p', 'i', 'c', - 0, 0, 0, 1, // 1 partition error - 0, 0, 0, 2, // partition 2 - 0, 48, // error - } -) +var addPartitionsToTxnResponse = []byte{ + 0, 0, 0, 100, + 0, 0, 0, 1, + 0, 5, 't', 'o', 'p', 'i', 'c', + 0, 0, 0, 1, // 1 partition error + 0, 0, 0, 2, // partition 2 + 0, 48, // error +} func TestAddPartitionsToTxnResponse(t *testing.T) { resp := &AddPartitionsToTxnResponse{ diff --git a/admin.go b/admin.go index 618981f5e..abe18b19f 100644 --- a/admin.go +++ b/admin.go @@ -134,7 +134,7 @@ func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) { // NewClusterAdminFromClient creates a new ClusterAdmin using the given client. // Note that underlying client will also be closed on admin's Close() call. func NewClusterAdminFromClient(client Client) (ClusterAdmin, error) { - //make sure we can retrieve the controller + // make sure we can retrieve the controller _, err := client.Controller() if err != nil { return nil, err @@ -592,8 +592,8 @@ func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]i if len(errs) > 0 { return ErrDeleteRecords{MultiError{&errs}} } - //todo since we are dealing with couple of partitions it would be good if we return slice of errors - //for each partition instead of one error + // todo since we are dealing with couple of partitions it would be good if we return slice of errors + // for each partition instead of one error return nil } diff --git a/admin_test.go b/admin_test.go index 754003f68..4ab91c05a 100644 --- a/admin_test.go +++ b/admin_test.go @@ -357,7 +357,7 @@ func TestClusterAdminAlterPartitionReassignments(t *testing.T) { t.Fatal(err) } - var topicAssignment = make([][]int32, 0, 3) + topicAssignment := make([][]int32, 0, 3) err = admin.AlterPartitionReassignments("my_topic", topicAssignment) if err != nil { @@ -395,7 +395,7 @@ func TestClusterAdminAlterPartitionReassignmentsWithDiffVersion(t *testing.T) { t.Fatal(err) } - var topicAssignment = make([][]int32, 0, 3) + topicAssignment := make([][]int32, 0, 3) err = admin.AlterPartitionReassignments("my_topic", topicAssignment) @@ -479,7 +479,7 @@ func TestClusterAdminListPartitionReassignmentsWithDiffVersion(t *testing.T) { t.Fatal(err) } - var partitions = make([]int32, 0) + partitions := make([]int32, 0) _, err = admin.ListPartitionReassignments("my_topic", partitions) @@ -649,7 +649,7 @@ func TestClusterAdminDescribeConfig(t *testing.T) { "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t), }) - var tests = []struct { + tests := []struct { saramaVersion KafkaVersion requestVersion int16 includeSynonyms bool diff --git a/alter_configs_request.go b/alter_configs_request.go index c88bb604a..8b94b1f3f 100644 --- a/alter_configs_request.go +++ b/alter_configs_request.go @@ -1,12 +1,12 @@ package sarama -//AlterConfigsRequest is an alter config request type +// AlterConfigsRequest is an alter config request type type AlterConfigsRequest struct { Resources []*AlterConfigsResource ValidateOnly bool } -//AlterConfigsResource is an alter config resource type +// AlterConfigsResource is an alter config resource type type AlterConfigsResource struct { Type ConfigResourceType Name string diff --git a/alter_configs_request_test.go b/alter_configs_request_test.go index 73c8a622d..e612244c6 100644 --- a/alter_configs_request_test.go +++ b/alter_configs_request_test.go @@ -12,7 +12,7 @@ var ( 0, 0, 0, 1, // 1 config 2, // a topic 0, 3, 'f', 'o', 'o', // topic name: foo - 0, 0, 0, 1, //1 config name + 0, 0, 0, 1, // 1 config name 0, 10, // 10 chars 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's', 0, 4, @@ -24,14 +24,14 @@ var ( 0, 0, 0, 2, // 2 config 2, // a topic 0, 3, 'f', 'o', 'o', // topic name: foo - 0, 0, 0, 1, //1 config name + 0, 0, 0, 1, // 1 config name 0, 10, // 10 chars 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's', 0, 4, '1', '0', '0', '0', 2, // a topic 0, 3, 'b', 'a', 'r', // topic name: foo - 0, 0, 0, 1, //2 config + 0, 0, 0, 1, // 2 config 0, 12, // 12 chars 'r', 'e', 't', 'e', 'n', 't', 'i', 'o', 'n', '.', 'm', 's', 0, 4, diff --git a/alter_configs_response.go b/alter_configs_response.go index 3266f9274..89065ab11 100644 --- a/alter_configs_response.go +++ b/alter_configs_response.go @@ -2,13 +2,13 @@ package sarama import "time" -//AlterConfigsResponse is a response type for alter config +// AlterConfigsResponse is a response type for alter config type AlterConfigsResponse struct { ThrottleTime time.Duration Resources []*AlterConfigsResourceResponse } -//AlterConfigsResourceResponse is a response type for alter config resource +// AlterConfigsResourceResponse is a response type for alter config resource type AlterConfigsResourceResponse struct { ErrorCode int16 ErrorMsg string diff --git a/alter_configs_response_test.go b/alter_configs_response_test.go index 9f44dd95b..62cd5991f 100644 --- a/alter_configs_response_test.go +++ b/alter_configs_response_test.go @@ -6,15 +6,15 @@ import ( var ( alterResponseEmpty = []byte{ - 0, 0, 0, 0, //throttle + 0, 0, 0, 0, // throttle 0, 0, 0, 0, // no configs } alterResponsePopulated = []byte{ - 0, 0, 0, 0, //throttle + 0, 0, 0, 0, // throttle 0, 0, 0, 1, // response - 0, 0, //errorcode - 0, 0, //string + 0, 0, // errorcode + 0, 0, // string 2, // topic 0, 3, 'f', 'o', 'o', } diff --git a/alter_partition_reassignments_response_test.go b/alter_partition_reassignments_response_test.go index 25f1fd792..8c39ef6ce 100644 --- a/alter_partition_reassignments_response_test.go +++ b/alter_partition_reassignments_response_test.go @@ -19,7 +19,7 @@ var ( 6, 116, 111, 112, 105, 99, // topic name "topic" 2, // partition array length 1 0, 0, 0, 1, // partitionId - 0, 3, //kerror + 0, 3, // kerror 7, 101, 114, 114, 111, 114, 50, // error string "error2" 0, 0, 0, // empty tagged fields } diff --git a/api_versions_request.go b/api_versions_request.go index d67c5e1e5..bee92c0e7 100644 --- a/api_versions_request.go +++ b/api_versions_request.go @@ -1,8 +1,7 @@ package sarama -//ApiVersionsRequest ... -type ApiVersionsRequest struct { -} +// ApiVersionsRequest ... +type ApiVersionsRequest struct{} func (a *ApiVersionsRequest) encode(pe packetEncoder) error { return nil diff --git a/api_versions_request_test.go b/api_versions_request_test.go index 4bb2fdac3..5fd21496a 100644 --- a/api_versions_request_test.go +++ b/api_versions_request_test.go @@ -2,9 +2,7 @@ package sarama import "testing" -var ( - apiVersionRequest []byte -) +var apiVersionRequest []byte func TestApiVersionsRequest(t *testing.T) { request := new(ApiVersionsRequest) diff --git a/api_versions_response.go b/api_versions_response.go index d09e8d9e1..0e72e3926 100644 --- a/api_versions_response.go +++ b/api_versions_response.go @@ -1,6 +1,6 @@ package sarama -//ApiVersionsResponseBlock is an api version response block type +// ApiVersionsResponseBlock is an api version response block type type ApiVersionsResponseBlock struct { ApiKey int16 MinVersion int16 @@ -32,7 +32,7 @@ func (b *ApiVersionsResponseBlock) decode(pd packetDecoder) error { return nil } -//ApiVersionsResponse is an api version response type +// ApiVersionsResponse is an api version response type type ApiVersionsResponse struct { Err KError ApiVersions []*ApiVersionsResponseBlock diff --git a/api_versions_response_test.go b/api_versions_response_test.go index 58b065834..e07bf785a 100644 --- a/api_versions_response_test.go +++ b/api_versions_response_test.go @@ -2,15 +2,13 @@ package sarama import "testing" -var ( - apiVersionResponse = []byte{ - 0x00, 0x00, - 0x00, 0x00, 0x00, 0x01, - 0x00, 0x03, - 0x00, 0x02, - 0x00, 0x01, - } -) +var apiVersionResponse = []byte{ + 0x00, 0x00, + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x03, + 0x00, 0x02, + 0x00, 0x01, +} func TestApiVersionsResponse(t *testing.T) { response := new(ApiVersionsResponse) diff --git a/async_producer.go b/async_producer.go index 209fd2d34..5911f7b58 100644 --- a/async_producer.go +++ b/async_producer.go @@ -449,7 +449,6 @@ func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error { } return }) - if err != nil { return err } diff --git a/async_producer_test.go b/async_producer_test.go index 855d6ae2a..e571aa068 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -961,7 +961,7 @@ func TestAsyncProducerIdempotentGoldenPath(t *testing.T) { } func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) { - //Logger = log.New(os.Stderr, "", log.LstdFlags) + // Logger = log.New(os.Stderr, "", log.LstdFlags) tests := []struct { name string failAfterWrite bool @@ -1028,9 +1028,9 @@ func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) { batchFirstSeq := int(batch.FirstSequence) batchSize := len(batch.Records) - if lastSequenceWrittenToDisk == batchFirstSeq-1 { //in sequence append - if lastBatchFirstSeq == batchFirstSeq { //is a batch retry - if lastBatchSize == batchSize { //good retry + if lastSequenceWrittenToDisk == batchFirstSeq-1 { // in sequence append + if lastBatchFirstSeq == batchFirstSeq { // is a batch retry + if lastBatchSize == batchSize { // good retry // mock write to disk lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1 return prodSuccessResponse @@ -1060,7 +1060,7 @@ func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) { // mock write to disk lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1 return prodSuccessResponse - } //out of sequence / bad retried batch + } // out of sequence / bad retried batch if lastBatchFirstSeq == batchFirstSeq && lastBatchSize != batchSize { t.Errorf("[%s] Retried Batch firstSeq=%d with different size old=%d new=%d", test.name, batchFirstSeq, lastBatchSize, batchSize) } else if lastSequenceWrittenToDisk+1 != batchFirstSeq { diff --git a/balance_strategy.go b/balance_strategy.go index 8f7634f94..9855bf443 100644 --- a/balance_strategy.go +++ b/balance_strategy.go @@ -138,6 +138,7 @@ func (p balanceStrategySortable) Len() int { return len(p.memberIDs) } func (p balanceStrategySortable) Swap(i, j int) { p.memberIDs[i], p.memberIDs[j] = p.memberIDs[j], p.memberIDs[i] } + func (p balanceStrategySortable) Less(i, j int) bool { return balanceStrategyHashValue(p.topic, p.memberIDs[i]) < balanceStrategyHashValue(p.topic, p.memberIDs[j]) } diff --git a/balance_strategy_test.go b/balance_strategy_test.go index f5ab2fd6f..adae80caa 100644 --- a/balance_strategy_test.go +++ b/balance_strategy_test.go @@ -1967,6 +1967,7 @@ func Test_stickyBalanceStrategy_Plan_AssignmentWithMultipleGenerations2(t *testi verifyPlanIsBalancedAndSticky(t, s, members, plan3, err) verifyFullyBalanced(t, plan3) } + func Test_stickyBalanceStrategy_Plan_AssignmentWithConflictingPreviousGenerations(t *testing.T) { s := &stickyBalanceStrategy{} diff --git a/broker.go b/broker.go index 0b3ea969c..664b74f54 100644 --- a/broker.go +++ b/broker.go @@ -223,7 +223,7 @@ func (b *Broker) Connected() (bool, error) { return b.conn != nil, b.connErr } -//Close closes the broker resources +// Close closes the broker resources func (b *Broker) Close() error { b.lock.Lock() defer b.lock.Unlock() @@ -276,12 +276,11 @@ func (b *Broker) Rack() string { return *b.rack } -//GetMetadata send a metadata request and returns a metadata response or error +// GetMetadata send a metadata request and returns a metadata response or error func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) { response := new(MetadataResponse) err := b.sendAndReceive(request, response) - if err != nil { return nil, err } @@ -289,12 +288,11 @@ func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error return response, nil } -//GetConsumerMetadata send a consumer metadata request and returns a consumer metadata response or error +// GetConsumerMetadata send a consumer metadata request and returns a consumer metadata response or error func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) { response := new(ConsumerMetadataResponse) err := b.sendAndReceive(request, response) - if err != nil { return nil, err } @@ -302,12 +300,11 @@ func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*Consume return response, nil } -//FindCoordinator sends a find coordinate request and returns a response or error +// FindCoordinator sends a find coordinate request and returns a response or error func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) { response := new(FindCoordinatorResponse) err := b.sendAndReceive(request, response) - if err != nil { return nil, err } @@ -315,12 +312,11 @@ func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordina return response, nil } -//GetAvailableOffsets return an offset response or error +// GetAvailableOffsets return an offset response or error func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) { response := new(OffsetResponse) err := b.sendAndReceive(request, response) - if err != nil { return nil, err } @@ -328,7 +324,7 @@ func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, e return response, nil } -//Produce returns a produce response or error +// Produce returns a produce response or error func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) { var ( response *ProduceResponse @@ -349,7 +345,7 @@ func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) { return response, nil } -//Fetch returns a FetchResponse or error +// Fetch returns a FetchResponse or error func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) { response := new(FetchResponse) @@ -361,7 +357,7 @@ func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) { return response, nil } -//CommitOffset return an Offset commit response or error +// CommitOffset return an Offset commit response or error func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) { response := new(OffsetCommitResponse) @@ -373,7 +369,7 @@ func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitRespon return response, nil } -//FetchOffset returns an offset fetch response or error +// FetchOffset returns an offset fetch response or error func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) { response := new(OffsetFetchResponse) response.Version = request.Version // needed to handle the two header versions @@ -386,7 +382,7 @@ func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, return response, nil } -//JoinGroup returns a join group response or error +// JoinGroup returns a join group response or error func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) { response := new(JoinGroupResponse) @@ -398,7 +394,7 @@ func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error return response, nil } -//SyncGroup returns a sync group response or error +// SyncGroup returns a sync group response or error func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) { response := new(SyncGroupResponse) @@ -410,7 +406,7 @@ func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error return response, nil } -//LeaveGroup return a leave group response or error +// LeaveGroup return a leave group response or error func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) { response := new(LeaveGroupResponse) @@ -422,7 +418,7 @@ func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, er return response, nil } -//Heartbeat returns a heartbeat response or error +// Heartbeat returns a heartbeat response or error func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) { response := new(HeartbeatResponse) @@ -434,7 +430,7 @@ func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error return response, nil } -//ListGroups return a list group response or error +// ListGroups return a list group response or error func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) { response := new(ListGroupsResponse) @@ -446,7 +442,7 @@ func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, er return response, nil } -//DescribeGroups return describe group response or error +// DescribeGroups return describe group response or error func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) { response := new(DescribeGroupsResponse) @@ -458,7 +454,7 @@ func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroups return response, nil } -//ApiVersions return api version response or error +// ApiVersions return api version response or error func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) { response := new(ApiVersionsResponse) @@ -470,7 +466,7 @@ func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, return response, nil } -//CreateTopics send a create topic request and returns create topic response +// CreateTopics send a create topic request and returns create topic response func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) { response := new(CreateTopicsResponse) @@ -482,7 +478,7 @@ func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsRespon return response, nil } -//DeleteTopics sends a delete topic request and returns delete topic response +// DeleteTopics sends a delete topic request and returns delete topic response func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) { response := new(DeleteTopicsResponse) @@ -494,8 +490,8 @@ func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsRespon return response, nil } -//CreatePartitions sends a create partition request and returns create -//partitions response or error +// CreatePartitions sends a create partition request and returns create +// partitions response or error func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) { response := new(CreatePartitionsResponse) @@ -507,8 +503,8 @@ func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePart return response, nil } -//AlterPartitionReassignments sends a alter partition reassignments request and -//returns alter partition reassignments response +// AlterPartitionReassignments sends a alter partition reassignments request and +// returns alter partition reassignments response func (b *Broker) AlterPartitionReassignments(request *AlterPartitionReassignmentsRequest) (*AlterPartitionReassignmentsResponse, error) { response := new(AlterPartitionReassignmentsResponse) @@ -520,8 +516,8 @@ func (b *Broker) AlterPartitionReassignments(request *AlterPartitionReassignment return response, nil } -//ListPartitionReassignments sends a list partition reassignments request and -//returns list partition reassignments response +// ListPartitionReassignments sends a list partition reassignments request and +// returns list partition reassignments response func (b *Broker) ListPartitionReassignments(request *ListPartitionReassignmentsRequest) (*ListPartitionReassignmentsResponse, error) { response := new(ListPartitionReassignmentsResponse) @@ -533,8 +529,8 @@ func (b *Broker) ListPartitionReassignments(request *ListPartitionReassignmentsR return response, nil } -//DeleteRecords send a request to delete records and return delete record -//response or error +// DeleteRecords send a request to delete records and return delete record +// response or error func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) { response := new(DeleteRecordsResponse) @@ -546,7 +542,7 @@ func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsRes return response, nil } -//DescribeAcls sends a describe acl request and returns a response or error +// DescribeAcls sends a describe acl request and returns a response or error func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) { response := new(DescribeAclsResponse) @@ -558,7 +554,7 @@ func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsRespon return response, nil } -//CreateAcls sends a create acl request and returns a response or error +// CreateAcls sends a create acl request and returns a response or error func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) { response := new(CreateAclsResponse) @@ -570,7 +566,7 @@ func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, er return response, nil } -//DeleteAcls sends a delete acl request and returns a response or error +// DeleteAcls sends a delete acl request and returns a response or error func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) { response := new(DeleteAclsResponse) @@ -582,7 +578,7 @@ func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, er return response, nil } -//InitProducerID sends an init producer request and returns a response or error +// InitProducerID sends an init producer request and returns a response or error func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) { response := new(InitProducerIDResponse) @@ -594,8 +590,8 @@ func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerID return response, nil } -//AddPartitionsToTxn send a request to add partition to txn and returns -//a response or error +// AddPartitionsToTxn send a request to add partition to txn and returns +// a response or error func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) { response := new(AddPartitionsToTxnResponse) @@ -607,8 +603,8 @@ func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPar return response, nil } -//AddOffsetsToTxn sends a request to add offsets to txn and returns a response -//or error +// AddOffsetsToTxn sends a request to add offsets to txn and returns a response +// or error func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) { response := new(AddOffsetsToTxnResponse) @@ -620,7 +616,7 @@ func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsTo return response, nil } -//EndTxn sends a request to end txn and returns a response or error +// EndTxn sends a request to end txn and returns a response or error func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) { response := new(EndTxnResponse) @@ -632,8 +628,8 @@ func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) { return response, nil } -//TxnOffsetCommit sends a request to commit transaction offsets and returns -//a response or error +// TxnOffsetCommit sends a request to commit transaction offsets and returns +// a response or error func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) { response := new(TxnOffsetCommitResponse) @@ -645,8 +641,8 @@ func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCom return response, nil } -//DescribeConfigs sends a request to describe config and returns a response or -//error +// DescribeConfigs sends a request to describe config and returns a response or +// error func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) { response := new(DescribeConfigsResponse) @@ -658,7 +654,7 @@ func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConf return response, nil } -//AlterConfigs sends a request to alter config and return a response or error +// AlterConfigs sends a request to alter config and return a response or error func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) { response := new(AlterConfigsResponse) @@ -670,7 +666,7 @@ func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsRespon return response, nil } -//DeleteGroups sends a request to delete groups and returns a response or error +// DeleteGroups sends a request to delete groups and returns a response or error func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) { response := new(DeleteGroupsResponse) @@ -681,7 +677,7 @@ func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsRespon return response, nil } -//DescribeLogDirs sends a request to get the broker's log dir paths and sizes +// DescribeLogDirs sends a request to get the broker's log dir paths and sizes func (b *Broker) DescribeLogDirs(request *DescribeLogDirsRequest) (*DescribeLogDirsResponse, error) { response := new(DescribeLogDirsResponse) @@ -876,7 +872,7 @@ func (b *Broker) responseReceiver() { continue } - var headerLength = getHeaderLength(response.headerVersion) + headerLength := getHeaderLength(response.headerVersion) header := make([]byte, headerLength) bytesReadHeader, err := b.readFull(header) @@ -1049,7 +1045,7 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error { // sendAndReceiveV0SASLPlainAuth flows the v0 sasl auth NOT wrapped in the kafka protocol func (b *Broker) sendAndReceiveV0SASLPlainAuth() error { length := len(b.conf.Net.SASL.AuthIdentity) + 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password) - authBytes := make([]byte, length+4) //4 byte length header + auth data + authBytes := make([]byte, length+4) // 4 byte length header + auth data binary.BigEndian.PutUint32(authBytes, uint32(length)) copy(authBytes[4:], b.conf.Net.SASL.AuthIdentity+"\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password) diff --git a/broker_test.go b/broker_test.go index 5f859c8f2..2fa40ceb4 100644 --- a/broker_test.go +++ b/broker_test.go @@ -239,7 +239,6 @@ func TestSASLOAuthBearer(t *testing.T) { } conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String()) - if err != nil { t.Fatal(err) } @@ -370,7 +369,6 @@ func TestSASLSCRAMSHAXXX(t *testing.T) { } conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String()) - if err != nil { t.Fatal(err) } @@ -480,7 +478,6 @@ func TestSASLPlainAuth(t *testing.T) { } conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String()) - if err != nil { t.Fatal(err) } @@ -658,7 +655,6 @@ func TestGSSAPIKerberosAuth_Authorize(t *testing.T) { } conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String()) - if err != nil { t.Fatal(err) } @@ -756,7 +752,8 @@ var brokerTestTable = []struct { response []byte runner func(*testing.T, *Broker) }{ - {V0_10_0_0, + { + V0_10_0_0, "MetadataRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { @@ -768,9 +765,11 @@ var brokerTestTable = []struct { if response == nil { t.Error("Metadata request got no response!") } - }}, + }, + }, - {V0_10_0_0, + { + V0_10_0_0, "ConsumerMetadataRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { @@ -782,9 +781,11 @@ var brokerTestTable = []struct { if response == nil { t.Error("Consumer Metadata request got no response!") } - }}, + }, + }, - {V0_10_0_0, + { + V0_10_0_0, "ProduceRequest (NoResponse)", []byte{}, func(t *testing.T, broker *Broker) { @@ -797,9 +798,11 @@ var brokerTestTable = []struct { if response != nil { t.Error("Produce request with NoResponse got a response!") } - }}, + }, + }, - {V0_10_0_0, + { + V0_10_0_0, "ProduceRequest (WaitForLocal)", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { @@ -812,9 +815,11 @@ var brokerTestTable = []struct { if response == nil { t.Error("Produce request without NoResponse got no response!") } - }}, + }, + }, - {V0_10_0_0, + { + V0_10_0_0, "FetchRequest", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { @@ -826,9 +831,11 @@ var brokerTestTable = []struct { if response == nil { t.Error("Fetch request got no response!") } - }}, + }, + }, - {V0_10_0_0, + { + V0_10_0_0, "OffsetFetchRequest", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { @@ -840,9 +847,11 @@ var brokerTestTable = []struct { if response == nil { t.Error("OffsetFetch request got no response!") } - }}, + }, + }, - {V0_10_0_0, + { + V0_10_0_0, "OffsetCommitRequest", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { @@ -854,9 +863,11 @@ var brokerTestTable = []struct { if response == nil { t.Error("OffsetCommit request got no response!") } - }}, + }, + }, - {V0_10_0_0, + { + V0_10_0_0, "OffsetRequest", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { @@ -868,9 +879,11 @@ var brokerTestTable = []struct { if response == nil { t.Error("Offset request got no response!") } - }}, + }, + }, - {V0_10_0_0, + { + V0_10_0_0, "JoinGroupRequest", []byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { @@ -882,9 +895,11 @@ var brokerTestTable = []struct { if response == nil { t.Error("JoinGroup request got no response!") } - }}, + }, + }, - {V0_10_0_0, + { + V0_10_0_0, "SyncGroupRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { @@ -896,9 +911,11 @@ var brokerTestTable = []struct { if response == nil { t.Error("SyncGroup request got no response!") } - }}, + }, + }, - {V0_10_0_0, + { + V0_10_0_0, "LeaveGroupRequest", []byte{0x00, 0x00}, func(t *testing.T, broker *Broker) { @@ -910,9 +927,11 @@ var brokerTestTable = []struct { if response == nil { t.Error("LeaveGroup request got no response!") } - }}, + }, + }, - {V0_10_0_0, + { + V0_10_0_0, "HeartbeatRequest", []byte{0x00, 0x00}, func(t *testing.T, broker *Broker) { @@ -924,9 +943,11 @@ var brokerTestTable = []struct { if response == nil { t.Error("Heartbeat request got no response!") } - }}, + }, + }, - {V0_10_0_0, + { + V0_10_0_0, "ListGroupsRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { @@ -938,9 +959,11 @@ var brokerTestTable = []struct { if response == nil { t.Error("ListGroups request got no response!") } - }}, + }, + }, - {V0_10_0_0, + { + V0_10_0_0, "DescribeGroupsRequest", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { @@ -952,9 +975,11 @@ var brokerTestTable = []struct { if response == nil { t.Error("DescribeGroups request got no response!") } - }}, + }, + }, - {V0_10_0_0, + { + V0_10_0_0, "ApiVersionsRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { @@ -966,9 +991,11 @@ var brokerTestTable = []struct { if response == nil { t.Error("ApiVersions request got no response!") } - }}, + }, + }, - {V1_1_0_0, + { + V1_1_0_0, "DeleteGroupsRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { @@ -980,7 +1007,8 @@ var brokerTestTable = []struct { if response == nil { t.Error("DeleteGroups request got no response!") } - }}, + }, + }, } func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) { diff --git a/client.go b/client.go index 85180a413..c0918ba35 100644 --- a/client.go +++ b/client.go @@ -493,7 +493,6 @@ func (client *client) GetOffset(topic string, partitionID int32, time int64) (in } offset, err := client.getOffset(topic, partitionID, time) - if err != nil { if err := client.RefreshMetadata(topic); err != nil { return -1, err @@ -607,7 +606,7 @@ func (client *client) randomizeSeedBrokers(addrs []string) { } func (client *client) updateBroker(brokers []*Broker) { - var currentBroker = make(map[int32]*Broker, len(brokers)) + currentBroker := make(map[int32]*Broker, len(brokers)) for _, broker := range brokers { currentBroker[broker.ID()] = broker @@ -1053,7 +1052,6 @@ func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemainin request.CoordinatorType = CoordinatorGroup response, err := broker.FindCoordinator(request) - if err != nil { Logger.Printf("client/coordinator request to broker %s failed: %s\n", broker.Addr(), err) diff --git a/config_test.go b/config_test.go index 2e34ba3d1..6d2b6fef9 100644 --- a/config_test.go +++ b/config_test.go @@ -33,8 +33,7 @@ func TestEmptyClientIDConfigValidates(t *testing.T) { } } -type DummyTokenProvider struct { -} +type DummyTokenProvider struct{} func (t *DummyTokenProvider) Token() (*AccessToken, error) { return &AccessToken{Token: "access-token-string"}, nil @@ -51,50 +50,66 @@ func TestNetConfigValidates(t *testing.T) { func(cfg *Config) { cfg.Net.MaxOpenRequests = 0 }, - "Net.MaxOpenRequests must be > 0"}, - {"DialTimeout", + "Net.MaxOpenRequests must be > 0", + }, + { + "DialTimeout", func(cfg *Config) { cfg.Net.DialTimeout = 0 }, - "Net.DialTimeout must be > 0"}, - {"ReadTimeout", + "Net.DialTimeout must be > 0", + }, + { + "ReadTimeout", func(cfg *Config) { cfg.Net.ReadTimeout = 0 }, - "Net.ReadTimeout must be > 0"}, - {"WriteTimeout", + "Net.ReadTimeout must be > 0", + }, + { + "WriteTimeout", func(cfg *Config) { cfg.Net.WriteTimeout = 0 }, - "Net.WriteTimeout must be > 0"}, - {"SASL.User", + "Net.WriteTimeout must be > 0", + }, + { + "SASL.User", func(cfg *Config) { cfg.Net.SASL.Enable = true cfg.Net.SASL.User = "" }, - "Net.SASL.User must not be empty when SASL is enabled"}, - {"SASL.Password", + "Net.SASL.User must not be empty when SASL is enabled", + }, + { + "SASL.Password", func(cfg *Config) { cfg.Net.SASL.Enable = true cfg.Net.SASL.User = "user" cfg.Net.SASL.Password = "" }, - "Net.SASL.Password must not be empty when SASL is enabled"}, - {"SASL.Mechanism - Invalid mechanism type", + "Net.SASL.Password must not be empty when SASL is enabled", + }, + { + "SASL.Mechanism - Invalid mechanism type", func(cfg *Config) { cfg.Net.SASL.Enable = true cfg.Net.SASL.Mechanism = "AnIncorrectSASLMechanism" cfg.Net.SASL.TokenProvider = &DummyTokenProvider{} }, - "The SASL mechanism configuration is invalid. Possible values are `OAUTHBEARER`, `PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512` and `GSSAPI`"}, - {"SASL.Mechanism.OAUTHBEARER - Missing token provider", + "The SASL mechanism configuration is invalid. Possible values are `OAUTHBEARER`, `PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512` and `GSSAPI`", + }, + { + "SASL.Mechanism.OAUTHBEARER - Missing token provider", func(cfg *Config) { cfg.Net.SASL.Enable = true cfg.Net.SASL.Mechanism = SASLTypeOAuth cfg.Net.SASL.TokenProvider = nil }, - "An AccessTokenProvider instance must be provided to Net.SASL.TokenProvider"}, - {"SASL.Mechanism SCRAM-SHA-256 - Missing SCRAM client", + "An AccessTokenProvider instance must be provided to Net.SASL.TokenProvider", + }, + { + "SASL.Mechanism SCRAM-SHA-256 - Missing SCRAM client", func(cfg *Config) { cfg.Net.SASL.Enable = true cfg.Net.SASL.Mechanism = SASLTypeSCRAMSHA256 @@ -102,8 +117,10 @@ func TestNetConfigValidates(t *testing.T) { cfg.Net.SASL.User = "user" cfg.Net.SASL.Password = "stong_password" }, - "A SCRAMClientGeneratorFunc function must be provided to Net.SASL.SCRAMClientGeneratorFunc"}, - {"SASL.Mechanism SCRAM-SHA-512 - Missing SCRAM client", + "A SCRAMClientGeneratorFunc function must be provided to Net.SASL.SCRAMClientGeneratorFunc", + }, + { + "SASL.Mechanism SCRAM-SHA-512 - Missing SCRAM client", func(cfg *Config) { cfg.Net.SASL.Enable = true cfg.Net.SASL.Mechanism = SASLTypeSCRAMSHA512 @@ -111,8 +128,10 @@ func TestNetConfigValidates(t *testing.T) { cfg.Net.SASL.User = "user" cfg.Net.SASL.Password = "stong_password" }, - "A SCRAMClientGeneratorFunc function must be provided to Net.SASL.SCRAMClientGeneratorFunc"}, - {"SASL.Mechanism GSSAPI (Kerberos) - Using User/Password, Missing password field", + "A SCRAMClientGeneratorFunc function must be provided to Net.SASL.SCRAMClientGeneratorFunc", + }, + { + "SASL.Mechanism GSSAPI (Kerberos) - Using User/Password, Missing password field", func(cfg *Config) { cfg.Net.SASL.Enable = true cfg.Net.SASL.Mechanism = SASLTypeGSSAPI @@ -123,8 +142,10 @@ func TestNetConfigValidates(t *testing.T) { cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf" }, "Net.SASL.GSSAPI.Password must not be empty when GSS-API " + - "mechanism is used and Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH"}, - {"SASL.Mechanism GSSAPI (Kerberos) - Using User/Password, Missing KeyTabPath field", + "mechanism is used and Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH", + }, + { + "SASL.Mechanism GSSAPI (Kerberos) - Using User/Password, Missing KeyTabPath field", func(cfg *Config) { cfg.Net.SASL.Enable = true cfg.Net.SASL.Mechanism = SASLTypeGSSAPI @@ -135,8 +156,10 @@ func TestNetConfigValidates(t *testing.T) { cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf" }, "Net.SASL.GSSAPI.KeyTabPath must not be empty when GSS-API mechanism is used" + - " and Net.SASL.GSSAPI.AuthType = KRB5_KEYTAB_AUTH"}, - {"SASL.Mechanism GSSAPI (Kerberos) - Missing username", + " and Net.SASL.GSSAPI.AuthType = KRB5_KEYTAB_AUTH", + }, + { + "SASL.Mechanism GSSAPI (Kerberos) - Missing username", func(cfg *Config) { cfg.Net.SASL.Enable = true cfg.Net.SASL.Mechanism = SASLTypeGSSAPI @@ -146,8 +169,10 @@ func TestNetConfigValidates(t *testing.T) { cfg.Net.SASL.GSSAPI.Realm = "kafka" cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf" }, - "Net.SASL.GSSAPI.Username must not be empty when GSS-API mechanism is used"}, - {"SASL.Mechanism GSSAPI (Kerberos) - Missing ServiceName", + "Net.SASL.GSSAPI.Username must not be empty when GSS-API mechanism is used", + }, + { + "SASL.Mechanism GSSAPI (Kerberos) - Missing ServiceName", func(cfg *Config) { cfg.Net.SASL.Enable = true cfg.Net.SASL.Mechanism = SASLTypeGSSAPI @@ -157,8 +182,10 @@ func TestNetConfigValidates(t *testing.T) { cfg.Net.SASL.GSSAPI.Realm = "kafka" cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf" }, - "Net.SASL.GSSAPI.ServiceName must not be empty when GSS-API mechanism is used"}, - {"SASL.Mechanism GSSAPI (Kerberos) - Missing AuthType", + "Net.SASL.GSSAPI.ServiceName must not be empty when GSS-API mechanism is used", + }, + { + "SASL.Mechanism GSSAPI (Kerberos) - Missing AuthType", func(cfg *Config) { cfg.Net.SASL.Enable = true cfg.Net.SASL.GSSAPI.ServiceName = "kafka" @@ -168,8 +195,10 @@ func TestNetConfigValidates(t *testing.T) { cfg.Net.SASL.GSSAPI.Realm = "kafka" cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf" }, - "Net.SASL.GSSAPI.AuthType is invalid. Possible values are KRB5_USER_AUTH and KRB5_KEYTAB_AUTH"}, - {"SASL.Mechanism GSSAPI (Kerberos) - Missing KerberosConfigPath", + "Net.SASL.GSSAPI.AuthType is invalid. Possible values are KRB5_USER_AUTH and KRB5_KEYTAB_AUTH", + }, + { + "SASL.Mechanism GSSAPI (Kerberos) - Missing KerberosConfigPath", func(cfg *Config) { cfg.Net.SASL.Enable = true cfg.Net.SASL.GSSAPI.ServiceName = "kafka" @@ -179,8 +208,10 @@ func TestNetConfigValidates(t *testing.T) { cfg.Net.SASL.GSSAPI.Password = "sarama" cfg.Net.SASL.GSSAPI.Realm = "kafka" }, - "Net.SASL.GSSAPI.KerberosConfigPath must not be empty when GSS-API mechanism is used"}, - {"SASL.Mechanism GSSAPI (Kerberos) - Missing Realm", + "Net.SASL.GSSAPI.KerberosConfigPath must not be empty when GSS-API mechanism is used", + }, + { + "SASL.Mechanism GSSAPI (Kerberos) - Missing Realm", func(cfg *Config) { cfg.Net.SASL.Enable = true cfg.Net.SASL.GSSAPI.ServiceName = "kafka" @@ -190,7 +221,8 @@ func TestNetConfigValidates(t *testing.T) { cfg.Net.SASL.GSSAPI.Password = "sarama" cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf" }, - "Net.SASL.GSSAPI.Realm must not be empty when GSS-API mechanism is used"}, + "Net.SASL.GSSAPI.Realm must not be empty when GSS-API mechanism is used", + }, } for i, test := range tests { @@ -213,17 +245,22 @@ func TestMetadataConfigValidates(t *testing.T) { func(cfg *Config) { cfg.Metadata.Retry.Max = -1 }, - "Metadata.Retry.Max must be >= 0"}, - {"Retry.Backoff", + "Metadata.Retry.Max must be >= 0", + }, + { + "Retry.Backoff", func(cfg *Config) { cfg.Metadata.Retry.Backoff = -1 }, - "Metadata.Retry.Backoff must be >= 0"}, - {"RefreshFrequency", + "Metadata.Retry.Backoff must be >= 0", + }, + { + "RefreshFrequency", func(cfg *Config) { cfg.Metadata.RefreshFrequency = -1 }, - "Metadata.RefreshFrequency must be >= 0"}, + "Metadata.RefreshFrequency must be >= 0", + }, } for i, test := range tests { @@ -241,11 +278,13 @@ func TestAdminConfigValidates(t *testing.T) { cfg func(*Config) // resorting to using a function as a param because of internal composite structs err string }{ - {"Timeout", + { + "Timeout", func(cfg *Config) { cfg.Admin.Timeout = 0 }, - "Admin.Timeout must be > 0"}, + "Admin.Timeout must be > 0", + }, } for i, test := range tests { @@ -268,84 +307,113 @@ func TestProducerConfigValidates(t *testing.T) { func(cfg *Config) { cfg.Producer.MaxMessageBytes = 0 }, - "Producer.MaxMessageBytes must be > 0"}, - {"RequiredAcks", + "Producer.MaxMessageBytes must be > 0", + }, + { + "RequiredAcks", func(cfg *Config) { cfg.Producer.RequiredAcks = -2 }, - "Producer.RequiredAcks must be >= -1"}, - {"Timeout", + "Producer.RequiredAcks must be >= -1", + }, + { + "Timeout", func(cfg *Config) { cfg.Producer.Timeout = 0 }, - "Producer.Timeout must be > 0"}, - {"Partitioner", + "Producer.Timeout must be > 0", + }, + { + "Partitioner", func(cfg *Config) { cfg.Producer.Partitioner = nil }, - "Producer.Partitioner must not be nil"}, - {"Flush.Bytes", + "Producer.Partitioner must not be nil", + }, + { + "Flush.Bytes", func(cfg *Config) { cfg.Producer.Flush.Bytes = -1 }, - "Producer.Flush.Bytes must be >= 0"}, - {"Flush.Messages", + "Producer.Flush.Bytes must be >= 0", + }, + { + "Flush.Messages", func(cfg *Config) { cfg.Producer.Flush.Messages = -1 }, - "Producer.Flush.Messages must be >= 0"}, - {"Flush.Frequency", + "Producer.Flush.Messages must be >= 0", + }, + { + "Flush.Frequency", func(cfg *Config) { cfg.Producer.Flush.Frequency = -1 }, - "Producer.Flush.Frequency must be >= 0"}, - {"Flush.MaxMessages", + "Producer.Flush.Frequency must be >= 0", + }, + { + "Flush.MaxMessages", func(cfg *Config) { cfg.Producer.Flush.MaxMessages = -1 }, - "Producer.Flush.MaxMessages must be >= 0"}, - {"Flush.MaxMessages with Producer.Flush.Messages", + "Producer.Flush.MaxMessages must be >= 0", + }, + { + "Flush.MaxMessages with Producer.Flush.Messages", func(cfg *Config) { cfg.Producer.Flush.MaxMessages = 1 cfg.Producer.Flush.Messages = 2 }, - "Producer.Flush.MaxMessages must be >= Producer.Flush.Messages when set"}, - {"Flush.Retry.Max", + "Producer.Flush.MaxMessages must be >= Producer.Flush.Messages when set", + }, + { + "Flush.Retry.Max", func(cfg *Config) { cfg.Producer.Retry.Max = -1 }, - "Producer.Retry.Max must be >= 0"}, - {"Flush.Retry.Backoff", + "Producer.Retry.Max must be >= 0", + }, + { + "Flush.Retry.Backoff", func(cfg *Config) { cfg.Producer.Retry.Backoff = -1 }, - "Producer.Retry.Backoff must be >= 0"}, - {"Idempotent Version", + "Producer.Retry.Backoff must be >= 0", + }, + { + "Idempotent Version", func(cfg *Config) { cfg.Producer.Idempotent = true cfg.Version = V0_10_0_0 }, - "Idempotent producer requires Version >= V0_11_0_0"}, - {"Idempotent with Producer.Retry.Max", + "Idempotent producer requires Version >= V0_11_0_0", + }, + { + "Idempotent with Producer.Retry.Max", func(cfg *Config) { cfg.Version = V0_11_0_0 cfg.Producer.Idempotent = true cfg.Producer.Retry.Max = 0 }, - "Idempotent producer requires Producer.Retry.Max >= 1"}, - {"Idempotent with Producer.RequiredAcks", + "Idempotent producer requires Producer.Retry.Max >= 1", + }, + { + "Idempotent with Producer.RequiredAcks", func(cfg *Config) { cfg.Version = V0_11_0_0 cfg.Producer.Idempotent = true }, - "Idempotent producer requires Producer.RequiredAcks to be WaitForAll"}, - {"Idempotent with Net.MaxOpenRequests", + "Idempotent producer requires Producer.RequiredAcks to be WaitForAll", + }, + { + "Idempotent with Net.MaxOpenRequests", func(cfg *Config) { cfg.Version = V0_11_0_0 cfg.Producer.Idempotent = true cfg.Producer.RequiredAcks = WaitForAll }, - "Idempotent producer requires Net.MaxOpenRequests to be 1"}, + "Idempotent producer requires Net.MaxOpenRequests to be 1", + }, } for i, test := range tests { @@ -356,20 +424,23 @@ func TestProducerConfigValidates(t *testing.T) { } } } + func TestConsumerConfigValidates(t *testing.T) { tests := []struct { name string cfg func(*Config) err string }{ - {"ReadCommitted Version", + { + "ReadCommitted Version", func(cfg *Config) { cfg.Version = V0_10_0_0 cfg.Consumer.IsolationLevel = ReadCommitted }, "ReadCommitted requires Version >= V0_11_0_0", }, - {"Incorrect isolation level", + { + "Incorrect isolation level", func(cfg *Config) { cfg.Version = V0_11_0_0 cfg.Consumer.IsolationLevel = IsolationLevel(42) diff --git a/consumer.go b/consumer.go index 9bd8d1820..46c26134c 100644 --- a/consumer.go +++ b/consumer.go @@ -783,7 +783,7 @@ done: close(bc.newSubscriptions) } -//subscriptionConsumer ensures we will get nil right away if no new subscriptions is available +// subscriptionConsumer ensures we will get nil right away if no new subscriptions is available func (bc *brokerConsumer) subscriptionConsumer() { <-bc.wait // wait for our first piece of work @@ -798,7 +798,6 @@ func (bc *brokerConsumer) subscriptionConsumer() { } response, err := bc.fetchNewMessages() - if err != nil { Logger.Printf("consumer/broker/%d disconnecting due to error processing FetchRequest: %s\n", bc.broker.ID(), err) bc.abort(err) @@ -832,17 +831,19 @@ func (bc *brokerConsumer) updateSubscriptions(newSubscriptions []*partitionConsu } } -//handleResponses handles the response codes left for us by our subscriptions, and abandons ones that have been closed +// handleResponses handles the response codes left for us by our subscriptions, and abandons ones that have been closed func (bc *brokerConsumer) handleResponses() { for child := range bc.subscriptions { result := child.responseResult child.responseResult = nil if result == nil { - if child.preferredReadReplica >= 0 && bc.broker.ID() != child.preferredReadReplica { - // not an error but needs redispatching to consume from prefered replica - child.trigger <- none{} - delete(bc.subscriptions, child) + if preferredBroker, err := child.preferredBroker(); err == nil { + if bc.broker.ID() != preferredBroker.ID() { + // not an error but needs redispatching to consume from prefered replica + child.trigger <- none{} + delete(bc.subscriptions, child) + } } continue } diff --git a/consumer_group.go b/consumer_group.go index fcc5792ea..80f625d80 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -429,7 +429,7 @@ func (c *consumerGroup) handleError(err error, topic string, partition int32) { select { case <-c.closed: - //consumer is closed + // consumer is closed return default: } diff --git a/consumer_group_members.go b/consumer_group_members.go index 2d02cc386..21b11e944 100644 --- a/consumer_group_members.go +++ b/consumer_group_members.go @@ -1,6 +1,6 @@ package sarama -//ConsumerGroupMemberMetadata holds the metadata for consumer group +// ConsumerGroupMemberMetadata holds the metadata for consumer group type ConsumerGroupMemberMetadata struct { Version int16 Topics []string @@ -37,7 +37,7 @@ func (m *ConsumerGroupMemberMetadata) decode(pd packetDecoder) (err error) { return nil } -//ConsumerGroupMemberAssignment holds the member assignment for a consume group +// ConsumerGroupMemberAssignment holds the member assignment for a consume group type ConsumerGroupMemberAssignment struct { Version int16 Topics map[string][]int32 diff --git a/consumer_metadata_request.go b/consumer_metadata_request.go index e5ebdaef5..5c18e048a 100644 --- a/consumer_metadata_request.go +++ b/consumer_metadata_request.go @@ -1,6 +1,6 @@ package sarama -//ConsumerMetadataRequest is used for metadata requests +// ConsumerMetadataRequest is used for metadata requests type ConsumerMetadataRequest struct { ConsumerGroup string } diff --git a/consumer_metadata_request_test.go b/consumer_metadata_request_test.go index 24e5f0a43..3f6bcdb57 100644 --- a/consumer_metadata_request_test.go +++ b/consumer_metadata_request_test.go @@ -6,10 +6,12 @@ import ( var ( consumerMetadataRequestEmpty = []byte{ - 0x00, 0x00} + 0x00, 0x00, + } consumerMetadataRequestString = []byte{ - 0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r'} + 0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r', + } ) func TestConsumerMetadataRequest(t *testing.T) { diff --git a/consumer_metadata_response.go b/consumer_metadata_response.go index 1b5d00d22..7fe0cf971 100644 --- a/consumer_metadata_response.go +++ b/consumer_metadata_response.go @@ -5,7 +5,7 @@ import ( "strconv" ) -//ConsumerMetadataResponse holds the response for a consumer group meta data requests +// ConsumerMetadataResponse holds the response for a consumer group meta data requests type ConsumerMetadataResponse struct { Err KError Coordinator *Broker diff --git a/consumer_metadata_response_test.go b/consumer_metadata_response_test.go index 8482f6ff1..a7d74b55b 100644 --- a/consumer_metadata_response_test.go +++ b/consumer_metadata_response_test.go @@ -7,13 +7,15 @@ var ( 0x00, 0x0E, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } consumerMetadataResponseSuccess = []byte{ 0x00, 0x00, 0x00, 0x00, 0x00, 0xAB, 0x00, 0x03, 'f', 'o', 'o', - 0x00, 0x00, 0xCC, 0xDD} + 0x00, 0x00, 0xCC, 0xDD, + } ) func TestConsumerMetadataResponseError(t *testing.T) { diff --git a/consumer_test.go b/consumer_test.go index 4c8aee187..61eb53d1c 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -893,6 +893,125 @@ func TestConsumeMessagesFromReadReplicaErrorUnknown(t *testing.T) { leader.Close() } +// TestConsumeMessagesTrackLeader ensures that in the event that leadership of +// a topicPartition changes and no preferredReadReplica is specified, the +// consumer connects back to the new leader to resume consumption and doesn't +// continue consuming from the follower. +// +// See https://github.com/Shopify/sarama/issues/1927 +func TestConsumeMessagesTrackLeader(t *testing.T) { + prevLogger := Logger + defer func() { Logger = prevLogger }() + Logger = &testLogger{t} + + cfg := NewConfig() + cfg.ClientID = t.Name() + cfg.Metadata.RefreshFrequency = time.Millisecond * 50 + cfg.Net.MaxOpenRequests = 1 + cfg.Version = V2_1_0_0 + + leader1 := NewMockBroker(t, 1) + leader2 := NewMockBroker(t, 2) + + mockMetadataResponse1 := NewMockMetadataResponse(t). + SetBroker(leader1.Addr(), leader1.BrokerID()). + SetBroker(leader2.Addr(), leader2.BrokerID()). + SetLeader("my_topic", 0, leader1.BrokerID()) + mockMetadataResponse2 := NewMockMetadataResponse(t). + SetBroker(leader1.Addr(), leader1.BrokerID()). + SetBroker(leader2.Addr(), leader2.BrokerID()). + SetLeader("my_topic", 0, leader2.BrokerID()) + mockMetadataResponse3 := NewMockMetadataResponse(t). + SetBroker(leader1.Addr(), leader1.BrokerID()). + SetBroker(leader2.Addr(), leader2.BrokerID()). + SetLeader("my_topic", 0, leader1.BrokerID()) + + leader1.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": mockMetadataResponse1, + "OffsetRequest": NewMockOffsetResponse(t). + SetVersion(1). + SetOffset("my_topic", 0, OffsetNewest, 1234). + SetOffset("my_topic", 0, OffsetOldest, 0), + "FetchRequest": NewMockFetchResponse(t, 1). + SetVersion(10). + SetMessage("my_topic", 0, 1, testMsg). + SetMessage("my_topic", 0, 2, testMsg), + }) + + client, err := NewClient([]string{leader1.Addr()}, cfg) + if err != nil { + t.Fatal(err) + } + + consumer, err := NewConsumerFromClient(client) + if err != nil { + t.Fatal(err) + } + + pConsumer, err := consumer.ConsumePartition("my_topic", 0, 1) + if err != nil { + t.Fatal(err) + } + + assertMessageOffset(t, <-pConsumer.Messages(), 1) + assertMessageOffset(t, <-pConsumer.Messages(), 2) + + fetchEmptyResponse := &FetchResponse{Version: 10} + fetchEmptyResponse.AddError("my_topic", 0, ErrNoError) + leader1.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": mockMetadataResponse2, + "FetchRequest": NewMockWrapper(fetchEmptyResponse), + }) + leader2.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": mockMetadataResponse2, + "FetchRequest": NewMockFetchResponse(t, 1). + SetVersion(10). + SetMessage("my_topic", 0, 3, testMsg). + SetMessage("my_topic", 0, 4, testMsg), + }) + + // wait for client to be aware that leadership has changed + for { + b, _ := client.Leader("my_topic", 0) + if b.ID() == int32(2) { + break + } + time.Sleep(time.Millisecond * 50) + } + + assertMessageOffset(t, <-pConsumer.Messages(), 3) + assertMessageOffset(t, <-pConsumer.Messages(), 4) + + leader1.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": mockMetadataResponse3, + "FetchRequest": NewMockFetchResponse(t, 1). + SetVersion(10). + SetMessage("my_topic", 0, 5, testMsg). + SetMessage("my_topic", 0, 6, testMsg), + }) + leader2.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": mockMetadataResponse3, + "FetchRequest": NewMockWrapper(fetchEmptyResponse), + }) + + // wait for client to be aware that leadership has changed back again + for { + b, _ := client.Leader("my_topic", 0) + if b.ID() == int32(1) { + break + } + time.Sleep(time.Millisecond * 50) + } + + assertMessageOffset(t, <-pConsumer.Messages(), 5) + assertMessageOffset(t, <-pConsumer.Messages(), 6) + + safeClose(t, pConsumer) + safeClose(t, consumer) + leader1.Close() + leader2.Close() +} + // It is fine if offsets of fetched messages are not sequential (although // strictly increasing!). func TestConsumerNonSequentialOffsets(t *testing.T) { @@ -1523,8 +1642,9 @@ func TestExcludeUncommitted(t *testing.T) { } func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) { + t.Helper() if msg.Offset != expectedOffset { - t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset) + t.Fatalf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset) } } diff --git a/control_record.go b/control_record.go index b9197e4b1..244a82136 100644 --- a/control_record.go +++ b/control_record.go @@ -1,14 +1,14 @@ package sarama -//ControlRecordType ... +// ControlRecordType ... type ControlRecordType int const ( - //ControlRecordAbort is a control record for abort + // ControlRecordAbort is a control record for abort ControlRecordAbort ControlRecordType = iota - //ControlRecordCommit is a control record for commit + // ControlRecordCommit is a control record for commit ControlRecordCommit - //ControlRecordUnknown is a control record of unknown type + // ControlRecordUnknown is a control record of unknown type ControlRecordUnknown ) diff --git a/crc32_field.go b/crc32_field.go index 38189a3cd..32236e50f 100644 --- a/crc32_field.go +++ b/crc32_field.go @@ -72,6 +72,7 @@ func (c *crc32Field) check(curOffset int, buf []byte) error { return nil } + func (c *crc32Field) crc(curOffset int, buf []byte) (uint32, error) { var tab *crc32.Table switch c.polynomial { diff --git a/describe_configs_request.go b/describe_configs_request.go index d0c735280..4c3488031 100644 --- a/describe_configs_request.go +++ b/describe_configs_request.go @@ -61,7 +61,6 @@ func (r *DescribeConfigsRequest) decode(pd packetDecoder, version int16) (err er r.Resources[i].Name = name confLength, err := pd.getArrayLength() - if err != nil { return err } diff --git a/describe_configs_request_test.go b/describe_configs_request_test.go index ef9d3ff80..a1148f401 100644 --- a/describe_configs_request_test.go +++ b/describe_configs_request_test.go @@ -11,7 +11,7 @@ var ( 0, 0, 0, 1, // 1 config 2, // a topic 0, 3, 'f', 'o', 'o', // topic name: foo - 0, 0, 0, 1, //1 config name + 0, 0, 0, 1, // 1 config name 0, 10, // 10 chars 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's', } @@ -20,7 +20,7 @@ var ( 0, 0, 0, 2, // 2 configs 2, // a topic 0, 3, 'f', 'o', 'o', // topic name: foo - 0, 0, 0, 2, //2 config name + 0, 0, 0, 2, // 2 config name 0, 10, // 10 chars 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's', 0, 12, // 12 chars @@ -44,7 +44,7 @@ var ( 2, // a topic 0, 3, 'f', 'o', 'o', // topic name: foo 255, 255, 255, 255, // no configs - 1, //synoms + 1, // synoms } ) diff --git a/describe_configs_response.go b/describe_configs_response.go index 063ae9112..928f5a52a 100644 --- a/describe_configs_response.go +++ b/describe_configs_response.go @@ -224,7 +224,7 @@ func (r *ConfigEntry) encode(pe packetEncoder, version int16) (err error) { return nil } -//https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration +// https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration func (r *ConfigEntry) decode(pd packetDecoder, version int16) (err error) { if version == 0 { r.Source = SourceUnknown diff --git a/describe_configs_response_test.go b/describe_configs_response_test.go index c91a04220..ea8f28e57 100644 --- a/describe_configs_response_test.go +++ b/describe_configs_response_test.go @@ -6,18 +6,18 @@ import ( var ( describeConfigsResponseEmpty = []byte{ - 0, 0, 0, 0, //throttle + 0, 0, 0, 0, // throttle 0, 0, 0, 0, // no configs } describeConfigsResponsePopulatedv0 = []byte{ - 0, 0, 0, 0, //throttle + 0, 0, 0, 0, // throttle 0, 0, 0, 1, // response - 0, 0, //errorcode - 0, 0, //string + 0, 0, // errorcode + 0, 0, // string 2, // topic 0, 3, 'f', 'o', 'o', - 0, 0, 0, 1, //configs + 0, 0, 0, 1, // configs 0, 10, 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's', 0, 4, '1', '0', '0', '0', 0, // ReadOnly @@ -26,13 +26,13 @@ var ( } describeConfigsResponseWithDefaultv0 = []byte{ - 0, 0, 0, 0, //throttle + 0, 0, 0, 0, // throttle 0, 0, 0, 1, // response - 0, 0, //errorcode - 0, 0, //string + 0, 0, // errorcode + 0, 0, // string 2, // topic 0, 3, 'f', 'o', 'o', - 0, 0, 0, 1, //configs + 0, 0, 0, 1, // configs 0, 10, 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's', 0, 4, '1', '0', '0', '0', 0, // ReadOnly @@ -41,13 +41,13 @@ var ( } describeConfigsResponsePopulatedv1 = []byte{ - 0, 0, 0, 0, //throttle + 0, 0, 0, 0, // throttle 0, 0, 0, 1, // response - 0, 0, //errorcode - 0, 0, //string + 0, 0, // errorcode + 0, 0, // string 2, // topic 0, 3, 'f', 'o', 'o', - 0, 0, 0, 1, //configs + 0, 0, 0, 1, // configs 0, 10, 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's', 0, 4, '1', '0', '0', '0', 0, // ReadOnly @@ -57,13 +57,13 @@ var ( } describeConfigsResponseWithSynonymv1 = []byte{ - 0, 0, 0, 0, //throttle + 0, 0, 0, 0, // throttle 0, 0, 0, 1, // response - 0, 0, //errorcode - 0, 0, //string + 0, 0, // errorcode + 0, 0, // string 2, // topic 0, 3, 'f', 'o', 'o', - 0, 0, 0, 1, //configs + 0, 0, 0, 1, // configs 0, 10, 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's', 0, 4, '1', '0', '0', '0', 0, // ReadOnly @@ -76,13 +76,13 @@ var ( } describeConfigsResponseWithDefaultv1 = []byte{ - 0, 0, 0, 0, //throttle + 0, 0, 0, 0, // throttle 0, 0, 0, 1, // response - 0, 0, //errorcode - 0, 0, //string + 0, 0, // errorcode + 0, 0, // string 2, // topic 0, 3, 'f', 'o', 'o', - 0, 0, 0, 1, //configs + 0, 0, 0, 1, // configs 0, 10, 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's', 0, 4, '1', '0', '0', '0', 0, // ReadOnly diff --git a/end_txn_request_test.go b/end_txn_request_test.go index 20e404eb8..6f5d4480e 100644 --- a/end_txn_request_test.go +++ b/end_txn_request_test.go @@ -2,14 +2,12 @@ package sarama import "testing" -var ( - endTxnRequest = []byte{ - 0, 3, 't', 'x', 'n', - 0, 0, 0, 0, 0, 0, 31, 64, - 0, 1, - 1, - } -) +var endTxnRequest = []byte{ + 0, 3, 't', 'x', 'n', + 0, 0, 0, 0, 0, 0, 31, 64, + 0, 1, + 1, +} func TestEndTxnRequest(t *testing.T) { req := &EndTxnRequest{ diff --git a/end_txn_response_test.go b/end_txn_response_test.go index 41d730418..d7ae1c988 100644 --- a/end_txn_response_test.go +++ b/end_txn_response_test.go @@ -5,12 +5,10 @@ import ( "time" ) -var ( - endTxnResponse = []byte{ - 0, 0, 0, 100, - 0, 49, - } -) +var endTxnResponse = []byte{ + 0, 0, 0, 100, + 0, 49, +} func TestEndTxnResponse(t *testing.T) { resp := &EndTxnResponse{ diff --git a/errors.go b/errors.go index a5b6edbb5..0fca0a30e 100644 --- a/errors.go +++ b/errors.go @@ -93,7 +93,7 @@ type MultiError struct { } func (mErr MultiError) Error() string { - var errString = "" + errString := "" for _, err := range *mErr.Errors { errString += err.Error() + "," } @@ -101,7 +101,7 @@ func (mErr MultiError) Error() string { } func (mErr MultiError) PrettyError() string { - var errString = "" + errString := "" for _, err := range *mErr.Errors { errString += err.Error() + "\n" } diff --git a/examples/consumergroup/main.go b/examples/consumergroup/main.go index 9a8b7cd8d..a7db175cd 100644 --- a/examples/consumergroup/main.go +++ b/examples/consumergroup/main.go @@ -150,7 +150,6 @@ func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error { // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { - // NOTE: // Do not move the code below to a goroutine. // The `ConsumeClaim` itself is called within a goroutine, see: diff --git a/examples/http_server/http_server.go b/examples/http_server/http_server.go index b6d83c5dc..b4fb32060 100644 --- a/examples/http_server/http_server.go +++ b/examples/http_server/http_server.go @@ -1,8 +1,6 @@ package main import ( - "github.com/Shopify/sarama" - "crypto/tls" "crypto/x509" "encoding/json" @@ -14,6 +12,8 @@ import ( "os" "strings" "time" + + "github.com/Shopify/sarama" ) var ( @@ -163,7 +163,6 @@ func (ale *accessLogEntry) Encode() ([]byte, error) { } func (s *Server) withAccessLog(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { started := time.Now() @@ -189,7 +188,6 @@ func (s *Server) withAccessLog(next http.Handler) http.Handler { } func newDataCollector(brokerList []string) sarama.SyncProducer { - // For the data collector, we are looking for strong consistency semantics. // Because we don't change the flush settings, sarama will try to produce messages // as fast as possible to keep latency low. @@ -217,7 +215,6 @@ func newDataCollector(brokerList []string) sarama.SyncProducer { } func newAccessLogProducer(brokerList []string) sarama.AsyncProducer { - // For the access log, we are looking for AP semantics, with high throughput. // By creating batches of compressed messages, we reduce network I/O at a cost of more latency. config := sarama.NewConfig() diff --git a/examples/sasl_scram_client/main.go b/examples/sasl_scram_client/main.go index 884949b7d..353a8a274 100644 --- a/examples/sasl_scram_client/main.go +++ b/examples/sasl_scram_client/main.go @@ -166,5 +166,4 @@ func main() { _ = syncProducer.Close() } logger.Println("Bye now !") - } diff --git a/examples/sasl_scram_client/scram_client.go b/examples/sasl_scram_client/scram_client.go index 551f0fefd..1cabaa38e 100644 --- a/examples/sasl_scram_client/scram_client.go +++ b/examples/sasl_scram_client/scram_client.go @@ -7,8 +7,10 @@ import ( "github.com/xdg/scram" ) -var SHA256 scram.HashGeneratorFcn = sha256.New -var SHA512 scram.HashGeneratorFcn = sha512.New +var ( + SHA256 scram.HashGeneratorFcn = sha256.New + SHA512 scram.HashGeneratorFcn = sha512.New +) type XDGSCRAMClient struct { *scram.Client diff --git a/fetch_request_test.go b/fetch_request_test.go index 0903caffa..0b807b9f8 100644 --- a/fetch_request_test.go +++ b/fetch_request_test.go @@ -5,18 +5,21 @@ import "testing" var ( fetchRequestNoBlocks = []byte{ 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } fetchRequestWithProperties = []byte{ 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x20, 0x00, 0x00, 0x00, 0xEF, - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } fetchRequestOneBlock = []byte{ 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x05, 't', 'o', 'p', 'i', 'c', 0x00, 0x00, 0x00, 0x01, - 0x00, 0x00, 0x00, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x34, 0x00, 0x00, 0x00, 0x56} + 0x00, 0x00, 0x00, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x34, 0x00, 0x00, 0x00, 0x56, + } fetchRequestOneBlockV4 = []byte{ 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, @@ -25,7 +28,8 @@ var ( 0x00, 0x00, 0x00, 0x01, 0x00, 0x05, 't', 'o', 'p', 'i', 'c', 0x00, 0x00, 0x00, 0x01, - 0x00, 0x00, 0x00, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x34, 0x00, 0x00, 0x00, 0x56} + 0x00, 0x00, 0x00, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x34, 0x00, 0x00, 0x00, 0x56, + } fetchRequestOneBlockV11 = []byte{ 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, diff --git a/fetch_response_test.go b/fetch_response_test.go index 99ad769c6..45ad8013b 100644 --- a/fetch_response_test.go +++ b/fetch_response_test.go @@ -7,7 +7,8 @@ import ( var ( emptyFetchResponse = []byte{ - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } oneMessageFetchResponse = []byte{ 0x00, 0x00, 0x00, 0x01, @@ -25,7 +26,8 @@ var ( 0x00, 0x00, 0xFF, 0xFF, 0xFF, 0xFF, - 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE} + 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE, + } overflowMessageFetchResponse = []byte{ 0x00, 0x00, 0x00, 0x01, @@ -48,7 +50,8 @@ var ( 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0xFF, // overflow bytes - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + } oneRecordFetchResponse = []byte{ 0x00, 0x00, 0x00, 0x00, // ThrottleTime @@ -84,7 +87,8 @@ var ( 0x06, 0x05, 0x06, 0x07, 0x02, 0x06, 0x08, 0x09, 0x0A, - 0x04, 0x0B, 0x0C} + 0x04, 0x0B, 0x0C, + } partialFetchResponse = []byte{ 0x00, 0x00, 0x00, 0x00, // ThrottleTime @@ -136,7 +140,8 @@ var ( 0x00, 0x00, 0xFF, 0xFF, 0xFF, 0xFF, - 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE} + 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE, + } preferredReplicaFetchResponseV11 = []byte{ 0x00, 0x00, 0x00, 0x00, // ThrottleTime @@ -161,7 +166,8 @@ var ( 0x00, 0x00, 0xFF, 0xFF, 0xFF, 0xFF, - 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE} + 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE, + } ) func TestEmptyFetchResponse(t *testing.T) { diff --git a/functional_consumer_group_test.go b/functional_consumer_group_test.go index 8a25f9b52..5fb063bf5 100644 --- a/functional_consumer_group_test.go +++ b/functional_consumer_group_test.go @@ -335,11 +335,13 @@ func (m *testFuncConsumerGroupMember) Setup(s ConsumerGroupSession) error { atomic.StoreInt32(&m.state, 2) return nil } + func (m *testFuncConsumerGroupMember) Cleanup(s ConsumerGroupSession) error { // enter post-cleanup state atomic.StoreInt32(&m.state, 3) return nil } + func (m *testFuncConsumerGroupMember) ConsumeClaim(s ConsumerGroupSession, c ConsumerGroupClaim) error { atomic.AddInt32(&m.handlers, 1) defer atomic.AddInt32(&m.handlers, -1) diff --git a/functional_producer_test.go b/functional_producer_test.go index 7c1925b06..74f9682a2 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -432,15 +432,19 @@ func validateMetrics(t *testing.T, client Client) { func BenchmarkProducerSmall(b *testing.B) { benchmarkProducer(b, nil, "test.64", ByteEncoder(make([]byte, 128))) } + func BenchmarkProducerMedium(b *testing.B) { benchmarkProducer(b, nil, "test.64", ByteEncoder(make([]byte, 1024))) } + func BenchmarkProducerLarge(b *testing.B) { benchmarkProducer(b, nil, "test.64", ByteEncoder(make([]byte, 8192))) } + func BenchmarkProducerSmallSinglePartition(b *testing.B) { benchmarkProducer(b, nil, "test.1", ByteEncoder(make([]byte, 128))) } + func BenchmarkProducerMediumSnappy(b *testing.B) { conf := NewTestConfig() conf.Producer.Compression = CompressionSnappy diff --git a/functional_test.go b/functional_test.go index 134e55e4e..7f10a9eea 100644 --- a/functional_test.go +++ b/functional_test.go @@ -355,7 +355,7 @@ func prepareTestTopics(ctx context.Context, env *testEnvironment) error { return fmt.Errorf("failed fetching the uncommitted msg jar: %w", err) } defer res.Body.Close() - jarFile, err := os.OpenFile(jarName, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644) + jarFile, err := os.OpenFile(jarName, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0o644) if err != nil { return fmt.Errorf("failed opening the uncomitted msg jar: %w", err) } diff --git a/gssapi_kerberos.go b/gssapi_kerberos.go index ea72ea521..b2c199000 100644 --- a/gssapi_kerberos.go +++ b/gssapi_kerberos.go @@ -215,7 +215,6 @@ func (krbAuth *GSSAPIKerberosAuth) Authorize(broker *Broker) error { spn := fmt.Sprintf("%s/%s", broker.conf.Net.SASL.GSSAPI.ServiceName, host) ticket, encKey, err := kerberosClient.GetServiceTicket(spn) - if err != nil { Logger.Printf("Error getting Kerberos service ticket : %s", err) return err diff --git a/heartbeat_request_test.go b/heartbeat_request_test.go index 981c377f9..2653f82c7 100644 --- a/heartbeat_request_test.go +++ b/heartbeat_request_test.go @@ -2,13 +2,11 @@ package sarama import "testing" -var ( - basicHeartbeatRequest = []byte{ - 0, 3, 'f', 'o', 'o', // Group ID - 0x00, 0x01, 0x02, 0x03, // Generatiuon ID - 0, 3, 'b', 'a', 'z', // Member ID - } -) +var basicHeartbeatRequest = []byte{ + 0, 3, 'f', 'o', 'o', // Group ID + 0x00, 0x01, 0x02, 0x03, // Generatiuon ID + 0, 3, 'b', 'a', 'z', // Member ID +} func TestHeartbeatRequest(t *testing.T) { request := new(HeartbeatRequest) diff --git a/heartbeat_response_test.go b/heartbeat_response_test.go index 4b29416de..e60146bdc 100644 --- a/heartbeat_response_test.go +++ b/heartbeat_response_test.go @@ -2,10 +2,9 @@ package sarama import "testing" -var ( - heartbeatResponseNoError = []byte{ - 0x00, 0x00} -) +var heartbeatResponseNoError = []byte{ + 0x00, 0x00, +} func TestHeartbeatResponse(t *testing.T) { response := new(HeartbeatResponse) diff --git a/leave_group_request_test.go b/leave_group_request_test.go index 04527e47c..b674e48b2 100644 --- a/leave_group_request_test.go +++ b/leave_group_request_test.go @@ -2,12 +2,10 @@ package sarama import "testing" -var ( - basicLeaveGroupRequest = []byte{ - 0, 3, 'f', 'o', 'o', - 0, 3, 'b', 'a', 'r', - } -) +var basicLeaveGroupRequest = []byte{ + 0, 3, 'f', 'o', 'o', + 0, 3, 'b', 'a', 'r', +} func TestLeaveGroupRequest(t *testing.T) { request := new(LeaveGroupRequest) diff --git a/list_groups_request.go b/list_groups_request.go index ed44cc27e..4553b2d2e 100644 --- a/list_groups_request.go +++ b/list_groups_request.go @@ -1,7 +1,6 @@ package sarama -type ListGroupsRequest struct { -} +type ListGroupsRequest struct{} func (r *ListGroupsRequest) encode(pe packetEncoder) error { return nil diff --git a/list_partition_reassignments_request_test.go b/list_partition_reassignments_request_test.go index b47480ece..e50b289b9 100644 --- a/list_partition_reassignments_request_test.go +++ b/list_partition_reassignments_request_test.go @@ -2,16 +2,14 @@ package sarama import "testing" -var ( - listPartitionReassignmentsRequestOneBlock = []byte{ - 0, 0, 39, 16, // timeout 10000 - 2, // 2-1=1 block - 6, 116, 111, 112, 105, 99, // topic name "topic" as compact string - 2, // 2-1=1 partitions - 0, 0, 0, 0, // partitionId - 0, 0, // empty tagged fields - } -) +var listPartitionReassignmentsRequestOneBlock = []byte{ + 0, 0, 39, 16, // timeout 10000 + 2, // 2-1=1 block + 6, 116, 111, 112, 105, 99, // topic name "topic" as compact string + 2, // 2-1=1 partitions + 0, 0, 0, 0, // partitionId + 0, 0, // empty tagged fields +} func TestListPartitionReassignmentRequest(t *testing.T) { var request *ListPartitionReassignmentsRequest = &ListPartitionReassignmentsRequest{ diff --git a/list_partition_reassignments_response_test.go b/list_partition_reassignments_response_test.go index 7b27b064e..71532423d 100644 --- a/list_partition_reassignments_response_test.go +++ b/list_partition_reassignments_response_test.go @@ -2,21 +2,19 @@ package sarama import "testing" -var ( - listPartitionReassignmentsResponse = []byte{ - 0, 0, 39, 16, // ThrottleTimeMs 10000 - 0, 0, // errorcode - 0, // null string - 2, // block array length 1 - 6, 116, 111, 112, 105, 99, // topic name "topic" - 2, // partition array length 1 - 0, 0, 0, 1, // partitionId - 3, 0, 0, 3, 232, 0, 0, 3, 233, // replicas [1000, 1001] - 3, 0, 0, 3, 234, 0, 0, 3, 235, // addingReplicas [1002, 1003] - 3, 0, 0, 3, 236, 0, 0, 3, 237, // addingReplicas [1004, 1005] - 0, 0, 0, // empty tagged fields - } -) +var listPartitionReassignmentsResponse = []byte{ + 0, 0, 39, 16, // ThrottleTimeMs 10000 + 0, 0, // errorcode + 0, // null string + 2, // block array length 1 + 6, 116, 111, 112, 105, 99, // topic name "topic" + 2, // partition array length 1 + 0, 0, 0, 1, // partitionId + 3, 0, 0, 3, 232, 0, 0, 3, 233, // replicas [1000, 1001] + 3, 0, 0, 3, 234, 0, 0, 3, 235, // addingReplicas [1002, 1003] + 3, 0, 0, 3, 236, 0, 0, 3, 237, // addingReplicas [1004, 1005] + 0, 0, 0, // empty tagged fields +} func TestListPartitionReassignmentResponse(t *testing.T) { var response *ListPartitionReassignmentsResponse = &ListPartitionReassignmentsResponse{ diff --git a/logger_test.go b/logger_test.go new file mode 100644 index 000000000..ceca40d2c --- /dev/null +++ b/logger_test.go @@ -0,0 +1,33 @@ +package sarama + +import "testing" + +// testLogger implements the StdLogger interface and records the text in the +// logs of the given T passed from Test functions. +// and records the text in the error log. +// +// nolint +type testLogger struct { + t *testing.T +} + +func (l *testLogger) Print(v ...interface{}) { + if l.t != nil { + l.t.Helper() + l.t.Log(v...) + } +} + +func (l *testLogger) Printf(format string, v ...interface{}) { + if l.t != nil { + l.t.Helper() + l.t.Logf(format, v...) + } +} + +func (l *testLogger) Println(v ...interface{}) { + if l.t != nil { + l.t.Helper() + l.t.Log(v...) + } +} diff --git a/message.go b/message.go index 40b3ac9d1..fd0d1d90b 100644 --- a/message.go +++ b/message.go @@ -6,15 +6,15 @@ import ( ) const ( - //CompressionNone no compression + // CompressionNone no compression CompressionNone CompressionCodec = iota - //CompressionGZIP compression using GZIP + // CompressionGZIP compression using GZIP CompressionGZIP - //CompressionSnappy compression using snappy + // CompressionSnappy compression using snappy CompressionSnappy - //CompressionLZ4 compression using LZ4 + // CompressionLZ4 compression using LZ4 CompressionLZ4 - //CompressionZSTD compression using ZSTD + // CompressionZSTD compression using ZSTD CompressionZSTD // The lowest 3 bits contain the compression codec used for the message @@ -42,7 +42,7 @@ func (cc CompressionCodec) String() string { }[int(cc)] } -//Message is a kafka message type +// Message is a kafka message type type Message struct { Codec CompressionCodec // codec used to compress the message contents CompressionLevel int // compression level diff --git a/message_test.go b/message_test.go index aa103d43f..a6c7cff2a 100644 --- a/message_test.go +++ b/message_test.go @@ -11,7 +11,8 @@ var ( 0x00, // magic version byte 0x00, // attribute flags 0xFF, 0xFF, 0xFF, 0xFF, // key - 0xFF, 0xFF, 0xFF, 0xFF} // value + 0xFF, 0xFF, 0xFF, 0xFF, + } // value emptyV1Message = []byte{ 204, 47, 121, 217, // CRC @@ -19,17 +20,19 @@ var ( 0x00, // attribute flags 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // timestamp 0xFF, 0xFF, 0xFF, 0xFF, // key - 0xFF, 0xFF, 0xFF, 0xFF} // value + 0xFF, 0xFF, 0xFF, 0xFF, + } // value emptyV2Message = []byte{ 167, 236, 104, 3, // CRC 0x02, // magic version byte 0x00, // attribute flags 0xFF, 0xFF, 0xFF, 0xFF, // key - 0xFF, 0xFF, 0xFF, 0xFF} // value + 0xFF, 0xFF, 0xFF, 0xFF, + } // value emptyGzipMessage = []byte{ - 132, 99, 80, 148, //CRC + 132, 99, 80, 148, // CRC 0x00, // magic version byte 0x01, // attribute flags 0xFF, 0xFF, 0xFF, 0xFF, // key @@ -37,7 +40,8 @@ var ( 0x00, 0x00, 0x00, 0x17, 0x1f, 0x8b, 0x08, - 0, 0, 0, 0, 0, 0, 255, 1, 0, 0, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0} + 0, 0, 0, 0, 0, 0, 255, 1, 0, 0, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0, + } emptyLZ4Message = []byte{ 132, 219, 238, 101, // CRC @@ -64,7 +68,7 @@ var ( } emptyBulkSnappyMessage = []byte{ - 180, 47, 53, 209, //CRC + 180, 47, 53, 209, // CRC 0x00, // magic version byte 0x02, // attribute flags 0xFF, 0xFF, 0xFF, 0xFF, // key @@ -72,17 +76,19 @@ var ( 130, 83, 78, 65, 80, 80, 89, 0, // SNAPPY magic 0, 0, 0, 1, // min version 0, 0, 0, 1, // default version - 0, 0, 0, 22, 52, 0, 0, 25, 1, 16, 14, 227, 138, 104, 118, 25, 15, 13, 1, 8, 1, 0, 0, 62, 26, 0} + 0, 0, 0, 22, 52, 0, 0, 25, 1, 16, 14, 227, 138, 104, 118, 25, 15, 13, 1, 8, 1, 0, 0, 62, 26, 0, + } emptyBulkGzipMessage = []byte{ - 139, 160, 63, 141, //CRC + 139, 160, 63, 141, // CRC 0x00, // magic version byte 0x01, // attribute flags 0xFF, 0xFF, 0xFF, 0xFF, // key 0x00, 0x00, 0x00, 0x27, // len 0x1f, 0x8b, // Gzip Magic 0x08, // deflate compressed - 0, 0, 0, 0, 0, 0, 0, 99, 96, 128, 3, 190, 202, 112, 143, 7, 12, 12, 255, 129, 0, 33, 200, 192, 136, 41, 3, 0, 199, 226, 155, 70, 52, 0, 0, 0} + 0, 0, 0, 0, 0, 0, 0, 99, 96, 128, 3, 190, 202, 112, 143, 7, 12, 12, 255, 129, 0, 33, 200, 192, 136, 41, 3, 0, 199, 226, 155, 70, 52, 0, 0, 0, + } emptyBulkLZ4Message = []byte{ 246, 12, 188, 129, // CRC diff --git a/metadata_response_test.go b/metadata_response_test.go index 04a4ce7fc..3970a2f31 100644 --- a/metadata_response_test.go +++ b/metadata_response_test.go @@ -5,7 +5,8 @@ import "testing" var ( emptyMetadataResponseV0 = []byte{ 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } brokersNoTopicsMetadataResponseV0 = []byte{ 0x00, 0x00, 0x00, 0x02, @@ -18,7 +19,8 @@ var ( 0x00, 0x0a, 'g', 'o', 'o', 'g', 'l', 'e', '.', 'c', 'o', 'm', 0x00, 0x00, 0x01, 0x11, - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } topicsNoBrokersMetadataResponseV0 = []byte{ 0x00, 0x00, 0x00, 0x00, @@ -35,7 +37,8 @@ var ( 0x00, 0x00, 0x00, 0x03, 'b', 'a', 'r', - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } brokersNoTopicsMetadataResponseV1 = []byte{ 0x00, 0x00, 0x00, 0x02, @@ -52,7 +55,8 @@ var ( 0x00, 0x00, 0x00, 0x01, - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } topicsNoBrokersMetadataResponseV1 = []byte{ 0x00, 0x00, 0x00, 0x00, @@ -74,14 +78,16 @@ var ( 0x00, 0x00, 0x00, 0x03, 'b', 'a', 'r', 0x01, - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } noBrokersNoTopicsWithThrottleTimeAndClusterIDV3 = []byte{ 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x09, 'c', 'l', 'u', 's', 't', 'e', 'r', 'I', 'd', 0x00, 0x00, 0x00, 0x01, - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } noBrokersOneTopicWithOfflineReplicasV5 = []byte{ 0x00, 0x00, 0x00, 0x05, diff --git a/mockbroker.go b/mockbroker.go index 0c99314e5..c2654d12e 100644 --- a/mockbroker.go +++ b/mockbroker.go @@ -218,6 +218,8 @@ func (b *MockBroker) handleRequests(conn io.ReadWriteCloser, idx int, wg *sync.W defer func() { _ = conn.Close() }() + s := spew.NewDefaultConfig() + s.MaxDepth = 1 Logger.Printf("*** mockbroker/%d/%d: connection opened", b.BrokerID(), idx) var err error @@ -264,7 +266,12 @@ func (b *MockBroker) handleRequests(conn io.ReadWriteCloser, idx int, wg *sync.W Logger.Printf("*** mockbroker/%d/%d: ignored %v", b.brokerID, idx, spew.Sdump(req)) continue } - Logger.Printf("*** mockbroker/%d/%d: served %v -> %v", b.brokerID, idx, req, res) + Logger.Printf( + "*** mockbroker/%d/%d: replied to %T with %T\n-> %s\n-> %s", + b.brokerID, idx, req.body, res, + s.Sprintf("%#v", req.body), + s.Sprintf("%#v", res), + ) encodedRes, err := encode(res, nil) if err != nil { diff --git a/mockkerberos.go b/mockkerberos.go index beb00e5b5..a43607e1c 100644 --- a/mockkerberos.go +++ b/mockkerberos.go @@ -27,7 +27,7 @@ func (h *KafkaGSSAPIHandler) MockKafkaGSSAPI(buffer []byte) []byte { return []byte{0x00, 0x00, 0x00, 0x01, 0xAD} } - var pack = gssapi.WrapToken{ + pack := gssapi.WrapToken{ Flags: KRB5_USER_AUTH, EC: 12, RRC: 0, @@ -108,8 +108,9 @@ func (c *MockKerberosClient) GetServiceTicket(spn string) (messages.Ticket, type func (c *MockKerberosClient) Domain() string { return "EXAMPLE.COM" } + func (c *MockKerberosClient) CName() types.PrincipalName { - var p = types.PrincipalName{ + p := types.PrincipalName{ NameType: KRB5_USER_AUTH, NameString: []string{ "kafka", @@ -118,6 +119,7 @@ func (c *MockKerberosClient) CName() types.PrincipalName { } return p } + func (c *MockKerberosClient) Destroy() { // Do nothing. } diff --git a/mockresponses.go b/mockresponses.go index 3df1ee0a7..6654ed07c 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -805,7 +805,8 @@ func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoderWith Configs: configEntries, }) case TopicResource: - maxMessageBytes := &ConfigEntry{Name: "max.message.bytes", + maxMessageBytes := &ConfigEntry{ + Name: "max.message.bytes", Value: "1000000", ReadOnly: false, Default: !includeSource, @@ -822,7 +823,8 @@ func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoderWith }, } } - retentionMs := &ConfigEntry{Name: "retention.ms", + retentionMs := &ConfigEntry{ + Name: "retention.ms", Value: "5000", ReadOnly: false, Default: false, @@ -836,7 +838,8 @@ func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoderWith }, } } - password := &ConfigEntry{Name: "password", + password := &ConfigEntry{ + Name: "password", Value: "12345", ReadOnly: false, Default: false, @@ -891,7 +894,8 @@ func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoderWithHea res := &AlterConfigsResponse{} for _, r := range req.Resources { - res.Resources = append(res.Resources, &AlterConfigsResourceResponse{Name: r.Name, + res.Resources = append(res.Resources, &AlterConfigsResourceResponse{ + Name: r.Name, Type: r.Type, ErrorMsg: "", }) diff --git a/mocks/consumer.go b/mocks/consumer.go index 451eb08d0..baec88b51 100644 --- a/mocks/consumer.go +++ b/mocks/consumer.go @@ -231,7 +231,7 @@ func (pc *PartitionConsumer) Close() error { go func() { defer wg.Done() - var errs = make(sarama.ConsumerErrors, 0) + errs := make(sarama.ConsumerErrors, 0) for err := range pc.errors { errs = append(errs, err) } diff --git a/offset_commit_request_test.go b/offset_commit_request_test.go index efb3d33f1..06bbd4036 100644 --- a/offset_commit_request_test.go +++ b/offset_commit_request_test.go @@ -8,20 +8,23 @@ import ( var ( offsetCommitRequestNoBlocksV0 = []byte{ 0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r', - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } offsetCommitRequestNoBlocksV1 = []byte{ 0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r', 0x00, 0x00, 0x11, 0x22, 0x00, 0x04, 'c', 'o', 'n', 's', - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } offsetCommitRequestNoBlocksV2 = []byte{ 0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r', 0x00, 0x00, 0x11, 0x22, 0x00, 0x04, 'c', 'o', 'n', 's', 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x44, 0x33, - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } offsetCommitRequestOneBlockV0 = []byte{ 0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r', @@ -30,7 +33,8 @@ var ( 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x52, 0x21, 0x00, 0x00, 0x00, 0x00, 0xDE, 0xAD, 0xBE, 0xEF, - 0x00, 0x08, 'm', 'e', 't', 'a', 'd', 'a', 't', 'a'} + 0x00, 0x08, 'm', 'e', 't', 'a', 'd', 'a', 't', 'a', + } offsetCommitRequestOneBlockV1 = []byte{ 0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r', @@ -42,7 +46,8 @@ var ( 0x00, 0x00, 0x52, 0x21, 0x00, 0x00, 0x00, 0x00, 0xDE, 0xAD, 0xBE, 0xEF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, - 0x00, 0x08, 'm', 'e', 't', 'a', 'd', 'a', 't', 'a'} + 0x00, 0x08, 'm', 'e', 't', 'a', 'd', 'a', 't', 'a', + } offsetCommitRequestOneBlockV2 = []byte{ 0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r', @@ -54,7 +59,8 @@ var ( 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x52, 0x21, 0x00, 0x00, 0x00, 0x00, 0xDE, 0xAD, 0xBE, 0xEF, - 0x00, 0x08, 'm', 'e', 't', 'a', 'd', 'a', 't', 'a'} + 0x00, 0x08, 'm', 'e', 't', 'a', 'd', 'a', 't', 'a', + } ) func TestOffsetCommitRequestV0(t *testing.T) { diff --git a/offset_commit_response_test.go b/offset_commit_response_test.go index 3c85713c7..f35ca54ec 100644 --- a/offset_commit_response_test.go +++ b/offset_commit_response_test.go @@ -5,10 +5,9 @@ import ( "testing" ) -var ( - emptyOffsetCommitResponse = []byte{ - 0x00, 0x00, 0x00, 0x00} -) +var emptyOffsetCommitResponse = []byte{ + 0x00, 0x00, 0x00, 0x00, +} func TestEmptyOffsetCommitResponse(t *testing.T) { response := OffsetCommitResponse{} diff --git a/offset_fetch_request_test.go b/offset_fetch_request_test.go index a5270dbeb..d4497a8b8 100644 --- a/offset_fetch_request_test.go +++ b/offset_fetch_request_test.go @@ -8,42 +8,50 @@ import ( var ( offsetFetchRequestNoGroupNoPartitions = []byte{ 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } offsetFetchRequestNoPartitionsV6 = []byte{ - 0x05, 'b', 'l', 'a', 'h', 0x01, 0x00} + 0x05, 'b', 'l', 'a', 'h', 0x01, 0x00, + } offsetFetchRequestNoPartitionsV7 = []byte{ - 0x05, 'b', 'l', 'a', 'h', 0x01, 0x01, 0x00} + 0x05, 'b', 'l', 'a', 'h', 0x01, 0x01, 0x00, + } offsetFetchRequestNoPartitions = []byte{ 0x00, 0x04, 'b', 'l', 'a', 'h', - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } offsetFetchRequestOnePartition = []byte{ 0x00, 0x04, 'b', 'l', 'a', 'h', 0x00, 0x00, 0x00, 0x01, 0x00, 0x0D, 't', 'o', 'p', 'i', 'c', 'T', 'h', 'e', 'F', 'i', 'r', 's', 't', 0x00, 0x00, 0x00, 0x01, - 0x4F, 0x4F, 0x4F, 0x4F} + 0x4F, 0x4F, 0x4F, 0x4F, + } offsetFetchRequestOnePartitionV6 = []byte{ 0x05, 'b', 'l', 'a', 'h', 0x02, 0x0E, 't', 'o', 'p', 'i', 'c', 'T', 'h', 'e', 'F', 'i', 'r', 's', 't', 0x02, 0x4F, 0x4F, 0x4F, 0x4F, - 0x00, 0x00} + 0x00, 0x00, + } offsetFetchRequestOnePartitionV7 = []byte{ 0x05, 'b', 'l', 'a', 'h', 0x02, 0x0E, 't', 'o', 'p', 'i', 'c', 'T', 'h', 'e', 'F', 'i', 'r', 's', 't', 0x02, 0x4F, 0x4F, 0x4F, 0x4F, - 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, + } offsetFetchRequestAllPartitions = []byte{ 0x00, 0x04, 'b', 'l', 'a', 'h', - 0xff, 0xff, 0xff, 0xff} + 0xff, 0xff, 0xff, 0xff, + } ) func TestOffsetFetchRequestNoPartitions(t *testing.T) { @@ -88,7 +96,7 @@ func TestOffsetFetchRequest(t *testing.T) { testRequest(t, fmt.Sprintf("one partition %d", version), request, offsetFetchRequestOnePartition) } - { //v6 + { // v6 version := 6 request := new(OffsetFetchRequest) request.Version = int16(version) @@ -97,7 +105,7 @@ func TestOffsetFetchRequest(t *testing.T) { testRequest(t, fmt.Sprintf("one partition %d", version), request, offsetFetchRequestOnePartitionV6) } - { //v7 + { // v7 version := 7 request := new(OffsetFetchRequest) request.Version = int16(version) diff --git a/offset_fetch_response_test.go b/offset_fetch_response_test.go index b1a6c3543..d70894ab2 100644 --- a/offset_fetch_response_test.go +++ b/offset_fetch_response_test.go @@ -7,20 +7,19 @@ import ( var ( emptyOffsetFetchResponse = []byte{ - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } emptyOffsetFetchResponseV2 = []byte{ 0x00, 0x00, 0x00, 0x00, - 0x00, 0x2A} + 0x00, 0x2A, + } emptyOffsetFetchResponseV3 = []byte{ 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x2A} - - emptyOffsetFetchResponseV6 = []byte{ - 0x00, 0x00, 0x00, 0x09, - 0x01, 0x00, 0x2A, 0x00} + 0x00, 0x2A, + } ) func TestEmptyOffsetFetchResponse(t *testing.T) { @@ -36,11 +35,6 @@ func TestEmptyOffsetFetchResponse(t *testing.T) { responseV3 := OffsetFetchResponse{Version: int16(version), Err: ErrInvalidRequest, ThrottleTimeMs: 9} testResponse(t, fmt.Sprintf("empty v%d", version), &responseV3, emptyOffsetFetchResponseV3) } - - for version := 6; version <= 7; version++ { - responseV6 := OffsetFetchResponse{Version: int16(version), Err: ErrInvalidRequest, ThrottleTimeMs: 9} - testResponse(t, fmt.Sprintf("empty v%d", version), &responseV6, emptyOffsetFetchResponseV6) - } } func TestNormalOffsetFetchResponse(t *testing.T) { @@ -67,10 +61,8 @@ func TestNormalOffsetFetchResponse(t *testing.T) { testResponse(t, fmt.Sprintf("Normal v%d", version), &responseV3, nil) } - for version := 5; version <= 7; version++ { - res := OffsetFetchResponse{Version: int16(version), Err: ErrInvalidRequest, ThrottleTimeMs: 9} - res.AddBlock("t", 0, &OffsetFetchResponseBlock{Offset: 10, LeaderEpoch: 100, Metadata: "md", Err: ErrRequestTimedOut}) - res.Blocks["m"] = nil - testResponse(t, fmt.Sprintf("normal V%d", version), &res, nil) - } + responseV5 := OffsetFetchResponse{Version: 5, Err: ErrInvalidRequest, ThrottleTimeMs: 9} + responseV5.AddBlock("t", 0, &OffsetFetchResponseBlock{Offset: 10, LeaderEpoch: 100, Metadata: "md", Err: ErrRequestTimedOut}) + responseV5.Blocks["m"] = nil + testResponse(t, "normal V5", &responseV5, nil) } diff --git a/offset_request_test.go b/offset_request_test.go index 0e6951a00..8ea533a6b 100644 --- a/offset_request_test.go +++ b/offset_request_test.go @@ -5,12 +5,14 @@ import "testing" var ( offsetRequestNoBlocksV1 = []byte{ 0xFF, 0xFF, 0xFF, 0xFF, - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } offsetRequestNoBlocksV2 = []byte{ 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00, - 0x00} + 0x00, + } offsetRequestOneBlock = []byte{ 0xFF, 0xFF, 0xFF, 0xFF, @@ -19,7 +21,8 @@ var ( 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, - 0x00, 0x00, 0x00, 0x02} + 0x00, 0x00, 0x00, 0x02, + } offsetRequestOneBlockV1 = []byte{ 0xFF, 0xFF, 0xFF, 0xFF, @@ -27,7 +30,8 @@ var ( 0x00, 0x03, 'b', 'a', 'r', 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01} + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, + } offsetRequestOneBlockReadCommittedV2 = []byte{ 0xFF, 0xFF, 0xFF, 0xFF, @@ -35,11 +39,13 @@ var ( 0x00, 0x03, 'b', 'a', 'r', 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01} + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, + } offsetRequestReplicaID = []byte{ 0x00, 0x00, 0x00, 0x2a, - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } ) func TestOffsetRequest(t *testing.T) { diff --git a/offset_response_test.go b/offset_response_test.go index a711a9864..683c11e43 100644 --- a/offset_response_test.go +++ b/offset_response_test.go @@ -1,12 +1,11 @@ package sarama -import ( - "testing" -) +import "testing" var ( emptyOffsetResponse = []byte{ - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } normalOffsetResponse = []byte{ 0x00, 0x00, 0x00, 0x02, @@ -20,7 +19,8 @@ var ( 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06} + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, + } normalOffsetResponseV1 = []byte{ 0x00, 0x00, 0x00, 0x02, @@ -33,21 +33,8 @@ var ( 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x01, 0x58, 0x1A, 0xE6, 0x48, 0x86, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06} - - normalOffsetResponseV2 = []byte{ - 0x00, 0x00, 0x00, 0x09, - 0x00, 0x00, 0x00, 0x02, - - 0x00, 0x01, 'a', - 0x00, 0x00, 0x00, 0x00, - - 0x00, 0x01, 'z', - 0x00, 0x00, 0x00, 0x01, - 0x00, 0x00, 0x00, 0x02, - 0x00, 0x00, - 0x00, 0x00, 0x01, 0x58, 0x1A, 0xE6, 0x48, 0x86, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06} + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, + } ) func TestEmptyOffsetResponse(t *testing.T) { @@ -125,37 +112,3 @@ func TestNormalOffsetResponseV1(t *testing.T) { t.Fatal("Decoding produced invalid offsets for topic z partition 2.") } } - -func TestNormalOffsetResponseV2(t *testing.T) { - response := OffsetResponse{} - - testVersionDecodable(t, "normal", &response, normalOffsetResponseV2, 2) // response should not change - - if response.ThrottleTimeMs != 9 { - t.Fatal("Decoding produced", response.ThrottleTimeMs, "throttle milliseconds where there were nine milliseconds.") - } - - if len(response.Blocks) != 2 { - t.Fatal("Decoding produced", len(response.Blocks), "topics where there were two.") - } - - if len(response.Blocks["a"]) != 0 { - t.Fatal("Decoding produced", len(response.Blocks["a"]), "partitions for topic 'a' where there were none.") - } - - if len(response.Blocks["z"]) != 1 { - t.Fatal("Decoding produced", len(response.Blocks["z"]), "partitions for topic 'z' where there was one.") - } - - if response.Blocks["z"][2].Err != ErrNoError { - t.Fatal("Decoding produced invalid error for topic z partition 2.") - } - - if response.Blocks["z"][2].Timestamp != 1477920049286 { - t.Fatal("Decoding produced invalid timestamp for topic z partition 2.", response.Blocks["z"][2].Timestamp) - } - - if response.Blocks["z"][2].Offset != 6 { - t.Fatal("Decoding produced invalid offsets for topic z partition 2.") - } -} diff --git a/produce_request_test.go b/produce_request_test.go index b9896eb61..e8684ffd2 100644 --- a/produce_request_test.go +++ b/produce_request_test.go @@ -9,12 +9,14 @@ var ( produceRequestEmpty = []byte{ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } produceRequestHeader = []byte{ 0x01, 0x23, 0x00, 0x00, 0x04, 0x44, - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } produceRequestOneMessage = []byte{ 0x01, 0x23, @@ -32,7 +34,8 @@ var ( 0x00, 0x00, 0xFF, 0xFF, 0xFF, 0xFF, - 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE} + 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE, + } produceRequestOneRecord = []byte{ 0xFF, 0xFF, // Transaction ID diff --git a/produce_response_test.go b/produce_response_test.go index 0dfac7947..584652221 100644 --- a/produce_response_test.go +++ b/produce_response_test.go @@ -8,7 +8,8 @@ import ( var ( produceResponseNoBlocksV0 = []byte{ - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } produceResponseManyBlocksVersions = map[int][]byte{ 0: { diff --git a/real_decoder.go b/real_decoder.go index bffec2f39..2482c6377 100644 --- a/real_decoder.go +++ b/real_decoder.go @@ -5,13 +5,15 @@ import ( "math" ) -var errInvalidArrayLength = PacketDecodingError{"invalid array length"} -var errInvalidByteSliceLength = PacketDecodingError{"invalid byteslice length"} -var errInvalidStringLength = PacketDecodingError{"invalid string length"} -var errVarintOverflow = PacketDecodingError{"varint overflow"} -var errUVarintOverflow = PacketDecodingError{"uvarint overflow"} -var errInvalidBool = PacketDecodingError{"invalid bool"} -var errUnsupportedTaggedFields = PacketDecodingError{"non-empty tagged fields are not supported yet"} +var ( + errInvalidArrayLength = PacketDecodingError{"invalid array length"} + errInvalidByteSliceLength = PacketDecodingError{"invalid byteslice length"} + errInvalidStringLength = PacketDecodingError{"invalid string length"} + errVarintOverflow = PacketDecodingError{"varint overflow"} + errUVarintOverflow = PacketDecodingError{"uvarint overflow"} + errInvalidBool = PacketDecodingError{"invalid bool"} + errUnsupportedTaggedFields = PacketDecodingError{"non-empty tagged fields are not supported yet"} +) type realDecoder struct { raw []byte @@ -176,7 +178,7 @@ func (rd *realDecoder) getCompactBytes() ([]byte, error) { return nil, err } - var length = int(n - 1) + length := int(n - 1) return rd.getRawBytes(length) } @@ -227,7 +229,7 @@ func (rd *realDecoder) getCompactString() (string, error) { return "", err } - var length = int(n - 1) + length := int(n - 1) tmpStr := string(rd.raw[rd.off : rd.off+length]) rd.off += length @@ -236,12 +238,11 @@ func (rd *realDecoder) getCompactString() (string, error) { func (rd *realDecoder) getCompactNullableString() (*string, error) { n, err := rd.getUVarint() - if err != nil { return nil, err } - var length = int(n - 1) + length := int(n - 1) if length < 0 { return nil, err diff --git a/record.go b/record.go index cdccfe322..a3fe8c061 100644 --- a/record.go +++ b/record.go @@ -11,7 +11,7 @@ const ( maximumRecordOverhead = 5*binary.MaxVarintLen32 + binary.MaxVarintLen64 + 1 ) -//RecordHeader stores key and value for a record header +// RecordHeader stores key and value for a record header type RecordHeader struct { Key []byte Value []byte @@ -35,7 +35,7 @@ func (h *RecordHeader) decode(pd packetDecoder) (err error) { return nil } -//Record is kafka record type +// Record is kafka record type type Record struct { Headers []*RecordHeader diff --git a/response_header.go b/response_header.go index 5dffb75be..fbcef0bfb 100644 --- a/response_header.go +++ b/response_header.go @@ -2,8 +2,10 @@ package sarama import "fmt" -const responseLengthSize = 4 -const correlationIDSize = 4 +const ( + responseLengthSize = 4 + correlationIDSize = 4 +) type responseHeader struct { length int32 diff --git a/response_header_test.go b/response_header_test.go index 31c35ae6a..c7c68eae7 100644 --- a/response_header_test.go +++ b/response_header_test.go @@ -5,11 +5,13 @@ import "testing" var ( responseHeaderBytesV0 = []byte{ 0x00, 0x00, 0x0f, 0x00, - 0x0a, 0xbb, 0xcc, 0xff} + 0x0a, 0xbb, 0xcc, 0xff, + } responseHeaderBytesV1 = []byte{ 0x00, 0x00, 0x0f, 0x00, - 0x0a, 0xbb, 0xcc, 0xff, 0x00} + 0x0a, 0xbb, 0xcc, 0xff, 0x00, + } ) func TestResponseHeaderV0(t *testing.T) { diff --git a/sasl_authenticate_request_test.go b/sasl_authenticate_request_test.go index fc059e077..bf75004d2 100644 --- a/sasl_authenticate_request_test.go +++ b/sasl_authenticate_request_test.go @@ -2,11 +2,9 @@ package sarama import "testing" -var ( - saslAuthenticateRequest = []byte{ - 0, 0, 0, 3, 'f', 'o', 'o', - } -) +var saslAuthenticateRequest = []byte{ + 0, 0, 0, 3, 'f', 'o', 'o', +} func TestSaslAuthenticateRequest(t *testing.T) { request := new(SaslAuthenticateRequest) diff --git a/sasl_authenticate_response_test.go b/sasl_authenticate_response_test.go index c555cfbfa..048dade19 100644 --- a/sasl_authenticate_response_test.go +++ b/sasl_authenticate_response_test.go @@ -2,13 +2,11 @@ package sarama import "testing" -var ( - saslAuthenticatResponseErr = []byte{ - 0, 58, - 0, 3, 'e', 'r', 'r', - 0, 0, 0, 3, 'm', 's', 'g', - } -) +var saslAuthenticatResponseErr = []byte{ + 0, 58, + 0, 3, 'e', 'r', 'r', + 0, 0, 0, 3, 'm', 's', 'g', +} func TestSaslAuthenticateResponse(t *testing.T) { response := new(SaslAuthenticateResponse) diff --git a/sasl_handshake_request_test.go b/sasl_handshake_request_test.go index 34f954c14..e100ad5b9 100644 --- a/sasl_handshake_request_test.go +++ b/sasl_handshake_request_test.go @@ -2,11 +2,9 @@ package sarama import "testing" -var ( - baseSaslRequest = []byte{ - 0, 3, 'f', 'o', 'o', // Mechanism - } -) +var baseSaslRequest = []byte{ + 0, 3, 'f', 'o', 'o', // Mechanism +} func TestSaslHandshakeRequest(t *testing.T) { request := new(SaslHandshakeRequest) diff --git a/sasl_handshake_response_test.go b/sasl_handshake_response_test.go index 0ebc27a1c..40441fd85 100644 --- a/sasl_handshake_response_test.go +++ b/sasl_handshake_response_test.go @@ -2,13 +2,11 @@ package sarama import "testing" -var ( - saslHandshakeResponse = []byte{ - 0x00, 0x00, - 0x00, 0x00, 0x00, 0x01, - 0x00, 0x03, 'f', 'o', 'o', - } -) +var saslHandshakeResponse = []byte{ + 0x00, 0x00, + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x03, 'f', 'o', 'o', +} func TestSaslHandshakeResponse(t *testing.T) { response := new(SaslHandshakeResponse) diff --git a/sticky_assignor_user_data.go b/sticky_assignor_user_data.go index bb0c82c34..161233fc3 100644 --- a/sticky_assignor_user_data.go +++ b/sticky_assignor_user_data.go @@ -11,7 +11,7 @@ type StickyAssignorUserData interface { generation() int } -//StickyAssignorUserDataV0 holds topic partition information for an assignment +// StickyAssignorUserDataV0 holds topic partition information for an assignment type StickyAssignorUserDataV0 struct { Topics map[string][]int32 @@ -58,7 +58,7 @@ func (m *StickyAssignorUserDataV0) partitions() []topicPartitionAssignment { ret func (m *StickyAssignorUserDataV0) hasGeneration() bool { return false } func (m *StickyAssignorUserDataV0) generation() int { return defaultGeneration } -//StickyAssignorUserDataV1 holds topic partition information for an assignment +// StickyAssignorUserDataV1 holds topic partition information for an assignment type StickyAssignorUserDataV1 struct { Topics map[string][]int32 Generation int32 diff --git a/txn_offset_commit_request_test.go b/txn_offset_commit_request_test.go index f60d41723..714ec47eb 100644 --- a/txn_offset_commit_request_test.go +++ b/txn_offset_commit_request_test.go @@ -2,20 +2,18 @@ package sarama import "testing" -var ( - txnOffsetCommitRequest = []byte{ - 0, 3, 't', 'x', 'n', - 0, 7, 'g', 'r', 'o', 'u', 'p', 'i', 'd', - 0, 0, 0, 0, 0, 0, 31, 64, // producer ID - 0, 1, // producer epoch - 0, 0, 0, 1, // 1 topic - 0, 5, 't', 'o', 'p', 'i', 'c', - 0, 0, 0, 1, // 1 partition - 0, 0, 0, 2, // partition no 2 - 0, 0, 0, 0, 0, 0, 0, 123, - 255, 255, // no meta data - } -) +var txnOffsetCommitRequest = []byte{ + 0, 3, 't', 'x', 'n', + 0, 7, 'g', 'r', 'o', 'u', 'p', 'i', 'd', + 0, 0, 0, 0, 0, 0, 31, 64, // producer ID + 0, 1, // producer epoch + 0, 0, 0, 1, // 1 topic + 0, 5, 't', 'o', 'p', 'i', 'c', + 0, 0, 0, 1, // 1 partition + 0, 0, 0, 2, // partition no 2 + 0, 0, 0, 0, 0, 0, 0, 123, + 255, 255, // no meta data +} func TestTxnOffsetCommitRequest(t *testing.T) { req := &TxnOffsetCommitRequest{ diff --git a/txn_offset_commit_response_test.go b/txn_offset_commit_response_test.go index 2a66ad3b4..b4caa69b9 100644 --- a/txn_offset_commit_response_test.go +++ b/txn_offset_commit_response_test.go @@ -5,16 +5,14 @@ import ( "time" ) -var ( - txnOffsetCommitResponse = []byte{ - 0, 0, 0, 100, - 0, 0, 0, 1, // 1 topic - 0, 5, 't', 'o', 'p', 'i', 'c', - 0, 0, 0, 1, // 1 partition response - 0, 0, 0, 2, // partition number 2 - 0, 47, // err - } -) +var txnOffsetCommitResponse = []byte{ + 0, 0, 0, 100, + 0, 0, 0, 1, // 1 topic + 0, 5, 't', 'o', 'p', 'i', 'c', + 0, 0, 0, 1, // 1 partition response + 0, 0, 0, 2, // partition number 2 + 0, 47, // err +} func TestTxnOffsetCommitResponse(t *testing.T) { resp := &TxnOffsetCommitResponse{ diff --git a/utils.go b/utils.go index 76daaf8fe..1859d29c2 100644 --- a/utils.go +++ b/utils.go @@ -199,7 +199,7 @@ var ( DefaultVersion = V1_0_0_0 ) -//ParseKafkaVersion parses and returns kafka version or error from a string +// ParseKafkaVersion parses and returns kafka version or error from a string func ParseKafkaVersion(s string) (KafkaVersion, error) { if len(s) < 5 { return DefaultVersion, fmt.Errorf("invalid version `%s`", s) diff --git a/zstd.go b/zstd.go index c87241f49..e23bfc477 100644 --- a/zstd.go +++ b/zstd.go @@ -4,8 +4,10 @@ import ( "github.com/klauspost/compress/zstd" ) -var zstdDec, _ = zstd.NewReader(nil) -var zstdEnc, _ = zstd.NewWriter(nil, zstd.WithZeroFrames(true)) +var ( + zstdDec, _ = zstd.NewReader(nil) + zstdEnc, _ = zstd.NewWriter(nil, zstd.WithZeroFrames(true)) +) func zstdDecompress(dst, src []byte) ([]byte, error) { return zstdDec.DecodeAll(src, dst)