Skip to content

Commit

Permalink
Fix offset when a batch ends with compacted records
Browse files Browse the repository at this point in the history
Saves the lastOffset and jumps past it when compacted
records are detected at the end of a batch.

- Adds a test for batches that end with compacted
records
- Adds a test for batches truncated due to MaxBytes


Co-authored-by: iddqdeika <iddqdeika@gmail.com>
  • Loading branch information
nlsun and iddqdeika committed Feb 9, 2022
1 parent 54559ab commit 999d0b3
Show file tree
Hide file tree
Showing 4 changed files with 311 additions and 7 deletions.
32 changes: 31 additions & 1 deletion batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ type Batch struct {
offset int64
highWaterMark int64
err error
// The last offset in the batch.
//
// We use lastOffset to skip offsets that have been compacted away.
//
// We store lastOffset because we get lastOffset when we read a new message
// but only try to handle compaction when we receive an EOF. However, when
// we get an EOF we do not get the lastOffset. So there is a mismatch
// between when we receive it and need to use it.
lastOffset int64
}

// Throttle gives the throttling duration applied by the kafka server on the
Expand Down Expand Up @@ -190,6 +199,8 @@ func (batch *Batch) ReadMessage() (Message, error) {
return
},
)
// A batch may start before the requested offset so skip messages
// until the requested offset is reached.
for batch.conn != nil && offset < batch.conn.offset {
if err != nil {
break
Expand Down Expand Up @@ -225,10 +236,12 @@ func (batch *Batch) readMessage(
return
}

offset, timestamp, headers, err = batch.msgs.readMessage(batch.offset, key, val)
var lastOffset int64
offset, lastOffset, timestamp, headers, err = batch.msgs.readMessage(batch.offset, key, val)
switch err {
case nil:
batch.offset = offset + 1
batch.lastOffset = lastOffset
case errShortRead:
// As an "optimization" kafka truncates the returned response after
// producing MaxBytes, which could then cause the code to return
Expand All @@ -252,6 +265,23 @@ func (batch *Batch) readMessage(
// read deadline management.
err = checkTimeoutErr(batch.deadline)
batch.err = err

// Checks the following:
// - `batch.err` for a "success" from the previous timeout check
// - `batch.msgs.lengthRemain` to ensure that this EOF is not due
// to MaxBytes truncation
// - `batch.lastOffset` to ensure that the message format contains
// `lastOffset`
if batch.err == io.EOF && batch.msgs.lengthRemain == 0 && batch.lastOffset != -1 {
// Log compaction can create batches that end with compacted
// records so the normal strategy that increments the "next"
// offset as records are read doesn't work as the compacted
// records are "missing" and never get "read".
//
// In order to reliably reach the next non-compacted offset we
// jump past the saved lastOffset.
batch.offset = batch.lastOffset + 1
}
}
default:
// Since io.EOF is used by the batch to indicate that there is are
Expand Down
24 changes: 21 additions & 3 deletions message_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ type messageSetReader struct {
*readerStack // used for decompressing compressed messages and record batches
empty bool // if true, short circuits messageSetReader methods
debug bool // enable debug log messages
// How many bytes are expected to remain in the response.
//
// This is used to detect truncation of the response.
lengthRemain int
}

type readerStack struct {
Expand Down Expand Up @@ -114,7 +118,7 @@ func (r *messageSetReader) discard() (err error) {
}

func (r *messageSetReader) readMessage(min int64, key readBytesFunc, val readBytesFunc) (
offset int64, timestamp int64, headers []Header, err error) {
offset int64, lastOffset int64, timestamp int64, headers []Header, err error) {

if r.empty {
err = RequestTimedOut
Expand All @@ -126,8 +130,10 @@ func (r *messageSetReader) readMessage(min int64, key readBytesFunc, val readByt
switch r.header.magic {
case 0, 1:
offset, timestamp, headers, err = r.readMessageV1(min, key, val)
// Set an invalid value so that it can be ignored
lastOffset = -1
case 2:
offset, timestamp, headers, err = r.readMessageV2(min, key, val)
offset, lastOffset, timestamp, headers, err = r.readMessageV2(min, key, val)
default:
err = r.header.badMagic()
}
Expand Down Expand Up @@ -239,7 +245,7 @@ func (r *messageSetReader) readMessageV1(min int64, key readBytesFunc, val readB
}

func (r *messageSetReader) readMessageV2(_ int64, key readBytesFunc, val readBytesFunc) (
offset int64, timestamp int64, headers []Header, err error) {
offset int64, lastOffset int64, timestamp int64, headers []Header, err error) {
if err = r.readHeader(); err != nil {
return
}
Expand Down Expand Up @@ -282,10 +288,12 @@ func (r *messageSetReader) readMessageV2(_ int64, key readBytesFunc, val readByt
r.readerStack.parent.count = 0
}
}
remainBefore := r.remain
var length int64
if err = r.readVarInt(&length); err != nil {
return
}
lengthOfLength := remainBefore - r.remain
var attrs int8
if err = r.readInt8(&attrs); err != nil {
return
Expand Down Expand Up @@ -316,6 +324,8 @@ func (r *messageSetReader) readMessageV2(_ int64, key readBytesFunc, val readByt
return
}
}
lastOffset = r.header.firstOffset + int64(r.header.v2.lastOffsetDelta)
r.lengthRemain -= int(length) + lengthOfLength
r.markRead()
return
}
Expand Down Expand Up @@ -407,6 +417,9 @@ func (r *messageSetReader) readHeader() (err error) {
return
}
r.count = 1
// Set arbitrary non-zero length so that we always assume the
// message is truncated since bytes remain.
r.lengthRemain = 1
r.log("Read v0 header with offset=%d len=%d magic=%d attributes=%d", r.header.firstOffset, r.header.length, r.header.magic, r.header.v1.attributes)
case 1:
r.header.crc = crcOrLeaderEpoch
Expand All @@ -417,6 +430,9 @@ func (r *messageSetReader) readHeader() (err error) {
return
}
r.count = 1
// Set arbitrary non-zero length so that we always assume the
// message is truncated since bytes remain.
r.lengthRemain = 1
r.log("Read v1 header with remain=%d offset=%d magic=%d and attributes=%d", r.remain, r.header.firstOffset, r.header.magic, r.header.v1.attributes)
case 2:
r.header.v2.leaderEpoch = crcOrLeaderEpoch
Expand Down Expand Up @@ -448,6 +464,8 @@ func (r *messageSetReader) readHeader() (err error) {
return
}
r.count = int(r.header.v2.count)
// Subtracts the header bytes from the length
r.lengthRemain = int(r.header.length) - 49
r.log("Read v2 header with count=%d offset=%d len=%d magic=%d attributes=%d", r.count, r.header.firstOffset, r.header.length, r.header.magic, r.header.v2.attributes)
default:
err = r.header.badMagic()
Expand Down
6 changes: 3 additions & 3 deletions message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ func TestMessageSetReaderEmpty(t *testing.T) {
return 0, nil
}

offset, timestamp, headers, err := m.readMessage(0, noop, noop)
offset, _, timestamp, headers, err := m.readMessage(0, noop, noop)
if offset != 0 {
t.Errorf("expected offset of 0, get %d", offset)
}
Expand Down Expand Up @@ -737,12 +737,12 @@ func (r *readerHelper) readMessageErr() (msg Message, err error) {
}
var timestamp int64
var headers []Header
r.offset, timestamp, headers, err = r.messageSetReader.readMessage(r.offset, keyFunc, valueFunc)
r.offset, _, timestamp, headers, err = r.messageSetReader.readMessage(r.offset, keyFunc, valueFunc)
if err != nil {
return
}
msg.Offset = r.offset
msg.Time = time.Unix(timestamp / 1000, (timestamp % 1000) * 1000000)
msg.Time = time.Unix(timestamp/1000, (timestamp%1000)*1000000)
msg.Headers = headers
return
}
Expand Down
Loading

0 comments on commit 999d0b3

Please sign in to comment.