Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: Deflake test_migrating_consume_offsets #5074

Merged
merged 1 commit into from
Jun 11, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 38 additions & 14 deletions tests/rptest/tests/consumer_offsets_migration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,26 +49,48 @@ def test_migrating_consume_offsets(self, failures, cpus):

self.redpanda.start()
self._client = DefaultClient(self.redpanda)
# set of failure suppressed nodes - required to make restarts deterministic
suppressed = set()

# Set of nodes that are undergoing some destructive operations like fault injections
# or restarts. We only want to do one of those at a time to avoid conflicts.
busy_nodes = set()
# synchronize access to busy_nodes set.
busy_nodes_lock = threading.RLock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixes synchronization around failure injection and
node restarts.

could you add a little context into the commit message about what was wrong and how it was fixed?

Looped through the test 50 times locally and didn't
run into any issues

this is great context to add to the PR cover letter, but doesn't add much value in the commit message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the commit message and did a force push.


# Atomic "mark a node busy if it is not" function.
def mark_node_busy(node_idx):
with busy_nodes_lock:
if node_idx in busy_nodes:
return False
busy_nodes.add(node_idx)
self.logger.debug(f"Marked {node_idx} busy")
return True

def mark_node_free(node_idx):
with busy_nodes_lock:
assert node_idx in busy_nodes
busy_nodes.remove(node_idx)
self.logger.debug(f"Marked {node_idx} free")

def failure_injector_loop():
f_injector = FailureInjector(self.redpanda)
while failures:
f_type = random.choice(FailureSpec.FAILURE_TYPES)
length = 0
node = random.choice(self.redpanda.nodes)
while self.redpanda.idx(node) in suppressed:
while not mark_node_busy(self.redpanda.idx(node)):
node = random.choice(self.redpanda.nodes)

# allow suspending any node
if f_type == FailureSpec.FAILURE_SUSPEND:
length = random.randint(
1,
ConsumerOffsetsMigrationTest.max_suspend_duration_sec)
try:
# allow suspending any node
if f_type == FailureSpec.FAILURE_SUSPEND:
length = random.randint(
1, ConsumerOffsetsMigrationTest.
max_suspend_duration_sec)

f_injector.inject_failure(
FailureSpec(node=node, type=f_type, length=length))
f_injector.inject_failure(
FailureSpec(node=node, type=f_type, length=length))
finally:
mark_node_free(self.redpanda.idx(node))

delay = random.randint(
ConsumerOffsetsMigrationTest.min_inter_failure_time_sec,
Expand Down Expand Up @@ -136,10 +158,12 @@ def _second_group_empty():
# enable consumer offsets support
self.redpanda.set_environment({})
for n in self.redpanda.nodes:
id = self.redpanda.idx(n)
suppressed.add(id)
self.redpanda.restart_nodes(n, stop_timeout=60)
suppressed.remove(id)
idx = self.redpanda.idx(n)
wait_until(lambda: mark_node_busy(idx), 90, backoff_sec=2)
try:
self.redpanda.restart_nodes(n, stop_timeout=60)
finally:
mark_node_free(idx)
# wait for leader balancer to start evening out leadership
wait_until(cluster_is_stable, 90, backoff_sec=2)

Expand Down