diff --git a/generate/main.go b/generate/main.go index 68de5d294e20..7fb0d349bb91 100644 --- a/generate/main.go +++ b/generate/main.go @@ -442,7 +442,7 @@ func main() { l.Write("switch key {") l.Write("default: return nil") for _, key2struct := range name2structs { - l.Write("case %d: return new(%s)", key2struct.Key, key2struct.Name) + l.Write("case %d: return NewPtr%s()", key2struct.Key, key2struct.Name) } l.Write("}") l.Write("}") @@ -453,7 +453,7 @@ func main() { l.Write("switch key {") l.Write("default: return nil") for _, key2struct := range name2structs { - l.Write("case %d: return new(%s)", key2struct.Key, strings.TrimSuffix(key2struct.Name, "Request")+"Response") + l.Write("case %d: return NewPtr%s()", key2struct.Key, strings.TrimSuffix(key2struct.Name, "Request")+"Response") } l.Write("}") l.Write("}") diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index de7f8b2a2aa5..e82c81a05b07 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -201,16 +201,11 @@ func NewClient(opts ...Opt) (*Client, error) { return cl, nil } -func connTimeoutBuilder(defaultTimeout time.Duration) func(kmsg.Request) (time.Duration, time.Duration) { +func connTimeoutBuilder(def time.Duration) func(kmsg.Request) (time.Duration, time.Duration) { var joinMu sync.Mutex var lastRebalanceTimeout time.Duration return func(req kmsg.Request) (read, write time.Duration) { - // We use a default of 15s for all write timeouts. Since we - // build requests in memory and flush in one go, we expect - // the process of writing to the connection to be quick. - // 15s is mighty generous. - const def = 15 * time.Second millis := func(m int32) time.Duration { return time.Duration(m) * time.Millisecond } switch t := req.(type) { default: diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index cd378f70f65a..4b8a3df888b2 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -172,6 +172,10 @@ func (cfg *cfg) validate() error { {v: int64(cfg.maxBrokerWriteBytes), allowed: int64(cfg.maxRecordBatchBytes), badcmp: i64lt, fmt: "max broker write bytes %v is erroneously less than max record batch bytes %v"}, {v: int64(cfg.maxBrokerReadBytes), allowed: int64(cfg.maxBytes), badcmp: i64lt, fmt: "max broker read bytes %v is erroneously less than max fetch bytes %v"}, + // 1s <= conn timeout overhead <= 15m + {name: "conn timeout max overhead", v: int64(cfg.connTimeoutOverhead), allowed: int64(15 * time.Minute), badcmp: i64gt, durs: true}, + {name: "conn timeout min overhead", v: int64(cfg.connTimeoutOverhead), allowed: int64(time.Second), badcmp: i64lt, durs: true}, + // 10ms <= metadata <= 1hr {name: "metadata max age", v: int64(cfg.metadataMaxAge), allowed: int64(time.Hour), badcmp: i64gt, durs: true}, {name: "metadata min age", v: int64(cfg.metadataMinAge), allowed: int64(10 * time.Millisecond), badcmp: i64lt, durs: true}, @@ -217,6 +221,8 @@ func defaultCfg() cfg { id: &defaultID, dialFn: (&net.Dialer{Timeout: 10 * time.Second}).DialContext, + connTimeoutOverhead: 20 * time.Second, + softwareName: "kgo", softwareVersion: "0.1.0", @@ -326,9 +332,9 @@ func WithLogger(l Logger) Opt { } // ConnTimeoutOverhead uses the given time as overhead while deadlining -// requests, overriding the default overhead of 5s. +// requests, overriding the default overhead of 20s. // -// For most requests, the overhead will simply be the timeout. However, for any +// For most requests, the overhead will simply be this timeout. However, for any // request with a TimeoutMillis field, the overhead is added on top of the // request's TimeoutMillis. This ensures that we give Kafka enough time to // actually process the request given the timeout, while still having a diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index 2a9e564b8cf7..7453c7219d81 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -700,6 +700,15 @@ func (c *consumer) stopSession() listOrEpochLoads { // At this point, all fetches, lists, and loads are dead. + c.cl.sinksAndSourcesMu.Lock() + for _, sns := range c.cl.sinksAndSources { + sns.source.session.reset() + } + c.cl.sinksAndSourcesMu.Unlock() + + // At this point, if we begin fetching anew, then the sources will not + // be using stale sessions. + c.sourcesReadyMu.Lock() defer c.sourcesReadyMu.Unlock() for _, ready := range c.sourcesReadyForDraining { @@ -981,6 +990,9 @@ func (cl *Client) listOffsetsForBrokerLoad(ctx context.Context, broker *broker, } offset := rPartition.Offset + loadPart.relative + if len(rPartition.OldStyleOffsets) > 0 { // if we have any, we used list offsets v0 + offset = rPartition.OldStyleOffsets[0] + loadPart.relative + } if loadPart.at >= 0 { offset = loadPart.at + loadPart.relative // we obey exact requests, even if they end up past the end } @@ -1095,6 +1107,7 @@ func (o offsetLoadMap) buildListReq(isolationLevel int8) *kmsg.ListOffsetsReques Partition: partition, CurrentLeaderEpoch: offset.currentEpoch, // KIP-320 Timestamp: offset.at, + MaxNumOffsets: 1, }) } req.Topics = append(req.Topics, kmsg.ListOffsetsRequestTopic{ diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index e20455c31af3..2f4c9a0141c7 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -433,6 +433,7 @@ func (s *source) fetch(consumerSession *consumerSession) (fetched bool) { case <-requested: fetched = true case <-ctx.Done(): + s.session.reset() req.usedOffsets.finishUsingAll() return } @@ -449,6 +450,7 @@ func (s *source) fetch(consumerSession *consumerSession) (fetched bool) { case <-after.C: case <-ctx.Done(): } + s.session.reset() req.usedOffsets.finishUsingAll() return } @@ -478,6 +480,7 @@ func (s *source) fetch(consumerSession *consumerSession) (fetched bool) { case <-handled: case <-ctx.Done(): req.usedOffsets.finishUsingAll() + s.session.reset() return } @@ -519,21 +522,24 @@ func (s *source) fetch(consumerSession *consumerSession) (fetched bool) { // If the epoch was zero, the broker did not even // establish a session for us (and thus is maxed on // sessions). We stop trying. + s.cl.cfg.logger.Log(LogLevelInfo, "session failed with SessionIDNotFound while trying to establish a session; broker likely maxed on sessions; continuing on without using sessions") s.session.kill() - s.cl.cfg.logger.Log(LogLevelInfo, - "session failed with SessionIDNotFound while trying to establish a session; broker likely maxed on sessions; continuing on without using sessions") - } - fallthrough - case kerr.InvalidFetchSessionEpoch: - if s.session.id != -1 { // if -1, the session was killed just above - s.cl.cfg.logger.Log(LogLevelInfo, "resetting fetch session", "err", err) + } else { + s.cl.cfg.logger.Log(LogLevelInfo, "received SessionIDNotFound from our in use session, our session was likely evicted; resetting session") s.session.reset() } req.usedOffsets.finishUsingAll() return + case kerr.InvalidFetchSessionEpoch: + s.cl.cfg.logger.Log(LogLevelInfo, "resetting fetch session", "err", err) + s.session.reset() + req.usedOffsets.finishUsingAll() + return } - s.session.bumpEpoch(resp.SessionID) + if resp.SessionID > 0 { + s.session.bumpEpoch(resp.SessionID) + } // If we moved any partitions to preferred replicas, we reset the // session. We do this after bumping the epoch just to ensure that we diff --git a/pkg/kmsg/generated.go b/pkg/kmsg/generated.go index 3b6390b60399..a23666fda865 100644 --- a/pkg/kmsg/generated.go +++ b/pkg/kmsg/generated.go @@ -33201,127 +33201,127 @@ func RequestForKey(key int16) Request { default: return nil case 0: - return new(ProduceRequest) + return NewPtrProduceRequest() case 1: - return new(FetchRequest) + return NewPtrFetchRequest() case 2: - return new(ListOffsetsRequest) + return NewPtrListOffsetsRequest() case 3: - return new(MetadataRequest) + return NewPtrMetadataRequest() case 4: - return new(LeaderAndISRRequest) + return NewPtrLeaderAndISRRequest() case 5: - return new(StopReplicaRequest) + return NewPtrStopReplicaRequest() case 6: - return new(UpdateMetadataRequest) + return NewPtrUpdateMetadataRequest() case 7: - return new(ControlledShutdownRequest) + return NewPtrControlledShutdownRequest() case 8: - return new(OffsetCommitRequest) + return NewPtrOffsetCommitRequest() case 9: - return new(OffsetFetchRequest) + return NewPtrOffsetFetchRequest() case 10: - return new(FindCoordinatorRequest) + return NewPtrFindCoordinatorRequest() case 11: - return new(JoinGroupRequest) + return NewPtrJoinGroupRequest() case 12: - return new(HeartbeatRequest) + return NewPtrHeartbeatRequest() case 13: - return new(LeaveGroupRequest) + return NewPtrLeaveGroupRequest() case 14: - return new(SyncGroupRequest) + return NewPtrSyncGroupRequest() case 15: - return new(DescribeGroupsRequest) + return NewPtrDescribeGroupsRequest() case 16: - return new(ListGroupsRequest) + return NewPtrListGroupsRequest() case 17: - return new(SASLHandshakeRequest) + return NewPtrSASLHandshakeRequest() case 18: - return new(ApiVersionsRequest) + return NewPtrApiVersionsRequest() case 19: - return new(CreateTopicsRequest) + return NewPtrCreateTopicsRequest() case 20: - return new(DeleteTopicsRequest) + return NewPtrDeleteTopicsRequest() case 21: - return new(DeleteRecordsRequest) + return NewPtrDeleteRecordsRequest() case 22: - return new(InitProducerIDRequest) + return NewPtrInitProducerIDRequest() case 23: - return new(OffsetForLeaderEpochRequest) + return NewPtrOffsetForLeaderEpochRequest() case 24: - return new(AddPartitionsToTxnRequest) + return NewPtrAddPartitionsToTxnRequest() case 25: - return new(AddOffsetsToTxnRequest) + return NewPtrAddOffsetsToTxnRequest() case 26: - return new(EndTxnRequest) + return NewPtrEndTxnRequest() case 27: - return new(WriteTxnMarkersRequest) + return NewPtrWriteTxnMarkersRequest() case 28: - return new(TxnOffsetCommitRequest) + return NewPtrTxnOffsetCommitRequest() case 29: - return new(DescribeACLsRequest) + return NewPtrDescribeACLsRequest() case 30: - return new(CreateACLsRequest) + return NewPtrCreateACLsRequest() case 31: - return new(DeleteACLsRequest) + return NewPtrDeleteACLsRequest() case 32: - return new(DescribeConfigsRequest) + return NewPtrDescribeConfigsRequest() case 33: - return new(AlterConfigsRequest) + return NewPtrAlterConfigsRequest() case 34: - return new(AlterReplicaLogDirsRequest) + return NewPtrAlterReplicaLogDirsRequest() case 35: - return new(DescribeLogDirsRequest) + return NewPtrDescribeLogDirsRequest() case 36: - return new(SASLAuthenticateRequest) + return NewPtrSASLAuthenticateRequest() case 37: - return new(CreatePartitionsRequest) + return NewPtrCreatePartitionsRequest() case 38: - return new(CreateDelegationTokenRequest) + return NewPtrCreateDelegationTokenRequest() case 39: - return new(RenewDelegationTokenRequest) + return NewPtrRenewDelegationTokenRequest() case 40: - return new(ExpireDelegationTokenRequest) + return NewPtrExpireDelegationTokenRequest() case 41: - return new(DescribeDelegationTokenRequest) + return NewPtrDescribeDelegationTokenRequest() case 42: - return new(DeleteGroupsRequest) + return NewPtrDeleteGroupsRequest() case 43: - return new(ElectLeadersRequest) + return NewPtrElectLeadersRequest() case 44: - return new(IncrementalAlterConfigsRequest) + return NewPtrIncrementalAlterConfigsRequest() case 45: - return new(AlterPartitionAssignmentsRequest) + return NewPtrAlterPartitionAssignmentsRequest() case 46: - return new(ListPartitionReassignmentsRequest) + return NewPtrListPartitionReassignmentsRequest() case 47: - return new(OffsetDeleteRequest) + return NewPtrOffsetDeleteRequest() case 48: - return new(DescribeClientQuotasRequest) + return NewPtrDescribeClientQuotasRequest() case 49: - return new(AlterClientQuotasRequest) + return NewPtrAlterClientQuotasRequest() case 50: - return new(DescribeUserSCRAMCredentialsRequest) + return NewPtrDescribeUserSCRAMCredentialsRequest() case 51: - return new(AlterUserSCRAMCredentialsRequest) + return NewPtrAlterUserSCRAMCredentialsRequest() case 52: - return new(VoteRequest) + return NewPtrVoteRequest() case 53: - return new(BeginQuorumEpochRequest) + return NewPtrBeginQuorumEpochRequest() case 54: - return new(EndQuorumEpochRequest) + return NewPtrEndQuorumEpochRequest() case 55: - return new(DescribeQuorumRequest) + return NewPtrDescribeQuorumRequest() case 56: - return new(AlterISRRequest) + return NewPtrAlterISRRequest() case 57: - return new(UpdateFeaturesRequest) + return NewPtrUpdateFeaturesRequest() case 58: - return new(EnvelopeRequest) + return NewPtrEnvelopeRequest() case 59: - return new(FetchSnapshotRequest) + return NewPtrFetchSnapshotRequest() case 60: - return new(DescribeClusterRequest) + return NewPtrDescribeClusterRequest() } } @@ -33332,127 +33332,127 @@ func ResponseForKey(key int16) Response { default: return nil case 0: - return new(ProduceResponse) + return NewPtrProduceResponse() case 1: - return new(FetchResponse) + return NewPtrFetchResponse() case 2: - return new(ListOffsetsResponse) + return NewPtrListOffsetsResponse() case 3: - return new(MetadataResponse) + return NewPtrMetadataResponse() case 4: - return new(LeaderAndISRResponse) + return NewPtrLeaderAndISRResponse() case 5: - return new(StopReplicaResponse) + return NewPtrStopReplicaResponse() case 6: - return new(UpdateMetadataResponse) + return NewPtrUpdateMetadataResponse() case 7: - return new(ControlledShutdownResponse) + return NewPtrControlledShutdownResponse() case 8: - return new(OffsetCommitResponse) + return NewPtrOffsetCommitResponse() case 9: - return new(OffsetFetchResponse) + return NewPtrOffsetFetchResponse() case 10: - return new(FindCoordinatorResponse) + return NewPtrFindCoordinatorResponse() case 11: - return new(JoinGroupResponse) + return NewPtrJoinGroupResponse() case 12: - return new(HeartbeatResponse) + return NewPtrHeartbeatResponse() case 13: - return new(LeaveGroupResponse) + return NewPtrLeaveGroupResponse() case 14: - return new(SyncGroupResponse) + return NewPtrSyncGroupResponse() case 15: - return new(DescribeGroupsResponse) + return NewPtrDescribeGroupsResponse() case 16: - return new(ListGroupsResponse) + return NewPtrListGroupsResponse() case 17: - return new(SASLHandshakeResponse) + return NewPtrSASLHandshakeResponse() case 18: - return new(ApiVersionsResponse) + return NewPtrApiVersionsResponse() case 19: - return new(CreateTopicsResponse) + return NewPtrCreateTopicsResponse() case 20: - return new(DeleteTopicsResponse) + return NewPtrDeleteTopicsResponse() case 21: - return new(DeleteRecordsResponse) + return NewPtrDeleteRecordsResponse() case 22: - return new(InitProducerIDResponse) + return NewPtrInitProducerIDResponse() case 23: - return new(OffsetForLeaderEpochResponse) + return NewPtrOffsetForLeaderEpochResponse() case 24: - return new(AddPartitionsToTxnResponse) + return NewPtrAddPartitionsToTxnResponse() case 25: - return new(AddOffsetsToTxnResponse) + return NewPtrAddOffsetsToTxnResponse() case 26: - return new(EndTxnResponse) + return NewPtrEndTxnResponse() case 27: - return new(WriteTxnMarkersResponse) + return NewPtrWriteTxnMarkersResponse() case 28: - return new(TxnOffsetCommitResponse) + return NewPtrTxnOffsetCommitResponse() case 29: - return new(DescribeACLsResponse) + return NewPtrDescribeACLsResponse() case 30: - return new(CreateACLsResponse) + return NewPtrCreateACLsResponse() case 31: - return new(DeleteACLsResponse) + return NewPtrDeleteACLsResponse() case 32: - return new(DescribeConfigsResponse) + return NewPtrDescribeConfigsResponse() case 33: - return new(AlterConfigsResponse) + return NewPtrAlterConfigsResponse() case 34: - return new(AlterReplicaLogDirsResponse) + return NewPtrAlterReplicaLogDirsResponse() case 35: - return new(DescribeLogDirsResponse) + return NewPtrDescribeLogDirsResponse() case 36: - return new(SASLAuthenticateResponse) + return NewPtrSASLAuthenticateResponse() case 37: - return new(CreatePartitionsResponse) + return NewPtrCreatePartitionsResponse() case 38: - return new(CreateDelegationTokenResponse) + return NewPtrCreateDelegationTokenResponse() case 39: - return new(RenewDelegationTokenResponse) + return NewPtrRenewDelegationTokenResponse() case 40: - return new(ExpireDelegationTokenResponse) + return NewPtrExpireDelegationTokenResponse() case 41: - return new(DescribeDelegationTokenResponse) + return NewPtrDescribeDelegationTokenResponse() case 42: - return new(DeleteGroupsResponse) + return NewPtrDeleteGroupsResponse() case 43: - return new(ElectLeadersResponse) + return NewPtrElectLeadersResponse() case 44: - return new(IncrementalAlterConfigsResponse) + return NewPtrIncrementalAlterConfigsResponse() case 45: - return new(AlterPartitionAssignmentsResponse) + return NewPtrAlterPartitionAssignmentsResponse() case 46: - return new(ListPartitionReassignmentsResponse) + return NewPtrListPartitionReassignmentsResponse() case 47: - return new(OffsetDeleteResponse) + return NewPtrOffsetDeleteResponse() case 48: - return new(DescribeClientQuotasResponse) + return NewPtrDescribeClientQuotasResponse() case 49: - return new(AlterClientQuotasResponse) + return NewPtrAlterClientQuotasResponse() case 50: - return new(DescribeUserSCRAMCredentialsResponse) + return NewPtrDescribeUserSCRAMCredentialsResponse() case 51: - return new(AlterUserSCRAMCredentialsResponse) + return NewPtrAlterUserSCRAMCredentialsResponse() case 52: - return new(VoteResponse) + return NewPtrVoteResponse() case 53: - return new(BeginQuorumEpochResponse) + return NewPtrBeginQuorumEpochResponse() case 54: - return new(EndQuorumEpochResponse) + return NewPtrEndQuorumEpochResponse() case 55: - return new(DescribeQuorumResponse) + return NewPtrDescribeQuorumResponse() case 56: - return new(AlterISRResponse) + return NewPtrAlterISRResponse() case 57: - return new(UpdateFeaturesResponse) + return NewPtrUpdateFeaturesResponse() case 58: - return new(EnvelopeResponse) + return NewPtrEnvelopeResponse() case 59: - return new(FetchSnapshotResponse) + return NewPtrFetchSnapshotResponse() case 60: - return new(DescribeClusterResponse) + return NewPtrDescribeClusterResponse() } }