Skip to content

Commit

Permalink
Add test for ConsumePartition with OffsetOldest
Browse files Browse the repository at this point in the history
Also refactor similiar tests and fix docs
  • Loading branch information
grongor committed Dec 5, 2021
1 parent 635bcf3 commit aca9acd
Showing 1 changed file with 74 additions and 13 deletions.
87 changes: 74 additions & 13 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,28 @@ func TestConsumerOffsetManual(t *testing.T) {
// Given
broker0 := NewMockBroker(t, 0)

manualOffset := int64(1234)
offsetNewest := int64(2345)
offsetNewestAfterFetchRequest := int64(3456)

mockFetchResponse := NewMockFetchResponse(t, 1)
for i := 0; i < 10; i++ {
mockFetchResponse.SetMessage("my_topic", 0, int64(i+1234), testMsg)

// skipped because parseRecords(): offset < child.offset
mockFetchResponse.SetMessage("my_topic", 0, manualOffset-1, testMsg)

for i := int64(0); i < 10; i++ {
mockFetchResponse.SetMessage("my_topic", 0, i+manualOffset, testMsg)
}

mockFetchResponse.SetHighWaterMark("my_topic", 0, offsetNewestAfterFetchRequest)

broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetOffset("my_topic", 0, OffsetOldest, 0).
SetOffset("my_topic", 0, OffsetNewest, 2345),
SetOffset("my_topic", 0, OffsetNewest, offsetNewest),
"FetchRequest": mockFetchResponse,
})

Expand All @@ -41,44 +51,50 @@ func TestConsumerOffsetManual(t *testing.T) {
t.Fatal(err)
}

consumer, err := master.ConsumePartition("my_topic", 0, 1234)
consumer, err := master.ConsumePartition("my_topic", 0, manualOffset)
if err != nil {
t.Fatal(err)
}

// Then: messages starting from offset 1234 are consumed.
for i := 0; i < 10; i++ {
// Then
for i := int64(0); i < 10; i++ {
select {
case message := <-consumer.Messages():
assertMessageOffset(t, message, int64(i+1234))
assertMessageOffset(t, message, i+manualOffset)
case err := <-consumer.Errors():
t.Error(err)
}
}

if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewestAfterFetchRequest {
t.Errorf("Expected high water mark offset %d, found %d", offsetNewestAfterFetchRequest, hwmo)
}

safeClose(t, consumer)
safeClose(t, master)
broker0.Close()
}

// If `OffsetNewest` is passed as the initial offset then the first consumed
// message is indeed corresponds to the offset that broker claims to be the
// message indeed corresponds to the offset that broker claims to be the
// newest in its metadata response.
func TestConsumerOffsetNewest(t *testing.T) {
// Given
offsetNewest := int64(10)
offsetNewestAfterFetchRequest := int64(50)
broker0 := NewMockBroker(t, 0)
broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetOffset("my_topic", 0, OffsetNewest, 10).
SetOffset("my_topic", 0, OffsetNewest, offsetNewest).
SetOffset("my_topic", 0, OffsetOldest, 7),
"FetchRequest": NewMockFetchResponse(t, 1).
SetMessage("my_topic", 0, 9, testMsg).
SetMessage("my_topic", 0, 9, testMsg). // skipped because parseRecords(): offset < child.offset
SetMessage("my_topic", 0, 10, testMsg).
SetMessage("my_topic", 0, 11, testMsg).
SetHighWaterMark("my_topic", 0, 14),
SetHighWaterMark("my_topic", 0, offsetNewestAfterFetchRequest),
})

master, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig())
Expand All @@ -94,8 +110,53 @@ func TestConsumerOffsetNewest(t *testing.T) {

// Then
assertMessageOffset(t, <-consumer.Messages(), 10)
if hwmo := consumer.HighWaterMarkOffset(); hwmo != 14 {
t.Errorf("Expected high water mark offset 14, found %d", hwmo)
if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewestAfterFetchRequest {
t.Errorf("Expected high water mark offset %d, found %d", offsetNewestAfterFetchRequest, hwmo)
}

safeClose(t, consumer)
safeClose(t, master)
broker0.Close()
}

// If `OffsetOldest` is passed as the initial offset then the first consumed
// message is indeed the first available in the partition.
func TestConsumerOffsetOldest(t *testing.T) {
// Given
offsetNewest := int64(10)
broker0 := NewMockBroker(t, 0)
broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetOffset("my_topic", 0, OffsetNewest, offsetNewest).
SetOffset("my_topic", 0, OffsetOldest, 7),
"FetchRequest": NewMockFetchResponse(t, 1).
// skipped because parseRecords(): offset < child.offset
SetMessage("my_topic", 0, 6, testMsg).
// these will get to the Messages() channel
SetMessage("my_topic", 0, 7, testMsg).
SetMessage("my_topic", 0, 8, testMsg).
SetMessage("my_topic", 0, 9, testMsg).
SetHighWaterMark("my_topic", 0, offsetNewest),
})

master, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig())
if err != nil {
t.Fatal(err)
}

// When
consumer, err := master.ConsumePartition("my_topic", 0, OffsetOldest)
if err != nil {
t.Fatal(err)
}

// Then
assertMessageOffset(t, <-consumer.Messages(), int64(7))
if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewest {
t.Errorf("Expected high water mark offset %d, found %d", offsetNewest, hwmo)
}

safeClose(t, consumer)
Expand Down

0 comments on commit aca9acd

Please sign in to comment.