Skip to content

Commit

Permalink
ducktape: python lint partition_balancer_test.py
Browse files Browse the repository at this point in the history
  • Loading branch information
ZeDRoman committed Jul 19, 2022
1 parent a5f1fc0 commit 919eafc
Showing 1 changed file with 27 additions and 29 deletions.
56 changes: 27 additions & 29 deletions tests/rptest/tests/partition_balancer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ def __init__(self, ctx, *args, **kwargs):
ctx,
*args,
extra_rp_conf={
'partition_autobalancing_mode': 'continuous',
'partition_autobalancing_node_availability_timeout_sec':
self.NODE_AVAILABILITY_TIMEOUT,
'partition_autobalancing_tick_interval_ms': 5000,
'raft_learner_recovery_rate': 1_000_000,
"partition_autobalancing_mode": "continuous",
"partition_autobalancing_node_availability_timeout_sec": self.NODE_AVAILABILITY_TIMEOUT,
"partition_autobalancing_tick_interval_ms": 5000,
"raft_learner_recovery_rate": 1_000_000,
},
**kwargs)
**kwargs,
)

def node2partition_count(self):
topics = [self.topic]
Expand All @@ -53,7 +53,8 @@ def all_partitions_ready():
all_partitions_ready,
timeout_sec=30,
backoff_sec=1,
err_msg="failed to wait until all partitions have leaders")
err_msg="failed to wait until all partitions have leaders",
)

for p in partitions:
for r in p.replicas:
Expand All @@ -69,23 +70,27 @@ def check():
req_start = time.time()

status = admin.get_partition_balancer_status(timeout=1)
self.logger.info(f'partition balancer status: {status}')
self.logger.info(f"partition balancer status: {status}")

if 'seconds_since_last_tick' not in status:
if "seconds_since_last_tick" not in status:
return False
return (req_start - status['seconds_since_last_tick'] - 1 > start
and predicate(status), status)
return (
req_start - status["seconds_since_last_tick"] - 1 > start
and predicate(status),
status,
)

return wait_until_result(
check,
timeout_sec=timeout_sec,
backoff_sec=2,
err_msg="failed to wait until status condition")
err_msg="failed to wait until status condition",
)

def wait_until_ready(self, timeout_sec=60):
return self.wait_until_status(
lambda status: status['status'] == 'ready',
timeout_sec=timeout_sec)
lambda status: status["status"] == "ready", timeout_sec=timeout_sec
)

@cluster(num_nodes=7, log_allow_list=CHAOS_LOG_ALLOW_LIST)
def test_unavailable_nodes(self):
Expand All @@ -106,8 +111,9 @@ def test_unavailable_nodes(self):
for n in range(10):
node = self.redpanda.nodes[n % 5]
failure_types = [
FailureSpec.FAILURE_KILL, FailureSpec.FAILURE_TERMINATE,
FailureSpec.FAILURE_SUSPEND
FailureSpec.FAILURE_KILL,
FailureSpec.FAILURE_TERMINATE,
FailureSpec.FAILURE_SUSPEND,
]
failure = FailureSpec(random.choice(failure_types), node)
f_injector._start_func(failure.type)(failure.node)
Expand All @@ -118,21 +124,13 @@ def test_unavailable_nodes(self):

time.sleep(self.NODE_AVAILABILITY_TIMEOUT)

# TODO: enable when cancellation gets implemented
# wait_for_quiescent_state = random.random() < 0.5
wait_for_quiescent_state = True
self.wait_until_ready()

if wait_for_quiescent_state:
self.wait_until_ready()
node2pc = self.node2partition_count()
self.logger.info(f"partition counts after: {node2pc}")

node2pc = self.node2partition_count()
self.logger.info(f'partition counts after: {node2pc}')

assert sum(node2pc.values()) == total_replicas
assert self.redpanda.idx(node) not in node2pc
else:
self.wait_until_status(lambda s: s['status'] == 'in_progress'
or s['status'] == 'ready')
assert sum(node2pc.values()) == total_replicas
assert self.redpanda.idx(node) not in node2pc

prev_failure = failure

Expand Down

0 comments on commit 919eafc

Please sign in to comment.