Skip to content

Commit

Permalink
Merge pull request redpanda-data#21 from twmb/twmb/bugs
Browse files Browse the repository at this point in the history
Bugfixes
  • Loading branch information
twmb committed Jan 29, 2021
2 parents 59c935c + c05572d commit a2ffc65
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 140 deletions.
4 changes: 2 additions & 2 deletions generate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ func main() {
l.Write("switch key {")
l.Write("default: return nil")
for _, key2struct := range name2structs {
l.Write("case %d: return new(%s)", key2struct.Key, key2struct.Name)
l.Write("case %d: return NewPtr%s()", key2struct.Key, key2struct.Name)
}
l.Write("}")
l.Write("}")
Expand All @@ -453,7 +453,7 @@ func main() {
l.Write("switch key {")
l.Write("default: return nil")
for _, key2struct := range name2structs {
l.Write("case %d: return new(%s)", key2struct.Key, strings.TrimSuffix(key2struct.Name, "Request")+"Response")
l.Write("case %d: return NewPtr%s()", key2struct.Key, strings.TrimSuffix(key2struct.Name, "Request")+"Response")
}
l.Write("}")
l.Write("}")
Expand Down
7 changes: 1 addition & 6 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,16 +201,11 @@ func NewClient(opts ...Opt) (*Client, error) {
return cl, nil
}

func connTimeoutBuilder(defaultTimeout time.Duration) func(kmsg.Request) (time.Duration, time.Duration) {
func connTimeoutBuilder(def time.Duration) func(kmsg.Request) (time.Duration, time.Duration) {
var joinMu sync.Mutex
var lastRebalanceTimeout time.Duration

return func(req kmsg.Request) (read, write time.Duration) {
// We use a default of 15s for all write timeouts. Since we
// build requests in memory and flush in one go, we expect
// the process of writing to the connection to be quick.
// 15s is mighty generous.
const def = 15 * time.Second
millis := func(m int32) time.Duration { return time.Duration(m) * time.Millisecond }
switch t := req.(type) {
default:
Expand Down
10 changes: 8 additions & 2 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ func (cfg *cfg) validate() error {
{v: int64(cfg.maxBrokerWriteBytes), allowed: int64(cfg.maxRecordBatchBytes), badcmp: i64lt, fmt: "max broker write bytes %v is erroneously less than max record batch bytes %v"},
{v: int64(cfg.maxBrokerReadBytes), allowed: int64(cfg.maxBytes), badcmp: i64lt, fmt: "max broker read bytes %v is erroneously less than max fetch bytes %v"},

// 1s <= conn timeout overhead <= 15m
{name: "conn timeout max overhead", v: int64(cfg.connTimeoutOverhead), allowed: int64(15 * time.Minute), badcmp: i64gt, durs: true},
{name: "conn timeout min overhead", v: int64(cfg.connTimeoutOverhead), allowed: int64(time.Second), badcmp: i64lt, durs: true},

// 10ms <= metadata <= 1hr
{name: "metadata max age", v: int64(cfg.metadataMaxAge), allowed: int64(time.Hour), badcmp: i64gt, durs: true},
{name: "metadata min age", v: int64(cfg.metadataMinAge), allowed: int64(10 * time.Millisecond), badcmp: i64lt, durs: true},
Expand Down Expand Up @@ -217,6 +221,8 @@ func defaultCfg() cfg {
id: &defaultID,
dialFn: (&net.Dialer{Timeout: 10 * time.Second}).DialContext,

connTimeoutOverhead: 20 * time.Second,

softwareName: "kgo",
softwareVersion: "0.1.0",

Expand Down Expand Up @@ -326,9 +332,9 @@ func WithLogger(l Logger) Opt {
}

// ConnTimeoutOverhead uses the given time as overhead while deadlining
// requests, overriding the default overhead of 5s.
// requests, overriding the default overhead of 20s.
//
// For most requests, the overhead will simply be the timeout. However, for any
// For most requests, the overhead will simply be this timeout. However, for any
// request with a TimeoutMillis field, the overhead is added on top of the
// request's TimeoutMillis. This ensures that we give Kafka enough time to
// actually process the request given the timeout, while still having a
Expand Down
13 changes: 13 additions & 0 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,15 @@ func (c *consumer) stopSession() listOrEpochLoads {

// At this point, all fetches, lists, and loads are dead.

c.cl.sinksAndSourcesMu.Lock()
for _, sns := range c.cl.sinksAndSources {
sns.source.session.reset()
}
c.cl.sinksAndSourcesMu.Unlock()

// At this point, if we begin fetching anew, then the sources will not
// be using stale sessions.

c.sourcesReadyMu.Lock()
defer c.sourcesReadyMu.Unlock()
for _, ready := range c.sourcesReadyForDraining {
Expand Down Expand Up @@ -981,6 +990,9 @@ func (cl *Client) listOffsetsForBrokerLoad(ctx context.Context, broker *broker,
}

offset := rPartition.Offset + loadPart.relative
if len(rPartition.OldStyleOffsets) > 0 { // if we have any, we used list offsets v0
offset = rPartition.OldStyleOffsets[0] + loadPart.relative
}
if loadPart.at >= 0 {
offset = loadPart.at + loadPart.relative // we obey exact requests, even if they end up past the end
}
Expand Down Expand Up @@ -1095,6 +1107,7 @@ func (o offsetLoadMap) buildListReq(isolationLevel int8) *kmsg.ListOffsetsReques
Partition: partition,
CurrentLeaderEpoch: offset.currentEpoch, // KIP-320
Timestamp: offset.at,
MaxNumOffsets: 1,
})
}
req.Topics = append(req.Topics, kmsg.ListOffsetsRequestTopic{
Expand Down
22 changes: 14 additions & 8 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ func (s *source) fetch(consumerSession *consumerSession) (fetched bool) {
case <-requested:
fetched = true
case <-ctx.Done():
s.session.reset()
req.usedOffsets.finishUsingAll()
return
}
Expand All @@ -449,6 +450,7 @@ func (s *source) fetch(consumerSession *consumerSession) (fetched bool) {
case <-after.C:
case <-ctx.Done():
}
s.session.reset()
req.usedOffsets.finishUsingAll()
return
}
Expand Down Expand Up @@ -478,6 +480,7 @@ func (s *source) fetch(consumerSession *consumerSession) (fetched bool) {
case <-handled:
case <-ctx.Done():
req.usedOffsets.finishUsingAll()
s.session.reset()
return
}

Expand Down Expand Up @@ -519,21 +522,24 @@ func (s *source) fetch(consumerSession *consumerSession) (fetched bool) {
// If the epoch was zero, the broker did not even
// establish a session for us (and thus is maxed on
// sessions). We stop trying.
s.cl.cfg.logger.Log(LogLevelInfo, "session failed with SessionIDNotFound while trying to establish a session; broker likely maxed on sessions; continuing on without using sessions")
s.session.kill()
s.cl.cfg.logger.Log(LogLevelInfo,
"session failed with SessionIDNotFound while trying to establish a session; broker likely maxed on sessions; continuing on without using sessions")
}
fallthrough
case kerr.InvalidFetchSessionEpoch:
if s.session.id != -1 { // if -1, the session was killed just above
s.cl.cfg.logger.Log(LogLevelInfo, "resetting fetch session", "err", err)
} else {
s.cl.cfg.logger.Log(LogLevelInfo, "received SessionIDNotFound from our in use session, our session was likely evicted; resetting session")
s.session.reset()
}
req.usedOffsets.finishUsingAll()
return
case kerr.InvalidFetchSessionEpoch:
s.cl.cfg.logger.Log(LogLevelInfo, "resetting fetch session", "err", err)
s.session.reset()
req.usedOffsets.finishUsingAll()
return
}

s.session.bumpEpoch(resp.SessionID)
if resp.SessionID > 0 {
s.session.bumpEpoch(resp.SessionID)
}

// If we moved any partitions to preferred replicas, we reset the
// session. We do this after bumping the epoch just to ensure that we
Expand Down
Loading

0 comments on commit a2ffc65

Please sign in to comment.