Skip to content

Commit

Permalink
tests: Deflake test_migrating_consume_offsets
Browse files Browse the repository at this point in the history
Fixes synchronization around failure injection and
node restarts.

Looped through the test 50 times locally and didn't
run into any issues
  • Loading branch information
bharathv committed Jun 8, 2022
1 parent bc76e1b commit 68013af
Showing 1 changed file with 38 additions and 14 deletions.
52 changes: 38 additions & 14 deletions tests/rptest/tests/consumer_offsets_migration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,26 +49,48 @@ def test_migrating_consume_offsets(self, failures, cpus):

self.redpanda.start()
self._client = DefaultClient(self.redpanda)
# set of failure suppressed nodes - required to make restarts deterministic
suppressed = set()

# Set of nodes that are undergoing some destructive operations like fault injections
# or restarts. We only want to do one of those at a time to avoid conflicts.
busy_nodes = set()
# synchronize access to busy_nodes set.
busy_nodes_lock = threading.RLock()

# Atomic "mark a node busy if it is not" function.
def mark_node_busy(node_idx):
with busy_nodes_lock:
if node_idx in busy_nodes:
return False
busy_nodes.add(node_idx)
self.logger.debug(f"Marked {node_idx} busy")
return True

def mark_node_free(node_idx):
with busy_nodes_lock:
assert node_idx in busy_nodes
busy_nodes.remove(node_idx)
self.logger.debug(f"Marked {node_idx} free")

def failure_injector_loop():
f_injector = FailureInjector(self.redpanda)
while failures:
f_type = random.choice(FailureSpec.FAILURE_TYPES)
length = 0
node = random.choice(self.redpanda.nodes)
while self.redpanda.idx(node) in suppressed:
while not mark_node_busy(self.redpanda.idx(node)):
node = random.choice(self.redpanda.nodes)

# allow suspending any node
if f_type == FailureSpec.FAILURE_SUSPEND:
length = random.randint(
1,
ConsumerOffsetsMigrationTest.max_suspend_duration_sec)
try:
# allow suspending any node
if f_type == FailureSpec.FAILURE_SUSPEND:
length = random.randint(
1, ConsumerOffsetsMigrationTest.
max_suspend_duration_sec)

f_injector.inject_failure(
FailureSpec(node=node, type=f_type, length=length))
f_injector.inject_failure(
FailureSpec(node=node, type=f_type, length=length))
finally:
mark_node_free(self.redpanda.idx(node))

delay = random.randint(
ConsumerOffsetsMigrationTest.min_inter_failure_time_sec,
Expand Down Expand Up @@ -136,10 +158,12 @@ def _second_group_empty():
# enable consumer offsets support
self.redpanda.set_environment({})
for n in self.redpanda.nodes:
id = self.redpanda.idx(n)
suppressed.add(id)
self.redpanda.restart_nodes(n, stop_timeout=60)
suppressed.remove(id)
idx = self.redpanda.idx(n)
wait_until(lambda: mark_node_busy(idx), 90, backoff_sec=2)
try:
self.redpanda.restart_nodes(n, stop_timeout=60)
finally:
mark_node_free(idx)
# wait for leader balancer to start evening out leadership
wait_until(cluster_is_stable, 90, backoff_sec=2)

Expand Down

0 comments on commit 68013af

Please sign in to comment.