Skip to content

Commit

Permalink
Merge pull request #5907 from VadimPlh/rebase
Browse files Browse the repository at this point in the history
[transaction] Compare record from input and output
  • Loading branch information
VadimPlh committed Aug 9, 2022
2 parents 5fb180d + c7be281 commit c324db7
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions tests/rptest/tests/transactions_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def simple_test(self):
consumer1.subscribe([self.input_t])

num_consumed_records = 0
consumed_from_input_topic = []
while num_consumed_records != self.max_records:
# Imagine that consume got broken, we read the same record twice and overshoot the condition
assert num_consumed_records < self.max_records
Expand All @@ -104,6 +105,7 @@ def simple_test(self):

for record in records:
assert (record.error() == None)
consumed_from_input_topic.append(record)
producer.produce(self.output_t.name,
record.value(),
record.key(),
Expand All @@ -119,6 +121,7 @@ def simple_test(self):

producer.flush()
consumer1.close()
assert len(consumed_from_input_topic) == self.max_records

consumer2 = ck.Consumer({
'group.id': "testtest",
Expand All @@ -127,17 +130,19 @@ def simple_test(self):
})
consumer2.subscribe([self.output_t])

final_consume_cnt = 0
expected = bytes("0", 'UTF-8')
index_from_input = 0

while final_consume_cnt != self.max_records:
while index_from_input < self.max_records:
records = self.consume(consumer2)

for record in records:
assert (record.key() == expected)
assert (record.value() == expected)
expected = bytes(str(int(record.key()) + 1), 'UTF-8')
final_consume_cnt += len(records)
assert record.key(
) == consumed_from_input_topic[index_from_input].key(
), f'Records key does not match from input {consumed_from_input_topic[index_from_input].key()}, from output {record.key()}'
assert record.value(
) == consumed_from_input_topic[index_from_input].value(
), f'Records value does not match from input {consumed_from_input_topic[index_from_input].value()}, from output {record.value()}'
index_from_input += 1

@cluster(num_nodes=3)
def rejoin_member_test(self):
Expand Down

0 comments on commit c324db7

Please sign in to comment.