From 1aac8e5424614233887c5600e8ab4258c89c6b80 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Thu, 6 May 2021 00:44:27 +0100 Subject: [PATCH 1/3] fix(consumer): follow preferred broker Historically (before protocol version 11) if we attempted to consume from a follower, we would get a NotLeaderForPartition response and move our consumer to the new leader. However, since v11 the Kafka broker treats us just like any other follower and permits us to consume from any replica and it is up to us to monitor metadata to determine when the leadership has changed. Modifying the handleResponse func to check the topic partition leadership against the current broker (in the absence of a preferredReadReplica) and trigger a re-create of the consumer for that partition Contributes-to: #1927 --- consumer.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/consumer.go b/consumer.go index 9bd8d1820..61c9bc57a 100644 --- a/consumer.go +++ b/consumer.go @@ -839,10 +839,12 @@ func (bc *brokerConsumer) handleResponses() { child.responseResult = nil if result == nil { - if child.preferredReadReplica >= 0 && bc.broker.ID() != child.preferredReadReplica { - // not an error but needs redispatching to consume from prefered replica - child.trigger <- none{} - delete(bc.subscriptions, child) + if preferredBroker, err := child.preferredBroker(); err == nil { + if bc.broker.ID() != preferredBroker.ID() { + // not an error but needs redispatching to consume from prefered replica + child.trigger <- none{} + delete(bc.subscriptions, child) + } } continue } From 6798d09adc99d2b771287214493c2f692ee26601 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Fri, 30 Apr 2021 12:31:58 +0100 Subject: [PATCH 2/3] chore: use gofumpt to tidyup formatting gofumpt is a stricter but compatible gofmt: https://github.com/mvdan/gofumpt Run it across the repo just to tidy things up a little more. --- acl_bindings.go | 6 +- acl_create_request.go | 4 +- acl_create_request_test.go | 46 ++-- acl_create_response.go | 4 +- acl_delete_request.go | 2 +- acl_delete_response.go | 6 +- acl_delete_response_test.go | 32 ++- acl_describe_request.go | 2 +- acl_describe_response.go | 2 +- acl_filter.go | 1 - acl_types.go | 16 +- acl_types_test.go | 2 + add_offsets_to_txn_request.go | 2 +- add_offsets_to_txn_request_test.go | 14 +- add_offsets_to_txn_response.go | 2 +- add_offsets_to_txn_response_test.go | 10 +- add_partitions_to_txn_request.go | 2 +- add_partitions_to_txn_request_test.go | 18 +- add_partitions_to_txn_response.go | 4 +- add_partitions_to_txn_response_test.go | 18 +- admin.go | 6 +- admin_test.go | 8 +- alter_configs_request.go | 4 +- alter_configs_request_test.go | 6 +- alter_configs_response.go | 4 +- alter_configs_response_test.go | 8 +- ...r_partition_reassignments_response_test.go | 2 +- api_versions_request.go | 5 +- api_versions_request_test.go | 4 +- api_versions_response.go | 4 +- api_versions_response_test.go | 16 +- async_producer.go | 1 - async_producer_test.go | 10 +- balance_strategy.go | 1 + balance_strategy_test.go | 1 + broker.go | 92 ++++---- broker_test.go | 100 +++++--- client.go | 4 +- config_test.go | 217 ++++++++++++------ consumer.go | 5 +- consumer_group.go | 2 +- consumer_group_members.go | 4 +- consumer_metadata_request.go | 2 +- consumer_metadata_request_test.go | 6 +- consumer_metadata_response.go | 2 +- consumer_metadata_response_test.go | 6 +- control_record.go | 8 +- crc32_field.go | 1 + describe_configs_request.go | 1 - describe_configs_request_test.go | 6 +- describe_configs_response.go | 2 +- describe_configs_response_test.go | 42 ++-- end_txn_request_test.go | 14 +- end_txn_response_test.go | 10 +- errors.go | 4 +- examples/consumergroup/main.go | 1 - examples/http_server/http_server.go | 7 +- examples/sasl_scram_client/main.go | 1 - examples/sasl_scram_client/scram_client.go | 6 +- fetch_request_test.go | 12 +- fetch_response_test.go | 18 +- functional_consumer_group_test.go | 2 + functional_producer_test.go | 4 + functional_test.go | 2 +- gssapi_kerberos.go | 1 - heartbeat_request_test.go | 12 +- heartbeat_response_test.go | 7 +- leave_group_request_test.go | 10 +- list_groups_request.go | 3 +- list_partition_reassignments_request_test.go | 18 +- list_partition_reassignments_response_test.go | 28 ++- message.go | 12 +- message_test.go | 24 +- metadata_response_test.go | 18 +- mockkerberos.go | 6 +- mockresponses.go | 12 +- mocks/consumer.go | 2 +- offset_commit_request_test.go | 18 +- offset_commit_response_test.go | 7 +- offset_fetch_request_test.go | 28 ++- offset_fetch_response_test.go | 28 +-- offset_request_test.go | 18 +- offset_response_test.go | 61 +---- produce_request_test.go | 9 +- produce_response_test.go | 3 +- real_decoder.go | 23 +- record.go | 4 +- response_header.go | 6 +- response_header_test.go | 6 +- sasl_authenticate_request_test.go | 8 +- sasl_authenticate_response_test.go | 12 +- sasl_handshake_request_test.go | 8 +- sasl_handshake_response_test.go | 12 +- sticky_assignor_user_data.go | 4 +- txn_offset_commit_request_test.go | 26 +-- txn_offset_commit_response_test.go | 18 +- utils.go | 2 +- zstd.go | 6 +- 98 files changed, 690 insertions(+), 624 deletions(-) diff --git a/acl_bindings.go b/acl_bindings.go index 50b689d1d..13440be67 100644 --- a/acl_bindings.go +++ b/acl_bindings.go @@ -1,6 +1,6 @@ package sarama -//Resource holds information about acl resource type +// Resource holds information about acl resource type type Resource struct { ResourceType AclResourceType ResourceName string @@ -46,7 +46,7 @@ func (r *Resource) decode(pd packetDecoder, version int16) (err error) { return nil } -//Acl holds information about acl type +// Acl holds information about acl type type Acl struct { Principal string Host string @@ -93,7 +93,7 @@ func (a *Acl) decode(pd packetDecoder, version int16) (err error) { return nil } -//ResourceAcls is an acl resource type +// ResourceAcls is an acl resource type type ResourceAcls struct { Resource Acls []*Acl diff --git a/acl_create_request.go b/acl_create_request.go index 6d8a70e1a..449102f74 100644 --- a/acl_create_request.go +++ b/acl_create_request.go @@ -1,6 +1,6 @@ package sarama -//CreateAclsRequest is an acl creation request +// CreateAclsRequest is an acl creation request type CreateAclsRequest struct { Version int16 AclCreations []*AclCreation @@ -60,7 +60,7 @@ func (c *CreateAclsRequest) requiredVersion() KafkaVersion { } } -//AclCreation is a wrapper around Resource and Acl type +// AclCreation is a wrapper around Resource and Acl type type AclCreation struct { Resource Acl diff --git a/acl_create_request_test.go b/acl_create_request_test.go index 8dd2a5f3e..1c636d01f 100644 --- a/acl_create_request_test.go +++ b/acl_create_request_test.go @@ -27,17 +27,19 @@ var ( func TestCreateAclsRequestv0(t *testing.T) { req := &CreateAclsRequest{ Version: 0, - AclCreations: []*AclCreation{{ - Resource: Resource{ - ResourceType: AclResourceGroup, - ResourceName: "group", + AclCreations: []*AclCreation{ + { + Resource: Resource{ + ResourceType: AclResourceGroup, + ResourceName: "group", + }, + Acl: Acl{ + Principal: "principal", + Host: "host", + Operation: AclOperationAll, + PermissionType: AclPermissionDeny, + }, }, - Acl: Acl{ - Principal: "principal", - Host: "host", - Operation: AclOperationAll, - PermissionType: AclPermissionDeny, - }}, }, } @@ -47,18 +49,20 @@ func TestCreateAclsRequestv0(t *testing.T) { func TestCreateAclsRequestv1(t *testing.T) { req := &CreateAclsRequest{ Version: 1, - AclCreations: []*AclCreation{{ - Resource: Resource{ - ResourceType: AclResourceGroup, - ResourceName: "group", - ResourcePatternType: AclPatternLiteral, + AclCreations: []*AclCreation{ + { + Resource: Resource{ + ResourceType: AclResourceGroup, + ResourceName: "group", + ResourcePatternType: AclPatternLiteral, + }, + Acl: Acl{ + Principal: "principal", + Host: "host", + Operation: AclOperationAll, + PermissionType: AclPermissionDeny, + }, }, - Acl: Acl{ - Principal: "principal", - Host: "host", - Operation: AclOperationAll, - PermissionType: AclPermissionDeny, - }}, }, } diff --git a/acl_create_response.go b/acl_create_response.go index 14b1b9e13..21d6c340c 100644 --- a/acl_create_response.go +++ b/acl_create_response.go @@ -2,7 +2,7 @@ package sarama import "time" -//CreateAclsResponse is a an acl response creation type +// CreateAclsResponse is a an acl response creation type type CreateAclsResponse struct { ThrottleTime time.Duration AclCreationResponses []*AclCreationResponse @@ -63,7 +63,7 @@ func (c *CreateAclsResponse) requiredVersion() KafkaVersion { return V0_11_0_0 } -//AclCreationResponse is an acl creation response type +// AclCreationResponse is an acl creation response type type AclCreationResponse struct { Err KError ErrMsg *string diff --git a/acl_delete_request.go b/acl_delete_request.go index 6efb44303..5e5c03bc2 100644 --- a/acl_delete_request.go +++ b/acl_delete_request.go @@ -1,6 +1,6 @@ package sarama -//DeleteAclsRequest is a delete acl request +// DeleteAclsRequest is a delete acl request type DeleteAclsRequest struct { Version int Filters []*AclFilter diff --git a/acl_delete_response.go b/acl_delete_response.go index cb6308826..cd33749d5 100644 --- a/acl_delete_response.go +++ b/acl_delete_response.go @@ -2,7 +2,7 @@ package sarama import "time" -//DeleteAclsResponse is a delete acl response +// DeleteAclsResponse is a delete acl response type DeleteAclsResponse struct { Version int16 ThrottleTime time.Duration @@ -64,7 +64,7 @@ func (d *DeleteAclsResponse) requiredVersion() KafkaVersion { return V0_11_0_0 } -//FilterResponse is a filter response type +// FilterResponse is a filter response type type FilterResponse struct { Err KError ErrMsg *string @@ -115,7 +115,7 @@ func (f *FilterResponse) decode(pd packetDecoder, version int16) (err error) { return nil } -//MatchingAcl is a matching acl type +// MatchingAcl is a matching acl type type MatchingAcl struct { Err KError ErrMsg *string diff --git a/acl_delete_response_test.go b/acl_delete_response_test.go index 0d9dea684..eb57d68a5 100644 --- a/acl_delete_response_test.go +++ b/acl_delete_response_test.go @@ -5,23 +5,21 @@ import ( "time" ) -var ( - deleteAclsResponse = []byte{ - 0, 0, 0, 100, - 0, 0, 0, 1, - 0, 0, // no error - 255, 255, // no error message - 0, 0, 0, 1, // 1 matching acl - 0, 0, // no error - 255, 255, // no error message - 2, // resource type - 0, 5, 't', 'o', 'p', 'i', 'c', - 0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l', - 0, 4, 'h', 'o', 's', 't', - 4, - 3, - } -) +var deleteAclsResponse = []byte{ + 0, 0, 0, 100, + 0, 0, 0, 1, + 0, 0, // no error + 255, 255, // no error message + 0, 0, 0, 1, // 1 matching acl + 0, 0, // no error + 255, 255, // no error message + 2, // resource type + 0, 5, 't', 'o', 'p', 'i', 'c', + 0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l', + 0, 4, 'h', 'o', 's', 't', + 4, + 3, +} func TestDeleteAclsResponse(t *testing.T) { resp := &DeleteAclsResponse{ diff --git a/acl_describe_request.go b/acl_describe_request.go index 29841a5ce..e0fe9023a 100644 --- a/acl_describe_request.go +++ b/acl_describe_request.go @@ -1,6 +1,6 @@ package sarama -//DescribeAclsRequest is a secribe acl request type +// DescribeAclsRequest is a secribe acl request type type DescribeAclsRequest struct { Version int AclFilter diff --git a/acl_describe_response.go b/acl_describe_response.go index c43408b24..3255fd485 100644 --- a/acl_describe_response.go +++ b/acl_describe_response.go @@ -2,7 +2,7 @@ package sarama import "time" -//DescribeAclsResponse is a describe acl response type +// DescribeAclsResponse is a describe acl response type type DescribeAclsResponse struct { Version int16 ThrottleTime time.Duration diff --git a/acl_filter.go b/acl_filter.go index fad555875..b380161aa 100644 --- a/acl_filter.go +++ b/acl_filter.go @@ -46,7 +46,6 @@ func (a *AclFilter) decode(pd packetDecoder, version int16) (err error) { if a.Version == 1 { pattern, err := pd.getInt8() - if err != nil { return err } diff --git a/acl_types.go b/acl_types.go index f3e6e79f4..16a21fc3f 100644 --- a/acl_types.go +++ b/acl_types.go @@ -55,12 +55,12 @@ func (a *AclOperation) String() string { return s } -//MarshalText returns the text form of the AclOperation (name without prefix) +// MarshalText returns the text form of the AclOperation (name without prefix) func (a *AclOperation) MarshalText() ([]byte, error) { return []byte(a.String()), nil } -//UnmarshalText takes a text reprentation of the operation and converts it to an AclOperation +// UnmarshalText takes a text reprentation of the operation and converts it to an AclOperation func (a *AclOperation) UnmarshalText(text []byte) error { normalized := strings.ToLower(string(text)) mapping := map[string]AclOperation{ @@ -109,12 +109,12 @@ func (a *AclPermissionType) String() string { return s } -//MarshalText returns the text form of the AclPermissionType (name without prefix) +// MarshalText returns the text form of the AclPermissionType (name without prefix) func (a *AclPermissionType) MarshalText() ([]byte, error) { return []byte(a.String()), nil } -//UnmarshalText takes a text reprentation of the permission type and converts it to an AclPermissionType +// UnmarshalText takes a text reprentation of the permission type and converts it to an AclPermissionType func (a *AclPermissionType) UnmarshalText(text []byte) error { normalized := strings.ToLower(string(text)) mapping := map[string]AclPermissionType{ @@ -159,12 +159,12 @@ func (a *AclResourceType) String() string { return s } -//MarshalText returns the text form of the AclResourceType (name without prefix) +// MarshalText returns the text form of the AclResourceType (name without prefix) func (a *AclResourceType) MarshalText() ([]byte, error) { return []byte(a.String()), nil } -//UnmarshalText takes a text reprentation of the resource type and converts it to an AclResourceType +// UnmarshalText takes a text reprentation of the resource type and converts it to an AclResourceType func (a *AclResourceType) UnmarshalText(text []byte) error { normalized := strings.ToLower(string(text)) mapping := map[string]AclResourceType{ @@ -209,12 +209,12 @@ func (a *AclResourcePatternType) String() string { return s } -//MarshalText returns the text form of the AclResourcePatternType (name without prefix) +// MarshalText returns the text form of the AclResourcePatternType (name without prefix) func (a *AclResourcePatternType) MarshalText() ([]byte, error) { return []byte(a.String()), nil } -//UnmarshalText takes a text reprentation of the resource pattern type and converts it to an AclResourcePatternType +// UnmarshalText takes a text reprentation of the resource pattern type and converts it to an AclResourcePatternType func (a *AclResourcePatternType) UnmarshalText(text []byte) error { normalized := strings.ToLower(string(text)) mapping := map[string]AclResourcePatternType{ diff --git a/acl_types_test.go b/acl_types_test.go index 295b7e2bd..0b5247a14 100644 --- a/acl_types_test.go +++ b/acl_types_test.go @@ -37,6 +37,7 @@ func TestAclPermissionTypeTextMarshal(t *testing.T) { } } } + func TestAclResourceTypeTextMarshal(t *testing.T) { for i := AclResourceUnknown; i <= AclResourceTransactionalID; i++ { text, err := i.MarshalText() @@ -53,6 +54,7 @@ func TestAclResourceTypeTextMarshal(t *testing.T) { } } } + func TestAclResourcePatternTypeTextMarshal(t *testing.T) { for i := AclPatternUnknown; i <= AclPatternPrefixed; i++ { text, err := i.MarshalText() diff --git a/add_offsets_to_txn_request.go b/add_offsets_to_txn_request.go index 95586f9a1..a96af9341 100644 --- a/add_offsets_to_txn_request.go +++ b/add_offsets_to_txn_request.go @@ -1,6 +1,6 @@ package sarama -//AddOffsetsToTxnRequest adds offsets to a transaction request +// AddOffsetsToTxnRequest adds offsets to a transaction request type AddOffsetsToTxnRequest struct { TransactionalID string ProducerID int64 diff --git a/add_offsets_to_txn_request_test.go b/add_offsets_to_txn_request_test.go index e96b3d33f..471d085cd 100644 --- a/add_offsets_to_txn_request_test.go +++ b/add_offsets_to_txn_request_test.go @@ -2,14 +2,12 @@ package sarama import "testing" -var ( - addOffsetsToTxnRequest = []byte{ - 0, 3, 't', 'x', 'n', - 0, 0, 0, 0, 0, 0, 31, 64, - 0, 0, - 0, 7, 'g', 'r', 'o', 'u', 'p', 'i', 'd', - } -) +var addOffsetsToTxnRequest = []byte{ + 0, 3, 't', 'x', 'n', + 0, 0, 0, 0, 0, 0, 31, 64, + 0, 0, + 0, 7, 'g', 'r', 'o', 'u', 'p', 'i', 'd', +} func TestAddOffsetsToTxnRequest(t *testing.T) { req := &AddOffsetsToTxnRequest{ diff --git a/add_offsets_to_txn_response.go b/add_offsets_to_txn_response.go index bdb184419..bb61973d1 100644 --- a/add_offsets_to_txn_response.go +++ b/add_offsets_to_txn_response.go @@ -4,7 +4,7 @@ import ( "time" ) -//AddOffsetsToTxnResponse is a response type for adding offsets to txns +// AddOffsetsToTxnResponse is a response type for adding offsets to txns type AddOffsetsToTxnResponse struct { ThrottleTime time.Duration Err KError diff --git a/add_offsets_to_txn_response_test.go b/add_offsets_to_txn_response_test.go index 4504966fe..d1730cee4 100644 --- a/add_offsets_to_txn_response_test.go +++ b/add_offsets_to_txn_response_test.go @@ -5,12 +5,10 @@ import ( "time" ) -var ( - addOffsetsToTxnResponse = []byte{ - 0, 0, 0, 100, - 0, 47, - } -) +var addOffsetsToTxnResponse = []byte{ + 0, 0, 0, 100, + 0, 47, +} func TestAddOffsetsToTxnResponse(t *testing.T) { resp := &AddOffsetsToTxnResponse{ diff --git a/add_partitions_to_txn_request.go b/add_partitions_to_txn_request.go index 6289f4514..57ecf6488 100644 --- a/add_partitions_to_txn_request.go +++ b/add_partitions_to_txn_request.go @@ -1,6 +1,6 @@ package sarama -//AddPartitionsToTxnRequest is a add paartition request +// AddPartitionsToTxnRequest is a add paartition request type AddPartitionsToTxnRequest struct { TransactionalID string ProducerID int64 diff --git a/add_partitions_to_txn_request_test.go b/add_partitions_to_txn_request_test.go index f8dfb27d0..f60a88695 100644 --- a/add_partitions_to_txn_request_test.go +++ b/add_partitions_to_txn_request_test.go @@ -2,16 +2,14 @@ package sarama import "testing" -var ( - addPartitionsToTxnRequest = []byte{ - 0, 3, 't', 'x', 'n', - 0, 0, 0, 0, 0, 0, 31, 64, // ProducerID - 0, 0, 0, 0, // ProducerEpoch - 0, 1, // 1 topic - 0, 5, 't', 'o', 'p', 'i', 'c', - 0, 0, 0, 1, 0, 0, 0, 1, - } -) +var addPartitionsToTxnRequest = []byte{ + 0, 3, 't', 'x', 'n', + 0, 0, 0, 0, 0, 0, 31, 64, // ProducerID + 0, 0, 0, 0, // ProducerEpoch + 0, 1, // 1 topic + 0, 5, 't', 'o', 'p', 'i', 'c', + 0, 0, 0, 1, 0, 0, 0, 1, +} func TestAddPartitionsToTxnRequest(t *testing.T) { req := &AddPartitionsToTxnRequest{ diff --git a/add_partitions_to_txn_response.go b/add_partitions_to_txn_response.go index 73b73b07f..098956507 100644 --- a/add_partitions_to_txn_response.go +++ b/add_partitions_to_txn_response.go @@ -4,7 +4,7 @@ import ( "time" ) -//AddPartitionsToTxnResponse is a partition errors to transaction type +// AddPartitionsToTxnResponse is a partition errors to transaction type type AddPartitionsToTxnResponse struct { ThrottleTime time.Duration Errors map[string][]*PartitionError @@ -87,7 +87,7 @@ func (a *AddPartitionsToTxnResponse) requiredVersion() KafkaVersion { return V0_11_0_0 } -//PartitionError is a partition error type +// PartitionError is a partition error type type PartitionError struct { Partition int32 Err KError diff --git a/add_partitions_to_txn_response_test.go b/add_partitions_to_txn_response_test.go index 9cf93e821..b3635e58d 100644 --- a/add_partitions_to_txn_response_test.go +++ b/add_partitions_to_txn_response_test.go @@ -5,16 +5,14 @@ import ( "time" ) -var ( - addPartitionsToTxnResponse = []byte{ - 0, 0, 0, 100, - 0, 0, 0, 1, - 0, 5, 't', 'o', 'p', 'i', 'c', - 0, 0, 0, 1, // 1 partition error - 0, 0, 0, 2, // partition 2 - 0, 48, // error - } -) +var addPartitionsToTxnResponse = []byte{ + 0, 0, 0, 100, + 0, 0, 0, 1, + 0, 5, 't', 'o', 'p', 'i', 'c', + 0, 0, 0, 1, // 1 partition error + 0, 0, 0, 2, // partition 2 + 0, 48, // error +} func TestAddPartitionsToTxnResponse(t *testing.T) { resp := &AddPartitionsToTxnResponse{ diff --git a/admin.go b/admin.go index 618981f5e..abe18b19f 100644 --- a/admin.go +++ b/admin.go @@ -134,7 +134,7 @@ func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) { // NewClusterAdminFromClient creates a new ClusterAdmin using the given client. // Note that underlying client will also be closed on admin's Close() call. func NewClusterAdminFromClient(client Client) (ClusterAdmin, error) { - //make sure we can retrieve the controller + // make sure we can retrieve the controller _, err := client.Controller() if err != nil { return nil, err @@ -592,8 +592,8 @@ func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]i if len(errs) > 0 { return ErrDeleteRecords{MultiError{&errs}} } - //todo since we are dealing with couple of partitions it would be good if we return slice of errors - //for each partition instead of one error + // todo since we are dealing with couple of partitions it would be good if we return slice of errors + // for each partition instead of one error return nil } diff --git a/admin_test.go b/admin_test.go index 754003f68..4ab91c05a 100644 --- a/admin_test.go +++ b/admin_test.go @@ -357,7 +357,7 @@ func TestClusterAdminAlterPartitionReassignments(t *testing.T) { t.Fatal(err) } - var topicAssignment = make([][]int32, 0, 3) + topicAssignment := make([][]int32, 0, 3) err = admin.AlterPartitionReassignments("my_topic", topicAssignment) if err != nil { @@ -395,7 +395,7 @@ func TestClusterAdminAlterPartitionReassignmentsWithDiffVersion(t *testing.T) { t.Fatal(err) } - var topicAssignment = make([][]int32, 0, 3) + topicAssignment := make([][]int32, 0, 3) err = admin.AlterPartitionReassignments("my_topic", topicAssignment) @@ -479,7 +479,7 @@ func TestClusterAdminListPartitionReassignmentsWithDiffVersion(t *testing.T) { t.Fatal(err) } - var partitions = make([]int32, 0) + partitions := make([]int32, 0) _, err = admin.ListPartitionReassignments("my_topic", partitions) @@ -649,7 +649,7 @@ func TestClusterAdminDescribeConfig(t *testing.T) { "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t), }) - var tests = []struct { + tests := []struct { saramaVersion KafkaVersion requestVersion int16 includeSynonyms bool diff --git a/alter_configs_request.go b/alter_configs_request.go index c88bb604a..8b94b1f3f 100644 --- a/alter_configs_request.go +++ b/alter_configs_request.go @@ -1,12 +1,12 @@ package sarama -//AlterConfigsRequest is an alter config request type +// AlterConfigsRequest is an alter config request type type AlterConfigsRequest struct { Resources []*AlterConfigsResource ValidateOnly bool } -//AlterConfigsResource is an alter config resource type +// AlterConfigsResource is an alter config resource type type AlterConfigsResource struct { Type ConfigResourceType Name string diff --git a/alter_configs_request_test.go b/alter_configs_request_test.go index 73c8a622d..e612244c6 100644 --- a/alter_configs_request_test.go +++ b/alter_configs_request_test.go @@ -12,7 +12,7 @@ var ( 0, 0, 0, 1, // 1 config 2, // a topic 0, 3, 'f', 'o', 'o', // topic name: foo - 0, 0, 0, 1, //1 config name + 0, 0, 0, 1, // 1 config name 0, 10, // 10 chars 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's', 0, 4, @@ -24,14 +24,14 @@ var ( 0, 0, 0, 2, // 2 config 2, // a topic 0, 3, 'f', 'o', 'o', // topic name: foo - 0, 0, 0, 1, //1 config name + 0, 0, 0, 1, // 1 config name 0, 10, // 10 chars 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's', 0, 4, '1', '0', '0', '0', 2, // a topic 0, 3, 'b', 'a', 'r', // topic name: foo - 0, 0, 0, 1, //2 config + 0, 0, 0, 1, // 2 config 0, 12, // 12 chars 'r', 'e', 't', 'e', 'n', 't', 'i', 'o', 'n', '.', 'm', 's', 0, 4, diff --git a/alter_configs_response.go b/alter_configs_response.go index 3266f9274..89065ab11 100644 --- a/alter_configs_response.go +++ b/alter_configs_response.go @@ -2,13 +2,13 @@ package sarama import "time" -//AlterConfigsResponse is a response type for alter config +// AlterConfigsResponse is a response type for alter config type AlterConfigsResponse struct { ThrottleTime time.Duration Resources []*AlterConfigsResourceResponse } -//AlterConfigsResourceResponse is a response type for alter config resource +// AlterConfigsResourceResponse is a response type for alter config resource type AlterConfigsResourceResponse struct { ErrorCode int16 ErrorMsg string diff --git a/alter_configs_response_test.go b/alter_configs_response_test.go index 9f44dd95b..62cd5991f 100644 --- a/alter_configs_response_test.go +++ b/alter_configs_response_test.go @@ -6,15 +6,15 @@ import ( var ( alterResponseEmpty = []byte{ - 0, 0, 0, 0, //throttle + 0, 0, 0, 0, // throttle 0, 0, 0, 0, // no configs } alterResponsePopulated = []byte{ - 0, 0, 0, 0, //throttle + 0, 0, 0, 0, // throttle 0, 0, 0, 1, // response - 0, 0, //errorcode - 0, 0, //string + 0, 0, // errorcode + 0, 0, // string 2, // topic 0, 3, 'f', 'o', 'o', } diff --git a/alter_partition_reassignments_response_test.go b/alter_partition_reassignments_response_test.go index 25f1fd792..8c39ef6ce 100644 --- a/alter_partition_reassignments_response_test.go +++ b/alter_partition_reassignments_response_test.go @@ -19,7 +19,7 @@ var ( 6, 116, 111, 112, 105, 99, // topic name "topic" 2, // partition array length 1 0, 0, 0, 1, // partitionId - 0, 3, //kerror + 0, 3, // kerror 7, 101, 114, 114, 111, 114, 50, // error string "error2" 0, 0, 0, // empty tagged fields } diff --git a/api_versions_request.go b/api_versions_request.go index d67c5e1e5..bee92c0e7 100644 --- a/api_versions_request.go +++ b/api_versions_request.go @@ -1,8 +1,7 @@ package sarama -//ApiVersionsRequest ... -type ApiVersionsRequest struct { -} +// ApiVersionsRequest ... +type ApiVersionsRequest struct{} func (a *ApiVersionsRequest) encode(pe packetEncoder) error { return nil diff --git a/api_versions_request_test.go b/api_versions_request_test.go index 4bb2fdac3..5fd21496a 100644 --- a/api_versions_request_test.go +++ b/api_versions_request_test.go @@ -2,9 +2,7 @@ package sarama import "testing" -var ( - apiVersionRequest []byte -) +var apiVersionRequest []byte func TestApiVersionsRequest(t *testing.T) { request := new(ApiVersionsRequest) diff --git a/api_versions_response.go b/api_versions_response.go index d09e8d9e1..0e72e3926 100644 --- a/api_versions_response.go +++ b/api_versions_response.go @@ -1,6 +1,6 @@ package sarama -//ApiVersionsResponseBlock is an api version response block type +// ApiVersionsResponseBlock is an api version response block type type ApiVersionsResponseBlock struct { ApiKey int16 MinVersion int16 @@ -32,7 +32,7 @@ func (b *ApiVersionsResponseBlock) decode(pd packetDecoder) error { return nil } -//ApiVersionsResponse is an api version response type +// ApiVersionsResponse is an api version response type type ApiVersionsResponse struct { Err KError ApiVersions []*ApiVersionsResponseBlock diff --git a/api_versions_response_test.go b/api_versions_response_test.go index 58b065834..e07bf785a 100644 --- a/api_versions_response_test.go +++ b/api_versions_response_test.go @@ -2,15 +2,13 @@ package sarama import "testing" -var ( - apiVersionResponse = []byte{ - 0x00, 0x00, - 0x00, 0x00, 0x00, 0x01, - 0x00, 0x03, - 0x00, 0x02, - 0x00, 0x01, - } -) +var apiVersionResponse = []byte{ + 0x00, 0x00, + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x03, + 0x00, 0x02, + 0x00, 0x01, +} func TestApiVersionsResponse(t *testing.T) { response := new(ApiVersionsResponse) diff --git a/async_producer.go b/async_producer.go index 209fd2d34..5911f7b58 100644 --- a/async_producer.go +++ b/async_producer.go @@ -449,7 +449,6 @@ func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error { } return }) - if err != nil { return err } diff --git a/async_producer_test.go b/async_producer_test.go index 855d6ae2a..e571aa068 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -961,7 +961,7 @@ func TestAsyncProducerIdempotentGoldenPath(t *testing.T) { } func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) { - //Logger = log.New(os.Stderr, "", log.LstdFlags) + // Logger = log.New(os.Stderr, "", log.LstdFlags) tests := []struct { name string failAfterWrite bool @@ -1028,9 +1028,9 @@ func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) { batchFirstSeq := int(batch.FirstSequence) batchSize := len(batch.Records) - if lastSequenceWrittenToDisk == batchFirstSeq-1 { //in sequence append - if lastBatchFirstSeq == batchFirstSeq { //is a batch retry - if lastBatchSize == batchSize { //good retry + if lastSequenceWrittenToDisk == batchFirstSeq-1 { // in sequence append + if lastBatchFirstSeq == batchFirstSeq { // is a batch retry + if lastBatchSize == batchSize { // good retry // mock write to disk lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1 return prodSuccessResponse @@ -1060,7 +1060,7 @@ func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) { // mock write to disk lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1 return prodSuccessResponse - } //out of sequence / bad retried batch + } // out of sequence / bad retried batch if lastBatchFirstSeq == batchFirstSeq && lastBatchSize != batchSize { t.Errorf("[%s] Retried Batch firstSeq=%d with different size old=%d new=%d", test.name, batchFirstSeq, lastBatchSize, batchSize) } else if lastSequenceWrittenToDisk+1 != batchFirstSeq { diff --git a/balance_strategy.go b/balance_strategy.go index 8f7634f94..9855bf443 100644 --- a/balance_strategy.go +++ b/balance_strategy.go @@ -138,6 +138,7 @@ func (p balanceStrategySortable) Len() int { return len(p.memberIDs) } func (p balanceStrategySortable) Swap(i, j int) { p.memberIDs[i], p.memberIDs[j] = p.memberIDs[j], p.memberIDs[i] } + func (p balanceStrategySortable) Less(i, j int) bool { return balanceStrategyHashValue(p.topic, p.memberIDs[i]) < balanceStrategyHashValue(p.topic, p.memberIDs[j]) } diff --git a/balance_strategy_test.go b/balance_strategy_test.go index f5ab2fd6f..adae80caa 100644 --- a/balance_strategy_test.go +++ b/balance_strategy_test.go @@ -1967,6 +1967,7 @@ func Test_stickyBalanceStrategy_Plan_AssignmentWithMultipleGenerations2(t *testi verifyPlanIsBalancedAndSticky(t, s, members, plan3, err) verifyFullyBalanced(t, plan3) } + func Test_stickyBalanceStrategy_Plan_AssignmentWithConflictingPreviousGenerations(t *testing.T) { s := &stickyBalanceStrategy{} diff --git a/broker.go b/broker.go index 0b3ea969c..664b74f54 100644 --- a/broker.go +++ b/broker.go @@ -223,7 +223,7 @@ func (b *Broker) Connected() (bool, error) { return b.conn != nil, b.connErr } -//Close closes the broker resources +// Close closes the broker resources func (b *Broker) Close() error { b.lock.Lock() defer b.lock.Unlock() @@ -276,12 +276,11 @@ func (b *Broker) Rack() string { return *b.rack } -//GetMetadata send a metadata request and returns a metadata response or error +// GetMetadata send a metadata request and returns a metadata response or error func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) { response := new(MetadataResponse) err := b.sendAndReceive(request, response) - if err != nil { return nil, err } @@ -289,12 +288,11 @@ func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error return response, nil } -//GetConsumerMetadata send a consumer metadata request and returns a consumer metadata response or error +// GetConsumerMetadata send a consumer metadata request and returns a consumer metadata response or error func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) { response := new(ConsumerMetadataResponse) err := b.sendAndReceive(request, response) - if err != nil { return nil, err } @@ -302,12 +300,11 @@ func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*Consume return response, nil } -//FindCoordinator sends a find coordinate request and returns a response or error +// FindCoordinator sends a find coordinate request and returns a response or error func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) { response := new(FindCoordinatorResponse) err := b.sendAndReceive(request, response) - if err != nil { return nil, err } @@ -315,12 +312,11 @@ func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordina return response, nil } -//GetAvailableOffsets return an offset response or error +// GetAvailableOffsets return an offset response or error func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) { response := new(OffsetResponse) err := b.sendAndReceive(request, response) - if err != nil { return nil, err } @@ -328,7 +324,7 @@ func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, e return response, nil } -//Produce returns a produce response or error +// Produce returns a produce response or error func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) { var ( response *ProduceResponse @@ -349,7 +345,7 @@ func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) { return response, nil } -//Fetch returns a FetchResponse or error +// Fetch returns a FetchResponse or error func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) { response := new(FetchResponse) @@ -361,7 +357,7 @@ func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) { return response, nil } -//CommitOffset return an Offset commit response or error +// CommitOffset return an Offset commit response or error func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) { response := new(OffsetCommitResponse) @@ -373,7 +369,7 @@ func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitRespon return response, nil } -//FetchOffset returns an offset fetch response or error +// FetchOffset returns an offset fetch response or error func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) { response := new(OffsetFetchResponse) response.Version = request.Version // needed to handle the two header versions @@ -386,7 +382,7 @@ func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, return response, nil } -//JoinGroup returns a join group response or error +// JoinGroup returns a join group response or error func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) { response := new(JoinGroupResponse) @@ -398,7 +394,7 @@ func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error return response, nil } -//SyncGroup returns a sync group response or error +// SyncGroup returns a sync group response or error func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) { response := new(SyncGroupResponse) @@ -410,7 +406,7 @@ func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error return response, nil } -//LeaveGroup return a leave group response or error +// LeaveGroup return a leave group response or error func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) { response := new(LeaveGroupResponse) @@ -422,7 +418,7 @@ func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, er return response, nil } -//Heartbeat returns a heartbeat response or error +// Heartbeat returns a heartbeat response or error func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) { response := new(HeartbeatResponse) @@ -434,7 +430,7 @@ func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error return response, nil } -//ListGroups return a list group response or error +// ListGroups return a list group response or error func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) { response := new(ListGroupsResponse) @@ -446,7 +442,7 @@ func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, er return response, nil } -//DescribeGroups return describe group response or error +// DescribeGroups return describe group response or error func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) { response := new(DescribeGroupsResponse) @@ -458,7 +454,7 @@ func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroups return response, nil } -//ApiVersions return api version response or error +// ApiVersions return api version response or error func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) { response := new(ApiVersionsResponse) @@ -470,7 +466,7 @@ func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, return response, nil } -//CreateTopics send a create topic request and returns create topic response +// CreateTopics send a create topic request and returns create topic response func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) { response := new(CreateTopicsResponse) @@ -482,7 +478,7 @@ func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsRespon return response, nil } -//DeleteTopics sends a delete topic request and returns delete topic response +// DeleteTopics sends a delete topic request and returns delete topic response func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) { response := new(DeleteTopicsResponse) @@ -494,8 +490,8 @@ func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsRespon return response, nil } -//CreatePartitions sends a create partition request and returns create -//partitions response or error +// CreatePartitions sends a create partition request and returns create +// partitions response or error func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) { response := new(CreatePartitionsResponse) @@ -507,8 +503,8 @@ func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePart return response, nil } -//AlterPartitionReassignments sends a alter partition reassignments request and -//returns alter partition reassignments response +// AlterPartitionReassignments sends a alter partition reassignments request and +// returns alter partition reassignments response func (b *Broker) AlterPartitionReassignments(request *AlterPartitionReassignmentsRequest) (*AlterPartitionReassignmentsResponse, error) { response := new(AlterPartitionReassignmentsResponse) @@ -520,8 +516,8 @@ func (b *Broker) AlterPartitionReassignments(request *AlterPartitionReassignment return response, nil } -//ListPartitionReassignments sends a list partition reassignments request and -//returns list partition reassignments response +// ListPartitionReassignments sends a list partition reassignments request and +// returns list partition reassignments response func (b *Broker) ListPartitionReassignments(request *ListPartitionReassignmentsRequest) (*ListPartitionReassignmentsResponse, error) { response := new(ListPartitionReassignmentsResponse) @@ -533,8 +529,8 @@ func (b *Broker) ListPartitionReassignments(request *ListPartitionReassignmentsR return response, nil } -//DeleteRecords send a request to delete records and return delete record -//response or error +// DeleteRecords send a request to delete records and return delete record +// response or error func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) { response := new(DeleteRecordsResponse) @@ -546,7 +542,7 @@ func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsRes return response, nil } -//DescribeAcls sends a describe acl request and returns a response or error +// DescribeAcls sends a describe acl request and returns a response or error func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) { response := new(DescribeAclsResponse) @@ -558,7 +554,7 @@ func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsRespon return response, nil } -//CreateAcls sends a create acl request and returns a response or error +// CreateAcls sends a create acl request and returns a response or error func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) { response := new(CreateAclsResponse) @@ -570,7 +566,7 @@ func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, er return response, nil } -//DeleteAcls sends a delete acl request and returns a response or error +// DeleteAcls sends a delete acl request and returns a response or error func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) { response := new(DeleteAclsResponse) @@ -582,7 +578,7 @@ func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, er return response, nil } -//InitProducerID sends an init producer request and returns a response or error +// InitProducerID sends an init producer request and returns a response or error func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) { response := new(InitProducerIDResponse) @@ -594,8 +590,8 @@ func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerID return response, nil } -//AddPartitionsToTxn send a request to add partition to txn and returns -//a response or error +// AddPartitionsToTxn send a request to add partition to txn and returns +// a response or error func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) { response := new(AddPartitionsToTxnResponse) @@ -607,8 +603,8 @@ func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPar return response, nil } -//AddOffsetsToTxn sends a request to add offsets to txn and returns a response -//or error +// AddOffsetsToTxn sends a request to add offsets to txn and returns a response +// or error func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) { response := new(AddOffsetsToTxnResponse) @@ -620,7 +616,7 @@ func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsTo return response, nil } -//EndTxn sends a request to end txn and returns a response or error +// EndTxn sends a request to end txn and returns a response or error func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) { response := new(EndTxnResponse) @@ -632,8 +628,8 @@ func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) { return response, nil } -//TxnOffsetCommit sends a request to commit transaction offsets and returns -//a response or error +// TxnOffsetCommit sends a request to commit transaction offsets and returns +// a response or error func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) { response := new(TxnOffsetCommitResponse) @@ -645,8 +641,8 @@ func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCom return response, nil } -//DescribeConfigs sends a request to describe config and returns a response or -//error +// DescribeConfigs sends a request to describe config and returns a response or +// error func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) { response := new(DescribeConfigsResponse) @@ -658,7 +654,7 @@ func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConf return response, nil } -//AlterConfigs sends a request to alter config and return a response or error +// AlterConfigs sends a request to alter config and return a response or error func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) { response := new(AlterConfigsResponse) @@ -670,7 +666,7 @@ func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsRespon return response, nil } -//DeleteGroups sends a request to delete groups and returns a response or error +// DeleteGroups sends a request to delete groups and returns a response or error func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) { response := new(DeleteGroupsResponse) @@ -681,7 +677,7 @@ func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsRespon return response, nil } -//DescribeLogDirs sends a request to get the broker's log dir paths and sizes +// DescribeLogDirs sends a request to get the broker's log dir paths and sizes func (b *Broker) DescribeLogDirs(request *DescribeLogDirsRequest) (*DescribeLogDirsResponse, error) { response := new(DescribeLogDirsResponse) @@ -876,7 +872,7 @@ func (b *Broker) responseReceiver() { continue } - var headerLength = getHeaderLength(response.headerVersion) + headerLength := getHeaderLength(response.headerVersion) header := make([]byte, headerLength) bytesReadHeader, err := b.readFull(header) @@ -1049,7 +1045,7 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error { // sendAndReceiveV0SASLPlainAuth flows the v0 sasl auth NOT wrapped in the kafka protocol func (b *Broker) sendAndReceiveV0SASLPlainAuth() error { length := len(b.conf.Net.SASL.AuthIdentity) + 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password) - authBytes := make([]byte, length+4) //4 byte length header + auth data + authBytes := make([]byte, length+4) // 4 byte length header + auth data binary.BigEndian.PutUint32(authBytes, uint32(length)) copy(authBytes[4:], b.conf.Net.SASL.AuthIdentity+"\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password) diff --git a/broker_test.go b/broker_test.go index 5f859c8f2..2fa40ceb4 100644 --- a/broker_test.go +++ b/broker_test.go @@ -239,7 +239,6 @@ func TestSASLOAuthBearer(t *testing.T) { } conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String()) - if err != nil { t.Fatal(err) } @@ -370,7 +369,6 @@ func TestSASLSCRAMSHAXXX(t *testing.T) { } conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String()) - if err != nil { t.Fatal(err) } @@ -480,7 +478,6 @@ func TestSASLPlainAuth(t *testing.T) { } conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String()) - if err != nil { t.Fatal(err) } @@ -658,7 +655,6 @@ func TestGSSAPIKerberosAuth_Authorize(t *testing.T) { } conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String()) - if err != nil { t.Fatal(err) } @@ -756,7 +752,8 @@ var brokerTestTable = []struct { response []byte runner func(*testing.T, *Broker) }{ - {V0_10_0_0, + { + V0_10_0_0, "MetadataRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { @@ -768,9 +765,11 @@ var brokerTestTable = []struct { if response == nil { t.Error("Metadata request got no response!") } - }}, + }, + }, - {V0_10_0_0, + { + V0_10_0_0, "ConsumerMetadataRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { @@ -782,9 +781,11 @@ var brokerTestTable = []struct { if response == nil { t.Error("Consumer Metadata request got no response!") } - }}, + }, + }, - {V0_10_0_0, + { + V0_10_0_0, "ProduceRequest (NoResponse)", []byte{}, func(t *testing.T, broker *Broker) { @@ -797,9 +798,11 @@ var brokerTestTable = []struct { if response != nil { t.Error("Produce request with NoResponse got a response!") } - }}, + }, + }, - {V0_10_0_0, + { + V0_10_0_0, "ProduceRequest (WaitForLocal)", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { @@ -812,9 +815,11 @@ var brokerTestTable = []struct { if response == nil { t.Error("Produce request without NoResponse got no response!") } - }}, + }, + }, - {V0_10_0_0, + { + V0_10_0_0, "FetchRequest", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { @@ -826,9 +831,11 @@ var brokerTestTable = []struct { if response == nil { t.Error("Fetch request got no response!") } - }}, + }, + }, - {V0_10_0_0, + { + V0_10_0_0, "OffsetFetchRequest", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { @@ -840,9 +847,11 @@ var brokerTestTable = []struct { if response == nil { t.Error("OffsetFetch request got no response!") } - }}, + }, + }, - {V0_10_0_0, + { + V0_10_0_0, "OffsetCommitRequest", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { @@ -854,9 +863,11 @@ var brokerTestTable = []struct { if response == nil { t.Error("OffsetCommit request got no response!") } - }}, + }, + }, - {V0_10_0_0, + { + V0_10_0_0, "OffsetRequest", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { @@ -868,9 +879,11 @@ var brokerTestTable = []struct { if response == nil { t.Error("Offset request got no response!") } - }}, + }, + }, - {V0_10_0_0, + { + V0_10_0_0, "JoinGroupRequest", []byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { @@ -882,9 +895,11 @@ var brokerTestTable = []struct { if response == nil { t.Error("JoinGroup request got no response!") } - }}, + }, + }, - {V0_10_0_0, + { + V0_10_0_0, "SyncGroupRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { @@ -896,9 +911,11 @@ var brokerTestTable = []struct { if response == nil { t.Error("SyncGroup request got no response!") } - }}, + }, + }, - {V0_10_0_0, + { + V0_10_0_0, "LeaveGroupRequest", []byte{0x00, 0x00}, func(t *testing.T, broker *Broker) { @@ -910,9 +927,11 @@ var brokerTestTable = []struct { if response == nil { t.Error("LeaveGroup request got no response!") } - }}, + }, + }, - {V0_10_0_0, + { + V0_10_0_0, "HeartbeatRequest", []byte{0x00, 0x00}, func(t *testing.T, broker *Broker) { @@ -924,9 +943,11 @@ var brokerTestTable = []struct { if response == nil { t.Error("Heartbeat request got no response!") } - }}, + }, + }, - {V0_10_0_0, + { + V0_10_0_0, "ListGroupsRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { @@ -938,9 +959,11 @@ var brokerTestTable = []struct { if response == nil { t.Error("ListGroups request got no response!") } - }}, + }, + }, - {V0_10_0_0, + { + V0_10_0_0, "DescribeGroupsRequest", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { @@ -952,9 +975,11 @@ var brokerTestTable = []struct { if response == nil { t.Error("DescribeGroups request got no response!") } - }}, + }, + }, - {V0_10_0_0, + { + V0_10_0_0, "ApiVersionsRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { @@ -966,9 +991,11 @@ var brokerTestTable = []struct { if response == nil { t.Error("ApiVersions request got no response!") } - }}, + }, + }, - {V1_1_0_0, + { + V1_1_0_0, "DeleteGroupsRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { @@ -980,7 +1007,8 @@ var brokerTestTable = []struct { if response == nil { t.Error("DeleteGroups request got no response!") } - }}, + }, + }, } func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) { diff --git a/client.go b/client.go index 85180a413..c0918ba35 100644 --- a/client.go +++ b/client.go @@ -493,7 +493,6 @@ func (client *client) GetOffset(topic string, partitionID int32, time int64) (in } offset, err := client.getOffset(topic, partitionID, time) - if err != nil { if err := client.RefreshMetadata(topic); err != nil { return -1, err @@ -607,7 +606,7 @@ func (client *client) randomizeSeedBrokers(addrs []string) { } func (client *client) updateBroker(brokers []*Broker) { - var currentBroker = make(map[int32]*Broker, len(brokers)) + currentBroker := make(map[int32]*Broker, len(brokers)) for _, broker := range brokers { currentBroker[broker.ID()] = broker @@ -1053,7 +1052,6 @@ func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemainin request.CoordinatorType = CoordinatorGroup response, err := broker.FindCoordinator(request) - if err != nil { Logger.Printf("client/coordinator request to broker %s failed: %s\n", broker.Addr(), err) diff --git a/config_test.go b/config_test.go index 2e34ba3d1..6d2b6fef9 100644 --- a/config_test.go +++ b/config_test.go @@ -33,8 +33,7 @@ func TestEmptyClientIDConfigValidates(t *testing.T) { } } -type DummyTokenProvider struct { -} +type DummyTokenProvider struct{} func (t *DummyTokenProvider) Token() (*AccessToken, error) { return &AccessToken{Token: "access-token-string"}, nil @@ -51,50 +50,66 @@ func TestNetConfigValidates(t *testing.T) { func(cfg *Config) { cfg.Net.MaxOpenRequests = 0 }, - "Net.MaxOpenRequests must be > 0"}, - {"DialTimeout", + "Net.MaxOpenRequests must be > 0", + }, + { + "DialTimeout", func(cfg *Config) { cfg.Net.DialTimeout = 0 }, - "Net.DialTimeout must be > 0"}, - {"ReadTimeout", + "Net.DialTimeout must be > 0", + }, + { + "ReadTimeout", func(cfg *Config) { cfg.Net.ReadTimeout = 0 }, - "Net.ReadTimeout must be > 0"}, - {"WriteTimeout", + "Net.ReadTimeout must be > 0", + }, + { + "WriteTimeout", func(cfg *Config) { cfg.Net.WriteTimeout = 0 }, - "Net.WriteTimeout must be > 0"}, - {"SASL.User", + "Net.WriteTimeout must be > 0", + }, + { + "SASL.User", func(cfg *Config) { cfg.Net.SASL.Enable = true cfg.Net.SASL.User = "" }, - "Net.SASL.User must not be empty when SASL is enabled"}, - {"SASL.Password", + "Net.SASL.User must not be empty when SASL is enabled", + }, + { + "SASL.Password", func(cfg *Config) { cfg.Net.SASL.Enable = true cfg.Net.SASL.User = "user" cfg.Net.SASL.Password = "" }, - "Net.SASL.Password must not be empty when SASL is enabled"}, - {"SASL.Mechanism - Invalid mechanism type", + "Net.SASL.Password must not be empty when SASL is enabled", + }, + { + "SASL.Mechanism - Invalid mechanism type", func(cfg *Config) { cfg.Net.SASL.Enable = true cfg.Net.SASL.Mechanism = "AnIncorrectSASLMechanism" cfg.Net.SASL.TokenProvider = &DummyTokenProvider{} }, - "The SASL mechanism configuration is invalid. Possible values are `OAUTHBEARER`, `PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512` and `GSSAPI`"}, - {"SASL.Mechanism.OAUTHBEARER - Missing token provider", + "The SASL mechanism configuration is invalid. Possible values are `OAUTHBEARER`, `PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512` and `GSSAPI`", + }, + { + "SASL.Mechanism.OAUTHBEARER - Missing token provider", func(cfg *Config) { cfg.Net.SASL.Enable = true cfg.Net.SASL.Mechanism = SASLTypeOAuth cfg.Net.SASL.TokenProvider = nil }, - "An AccessTokenProvider instance must be provided to Net.SASL.TokenProvider"}, - {"SASL.Mechanism SCRAM-SHA-256 - Missing SCRAM client", + "An AccessTokenProvider instance must be provided to Net.SASL.TokenProvider", + }, + { + "SASL.Mechanism SCRAM-SHA-256 - Missing SCRAM client", func(cfg *Config) { cfg.Net.SASL.Enable = true cfg.Net.SASL.Mechanism = SASLTypeSCRAMSHA256 @@ -102,8 +117,10 @@ func TestNetConfigValidates(t *testing.T) { cfg.Net.SASL.User = "user" cfg.Net.SASL.Password = "stong_password" }, - "A SCRAMClientGeneratorFunc function must be provided to Net.SASL.SCRAMClientGeneratorFunc"}, - {"SASL.Mechanism SCRAM-SHA-512 - Missing SCRAM client", + "A SCRAMClientGeneratorFunc function must be provided to Net.SASL.SCRAMClientGeneratorFunc", + }, + { + "SASL.Mechanism SCRAM-SHA-512 - Missing SCRAM client", func(cfg *Config) { cfg.Net.SASL.Enable = true cfg.Net.SASL.Mechanism = SASLTypeSCRAMSHA512 @@ -111,8 +128,10 @@ func TestNetConfigValidates(t *testing.T) { cfg.Net.SASL.User = "user" cfg.Net.SASL.Password = "stong_password" }, - "A SCRAMClientGeneratorFunc function must be provided to Net.SASL.SCRAMClientGeneratorFunc"}, - {"SASL.Mechanism GSSAPI (Kerberos) - Using User/Password, Missing password field", + "A SCRAMClientGeneratorFunc function must be provided to Net.SASL.SCRAMClientGeneratorFunc", + }, + { + "SASL.Mechanism GSSAPI (Kerberos) - Using User/Password, Missing password field", func(cfg *Config) { cfg.Net.SASL.Enable = true cfg.Net.SASL.Mechanism = SASLTypeGSSAPI @@ -123,8 +142,10 @@ func TestNetConfigValidates(t *testing.T) { cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf" }, "Net.SASL.GSSAPI.Password must not be empty when GSS-API " + - "mechanism is used and Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH"}, - {"SASL.Mechanism GSSAPI (Kerberos) - Using User/Password, Missing KeyTabPath field", + "mechanism is used and Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH", + }, + { + "SASL.Mechanism GSSAPI (Kerberos) - Using User/Password, Missing KeyTabPath field", func(cfg *Config) { cfg.Net.SASL.Enable = true cfg.Net.SASL.Mechanism = SASLTypeGSSAPI @@ -135,8 +156,10 @@ func TestNetConfigValidates(t *testing.T) { cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf" }, "Net.SASL.GSSAPI.KeyTabPath must not be empty when GSS-API mechanism is used" + - " and Net.SASL.GSSAPI.AuthType = KRB5_KEYTAB_AUTH"}, - {"SASL.Mechanism GSSAPI (Kerberos) - Missing username", + " and Net.SASL.GSSAPI.AuthType = KRB5_KEYTAB_AUTH", + }, + { + "SASL.Mechanism GSSAPI (Kerberos) - Missing username", func(cfg *Config) { cfg.Net.SASL.Enable = true cfg.Net.SASL.Mechanism = SASLTypeGSSAPI @@ -146,8 +169,10 @@ func TestNetConfigValidates(t *testing.T) { cfg.Net.SASL.GSSAPI.Realm = "kafka" cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf" }, - "Net.SASL.GSSAPI.Username must not be empty when GSS-API mechanism is used"}, - {"SASL.Mechanism GSSAPI (Kerberos) - Missing ServiceName", + "Net.SASL.GSSAPI.Username must not be empty when GSS-API mechanism is used", + }, + { + "SASL.Mechanism GSSAPI (Kerberos) - Missing ServiceName", func(cfg *Config) { cfg.Net.SASL.Enable = true cfg.Net.SASL.Mechanism = SASLTypeGSSAPI @@ -157,8 +182,10 @@ func TestNetConfigValidates(t *testing.T) { cfg.Net.SASL.GSSAPI.Realm = "kafka" cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf" }, - "Net.SASL.GSSAPI.ServiceName must not be empty when GSS-API mechanism is used"}, - {"SASL.Mechanism GSSAPI (Kerberos) - Missing AuthType", + "Net.SASL.GSSAPI.ServiceName must not be empty when GSS-API mechanism is used", + }, + { + "SASL.Mechanism GSSAPI (Kerberos) - Missing AuthType", func(cfg *Config) { cfg.Net.SASL.Enable = true cfg.Net.SASL.GSSAPI.ServiceName = "kafka" @@ -168,8 +195,10 @@ func TestNetConfigValidates(t *testing.T) { cfg.Net.SASL.GSSAPI.Realm = "kafka" cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf" }, - "Net.SASL.GSSAPI.AuthType is invalid. Possible values are KRB5_USER_AUTH and KRB5_KEYTAB_AUTH"}, - {"SASL.Mechanism GSSAPI (Kerberos) - Missing KerberosConfigPath", + "Net.SASL.GSSAPI.AuthType is invalid. Possible values are KRB5_USER_AUTH and KRB5_KEYTAB_AUTH", + }, + { + "SASL.Mechanism GSSAPI (Kerberos) - Missing KerberosConfigPath", func(cfg *Config) { cfg.Net.SASL.Enable = true cfg.Net.SASL.GSSAPI.ServiceName = "kafka" @@ -179,8 +208,10 @@ func TestNetConfigValidates(t *testing.T) { cfg.Net.SASL.GSSAPI.Password = "sarama" cfg.Net.SASL.GSSAPI.Realm = "kafka" }, - "Net.SASL.GSSAPI.KerberosConfigPath must not be empty when GSS-API mechanism is used"}, - {"SASL.Mechanism GSSAPI (Kerberos) - Missing Realm", + "Net.SASL.GSSAPI.KerberosConfigPath must not be empty when GSS-API mechanism is used", + }, + { + "SASL.Mechanism GSSAPI (Kerberos) - Missing Realm", func(cfg *Config) { cfg.Net.SASL.Enable = true cfg.Net.SASL.GSSAPI.ServiceName = "kafka" @@ -190,7 +221,8 @@ func TestNetConfigValidates(t *testing.T) { cfg.Net.SASL.GSSAPI.Password = "sarama" cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf" }, - "Net.SASL.GSSAPI.Realm must not be empty when GSS-API mechanism is used"}, + "Net.SASL.GSSAPI.Realm must not be empty when GSS-API mechanism is used", + }, } for i, test := range tests { @@ -213,17 +245,22 @@ func TestMetadataConfigValidates(t *testing.T) { func(cfg *Config) { cfg.Metadata.Retry.Max = -1 }, - "Metadata.Retry.Max must be >= 0"}, - {"Retry.Backoff", + "Metadata.Retry.Max must be >= 0", + }, + { + "Retry.Backoff", func(cfg *Config) { cfg.Metadata.Retry.Backoff = -1 }, - "Metadata.Retry.Backoff must be >= 0"}, - {"RefreshFrequency", + "Metadata.Retry.Backoff must be >= 0", + }, + { + "RefreshFrequency", func(cfg *Config) { cfg.Metadata.RefreshFrequency = -1 }, - "Metadata.RefreshFrequency must be >= 0"}, + "Metadata.RefreshFrequency must be >= 0", + }, } for i, test := range tests { @@ -241,11 +278,13 @@ func TestAdminConfigValidates(t *testing.T) { cfg func(*Config) // resorting to using a function as a param because of internal composite structs err string }{ - {"Timeout", + { + "Timeout", func(cfg *Config) { cfg.Admin.Timeout = 0 }, - "Admin.Timeout must be > 0"}, + "Admin.Timeout must be > 0", + }, } for i, test := range tests { @@ -268,84 +307,113 @@ func TestProducerConfigValidates(t *testing.T) { func(cfg *Config) { cfg.Producer.MaxMessageBytes = 0 }, - "Producer.MaxMessageBytes must be > 0"}, - {"RequiredAcks", + "Producer.MaxMessageBytes must be > 0", + }, + { + "RequiredAcks", func(cfg *Config) { cfg.Producer.RequiredAcks = -2 }, - "Producer.RequiredAcks must be >= -1"}, - {"Timeout", + "Producer.RequiredAcks must be >= -1", + }, + { + "Timeout", func(cfg *Config) { cfg.Producer.Timeout = 0 }, - "Producer.Timeout must be > 0"}, - {"Partitioner", + "Producer.Timeout must be > 0", + }, + { + "Partitioner", func(cfg *Config) { cfg.Producer.Partitioner = nil }, - "Producer.Partitioner must not be nil"}, - {"Flush.Bytes", + "Producer.Partitioner must not be nil", + }, + { + "Flush.Bytes", func(cfg *Config) { cfg.Producer.Flush.Bytes = -1 }, - "Producer.Flush.Bytes must be >= 0"}, - {"Flush.Messages", + "Producer.Flush.Bytes must be >= 0", + }, + { + "Flush.Messages", func(cfg *Config) { cfg.Producer.Flush.Messages = -1 }, - "Producer.Flush.Messages must be >= 0"}, - {"Flush.Frequency", + "Producer.Flush.Messages must be >= 0", + }, + { + "Flush.Frequency", func(cfg *Config) { cfg.Producer.Flush.Frequency = -1 }, - "Producer.Flush.Frequency must be >= 0"}, - {"Flush.MaxMessages", + "Producer.Flush.Frequency must be >= 0", + }, + { + "Flush.MaxMessages", func(cfg *Config) { cfg.Producer.Flush.MaxMessages = -1 }, - "Producer.Flush.MaxMessages must be >= 0"}, - {"Flush.MaxMessages with Producer.Flush.Messages", + "Producer.Flush.MaxMessages must be >= 0", + }, + { + "Flush.MaxMessages with Producer.Flush.Messages", func(cfg *Config) { cfg.Producer.Flush.MaxMessages = 1 cfg.Producer.Flush.Messages = 2 }, - "Producer.Flush.MaxMessages must be >= Producer.Flush.Messages when set"}, - {"Flush.Retry.Max", + "Producer.Flush.MaxMessages must be >= Producer.Flush.Messages when set", + }, + { + "Flush.Retry.Max", func(cfg *Config) { cfg.Producer.Retry.Max = -1 }, - "Producer.Retry.Max must be >= 0"}, - {"Flush.Retry.Backoff", + "Producer.Retry.Max must be >= 0", + }, + { + "Flush.Retry.Backoff", func(cfg *Config) { cfg.Producer.Retry.Backoff = -1 }, - "Producer.Retry.Backoff must be >= 0"}, - {"Idempotent Version", + "Producer.Retry.Backoff must be >= 0", + }, + { + "Idempotent Version", func(cfg *Config) { cfg.Producer.Idempotent = true cfg.Version = V0_10_0_0 }, - "Idempotent producer requires Version >= V0_11_0_0"}, - {"Idempotent with Producer.Retry.Max", + "Idempotent producer requires Version >= V0_11_0_0", + }, + { + "Idempotent with Producer.Retry.Max", func(cfg *Config) { cfg.Version = V0_11_0_0 cfg.Producer.Idempotent = true cfg.Producer.Retry.Max = 0 }, - "Idempotent producer requires Producer.Retry.Max >= 1"}, - {"Idempotent with Producer.RequiredAcks", + "Idempotent producer requires Producer.Retry.Max >= 1", + }, + { + "Idempotent with Producer.RequiredAcks", func(cfg *Config) { cfg.Version = V0_11_0_0 cfg.Producer.Idempotent = true }, - "Idempotent producer requires Producer.RequiredAcks to be WaitForAll"}, - {"Idempotent with Net.MaxOpenRequests", + "Idempotent producer requires Producer.RequiredAcks to be WaitForAll", + }, + { + "Idempotent with Net.MaxOpenRequests", func(cfg *Config) { cfg.Version = V0_11_0_0 cfg.Producer.Idempotent = true cfg.Producer.RequiredAcks = WaitForAll }, - "Idempotent producer requires Net.MaxOpenRequests to be 1"}, + "Idempotent producer requires Net.MaxOpenRequests to be 1", + }, } for i, test := range tests { @@ -356,20 +424,23 @@ func TestProducerConfigValidates(t *testing.T) { } } } + func TestConsumerConfigValidates(t *testing.T) { tests := []struct { name string cfg func(*Config) err string }{ - {"ReadCommitted Version", + { + "ReadCommitted Version", func(cfg *Config) { cfg.Version = V0_10_0_0 cfg.Consumer.IsolationLevel = ReadCommitted }, "ReadCommitted requires Version >= V0_11_0_0", }, - {"Incorrect isolation level", + { + "Incorrect isolation level", func(cfg *Config) { cfg.Version = V0_11_0_0 cfg.Consumer.IsolationLevel = IsolationLevel(42) diff --git a/consumer.go b/consumer.go index 61c9bc57a..46c26134c 100644 --- a/consumer.go +++ b/consumer.go @@ -783,7 +783,7 @@ done: close(bc.newSubscriptions) } -//subscriptionConsumer ensures we will get nil right away if no new subscriptions is available +// subscriptionConsumer ensures we will get nil right away if no new subscriptions is available func (bc *brokerConsumer) subscriptionConsumer() { <-bc.wait // wait for our first piece of work @@ -798,7 +798,6 @@ func (bc *brokerConsumer) subscriptionConsumer() { } response, err := bc.fetchNewMessages() - if err != nil { Logger.Printf("consumer/broker/%d disconnecting due to error processing FetchRequest: %s\n", bc.broker.ID(), err) bc.abort(err) @@ -832,7 +831,7 @@ func (bc *brokerConsumer) updateSubscriptions(newSubscriptions []*partitionConsu } } -//handleResponses handles the response codes left for us by our subscriptions, and abandons ones that have been closed +// handleResponses handles the response codes left for us by our subscriptions, and abandons ones that have been closed func (bc *brokerConsumer) handleResponses() { for child := range bc.subscriptions { result := child.responseResult diff --git a/consumer_group.go b/consumer_group.go index fcc5792ea..80f625d80 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -429,7 +429,7 @@ func (c *consumerGroup) handleError(err error, topic string, partition int32) { select { case <-c.closed: - //consumer is closed + // consumer is closed return default: } diff --git a/consumer_group_members.go b/consumer_group_members.go index 2d02cc386..21b11e944 100644 --- a/consumer_group_members.go +++ b/consumer_group_members.go @@ -1,6 +1,6 @@ package sarama -//ConsumerGroupMemberMetadata holds the metadata for consumer group +// ConsumerGroupMemberMetadata holds the metadata for consumer group type ConsumerGroupMemberMetadata struct { Version int16 Topics []string @@ -37,7 +37,7 @@ func (m *ConsumerGroupMemberMetadata) decode(pd packetDecoder) (err error) { return nil } -//ConsumerGroupMemberAssignment holds the member assignment for a consume group +// ConsumerGroupMemberAssignment holds the member assignment for a consume group type ConsumerGroupMemberAssignment struct { Version int16 Topics map[string][]int32 diff --git a/consumer_metadata_request.go b/consumer_metadata_request.go index e5ebdaef5..5c18e048a 100644 --- a/consumer_metadata_request.go +++ b/consumer_metadata_request.go @@ -1,6 +1,6 @@ package sarama -//ConsumerMetadataRequest is used for metadata requests +// ConsumerMetadataRequest is used for metadata requests type ConsumerMetadataRequest struct { ConsumerGroup string } diff --git a/consumer_metadata_request_test.go b/consumer_metadata_request_test.go index 24e5f0a43..3f6bcdb57 100644 --- a/consumer_metadata_request_test.go +++ b/consumer_metadata_request_test.go @@ -6,10 +6,12 @@ import ( var ( consumerMetadataRequestEmpty = []byte{ - 0x00, 0x00} + 0x00, 0x00, + } consumerMetadataRequestString = []byte{ - 0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r'} + 0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r', + } ) func TestConsumerMetadataRequest(t *testing.T) { diff --git a/consumer_metadata_response.go b/consumer_metadata_response.go index 1b5d00d22..7fe0cf971 100644 --- a/consumer_metadata_response.go +++ b/consumer_metadata_response.go @@ -5,7 +5,7 @@ import ( "strconv" ) -//ConsumerMetadataResponse holds the response for a consumer group meta data requests +// ConsumerMetadataResponse holds the response for a consumer group meta data requests type ConsumerMetadataResponse struct { Err KError Coordinator *Broker diff --git a/consumer_metadata_response_test.go b/consumer_metadata_response_test.go index 8482f6ff1..a7d74b55b 100644 --- a/consumer_metadata_response_test.go +++ b/consumer_metadata_response_test.go @@ -7,13 +7,15 @@ var ( 0x00, 0x0E, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } consumerMetadataResponseSuccess = []byte{ 0x00, 0x00, 0x00, 0x00, 0x00, 0xAB, 0x00, 0x03, 'f', 'o', 'o', - 0x00, 0x00, 0xCC, 0xDD} + 0x00, 0x00, 0xCC, 0xDD, + } ) func TestConsumerMetadataResponseError(t *testing.T) { diff --git a/control_record.go b/control_record.go index b9197e4b1..244a82136 100644 --- a/control_record.go +++ b/control_record.go @@ -1,14 +1,14 @@ package sarama -//ControlRecordType ... +// ControlRecordType ... type ControlRecordType int const ( - //ControlRecordAbort is a control record for abort + // ControlRecordAbort is a control record for abort ControlRecordAbort ControlRecordType = iota - //ControlRecordCommit is a control record for commit + // ControlRecordCommit is a control record for commit ControlRecordCommit - //ControlRecordUnknown is a control record of unknown type + // ControlRecordUnknown is a control record of unknown type ControlRecordUnknown ) diff --git a/crc32_field.go b/crc32_field.go index 38189a3cd..32236e50f 100644 --- a/crc32_field.go +++ b/crc32_field.go @@ -72,6 +72,7 @@ func (c *crc32Field) check(curOffset int, buf []byte) error { return nil } + func (c *crc32Field) crc(curOffset int, buf []byte) (uint32, error) { var tab *crc32.Table switch c.polynomial { diff --git a/describe_configs_request.go b/describe_configs_request.go index d0c735280..4c3488031 100644 --- a/describe_configs_request.go +++ b/describe_configs_request.go @@ -61,7 +61,6 @@ func (r *DescribeConfigsRequest) decode(pd packetDecoder, version int16) (err er r.Resources[i].Name = name confLength, err := pd.getArrayLength() - if err != nil { return err } diff --git a/describe_configs_request_test.go b/describe_configs_request_test.go index ef9d3ff80..a1148f401 100644 --- a/describe_configs_request_test.go +++ b/describe_configs_request_test.go @@ -11,7 +11,7 @@ var ( 0, 0, 0, 1, // 1 config 2, // a topic 0, 3, 'f', 'o', 'o', // topic name: foo - 0, 0, 0, 1, //1 config name + 0, 0, 0, 1, // 1 config name 0, 10, // 10 chars 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's', } @@ -20,7 +20,7 @@ var ( 0, 0, 0, 2, // 2 configs 2, // a topic 0, 3, 'f', 'o', 'o', // topic name: foo - 0, 0, 0, 2, //2 config name + 0, 0, 0, 2, // 2 config name 0, 10, // 10 chars 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's', 0, 12, // 12 chars @@ -44,7 +44,7 @@ var ( 2, // a topic 0, 3, 'f', 'o', 'o', // topic name: foo 255, 255, 255, 255, // no configs - 1, //synoms + 1, // synoms } ) diff --git a/describe_configs_response.go b/describe_configs_response.go index 063ae9112..928f5a52a 100644 --- a/describe_configs_response.go +++ b/describe_configs_response.go @@ -224,7 +224,7 @@ func (r *ConfigEntry) encode(pe packetEncoder, version int16) (err error) { return nil } -//https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration +// https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration func (r *ConfigEntry) decode(pd packetDecoder, version int16) (err error) { if version == 0 { r.Source = SourceUnknown diff --git a/describe_configs_response_test.go b/describe_configs_response_test.go index c91a04220..ea8f28e57 100644 --- a/describe_configs_response_test.go +++ b/describe_configs_response_test.go @@ -6,18 +6,18 @@ import ( var ( describeConfigsResponseEmpty = []byte{ - 0, 0, 0, 0, //throttle + 0, 0, 0, 0, // throttle 0, 0, 0, 0, // no configs } describeConfigsResponsePopulatedv0 = []byte{ - 0, 0, 0, 0, //throttle + 0, 0, 0, 0, // throttle 0, 0, 0, 1, // response - 0, 0, //errorcode - 0, 0, //string + 0, 0, // errorcode + 0, 0, // string 2, // topic 0, 3, 'f', 'o', 'o', - 0, 0, 0, 1, //configs + 0, 0, 0, 1, // configs 0, 10, 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's', 0, 4, '1', '0', '0', '0', 0, // ReadOnly @@ -26,13 +26,13 @@ var ( } describeConfigsResponseWithDefaultv0 = []byte{ - 0, 0, 0, 0, //throttle + 0, 0, 0, 0, // throttle 0, 0, 0, 1, // response - 0, 0, //errorcode - 0, 0, //string + 0, 0, // errorcode + 0, 0, // string 2, // topic 0, 3, 'f', 'o', 'o', - 0, 0, 0, 1, //configs + 0, 0, 0, 1, // configs 0, 10, 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's', 0, 4, '1', '0', '0', '0', 0, // ReadOnly @@ -41,13 +41,13 @@ var ( } describeConfigsResponsePopulatedv1 = []byte{ - 0, 0, 0, 0, //throttle + 0, 0, 0, 0, // throttle 0, 0, 0, 1, // response - 0, 0, //errorcode - 0, 0, //string + 0, 0, // errorcode + 0, 0, // string 2, // topic 0, 3, 'f', 'o', 'o', - 0, 0, 0, 1, //configs + 0, 0, 0, 1, // configs 0, 10, 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's', 0, 4, '1', '0', '0', '0', 0, // ReadOnly @@ -57,13 +57,13 @@ var ( } describeConfigsResponseWithSynonymv1 = []byte{ - 0, 0, 0, 0, //throttle + 0, 0, 0, 0, // throttle 0, 0, 0, 1, // response - 0, 0, //errorcode - 0, 0, //string + 0, 0, // errorcode + 0, 0, // string 2, // topic 0, 3, 'f', 'o', 'o', - 0, 0, 0, 1, //configs + 0, 0, 0, 1, // configs 0, 10, 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's', 0, 4, '1', '0', '0', '0', 0, // ReadOnly @@ -76,13 +76,13 @@ var ( } describeConfigsResponseWithDefaultv1 = []byte{ - 0, 0, 0, 0, //throttle + 0, 0, 0, 0, // throttle 0, 0, 0, 1, // response - 0, 0, //errorcode - 0, 0, //string + 0, 0, // errorcode + 0, 0, // string 2, // topic 0, 3, 'f', 'o', 'o', - 0, 0, 0, 1, //configs + 0, 0, 0, 1, // configs 0, 10, 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's', 0, 4, '1', '0', '0', '0', 0, // ReadOnly diff --git a/end_txn_request_test.go b/end_txn_request_test.go index 20e404eb8..6f5d4480e 100644 --- a/end_txn_request_test.go +++ b/end_txn_request_test.go @@ -2,14 +2,12 @@ package sarama import "testing" -var ( - endTxnRequest = []byte{ - 0, 3, 't', 'x', 'n', - 0, 0, 0, 0, 0, 0, 31, 64, - 0, 1, - 1, - } -) +var endTxnRequest = []byte{ + 0, 3, 't', 'x', 'n', + 0, 0, 0, 0, 0, 0, 31, 64, + 0, 1, + 1, +} func TestEndTxnRequest(t *testing.T) { req := &EndTxnRequest{ diff --git a/end_txn_response_test.go b/end_txn_response_test.go index 41d730418..d7ae1c988 100644 --- a/end_txn_response_test.go +++ b/end_txn_response_test.go @@ -5,12 +5,10 @@ import ( "time" ) -var ( - endTxnResponse = []byte{ - 0, 0, 0, 100, - 0, 49, - } -) +var endTxnResponse = []byte{ + 0, 0, 0, 100, + 0, 49, +} func TestEndTxnResponse(t *testing.T) { resp := &EndTxnResponse{ diff --git a/errors.go b/errors.go index a5b6edbb5..0fca0a30e 100644 --- a/errors.go +++ b/errors.go @@ -93,7 +93,7 @@ type MultiError struct { } func (mErr MultiError) Error() string { - var errString = "" + errString := "" for _, err := range *mErr.Errors { errString += err.Error() + "," } @@ -101,7 +101,7 @@ func (mErr MultiError) Error() string { } func (mErr MultiError) PrettyError() string { - var errString = "" + errString := "" for _, err := range *mErr.Errors { errString += err.Error() + "\n" } diff --git a/examples/consumergroup/main.go b/examples/consumergroup/main.go index 9a8b7cd8d..a7db175cd 100644 --- a/examples/consumergroup/main.go +++ b/examples/consumergroup/main.go @@ -150,7 +150,6 @@ func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error { // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { - // NOTE: // Do not move the code below to a goroutine. // The `ConsumeClaim` itself is called within a goroutine, see: diff --git a/examples/http_server/http_server.go b/examples/http_server/http_server.go index b6d83c5dc..b4fb32060 100644 --- a/examples/http_server/http_server.go +++ b/examples/http_server/http_server.go @@ -1,8 +1,6 @@ package main import ( - "github.com/Shopify/sarama" - "crypto/tls" "crypto/x509" "encoding/json" @@ -14,6 +12,8 @@ import ( "os" "strings" "time" + + "github.com/Shopify/sarama" ) var ( @@ -163,7 +163,6 @@ func (ale *accessLogEntry) Encode() ([]byte, error) { } func (s *Server) withAccessLog(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { started := time.Now() @@ -189,7 +188,6 @@ func (s *Server) withAccessLog(next http.Handler) http.Handler { } func newDataCollector(brokerList []string) sarama.SyncProducer { - // For the data collector, we are looking for strong consistency semantics. // Because we don't change the flush settings, sarama will try to produce messages // as fast as possible to keep latency low. @@ -217,7 +215,6 @@ func newDataCollector(brokerList []string) sarama.SyncProducer { } func newAccessLogProducer(brokerList []string) sarama.AsyncProducer { - // For the access log, we are looking for AP semantics, with high throughput. // By creating batches of compressed messages, we reduce network I/O at a cost of more latency. config := sarama.NewConfig() diff --git a/examples/sasl_scram_client/main.go b/examples/sasl_scram_client/main.go index 884949b7d..353a8a274 100644 --- a/examples/sasl_scram_client/main.go +++ b/examples/sasl_scram_client/main.go @@ -166,5 +166,4 @@ func main() { _ = syncProducer.Close() } logger.Println("Bye now !") - } diff --git a/examples/sasl_scram_client/scram_client.go b/examples/sasl_scram_client/scram_client.go index 551f0fefd..1cabaa38e 100644 --- a/examples/sasl_scram_client/scram_client.go +++ b/examples/sasl_scram_client/scram_client.go @@ -7,8 +7,10 @@ import ( "github.com/xdg/scram" ) -var SHA256 scram.HashGeneratorFcn = sha256.New -var SHA512 scram.HashGeneratorFcn = sha512.New +var ( + SHA256 scram.HashGeneratorFcn = sha256.New + SHA512 scram.HashGeneratorFcn = sha512.New +) type XDGSCRAMClient struct { *scram.Client diff --git a/fetch_request_test.go b/fetch_request_test.go index 0903caffa..0b807b9f8 100644 --- a/fetch_request_test.go +++ b/fetch_request_test.go @@ -5,18 +5,21 @@ import "testing" var ( fetchRequestNoBlocks = []byte{ 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } fetchRequestWithProperties = []byte{ 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x20, 0x00, 0x00, 0x00, 0xEF, - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } fetchRequestOneBlock = []byte{ 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x05, 't', 'o', 'p', 'i', 'c', 0x00, 0x00, 0x00, 0x01, - 0x00, 0x00, 0x00, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x34, 0x00, 0x00, 0x00, 0x56} + 0x00, 0x00, 0x00, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x34, 0x00, 0x00, 0x00, 0x56, + } fetchRequestOneBlockV4 = []byte{ 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, @@ -25,7 +28,8 @@ var ( 0x00, 0x00, 0x00, 0x01, 0x00, 0x05, 't', 'o', 'p', 'i', 'c', 0x00, 0x00, 0x00, 0x01, - 0x00, 0x00, 0x00, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x34, 0x00, 0x00, 0x00, 0x56} + 0x00, 0x00, 0x00, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x34, 0x00, 0x00, 0x00, 0x56, + } fetchRequestOneBlockV11 = []byte{ 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, diff --git a/fetch_response_test.go b/fetch_response_test.go index 99ad769c6..45ad8013b 100644 --- a/fetch_response_test.go +++ b/fetch_response_test.go @@ -7,7 +7,8 @@ import ( var ( emptyFetchResponse = []byte{ - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } oneMessageFetchResponse = []byte{ 0x00, 0x00, 0x00, 0x01, @@ -25,7 +26,8 @@ var ( 0x00, 0x00, 0xFF, 0xFF, 0xFF, 0xFF, - 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE} + 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE, + } overflowMessageFetchResponse = []byte{ 0x00, 0x00, 0x00, 0x01, @@ -48,7 +50,8 @@ var ( 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0xFF, // overflow bytes - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + } oneRecordFetchResponse = []byte{ 0x00, 0x00, 0x00, 0x00, // ThrottleTime @@ -84,7 +87,8 @@ var ( 0x06, 0x05, 0x06, 0x07, 0x02, 0x06, 0x08, 0x09, 0x0A, - 0x04, 0x0B, 0x0C} + 0x04, 0x0B, 0x0C, + } partialFetchResponse = []byte{ 0x00, 0x00, 0x00, 0x00, // ThrottleTime @@ -136,7 +140,8 @@ var ( 0x00, 0x00, 0xFF, 0xFF, 0xFF, 0xFF, - 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE} + 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE, + } preferredReplicaFetchResponseV11 = []byte{ 0x00, 0x00, 0x00, 0x00, // ThrottleTime @@ -161,7 +166,8 @@ var ( 0x00, 0x00, 0xFF, 0xFF, 0xFF, 0xFF, - 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE} + 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE, + } ) func TestEmptyFetchResponse(t *testing.T) { diff --git a/functional_consumer_group_test.go b/functional_consumer_group_test.go index 8a25f9b52..5fb063bf5 100644 --- a/functional_consumer_group_test.go +++ b/functional_consumer_group_test.go @@ -335,11 +335,13 @@ func (m *testFuncConsumerGroupMember) Setup(s ConsumerGroupSession) error { atomic.StoreInt32(&m.state, 2) return nil } + func (m *testFuncConsumerGroupMember) Cleanup(s ConsumerGroupSession) error { // enter post-cleanup state atomic.StoreInt32(&m.state, 3) return nil } + func (m *testFuncConsumerGroupMember) ConsumeClaim(s ConsumerGroupSession, c ConsumerGroupClaim) error { atomic.AddInt32(&m.handlers, 1) defer atomic.AddInt32(&m.handlers, -1) diff --git a/functional_producer_test.go b/functional_producer_test.go index 7c1925b06..74f9682a2 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -432,15 +432,19 @@ func validateMetrics(t *testing.T, client Client) { func BenchmarkProducerSmall(b *testing.B) { benchmarkProducer(b, nil, "test.64", ByteEncoder(make([]byte, 128))) } + func BenchmarkProducerMedium(b *testing.B) { benchmarkProducer(b, nil, "test.64", ByteEncoder(make([]byte, 1024))) } + func BenchmarkProducerLarge(b *testing.B) { benchmarkProducer(b, nil, "test.64", ByteEncoder(make([]byte, 8192))) } + func BenchmarkProducerSmallSinglePartition(b *testing.B) { benchmarkProducer(b, nil, "test.1", ByteEncoder(make([]byte, 128))) } + func BenchmarkProducerMediumSnappy(b *testing.B) { conf := NewTestConfig() conf.Producer.Compression = CompressionSnappy diff --git a/functional_test.go b/functional_test.go index 134e55e4e..7f10a9eea 100644 --- a/functional_test.go +++ b/functional_test.go @@ -355,7 +355,7 @@ func prepareTestTopics(ctx context.Context, env *testEnvironment) error { return fmt.Errorf("failed fetching the uncommitted msg jar: %w", err) } defer res.Body.Close() - jarFile, err := os.OpenFile(jarName, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644) + jarFile, err := os.OpenFile(jarName, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0o644) if err != nil { return fmt.Errorf("failed opening the uncomitted msg jar: %w", err) } diff --git a/gssapi_kerberos.go b/gssapi_kerberos.go index ea72ea521..b2c199000 100644 --- a/gssapi_kerberos.go +++ b/gssapi_kerberos.go @@ -215,7 +215,6 @@ func (krbAuth *GSSAPIKerberosAuth) Authorize(broker *Broker) error { spn := fmt.Sprintf("%s/%s", broker.conf.Net.SASL.GSSAPI.ServiceName, host) ticket, encKey, err := kerberosClient.GetServiceTicket(spn) - if err != nil { Logger.Printf("Error getting Kerberos service ticket : %s", err) return err diff --git a/heartbeat_request_test.go b/heartbeat_request_test.go index 981c377f9..2653f82c7 100644 --- a/heartbeat_request_test.go +++ b/heartbeat_request_test.go @@ -2,13 +2,11 @@ package sarama import "testing" -var ( - basicHeartbeatRequest = []byte{ - 0, 3, 'f', 'o', 'o', // Group ID - 0x00, 0x01, 0x02, 0x03, // Generatiuon ID - 0, 3, 'b', 'a', 'z', // Member ID - } -) +var basicHeartbeatRequest = []byte{ + 0, 3, 'f', 'o', 'o', // Group ID + 0x00, 0x01, 0x02, 0x03, // Generatiuon ID + 0, 3, 'b', 'a', 'z', // Member ID +} func TestHeartbeatRequest(t *testing.T) { request := new(HeartbeatRequest) diff --git a/heartbeat_response_test.go b/heartbeat_response_test.go index 4b29416de..e60146bdc 100644 --- a/heartbeat_response_test.go +++ b/heartbeat_response_test.go @@ -2,10 +2,9 @@ package sarama import "testing" -var ( - heartbeatResponseNoError = []byte{ - 0x00, 0x00} -) +var heartbeatResponseNoError = []byte{ + 0x00, 0x00, +} func TestHeartbeatResponse(t *testing.T) { response := new(HeartbeatResponse) diff --git a/leave_group_request_test.go b/leave_group_request_test.go index 04527e47c..b674e48b2 100644 --- a/leave_group_request_test.go +++ b/leave_group_request_test.go @@ -2,12 +2,10 @@ package sarama import "testing" -var ( - basicLeaveGroupRequest = []byte{ - 0, 3, 'f', 'o', 'o', - 0, 3, 'b', 'a', 'r', - } -) +var basicLeaveGroupRequest = []byte{ + 0, 3, 'f', 'o', 'o', + 0, 3, 'b', 'a', 'r', +} func TestLeaveGroupRequest(t *testing.T) { request := new(LeaveGroupRequest) diff --git a/list_groups_request.go b/list_groups_request.go index ed44cc27e..4553b2d2e 100644 --- a/list_groups_request.go +++ b/list_groups_request.go @@ -1,7 +1,6 @@ package sarama -type ListGroupsRequest struct { -} +type ListGroupsRequest struct{} func (r *ListGroupsRequest) encode(pe packetEncoder) error { return nil diff --git a/list_partition_reassignments_request_test.go b/list_partition_reassignments_request_test.go index b47480ece..e50b289b9 100644 --- a/list_partition_reassignments_request_test.go +++ b/list_partition_reassignments_request_test.go @@ -2,16 +2,14 @@ package sarama import "testing" -var ( - listPartitionReassignmentsRequestOneBlock = []byte{ - 0, 0, 39, 16, // timeout 10000 - 2, // 2-1=1 block - 6, 116, 111, 112, 105, 99, // topic name "topic" as compact string - 2, // 2-1=1 partitions - 0, 0, 0, 0, // partitionId - 0, 0, // empty tagged fields - } -) +var listPartitionReassignmentsRequestOneBlock = []byte{ + 0, 0, 39, 16, // timeout 10000 + 2, // 2-1=1 block + 6, 116, 111, 112, 105, 99, // topic name "topic" as compact string + 2, // 2-1=1 partitions + 0, 0, 0, 0, // partitionId + 0, 0, // empty tagged fields +} func TestListPartitionReassignmentRequest(t *testing.T) { var request *ListPartitionReassignmentsRequest = &ListPartitionReassignmentsRequest{ diff --git a/list_partition_reassignments_response_test.go b/list_partition_reassignments_response_test.go index 7b27b064e..71532423d 100644 --- a/list_partition_reassignments_response_test.go +++ b/list_partition_reassignments_response_test.go @@ -2,21 +2,19 @@ package sarama import "testing" -var ( - listPartitionReassignmentsResponse = []byte{ - 0, 0, 39, 16, // ThrottleTimeMs 10000 - 0, 0, // errorcode - 0, // null string - 2, // block array length 1 - 6, 116, 111, 112, 105, 99, // topic name "topic" - 2, // partition array length 1 - 0, 0, 0, 1, // partitionId - 3, 0, 0, 3, 232, 0, 0, 3, 233, // replicas [1000, 1001] - 3, 0, 0, 3, 234, 0, 0, 3, 235, // addingReplicas [1002, 1003] - 3, 0, 0, 3, 236, 0, 0, 3, 237, // addingReplicas [1004, 1005] - 0, 0, 0, // empty tagged fields - } -) +var listPartitionReassignmentsResponse = []byte{ + 0, 0, 39, 16, // ThrottleTimeMs 10000 + 0, 0, // errorcode + 0, // null string + 2, // block array length 1 + 6, 116, 111, 112, 105, 99, // topic name "topic" + 2, // partition array length 1 + 0, 0, 0, 1, // partitionId + 3, 0, 0, 3, 232, 0, 0, 3, 233, // replicas [1000, 1001] + 3, 0, 0, 3, 234, 0, 0, 3, 235, // addingReplicas [1002, 1003] + 3, 0, 0, 3, 236, 0, 0, 3, 237, // addingReplicas [1004, 1005] + 0, 0, 0, // empty tagged fields +} func TestListPartitionReassignmentResponse(t *testing.T) { var response *ListPartitionReassignmentsResponse = &ListPartitionReassignmentsResponse{ diff --git a/message.go b/message.go index 40b3ac9d1..fd0d1d90b 100644 --- a/message.go +++ b/message.go @@ -6,15 +6,15 @@ import ( ) const ( - //CompressionNone no compression + // CompressionNone no compression CompressionNone CompressionCodec = iota - //CompressionGZIP compression using GZIP + // CompressionGZIP compression using GZIP CompressionGZIP - //CompressionSnappy compression using snappy + // CompressionSnappy compression using snappy CompressionSnappy - //CompressionLZ4 compression using LZ4 + // CompressionLZ4 compression using LZ4 CompressionLZ4 - //CompressionZSTD compression using ZSTD + // CompressionZSTD compression using ZSTD CompressionZSTD // The lowest 3 bits contain the compression codec used for the message @@ -42,7 +42,7 @@ func (cc CompressionCodec) String() string { }[int(cc)] } -//Message is a kafka message type +// Message is a kafka message type type Message struct { Codec CompressionCodec // codec used to compress the message contents CompressionLevel int // compression level diff --git a/message_test.go b/message_test.go index aa103d43f..a6c7cff2a 100644 --- a/message_test.go +++ b/message_test.go @@ -11,7 +11,8 @@ var ( 0x00, // magic version byte 0x00, // attribute flags 0xFF, 0xFF, 0xFF, 0xFF, // key - 0xFF, 0xFF, 0xFF, 0xFF} // value + 0xFF, 0xFF, 0xFF, 0xFF, + } // value emptyV1Message = []byte{ 204, 47, 121, 217, // CRC @@ -19,17 +20,19 @@ var ( 0x00, // attribute flags 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // timestamp 0xFF, 0xFF, 0xFF, 0xFF, // key - 0xFF, 0xFF, 0xFF, 0xFF} // value + 0xFF, 0xFF, 0xFF, 0xFF, + } // value emptyV2Message = []byte{ 167, 236, 104, 3, // CRC 0x02, // magic version byte 0x00, // attribute flags 0xFF, 0xFF, 0xFF, 0xFF, // key - 0xFF, 0xFF, 0xFF, 0xFF} // value + 0xFF, 0xFF, 0xFF, 0xFF, + } // value emptyGzipMessage = []byte{ - 132, 99, 80, 148, //CRC + 132, 99, 80, 148, // CRC 0x00, // magic version byte 0x01, // attribute flags 0xFF, 0xFF, 0xFF, 0xFF, // key @@ -37,7 +40,8 @@ var ( 0x00, 0x00, 0x00, 0x17, 0x1f, 0x8b, 0x08, - 0, 0, 0, 0, 0, 0, 255, 1, 0, 0, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0} + 0, 0, 0, 0, 0, 0, 255, 1, 0, 0, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0, + } emptyLZ4Message = []byte{ 132, 219, 238, 101, // CRC @@ -64,7 +68,7 @@ var ( } emptyBulkSnappyMessage = []byte{ - 180, 47, 53, 209, //CRC + 180, 47, 53, 209, // CRC 0x00, // magic version byte 0x02, // attribute flags 0xFF, 0xFF, 0xFF, 0xFF, // key @@ -72,17 +76,19 @@ var ( 130, 83, 78, 65, 80, 80, 89, 0, // SNAPPY magic 0, 0, 0, 1, // min version 0, 0, 0, 1, // default version - 0, 0, 0, 22, 52, 0, 0, 25, 1, 16, 14, 227, 138, 104, 118, 25, 15, 13, 1, 8, 1, 0, 0, 62, 26, 0} + 0, 0, 0, 22, 52, 0, 0, 25, 1, 16, 14, 227, 138, 104, 118, 25, 15, 13, 1, 8, 1, 0, 0, 62, 26, 0, + } emptyBulkGzipMessage = []byte{ - 139, 160, 63, 141, //CRC + 139, 160, 63, 141, // CRC 0x00, // magic version byte 0x01, // attribute flags 0xFF, 0xFF, 0xFF, 0xFF, // key 0x00, 0x00, 0x00, 0x27, // len 0x1f, 0x8b, // Gzip Magic 0x08, // deflate compressed - 0, 0, 0, 0, 0, 0, 0, 99, 96, 128, 3, 190, 202, 112, 143, 7, 12, 12, 255, 129, 0, 33, 200, 192, 136, 41, 3, 0, 199, 226, 155, 70, 52, 0, 0, 0} + 0, 0, 0, 0, 0, 0, 0, 99, 96, 128, 3, 190, 202, 112, 143, 7, 12, 12, 255, 129, 0, 33, 200, 192, 136, 41, 3, 0, 199, 226, 155, 70, 52, 0, 0, 0, + } emptyBulkLZ4Message = []byte{ 246, 12, 188, 129, // CRC diff --git a/metadata_response_test.go b/metadata_response_test.go index 04a4ce7fc..3970a2f31 100644 --- a/metadata_response_test.go +++ b/metadata_response_test.go @@ -5,7 +5,8 @@ import "testing" var ( emptyMetadataResponseV0 = []byte{ 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } brokersNoTopicsMetadataResponseV0 = []byte{ 0x00, 0x00, 0x00, 0x02, @@ -18,7 +19,8 @@ var ( 0x00, 0x0a, 'g', 'o', 'o', 'g', 'l', 'e', '.', 'c', 'o', 'm', 0x00, 0x00, 0x01, 0x11, - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } topicsNoBrokersMetadataResponseV0 = []byte{ 0x00, 0x00, 0x00, 0x00, @@ -35,7 +37,8 @@ var ( 0x00, 0x00, 0x00, 0x03, 'b', 'a', 'r', - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } brokersNoTopicsMetadataResponseV1 = []byte{ 0x00, 0x00, 0x00, 0x02, @@ -52,7 +55,8 @@ var ( 0x00, 0x00, 0x00, 0x01, - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } topicsNoBrokersMetadataResponseV1 = []byte{ 0x00, 0x00, 0x00, 0x00, @@ -74,14 +78,16 @@ var ( 0x00, 0x00, 0x00, 0x03, 'b', 'a', 'r', 0x01, - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } noBrokersNoTopicsWithThrottleTimeAndClusterIDV3 = []byte{ 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x09, 'c', 'l', 'u', 's', 't', 'e', 'r', 'I', 'd', 0x00, 0x00, 0x00, 0x01, - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } noBrokersOneTopicWithOfflineReplicasV5 = []byte{ 0x00, 0x00, 0x00, 0x05, diff --git a/mockkerberos.go b/mockkerberos.go index beb00e5b5..a43607e1c 100644 --- a/mockkerberos.go +++ b/mockkerberos.go @@ -27,7 +27,7 @@ func (h *KafkaGSSAPIHandler) MockKafkaGSSAPI(buffer []byte) []byte { return []byte{0x00, 0x00, 0x00, 0x01, 0xAD} } - var pack = gssapi.WrapToken{ + pack := gssapi.WrapToken{ Flags: KRB5_USER_AUTH, EC: 12, RRC: 0, @@ -108,8 +108,9 @@ func (c *MockKerberosClient) GetServiceTicket(spn string) (messages.Ticket, type func (c *MockKerberosClient) Domain() string { return "EXAMPLE.COM" } + func (c *MockKerberosClient) CName() types.PrincipalName { - var p = types.PrincipalName{ + p := types.PrincipalName{ NameType: KRB5_USER_AUTH, NameString: []string{ "kafka", @@ -118,6 +119,7 @@ func (c *MockKerberosClient) CName() types.PrincipalName { } return p } + func (c *MockKerberosClient) Destroy() { // Do nothing. } diff --git a/mockresponses.go b/mockresponses.go index 3df1ee0a7..6654ed07c 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -805,7 +805,8 @@ func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoderWith Configs: configEntries, }) case TopicResource: - maxMessageBytes := &ConfigEntry{Name: "max.message.bytes", + maxMessageBytes := &ConfigEntry{ + Name: "max.message.bytes", Value: "1000000", ReadOnly: false, Default: !includeSource, @@ -822,7 +823,8 @@ func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoderWith }, } } - retentionMs := &ConfigEntry{Name: "retention.ms", + retentionMs := &ConfigEntry{ + Name: "retention.ms", Value: "5000", ReadOnly: false, Default: false, @@ -836,7 +838,8 @@ func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoderWith }, } } - password := &ConfigEntry{Name: "password", + password := &ConfigEntry{ + Name: "password", Value: "12345", ReadOnly: false, Default: false, @@ -891,7 +894,8 @@ func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoderWithHea res := &AlterConfigsResponse{} for _, r := range req.Resources { - res.Resources = append(res.Resources, &AlterConfigsResourceResponse{Name: r.Name, + res.Resources = append(res.Resources, &AlterConfigsResourceResponse{ + Name: r.Name, Type: r.Type, ErrorMsg: "", }) diff --git a/mocks/consumer.go b/mocks/consumer.go index 451eb08d0..baec88b51 100644 --- a/mocks/consumer.go +++ b/mocks/consumer.go @@ -231,7 +231,7 @@ func (pc *PartitionConsumer) Close() error { go func() { defer wg.Done() - var errs = make(sarama.ConsumerErrors, 0) + errs := make(sarama.ConsumerErrors, 0) for err := range pc.errors { errs = append(errs, err) } diff --git a/offset_commit_request_test.go b/offset_commit_request_test.go index efb3d33f1..06bbd4036 100644 --- a/offset_commit_request_test.go +++ b/offset_commit_request_test.go @@ -8,20 +8,23 @@ import ( var ( offsetCommitRequestNoBlocksV0 = []byte{ 0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r', - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } offsetCommitRequestNoBlocksV1 = []byte{ 0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r', 0x00, 0x00, 0x11, 0x22, 0x00, 0x04, 'c', 'o', 'n', 's', - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } offsetCommitRequestNoBlocksV2 = []byte{ 0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r', 0x00, 0x00, 0x11, 0x22, 0x00, 0x04, 'c', 'o', 'n', 's', 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x44, 0x33, - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } offsetCommitRequestOneBlockV0 = []byte{ 0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r', @@ -30,7 +33,8 @@ var ( 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x52, 0x21, 0x00, 0x00, 0x00, 0x00, 0xDE, 0xAD, 0xBE, 0xEF, - 0x00, 0x08, 'm', 'e', 't', 'a', 'd', 'a', 't', 'a'} + 0x00, 0x08, 'm', 'e', 't', 'a', 'd', 'a', 't', 'a', + } offsetCommitRequestOneBlockV1 = []byte{ 0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r', @@ -42,7 +46,8 @@ var ( 0x00, 0x00, 0x52, 0x21, 0x00, 0x00, 0x00, 0x00, 0xDE, 0xAD, 0xBE, 0xEF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, - 0x00, 0x08, 'm', 'e', 't', 'a', 'd', 'a', 't', 'a'} + 0x00, 0x08, 'm', 'e', 't', 'a', 'd', 'a', 't', 'a', + } offsetCommitRequestOneBlockV2 = []byte{ 0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r', @@ -54,7 +59,8 @@ var ( 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x52, 0x21, 0x00, 0x00, 0x00, 0x00, 0xDE, 0xAD, 0xBE, 0xEF, - 0x00, 0x08, 'm', 'e', 't', 'a', 'd', 'a', 't', 'a'} + 0x00, 0x08, 'm', 'e', 't', 'a', 'd', 'a', 't', 'a', + } ) func TestOffsetCommitRequestV0(t *testing.T) { diff --git a/offset_commit_response_test.go b/offset_commit_response_test.go index 3c85713c7..f35ca54ec 100644 --- a/offset_commit_response_test.go +++ b/offset_commit_response_test.go @@ -5,10 +5,9 @@ import ( "testing" ) -var ( - emptyOffsetCommitResponse = []byte{ - 0x00, 0x00, 0x00, 0x00} -) +var emptyOffsetCommitResponse = []byte{ + 0x00, 0x00, 0x00, 0x00, +} func TestEmptyOffsetCommitResponse(t *testing.T) { response := OffsetCommitResponse{} diff --git a/offset_fetch_request_test.go b/offset_fetch_request_test.go index a5270dbeb..d4497a8b8 100644 --- a/offset_fetch_request_test.go +++ b/offset_fetch_request_test.go @@ -8,42 +8,50 @@ import ( var ( offsetFetchRequestNoGroupNoPartitions = []byte{ 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } offsetFetchRequestNoPartitionsV6 = []byte{ - 0x05, 'b', 'l', 'a', 'h', 0x01, 0x00} + 0x05, 'b', 'l', 'a', 'h', 0x01, 0x00, + } offsetFetchRequestNoPartitionsV7 = []byte{ - 0x05, 'b', 'l', 'a', 'h', 0x01, 0x01, 0x00} + 0x05, 'b', 'l', 'a', 'h', 0x01, 0x01, 0x00, + } offsetFetchRequestNoPartitions = []byte{ 0x00, 0x04, 'b', 'l', 'a', 'h', - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } offsetFetchRequestOnePartition = []byte{ 0x00, 0x04, 'b', 'l', 'a', 'h', 0x00, 0x00, 0x00, 0x01, 0x00, 0x0D, 't', 'o', 'p', 'i', 'c', 'T', 'h', 'e', 'F', 'i', 'r', 's', 't', 0x00, 0x00, 0x00, 0x01, - 0x4F, 0x4F, 0x4F, 0x4F} + 0x4F, 0x4F, 0x4F, 0x4F, + } offsetFetchRequestOnePartitionV6 = []byte{ 0x05, 'b', 'l', 'a', 'h', 0x02, 0x0E, 't', 'o', 'p', 'i', 'c', 'T', 'h', 'e', 'F', 'i', 'r', 's', 't', 0x02, 0x4F, 0x4F, 0x4F, 0x4F, - 0x00, 0x00} + 0x00, 0x00, + } offsetFetchRequestOnePartitionV7 = []byte{ 0x05, 'b', 'l', 'a', 'h', 0x02, 0x0E, 't', 'o', 'p', 'i', 'c', 'T', 'h', 'e', 'F', 'i', 'r', 's', 't', 0x02, 0x4F, 0x4F, 0x4F, 0x4F, - 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, + } offsetFetchRequestAllPartitions = []byte{ 0x00, 0x04, 'b', 'l', 'a', 'h', - 0xff, 0xff, 0xff, 0xff} + 0xff, 0xff, 0xff, 0xff, + } ) func TestOffsetFetchRequestNoPartitions(t *testing.T) { @@ -88,7 +96,7 @@ func TestOffsetFetchRequest(t *testing.T) { testRequest(t, fmt.Sprintf("one partition %d", version), request, offsetFetchRequestOnePartition) } - { //v6 + { // v6 version := 6 request := new(OffsetFetchRequest) request.Version = int16(version) @@ -97,7 +105,7 @@ func TestOffsetFetchRequest(t *testing.T) { testRequest(t, fmt.Sprintf("one partition %d", version), request, offsetFetchRequestOnePartitionV6) } - { //v7 + { // v7 version := 7 request := new(OffsetFetchRequest) request.Version = int16(version) diff --git a/offset_fetch_response_test.go b/offset_fetch_response_test.go index b1a6c3543..d70894ab2 100644 --- a/offset_fetch_response_test.go +++ b/offset_fetch_response_test.go @@ -7,20 +7,19 @@ import ( var ( emptyOffsetFetchResponse = []byte{ - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } emptyOffsetFetchResponseV2 = []byte{ 0x00, 0x00, 0x00, 0x00, - 0x00, 0x2A} + 0x00, 0x2A, + } emptyOffsetFetchResponseV3 = []byte{ 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x2A} - - emptyOffsetFetchResponseV6 = []byte{ - 0x00, 0x00, 0x00, 0x09, - 0x01, 0x00, 0x2A, 0x00} + 0x00, 0x2A, + } ) func TestEmptyOffsetFetchResponse(t *testing.T) { @@ -36,11 +35,6 @@ func TestEmptyOffsetFetchResponse(t *testing.T) { responseV3 := OffsetFetchResponse{Version: int16(version), Err: ErrInvalidRequest, ThrottleTimeMs: 9} testResponse(t, fmt.Sprintf("empty v%d", version), &responseV3, emptyOffsetFetchResponseV3) } - - for version := 6; version <= 7; version++ { - responseV6 := OffsetFetchResponse{Version: int16(version), Err: ErrInvalidRequest, ThrottleTimeMs: 9} - testResponse(t, fmt.Sprintf("empty v%d", version), &responseV6, emptyOffsetFetchResponseV6) - } } func TestNormalOffsetFetchResponse(t *testing.T) { @@ -67,10 +61,8 @@ func TestNormalOffsetFetchResponse(t *testing.T) { testResponse(t, fmt.Sprintf("Normal v%d", version), &responseV3, nil) } - for version := 5; version <= 7; version++ { - res := OffsetFetchResponse{Version: int16(version), Err: ErrInvalidRequest, ThrottleTimeMs: 9} - res.AddBlock("t", 0, &OffsetFetchResponseBlock{Offset: 10, LeaderEpoch: 100, Metadata: "md", Err: ErrRequestTimedOut}) - res.Blocks["m"] = nil - testResponse(t, fmt.Sprintf("normal V%d", version), &res, nil) - } + responseV5 := OffsetFetchResponse{Version: 5, Err: ErrInvalidRequest, ThrottleTimeMs: 9} + responseV5.AddBlock("t", 0, &OffsetFetchResponseBlock{Offset: 10, LeaderEpoch: 100, Metadata: "md", Err: ErrRequestTimedOut}) + responseV5.Blocks["m"] = nil + testResponse(t, "normal V5", &responseV5, nil) } diff --git a/offset_request_test.go b/offset_request_test.go index 0e6951a00..8ea533a6b 100644 --- a/offset_request_test.go +++ b/offset_request_test.go @@ -5,12 +5,14 @@ import "testing" var ( offsetRequestNoBlocksV1 = []byte{ 0xFF, 0xFF, 0xFF, 0xFF, - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } offsetRequestNoBlocksV2 = []byte{ 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00, - 0x00} + 0x00, + } offsetRequestOneBlock = []byte{ 0xFF, 0xFF, 0xFF, 0xFF, @@ -19,7 +21,8 @@ var ( 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, - 0x00, 0x00, 0x00, 0x02} + 0x00, 0x00, 0x00, 0x02, + } offsetRequestOneBlockV1 = []byte{ 0xFF, 0xFF, 0xFF, 0xFF, @@ -27,7 +30,8 @@ var ( 0x00, 0x03, 'b', 'a', 'r', 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01} + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, + } offsetRequestOneBlockReadCommittedV2 = []byte{ 0xFF, 0xFF, 0xFF, 0xFF, @@ -35,11 +39,13 @@ var ( 0x00, 0x03, 'b', 'a', 'r', 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01} + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, + } offsetRequestReplicaID = []byte{ 0x00, 0x00, 0x00, 0x2a, - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } ) func TestOffsetRequest(t *testing.T) { diff --git a/offset_response_test.go b/offset_response_test.go index a711a9864..683c11e43 100644 --- a/offset_response_test.go +++ b/offset_response_test.go @@ -1,12 +1,11 @@ package sarama -import ( - "testing" -) +import "testing" var ( emptyOffsetResponse = []byte{ - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } normalOffsetResponse = []byte{ 0x00, 0x00, 0x00, 0x02, @@ -20,7 +19,8 @@ var ( 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06} + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, + } normalOffsetResponseV1 = []byte{ 0x00, 0x00, 0x00, 0x02, @@ -33,21 +33,8 @@ var ( 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x01, 0x58, 0x1A, 0xE6, 0x48, 0x86, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06} - - normalOffsetResponseV2 = []byte{ - 0x00, 0x00, 0x00, 0x09, - 0x00, 0x00, 0x00, 0x02, - - 0x00, 0x01, 'a', - 0x00, 0x00, 0x00, 0x00, - - 0x00, 0x01, 'z', - 0x00, 0x00, 0x00, 0x01, - 0x00, 0x00, 0x00, 0x02, - 0x00, 0x00, - 0x00, 0x00, 0x01, 0x58, 0x1A, 0xE6, 0x48, 0x86, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06} + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, + } ) func TestEmptyOffsetResponse(t *testing.T) { @@ -125,37 +112,3 @@ func TestNormalOffsetResponseV1(t *testing.T) { t.Fatal("Decoding produced invalid offsets for topic z partition 2.") } } - -func TestNormalOffsetResponseV2(t *testing.T) { - response := OffsetResponse{} - - testVersionDecodable(t, "normal", &response, normalOffsetResponseV2, 2) // response should not change - - if response.ThrottleTimeMs != 9 { - t.Fatal("Decoding produced", response.ThrottleTimeMs, "throttle milliseconds where there were nine milliseconds.") - } - - if len(response.Blocks) != 2 { - t.Fatal("Decoding produced", len(response.Blocks), "topics where there were two.") - } - - if len(response.Blocks["a"]) != 0 { - t.Fatal("Decoding produced", len(response.Blocks["a"]), "partitions for topic 'a' where there were none.") - } - - if len(response.Blocks["z"]) != 1 { - t.Fatal("Decoding produced", len(response.Blocks["z"]), "partitions for topic 'z' where there was one.") - } - - if response.Blocks["z"][2].Err != ErrNoError { - t.Fatal("Decoding produced invalid error for topic z partition 2.") - } - - if response.Blocks["z"][2].Timestamp != 1477920049286 { - t.Fatal("Decoding produced invalid timestamp for topic z partition 2.", response.Blocks["z"][2].Timestamp) - } - - if response.Blocks["z"][2].Offset != 6 { - t.Fatal("Decoding produced invalid offsets for topic z partition 2.") - } -} diff --git a/produce_request_test.go b/produce_request_test.go index b9896eb61..e8684ffd2 100644 --- a/produce_request_test.go +++ b/produce_request_test.go @@ -9,12 +9,14 @@ var ( produceRequestEmpty = []byte{ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } produceRequestHeader = []byte{ 0x01, 0x23, 0x00, 0x00, 0x04, 0x44, - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } produceRequestOneMessage = []byte{ 0x01, 0x23, @@ -32,7 +34,8 @@ var ( 0x00, 0x00, 0xFF, 0xFF, 0xFF, 0xFF, - 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE} + 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE, + } produceRequestOneRecord = []byte{ 0xFF, 0xFF, // Transaction ID diff --git a/produce_response_test.go b/produce_response_test.go index 0dfac7947..584652221 100644 --- a/produce_response_test.go +++ b/produce_response_test.go @@ -8,7 +8,8 @@ import ( var ( produceResponseNoBlocksV0 = []byte{ - 0x00, 0x00, 0x00, 0x00} + 0x00, 0x00, 0x00, 0x00, + } produceResponseManyBlocksVersions = map[int][]byte{ 0: { diff --git a/real_decoder.go b/real_decoder.go index bffec2f39..2482c6377 100644 --- a/real_decoder.go +++ b/real_decoder.go @@ -5,13 +5,15 @@ import ( "math" ) -var errInvalidArrayLength = PacketDecodingError{"invalid array length"} -var errInvalidByteSliceLength = PacketDecodingError{"invalid byteslice length"} -var errInvalidStringLength = PacketDecodingError{"invalid string length"} -var errVarintOverflow = PacketDecodingError{"varint overflow"} -var errUVarintOverflow = PacketDecodingError{"uvarint overflow"} -var errInvalidBool = PacketDecodingError{"invalid bool"} -var errUnsupportedTaggedFields = PacketDecodingError{"non-empty tagged fields are not supported yet"} +var ( + errInvalidArrayLength = PacketDecodingError{"invalid array length"} + errInvalidByteSliceLength = PacketDecodingError{"invalid byteslice length"} + errInvalidStringLength = PacketDecodingError{"invalid string length"} + errVarintOverflow = PacketDecodingError{"varint overflow"} + errUVarintOverflow = PacketDecodingError{"uvarint overflow"} + errInvalidBool = PacketDecodingError{"invalid bool"} + errUnsupportedTaggedFields = PacketDecodingError{"non-empty tagged fields are not supported yet"} +) type realDecoder struct { raw []byte @@ -176,7 +178,7 @@ func (rd *realDecoder) getCompactBytes() ([]byte, error) { return nil, err } - var length = int(n - 1) + length := int(n - 1) return rd.getRawBytes(length) } @@ -227,7 +229,7 @@ func (rd *realDecoder) getCompactString() (string, error) { return "", err } - var length = int(n - 1) + length := int(n - 1) tmpStr := string(rd.raw[rd.off : rd.off+length]) rd.off += length @@ -236,12 +238,11 @@ func (rd *realDecoder) getCompactString() (string, error) { func (rd *realDecoder) getCompactNullableString() (*string, error) { n, err := rd.getUVarint() - if err != nil { return nil, err } - var length = int(n - 1) + length := int(n - 1) if length < 0 { return nil, err diff --git a/record.go b/record.go index cdccfe322..a3fe8c061 100644 --- a/record.go +++ b/record.go @@ -11,7 +11,7 @@ const ( maximumRecordOverhead = 5*binary.MaxVarintLen32 + binary.MaxVarintLen64 + 1 ) -//RecordHeader stores key and value for a record header +// RecordHeader stores key and value for a record header type RecordHeader struct { Key []byte Value []byte @@ -35,7 +35,7 @@ func (h *RecordHeader) decode(pd packetDecoder) (err error) { return nil } -//Record is kafka record type +// Record is kafka record type type Record struct { Headers []*RecordHeader diff --git a/response_header.go b/response_header.go index 5dffb75be..fbcef0bfb 100644 --- a/response_header.go +++ b/response_header.go @@ -2,8 +2,10 @@ package sarama import "fmt" -const responseLengthSize = 4 -const correlationIDSize = 4 +const ( + responseLengthSize = 4 + correlationIDSize = 4 +) type responseHeader struct { length int32 diff --git a/response_header_test.go b/response_header_test.go index 31c35ae6a..c7c68eae7 100644 --- a/response_header_test.go +++ b/response_header_test.go @@ -5,11 +5,13 @@ import "testing" var ( responseHeaderBytesV0 = []byte{ 0x00, 0x00, 0x0f, 0x00, - 0x0a, 0xbb, 0xcc, 0xff} + 0x0a, 0xbb, 0xcc, 0xff, + } responseHeaderBytesV1 = []byte{ 0x00, 0x00, 0x0f, 0x00, - 0x0a, 0xbb, 0xcc, 0xff, 0x00} + 0x0a, 0xbb, 0xcc, 0xff, 0x00, + } ) func TestResponseHeaderV0(t *testing.T) { diff --git a/sasl_authenticate_request_test.go b/sasl_authenticate_request_test.go index fc059e077..bf75004d2 100644 --- a/sasl_authenticate_request_test.go +++ b/sasl_authenticate_request_test.go @@ -2,11 +2,9 @@ package sarama import "testing" -var ( - saslAuthenticateRequest = []byte{ - 0, 0, 0, 3, 'f', 'o', 'o', - } -) +var saslAuthenticateRequest = []byte{ + 0, 0, 0, 3, 'f', 'o', 'o', +} func TestSaslAuthenticateRequest(t *testing.T) { request := new(SaslAuthenticateRequest) diff --git a/sasl_authenticate_response_test.go b/sasl_authenticate_response_test.go index c555cfbfa..048dade19 100644 --- a/sasl_authenticate_response_test.go +++ b/sasl_authenticate_response_test.go @@ -2,13 +2,11 @@ package sarama import "testing" -var ( - saslAuthenticatResponseErr = []byte{ - 0, 58, - 0, 3, 'e', 'r', 'r', - 0, 0, 0, 3, 'm', 's', 'g', - } -) +var saslAuthenticatResponseErr = []byte{ + 0, 58, + 0, 3, 'e', 'r', 'r', + 0, 0, 0, 3, 'm', 's', 'g', +} func TestSaslAuthenticateResponse(t *testing.T) { response := new(SaslAuthenticateResponse) diff --git a/sasl_handshake_request_test.go b/sasl_handshake_request_test.go index 34f954c14..e100ad5b9 100644 --- a/sasl_handshake_request_test.go +++ b/sasl_handshake_request_test.go @@ -2,11 +2,9 @@ package sarama import "testing" -var ( - baseSaslRequest = []byte{ - 0, 3, 'f', 'o', 'o', // Mechanism - } -) +var baseSaslRequest = []byte{ + 0, 3, 'f', 'o', 'o', // Mechanism +} func TestSaslHandshakeRequest(t *testing.T) { request := new(SaslHandshakeRequest) diff --git a/sasl_handshake_response_test.go b/sasl_handshake_response_test.go index 0ebc27a1c..40441fd85 100644 --- a/sasl_handshake_response_test.go +++ b/sasl_handshake_response_test.go @@ -2,13 +2,11 @@ package sarama import "testing" -var ( - saslHandshakeResponse = []byte{ - 0x00, 0x00, - 0x00, 0x00, 0x00, 0x01, - 0x00, 0x03, 'f', 'o', 'o', - } -) +var saslHandshakeResponse = []byte{ + 0x00, 0x00, + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x03, 'f', 'o', 'o', +} func TestSaslHandshakeResponse(t *testing.T) { response := new(SaslHandshakeResponse) diff --git a/sticky_assignor_user_data.go b/sticky_assignor_user_data.go index bb0c82c34..161233fc3 100644 --- a/sticky_assignor_user_data.go +++ b/sticky_assignor_user_data.go @@ -11,7 +11,7 @@ type StickyAssignorUserData interface { generation() int } -//StickyAssignorUserDataV0 holds topic partition information for an assignment +// StickyAssignorUserDataV0 holds topic partition information for an assignment type StickyAssignorUserDataV0 struct { Topics map[string][]int32 @@ -58,7 +58,7 @@ func (m *StickyAssignorUserDataV0) partitions() []topicPartitionAssignment { ret func (m *StickyAssignorUserDataV0) hasGeneration() bool { return false } func (m *StickyAssignorUserDataV0) generation() int { return defaultGeneration } -//StickyAssignorUserDataV1 holds topic partition information for an assignment +// StickyAssignorUserDataV1 holds topic partition information for an assignment type StickyAssignorUserDataV1 struct { Topics map[string][]int32 Generation int32 diff --git a/txn_offset_commit_request_test.go b/txn_offset_commit_request_test.go index f60d41723..714ec47eb 100644 --- a/txn_offset_commit_request_test.go +++ b/txn_offset_commit_request_test.go @@ -2,20 +2,18 @@ package sarama import "testing" -var ( - txnOffsetCommitRequest = []byte{ - 0, 3, 't', 'x', 'n', - 0, 7, 'g', 'r', 'o', 'u', 'p', 'i', 'd', - 0, 0, 0, 0, 0, 0, 31, 64, // producer ID - 0, 1, // producer epoch - 0, 0, 0, 1, // 1 topic - 0, 5, 't', 'o', 'p', 'i', 'c', - 0, 0, 0, 1, // 1 partition - 0, 0, 0, 2, // partition no 2 - 0, 0, 0, 0, 0, 0, 0, 123, - 255, 255, // no meta data - } -) +var txnOffsetCommitRequest = []byte{ + 0, 3, 't', 'x', 'n', + 0, 7, 'g', 'r', 'o', 'u', 'p', 'i', 'd', + 0, 0, 0, 0, 0, 0, 31, 64, // producer ID + 0, 1, // producer epoch + 0, 0, 0, 1, // 1 topic + 0, 5, 't', 'o', 'p', 'i', 'c', + 0, 0, 0, 1, // 1 partition + 0, 0, 0, 2, // partition no 2 + 0, 0, 0, 0, 0, 0, 0, 123, + 255, 255, // no meta data +} func TestTxnOffsetCommitRequest(t *testing.T) { req := &TxnOffsetCommitRequest{ diff --git a/txn_offset_commit_response_test.go b/txn_offset_commit_response_test.go index 2a66ad3b4..b4caa69b9 100644 --- a/txn_offset_commit_response_test.go +++ b/txn_offset_commit_response_test.go @@ -5,16 +5,14 @@ import ( "time" ) -var ( - txnOffsetCommitResponse = []byte{ - 0, 0, 0, 100, - 0, 0, 0, 1, // 1 topic - 0, 5, 't', 'o', 'p', 'i', 'c', - 0, 0, 0, 1, // 1 partition response - 0, 0, 0, 2, // partition number 2 - 0, 47, // err - } -) +var txnOffsetCommitResponse = []byte{ + 0, 0, 0, 100, + 0, 0, 0, 1, // 1 topic + 0, 5, 't', 'o', 'p', 'i', 'c', + 0, 0, 0, 1, // 1 partition response + 0, 0, 0, 2, // partition number 2 + 0, 47, // err +} func TestTxnOffsetCommitResponse(t *testing.T) { resp := &TxnOffsetCommitResponse{ diff --git a/utils.go b/utils.go index 76daaf8fe..1859d29c2 100644 --- a/utils.go +++ b/utils.go @@ -199,7 +199,7 @@ var ( DefaultVersion = V1_0_0_0 ) -//ParseKafkaVersion parses and returns kafka version or error from a string +// ParseKafkaVersion parses and returns kafka version or error from a string func ParseKafkaVersion(s string) (KafkaVersion, error) { if len(s) < 5 { return DefaultVersion, fmt.Errorf("invalid version `%s`", s) diff --git a/zstd.go b/zstd.go index c87241f49..e23bfc477 100644 --- a/zstd.go +++ b/zstd.go @@ -4,8 +4,10 @@ import ( "github.com/klauspost/compress/zstd" ) -var zstdDec, _ = zstd.NewReader(nil) -var zstdEnc, _ = zstd.NewWriter(nil, zstd.WithZeroFrames(true)) +var ( + zstdDec, _ = zstd.NewReader(nil) + zstdEnc, _ = zstd.NewWriter(nil, zstd.WithZeroFrames(true)) +) func zstdDecompress(dst, src []byte) ([]byte, error) { return zstdDec.DecodeAll(src, dst) From 73be95ab236febdebecd7544fb85d6309dc74e4d Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Fri, 7 May 2021 12:59:16 +0100 Subject: [PATCH 3/3] feat(test): add a unittest for leadership tracking TestConsumeMessagesTrackLeader ensures that in the event that leadership of a topicPartition changes and no preferredReadReplica is specified, the consumer connects back to the new leader to resume consumption and doesn't continue consuming from the follower. See https://github.com/Shopify/sarama/issues/1927 --- consumer_test.go | 122 ++++++++++++++++++++++++++++++++++++++++++++++- logger_test.go | 33 +++++++++++++ mockbroker.go | 9 +++- 3 files changed, 162 insertions(+), 2 deletions(-) create mode 100644 logger_test.go diff --git a/consumer_test.go b/consumer_test.go index 4c8aee187..61eb53d1c 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -893,6 +893,125 @@ func TestConsumeMessagesFromReadReplicaErrorUnknown(t *testing.T) { leader.Close() } +// TestConsumeMessagesTrackLeader ensures that in the event that leadership of +// a topicPartition changes and no preferredReadReplica is specified, the +// consumer connects back to the new leader to resume consumption and doesn't +// continue consuming from the follower. +// +// See https://github.com/Shopify/sarama/issues/1927 +func TestConsumeMessagesTrackLeader(t *testing.T) { + prevLogger := Logger + defer func() { Logger = prevLogger }() + Logger = &testLogger{t} + + cfg := NewConfig() + cfg.ClientID = t.Name() + cfg.Metadata.RefreshFrequency = time.Millisecond * 50 + cfg.Net.MaxOpenRequests = 1 + cfg.Version = V2_1_0_0 + + leader1 := NewMockBroker(t, 1) + leader2 := NewMockBroker(t, 2) + + mockMetadataResponse1 := NewMockMetadataResponse(t). + SetBroker(leader1.Addr(), leader1.BrokerID()). + SetBroker(leader2.Addr(), leader2.BrokerID()). + SetLeader("my_topic", 0, leader1.BrokerID()) + mockMetadataResponse2 := NewMockMetadataResponse(t). + SetBroker(leader1.Addr(), leader1.BrokerID()). + SetBroker(leader2.Addr(), leader2.BrokerID()). + SetLeader("my_topic", 0, leader2.BrokerID()) + mockMetadataResponse3 := NewMockMetadataResponse(t). + SetBroker(leader1.Addr(), leader1.BrokerID()). + SetBroker(leader2.Addr(), leader2.BrokerID()). + SetLeader("my_topic", 0, leader1.BrokerID()) + + leader1.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": mockMetadataResponse1, + "OffsetRequest": NewMockOffsetResponse(t). + SetVersion(1). + SetOffset("my_topic", 0, OffsetNewest, 1234). + SetOffset("my_topic", 0, OffsetOldest, 0), + "FetchRequest": NewMockFetchResponse(t, 1). + SetVersion(10). + SetMessage("my_topic", 0, 1, testMsg). + SetMessage("my_topic", 0, 2, testMsg), + }) + + client, err := NewClient([]string{leader1.Addr()}, cfg) + if err != nil { + t.Fatal(err) + } + + consumer, err := NewConsumerFromClient(client) + if err != nil { + t.Fatal(err) + } + + pConsumer, err := consumer.ConsumePartition("my_topic", 0, 1) + if err != nil { + t.Fatal(err) + } + + assertMessageOffset(t, <-pConsumer.Messages(), 1) + assertMessageOffset(t, <-pConsumer.Messages(), 2) + + fetchEmptyResponse := &FetchResponse{Version: 10} + fetchEmptyResponse.AddError("my_topic", 0, ErrNoError) + leader1.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": mockMetadataResponse2, + "FetchRequest": NewMockWrapper(fetchEmptyResponse), + }) + leader2.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": mockMetadataResponse2, + "FetchRequest": NewMockFetchResponse(t, 1). + SetVersion(10). + SetMessage("my_topic", 0, 3, testMsg). + SetMessage("my_topic", 0, 4, testMsg), + }) + + // wait for client to be aware that leadership has changed + for { + b, _ := client.Leader("my_topic", 0) + if b.ID() == int32(2) { + break + } + time.Sleep(time.Millisecond * 50) + } + + assertMessageOffset(t, <-pConsumer.Messages(), 3) + assertMessageOffset(t, <-pConsumer.Messages(), 4) + + leader1.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": mockMetadataResponse3, + "FetchRequest": NewMockFetchResponse(t, 1). + SetVersion(10). + SetMessage("my_topic", 0, 5, testMsg). + SetMessage("my_topic", 0, 6, testMsg), + }) + leader2.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": mockMetadataResponse3, + "FetchRequest": NewMockWrapper(fetchEmptyResponse), + }) + + // wait for client to be aware that leadership has changed back again + for { + b, _ := client.Leader("my_topic", 0) + if b.ID() == int32(1) { + break + } + time.Sleep(time.Millisecond * 50) + } + + assertMessageOffset(t, <-pConsumer.Messages(), 5) + assertMessageOffset(t, <-pConsumer.Messages(), 6) + + safeClose(t, pConsumer) + safeClose(t, consumer) + leader1.Close() + leader2.Close() +} + // It is fine if offsets of fetched messages are not sequential (although // strictly increasing!). func TestConsumerNonSequentialOffsets(t *testing.T) { @@ -1523,8 +1642,9 @@ func TestExcludeUncommitted(t *testing.T) { } func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) { + t.Helper() if msg.Offset != expectedOffset { - t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset) + t.Fatalf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset) } } diff --git a/logger_test.go b/logger_test.go new file mode 100644 index 000000000..ceca40d2c --- /dev/null +++ b/logger_test.go @@ -0,0 +1,33 @@ +package sarama + +import "testing" + +// testLogger implements the StdLogger interface and records the text in the +// logs of the given T passed from Test functions. +// and records the text in the error log. +// +// nolint +type testLogger struct { + t *testing.T +} + +func (l *testLogger) Print(v ...interface{}) { + if l.t != nil { + l.t.Helper() + l.t.Log(v...) + } +} + +func (l *testLogger) Printf(format string, v ...interface{}) { + if l.t != nil { + l.t.Helper() + l.t.Logf(format, v...) + } +} + +func (l *testLogger) Println(v ...interface{}) { + if l.t != nil { + l.t.Helper() + l.t.Log(v...) + } +} diff --git a/mockbroker.go b/mockbroker.go index 0c99314e5..c2654d12e 100644 --- a/mockbroker.go +++ b/mockbroker.go @@ -218,6 +218,8 @@ func (b *MockBroker) handleRequests(conn io.ReadWriteCloser, idx int, wg *sync.W defer func() { _ = conn.Close() }() + s := spew.NewDefaultConfig() + s.MaxDepth = 1 Logger.Printf("*** mockbroker/%d/%d: connection opened", b.BrokerID(), idx) var err error @@ -264,7 +266,12 @@ func (b *MockBroker) handleRequests(conn io.ReadWriteCloser, idx int, wg *sync.W Logger.Printf("*** mockbroker/%d/%d: ignored %v", b.brokerID, idx, spew.Sdump(req)) continue } - Logger.Printf("*** mockbroker/%d/%d: served %v -> %v", b.brokerID, idx, req, res) + Logger.Printf( + "*** mockbroker/%d/%d: replied to %T with %T\n-> %s\n-> %s", + b.brokerID, idx, req.body, res, + s.Sprintf("%#v", req.body), + s.Sprintf("%#v", res), + ) encodedRes, err := encode(res, nil) if err != nil {