diff --git a/mocks/consumer.go b/mocks/consumer.go index c10ae4765..0d9396f34 100644 --- a/mocks/consumer.go +++ b/mocks/consumer.go @@ -155,13 +155,19 @@ func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset } if c.partitionConsumers[topic][partition] == nil { + highWatermarkOffset := offset + if offset == sarama.OffsetOldest { + highWatermarkOffset = 0 + } + c.partitionConsumers[topic][partition] = &PartitionConsumer{ - t: c.t, - topic: topic, - partition: partition, - offset: offset, - messages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize), - errors: make(chan *sarama.ConsumerError, c.config.ChannelBufferSize), + highWaterMarkOffset: highWatermarkOffset, + t: c.t, + topic: topic, + partition: partition, + offset: offset, + messages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize), + errors: make(chan *sarama.ConsumerError, c.config.ChannelBufferSize), } } @@ -282,7 +288,7 @@ func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) *Partitio msg.Topic = pc.topic msg.Partition = pc.partition - msg.Offset = atomic.AddInt64(&pc.highWaterMarkOffset, 1) + msg.Offset = atomic.AddInt64(&pc.highWaterMarkOffset, 1) - 1 pc.messages <- msg diff --git a/mocks/consumer_test.go b/mocks/consumer_test.go index 9367d6aa1..c46212fcf 100644 --- a/mocks/consumer_test.go +++ b/mocks/consumer_test.go @@ -247,3 +247,68 @@ func TestConsumerUnexpectedTopicMetadata(t *testing.T) { t.Errorf("Expected an expectation failure to be set on the error reporter.") } } + +func TestConsumerOffsetsAreManagedCorrectlyWithOffsetOldest(t *testing.T) { + trm := newTestReporterMock() + consumer := NewConsumer(trm, NewTestConfig()) + pcmock := consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest) + pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")}) + pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")}) + pcmock.ExpectMessagesDrainedOnClose() + + pc, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest) + if err != nil { + t.Error(err) + } + + message1 := <-pc.Messages() + if message1.Offset != 0 { + t.Errorf("Expected offset of first message in the partition to be 0, got %d", message1.Offset) + } + + message2 := <-pc.Messages() + if message2.Offset != 1 { + t.Errorf("Expected offset of second message in the partition to be 1, got %d", message2.Offset) + } + + if err := consumer.Close(); err != nil { + t.Error(err) + } + + if len(trm.errors) != 0 { + t.Errorf("Expected to not report any errors, found: %v", trm.errors) + } +} + +func TestConsumerOffsetsAreManagedCorrectlyWithSpecifiedOffset(t *testing.T) { + startingOffset := int64(123) + trm := newTestReporterMock() + consumer := NewConsumer(trm, NewTestConfig()) + pcmock := consumer.ExpectConsumePartition("test", 0, startingOffset) + pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")}) + pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")}) + pcmock.ExpectMessagesDrainedOnClose() + + pc, err := consumer.ConsumePartition("test", 0, startingOffset) + if err != nil { + t.Error(err) + } + + message1 := <-pc.Messages() + if message1.Offset != startingOffset { + t.Errorf("Expected offset of first message to be %d, got %d", startingOffset, message1.Offset) + } + + message2 := <-pc.Messages() + if message2.Offset != startingOffset+1 { + t.Errorf("Expected offset of second message to be %d, got %d", startingOffset+1, message2.Offset) + } + + if err := consumer.Close(); err != nil { + t.Error(err) + } + + if len(trm.errors) != 0 { + t.Errorf("Expected to not report any errors, found: %v", trm.errors) + } +}