Skip to content

Commit

Permalink
fetch sessions: kill harder
Browse files Browse the repository at this point in the history
- properly uses id of 0 for not using fetch sessions
- does not bump epochs when not using fetch sessions
  • Loading branch information
twmb committed Jan 28, 2021
1 parent 2cbe6fb commit 59c935c
Showing 1 changed file with 12 additions and 3 deletions.
15 changes: 12 additions & 3 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -1164,7 +1164,7 @@ type fetchSession struct {
}

func (s *fetchSession) kill() {
s.id = -1
s.id = 0
s.epoch = -1
s.used = nil
s.killed = true
Expand All @@ -1174,18 +1174,27 @@ func (s *fetchSession) kill() {
// We do not reset the ID; using epoch 0 for an existing ID unregisters the
// prior session.
func (s *fetchSession) reset() {
if s.killed {
return
}
s.epoch = 0
s.used = nil
}

// bumpEpoch bumps the epoch and saves the session id.
//
// Kafka replies with the session ID of the session to use. When it does, we
// start from epoch 1, wrapping back to 1 if we go negative.
func (s *fetchSession) bumpEpoch(id int32) {
if s.killed {
return
}
if id != s.id {
s.epoch = 0
s.epoch = 0 // new session: reset to 0 for the increment below
}
s.epoch++
if s.epoch < 0 {
s.epoch = 1
s.epoch = 1 // we wrapped: reset back to 1 to continue this session
}
s.id = id
}
Expand Down

0 comments on commit 59c935c

Please sign in to comment.