Skip to content

Commit

Permalink
Use buffer pool for decompressed buffer (#1063)
Browse files Browse the repository at this point in the history
* Use buffer pool for decompressed buffer

* Address review comments
  • Loading branch information
ashishkf committed Feb 17, 2023
1 parent dc0faf5 commit f6986fb
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
6 changes: 6 additions & 0 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,16 @@ func (batch *Batch) close() (err error) {

batch.conn = nil
batch.lock = nil

if batch.msgs != nil {
batch.msgs.discard()
}

if batch.msgs != nil && batch.msgs.decompressed != nil {
releaseBuffer(batch.msgs.decompressed)
batch.msgs.decompressed = nil
}

if err = batch.err; errors.Is(batch.err, io.EOF) {
err = nil
}
Expand Down
7 changes: 4 additions & 3 deletions message_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type messageSetReader struct {
// This is used to detect truncation of the response.
lengthRemain int

decompressed bytes.Buffer
decompressed *bytes.Buffer
}

type readerStack struct {
Expand Down Expand Up @@ -87,6 +87,7 @@ func newMessageSetReader(reader *bufio.Reader, remain int) (*messageSetReader, e
reader: reader,
remain: remain,
},
decompressed: acquireBuffer(),
}
err := res.readHeader()
return res, err
Expand Down Expand Up @@ -199,7 +200,7 @@ func (r *messageSetReader) readMessageV1(min int64, key readBytesFunc, val readB
// Allocate a buffer of size 0, which gets capped at 16 bytes
// by the bufio package. We are already reading buffered data
// here, no need to reserve another 4KB buffer.
reader: bufio.NewReaderSize(&r.decompressed, 0),
reader: bufio.NewReaderSize(r.decompressed, 0),
remain: r.decompressed.Len(),
base: offset,
parent: r.readerStack,
Expand Down Expand Up @@ -278,7 +279,7 @@ func (r *messageSetReader) readMessageV2(_ int64, key readBytesFunc, val readByt
}
r.remain -= batchRemain - int(limitReader.N)
r.readerStack = &readerStack{
reader: bufio.NewReaderSize(&r.decompressed, 0), // the new stack reads from the decompressed buffer
reader: bufio.NewReaderSize(r.decompressed, 0), // the new stack reads from the decompressed buffer
remain: r.decompressed.Len(),
base: -1, // base is unused here
parent: r.readerStack,
Expand Down

0 comments on commit f6986fb

Please sign in to comment.