diff --git a/consumer_test.go b/consumer_test.go index 7f9ff2c2e..8d8c33b8c 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -20,18 +20,28 @@ func TestConsumerOffsetManual(t *testing.T) { // Given broker0 := NewMockBroker(t, 0) + manualOffset := int64(1234) + offsetNewest := int64(2345) + offsetNewestAfterFetchRequest := int64(3456) + mockFetchResponse := NewMockFetchResponse(t, 1) - for i := 0; i < 10; i++ { - mockFetchResponse.SetMessage("my_topic", 0, int64(i+1234), testMsg) + + // skipped because parseRecords(): offset < child.offset + mockFetchResponse.SetMessage("my_topic", 0, manualOffset-1, testMsg) + + for i := int64(0); i < 10; i++ { + mockFetchResponse.SetMessage("my_topic", 0, i+manualOffset, testMsg) } + mockFetchResponse.SetHighWaterMark("my_topic", 0, offsetNewestAfterFetchRequest) + broker0.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetBroker(broker0.Addr(), broker0.BrokerID()). SetLeader("my_topic", 0, broker0.BrokerID()), "OffsetRequest": NewMockOffsetResponse(t). SetOffset("my_topic", 0, OffsetOldest, 0). - SetOffset("my_topic", 0, OffsetNewest, 2345), + SetOffset("my_topic", 0, OffsetNewest, offsetNewest), "FetchRequest": mockFetchResponse, }) @@ -41,44 +51,50 @@ func TestConsumerOffsetManual(t *testing.T) { t.Fatal(err) } - consumer, err := master.ConsumePartition("my_topic", 0, 1234) + consumer, err := master.ConsumePartition("my_topic", 0, manualOffset) if err != nil { t.Fatal(err) } - // Then: messages starting from offset 1234 are consumed. - for i := 0; i < 10; i++ { + // Then + for i := int64(0); i < 10; i++ { select { case message := <-consumer.Messages(): - assertMessageOffset(t, message, int64(i+1234)) + assertMessageOffset(t, message, i+manualOffset) case err := <-consumer.Errors(): t.Error(err) } } + if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewestAfterFetchRequest { + t.Errorf("Expected high water mark offset %d, found %d", offsetNewestAfterFetchRequest, hwmo) + } + safeClose(t, consumer) safeClose(t, master) broker0.Close() } // If `OffsetNewest` is passed as the initial offset then the first consumed -// message is indeed corresponds to the offset that broker claims to be the +// message indeed corresponds to the offset that broker claims to be the // newest in its metadata response. func TestConsumerOffsetNewest(t *testing.T) { // Given + offsetNewest := int64(10) + offsetNewestAfterFetchRequest := int64(50) broker0 := NewMockBroker(t, 0) broker0.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetBroker(broker0.Addr(), broker0.BrokerID()). SetLeader("my_topic", 0, broker0.BrokerID()), "OffsetRequest": NewMockOffsetResponse(t). - SetOffset("my_topic", 0, OffsetNewest, 10). + SetOffset("my_topic", 0, OffsetNewest, offsetNewest). SetOffset("my_topic", 0, OffsetOldest, 7), "FetchRequest": NewMockFetchResponse(t, 1). - SetMessage("my_topic", 0, 9, testMsg). + SetMessage("my_topic", 0, 9, testMsg). // skipped because parseRecords(): offset < child.offset SetMessage("my_topic", 0, 10, testMsg). SetMessage("my_topic", 0, 11, testMsg). - SetHighWaterMark("my_topic", 0, 14), + SetHighWaterMark("my_topic", 0, offsetNewestAfterFetchRequest), }) master, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig()) @@ -94,8 +110,53 @@ func TestConsumerOffsetNewest(t *testing.T) { // Then assertMessageOffset(t, <-consumer.Messages(), 10) - if hwmo := consumer.HighWaterMarkOffset(); hwmo != 14 { - t.Errorf("Expected high water mark offset 14, found %d", hwmo) + if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewestAfterFetchRequest { + t.Errorf("Expected high water mark offset %d, found %d", offsetNewestAfterFetchRequest, hwmo) + } + + safeClose(t, consumer) + safeClose(t, master) + broker0.Close() +} + +// If `OffsetOldest` is passed as the initial offset then the first consumed +// message is indeed the first available in the partition. +func TestConsumerOffsetOldest(t *testing.T) { + // Given + offsetNewest := int64(10) + broker0 := NewMockBroker(t, 0) + broker0.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetLeader("my_topic", 0, broker0.BrokerID()), + "OffsetRequest": NewMockOffsetResponse(t). + SetOffset("my_topic", 0, OffsetNewest, offsetNewest). + SetOffset("my_topic", 0, OffsetOldest, 7), + "FetchRequest": NewMockFetchResponse(t, 1). + // skipped because parseRecords(): offset < child.offset + SetMessage("my_topic", 0, 6, testMsg). + // these will get to the Messages() channel + SetMessage("my_topic", 0, 7, testMsg). + SetMessage("my_topic", 0, 8, testMsg). + SetMessage("my_topic", 0, 9, testMsg). + SetHighWaterMark("my_topic", 0, offsetNewest), + }) + + master, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig()) + if err != nil { + t.Fatal(err) + } + + // When + consumer, err := master.ConsumePartition("my_topic", 0, OffsetOldest) + if err != nil { + t.Fatal(err) + } + + // Then + assertMessageOffset(t, <-consumer.Messages(), int64(7)) + if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewest { + t.Errorf("Expected high water mark offset %d, found %d", offsetNewest, hwmo) } safeClose(t, consumer)