Skip to content

Commit

Permalink
Fix wrong offsets in mock Consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
grongor committed Dec 17, 2021
1 parent 6a2f0c2 commit f05189d
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 7 deletions.
20 changes: 13 additions & 7 deletions mocks/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,19 @@ func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset
}

if c.partitionConsumers[topic][partition] == nil {
highWatermarkOffset := offset
if offset == sarama.OffsetOldest {
highWatermarkOffset = 0
}

c.partitionConsumers[topic][partition] = &PartitionConsumer{
t: c.t,
topic: topic,
partition: partition,
offset: offset,
messages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize),
errors: make(chan *sarama.ConsumerError, c.config.ChannelBufferSize),
highWaterMarkOffset: highWatermarkOffset,
t: c.t,
topic: topic,
partition: partition,
offset: offset,
messages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize),
errors: make(chan *sarama.ConsumerError, c.config.ChannelBufferSize),
}
}

Expand Down Expand Up @@ -282,7 +288,7 @@ func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) *Partitio

msg.Topic = pc.topic
msg.Partition = pc.partition
msg.Offset = atomic.AddInt64(&pc.highWaterMarkOffset, 1)
msg.Offset = atomic.AddInt64(&pc.highWaterMarkOffset, 1) - 1

pc.messages <- msg

Expand Down
65 changes: 65 additions & 0 deletions mocks/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,3 +247,68 @@ func TestConsumerUnexpectedTopicMetadata(t *testing.T) {
t.Errorf("Expected an expectation failure to be set on the error reporter.")
}
}

func TestConsumerOffsetsAreManagedCorrectlyWithOffsetOldest(t *testing.T) {
trm := newTestReporterMock()
consumer := NewConsumer(trm, NewTestConfig())
pcmock := consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest)
pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")})
pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")})
pcmock.ExpectMessagesDrainedOnClose()

pc, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest)
if err != nil {
t.Error(err)
}

message1 := <-pc.Messages()
if message1.Offset != 0 {
t.Errorf("Expected offset of first message in the partition to be 0, got %d", message1.Offset)
}

message2 := <-pc.Messages()
if message2.Offset != 1 {
t.Errorf("Expected offset of second message in the partition to be 1, got %d", message2.Offset)
}

if err := consumer.Close(); err != nil {
t.Error(err)
}

if len(trm.errors) != 0 {
t.Errorf("Expected to not report any errors, found: %v", trm.errors)
}
}

func TestConsumerOffsetsAreManagedCorrectlyWithSpecifiedOffset(t *testing.T) {
startingOffset := int64(123)
trm := newTestReporterMock()
consumer := NewConsumer(trm, NewTestConfig())
pcmock := consumer.ExpectConsumePartition("test", 0, startingOffset)
pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")})
pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")})
pcmock.ExpectMessagesDrainedOnClose()

pc, err := consumer.ConsumePartition("test", 0, startingOffset)
if err != nil {
t.Error(err)
}

message1 := <-pc.Messages()
if message1.Offset != startingOffset {
t.Errorf("Expected offset of first message to be %d, got %d", startingOffset, message1.Offset)
}

message2 := <-pc.Messages()
if message2.Offset != startingOffset+1 {
t.Errorf("Expected offset of second message to be %d, got %d", startingOffset+1, message2.Offset)
}

if err := consumer.Close(); err != nil {
t.Error(err)
}

if len(trm.errors) != 0 {
t.Errorf("Expected to not report any errors, found: %v", trm.errors)
}
}

0 comments on commit f05189d

Please sign in to comment.