Skip to content

Commit

Permalink
txns/compaction: Improve test coverage.
Browse files Browse the repository at this point in the history
Summary of changes:
* Make repeatingKeys config a long to accept larger inputs
* Use "read_committed" for Verified consumer to read only committed
batches
* Add test coverage for high cardinality inputs where nothing is
compacted.
* Improve consumer output validation for transactional verifier.
  • Loading branch information
bharathv committed Oct 20, 2022
1 parent f65e2aa commit e5a4f2f
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public void send(List<Map.Entry<String, String>> kvs) {
// Send messages with a key of 0 incrementing by 1 for
// each message produced when number specified is reached
// key is reset to 0
private final Integer repeatingKeys;
private final Long repeatingKeys;

private int keyCounter;

Expand All @@ -217,7 +217,7 @@ public void send(List<Map.Entry<String, String>> kvs) {

public VerifiableProducer(
Properties properties, String topic, int throughput, int maxMessages,
Integer valuePrefix, Long createTime, Integer repeatingKeys,
Integer valuePrefix, Long createTime, Long repeatingKeys,
boolean isTransactional, boolean enableRandomAborts, long batchSize) {

this.topic = topic;
Expand Down Expand Up @@ -332,7 +332,7 @@ private static ArgumentParser argParser() {
parser.addArgument("--repeating-keys")
.action(store())
.required(false)
.type(Integer.class)
.type(Long.class)
.metavar("REPEATING-KEYS")
.dest("repeatingKeys")
.help(
Expand Down Expand Up @@ -420,7 +420,7 @@ public static Properties loadProps(String filename) throws IOException {
String configFile = res.getString("producer.config");
Integer valuePrefix = res.getInt("valuePrefix");
Long createTime = res.getLong("createTime");
Integer repeatingKeys = res.getInt("repeatingKeys");
Long repeatingKeys = res.getLong("repeatingKeys");

if (createTime == -1L) createTime = null;

Expand Down
2 changes: 1 addition & 1 deletion tests/rptest/services/verifiable_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def __init__(self,
self.session_timeout_sec = session_timeout_sec
self.enable_autocommit = enable_autocommit
self.assignment_strategy = assignment_strategy
self.prop_file = ""
self.prop_file = "isolation.level=read_committed"
self.stop_timeout_sec = stop_timeout_sec
self.on_record_consumed = on_record_consumed
self.verify_offsets = verify_offsets
Expand Down
11 changes: 9 additions & 2 deletions tests/rptest/tests/compaction_end_to_end_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

import sys
from ducktape.mark import matrix
from ducktape.utils.util import wait_until
from rptest.clients.rpk import RpkTool
Expand Down Expand Up @@ -64,7 +65,11 @@ def topic_segments(self):
return [len(p.segments) for p in topic_partitions]

@cluster(num_nodes=5, log_allow_list=RESTART_LOG_ALLOW_LIST)
@matrix(key_set_cardinality=[10],
# sys.maxsize is a special high cardinality case where no keys should be
# compacted away. With transactions enabled, all the aborted batches
# should be compacted away from the log and should not show up in
# consumed list.
@matrix(key_set_cardinality=[10, sys.maxsize],
initial_cleanup_policy=[
TopicSpec.CLEANUP_COMPACT, TopicSpec.CLEANUP_DELETE
],
Expand Down Expand Up @@ -144,4 +149,6 @@ def segment_number_matches(predicate):
# enable consumer and validate consumed records
self.start_consumer(num_nodes=1, verify_offsets=False)

self.run_validation(enable_compaction=True)
self.run_validation(enable_compaction=True,
enable_transactions=transactions,
consumer_timeout_sec=90)
57 changes: 37 additions & 20 deletions tests/rptest/tests/end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ def run_validation(self,
producer_timeout_sec=30,
consumer_timeout_sec=30,
enable_idempotence=False,
enable_compaction=False):
enable_compaction=False,
enable_transactions=False):
try:
self.await_num_produced(min_records, producer_timeout_sec)

Expand All @@ -252,15 +253,17 @@ def run_validation(self,
self.run_consumer_validation(
consumer_timeout_sec=consumer_timeout_sec,
enable_idempotence=enable_idempotence,
enable_compaction=enable_compaction)
enable_compaction=enable_compaction,
enable_transactions=enable_transactions)
except BaseException:
self._collect_all_logs()
raise

def run_consumer_validation(self,
consumer_timeout_sec=30,
enable_idempotence=False,
enable_compaction=False) -> None:
enable_compaction=False,
enable_transactions=False) -> None:
try:
# Take copy of this dict in case a rogue VerifiableProducer
# thread modifies it.
Expand All @@ -275,12 +278,13 @@ def run_consumer_validation(self,

self.consumer.stop()

self.validate(enable_idempotence, enable_compaction)
self.validate(enable_idempotence, enable_compaction,
enable_transactions)
except BaseException:
self._collect_all_logs()
raise

def validate_compacted(self):
def validate_compacted(self, enable_transactions):

consumer_state = {}

Expand All @@ -295,12 +299,23 @@ def validate_compacted(self):
for k, v in self.producer.not_acked:
not_acked_producer_state[k].add(v)

for k, v in self.records_consumed:
consumer_state[k] = v

msg = ""
success = True
errors = []
aborted_kvs_found = []

for k, v in self.records_consumed:
if enable_transactions:
# When transactions are enabled, unacked state corresponds to aborted transactions.
# Make sure no consumed (key, value) shows up from that list.
if k in not_acked_producer_state and v in not_acked_producer_state[
k]:
aborted_kvs_found((k, v))
consumer_state[k] = v

if aborted_kvs_found:
return False, "Aborted kvs found: \n" + "\n".join(
aborted_kvs_found)

for consumed_key, consumed_value in consumer_state.items():
# invalid key consumed
Expand All @@ -311,15 +326,16 @@ def validate_compacted(self):
if acked_producer_state[consumed_key] == consumed_value:
continue

# we must check not acked state as it might have been caused
# by request timeout and a message might still have been consumed by consumer
self.logger.debug(
f"Checking not acked produced messages for key: {k}, previous acked value: {acked_producer_state[consumed_key]}, consumed value: {v}"
)
# consumed value is one of the not acked produced values
if consumed_key in not_acked_producer_state and consumed_value in not_acked_producer_state[
consumed_key]:
continue
if not enable_transactions:
# we must check not acked state as it might have been caused
# by request timeout and a message might still have been consumed by consumer
self.logger.debug(
f"Checking not acked produced messages for key: {k}, previous acked value: {acked_producer_state[consumed_key]}, consumed value: {v}"
)
# consumed value is one of the not acked produced values
if consumed_key in not_acked_producer_state and consumed_value in not_acked_producer_state[
consumed_key]:
continue

# consumed value is not equal to last acked produced value and any of not acked value, error out
success = False
Expand All @@ -330,12 +346,13 @@ def validate_compacted(self):
if not success:
msg += "Invalid value detected for consumed compacted topic records. errors: ["
for key, consumed_value, produced_acked, producer_not_acked in errors:
msg += f"key: {k} consumed value: {consumed_value}, produced values: (acked: {produced_acked}, not_acked: {producer_not_acked}) "
msg += f"key: {key} consumed value: {consumed_value}, produced values: (acked: {produced_acked}, not_acked: {producer_not_acked}) \n"
msg += "]"

return success, msg

def validate(self, enable_idempotence, enable_compaction):
def validate(self, enable_idempotence, enable_compaction,
enable_transactions):
self.logger.info("Number of acked records: %d" %
len(self.producer.acked))
self.logger.info("Number of consumed records: %d" %
Expand All @@ -344,7 +361,7 @@ def validate(self, enable_idempotence, enable_compaction):
success = True
msg = ""
if enable_compaction:
success, msg = self.validate_compacted()
success, msg = self.validate_compacted(enable_transactions)
else:
# Correctness of the set difference operation depends on using equivalent
# message_validators in producer and consumer
Expand Down

0 comments on commit e5a4f2f

Please sign in to comment.