Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce raft_availability ducktape execution time #4598

Merged
merged 2 commits into from
May 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions tests/rptest/clients/ping_pong.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
from confluent_kafka import Producer, Consumer, KafkaException, KafkaError
from confluent_kafka import TopicPartition, OFFSET_BEGINNING
import random

import time


class SyncProducer:
def __init__(self, bootstrap):
self.bootstrap = bootstrap
self.producer = None
self.last_msg = None

def init(self):
self.producer = Producer({
"bootstrap.servers": self.bootstrap,
"request.required.acks": -1,
"retries": 5,
"enable.idempotence": True
})

def on_delivery(self, err, msg):
if err is not None:
raise KafkaException(err)
self.last_msg = msg

def produce(self, topic, partition, key, value, timeout_s):
self.last_msg = None
self.producer.produce(topic,
key=key.encode('utf-8'),
value=value.encode('utf-8'),
partition=partition,
callback=lambda e, m: self.on_delivery(e, m))
self.producer.flush(timeout_s)
msg = self.last_msg
if msg == None:
raise KafkaException(KafkaError(KafkaError._MSG_TIMED_OUT))
if msg.error() != None:
raise KafkaException(msg.error())
assert msg.offset() != None
return msg.offset()


class LogReader:
def __init__(self, bootstrap):
self.bootstrap = bootstrap
self.consumer = None
self.stream = None

def init(self, group, topic, partition):
self.consumer = Consumer({
"bootstrap.servers": self.bootstrap,
"group.id": group,
"enable.auto.commit": False,
"auto.offset.reset": "earliest",
"isolation.level": "read_committed"
})
self.consumer.assign(
[TopicPartition(topic, partition, OFFSET_BEGINNING)])
self.stream = self.stream_gen()

def stream_gen(self):
while True:
msgs = self.consumer.consume(timeout=10)
for msg in msgs:
yield msg

def read_until(self, check, timeout_s):
begin = time.time()
while True:
if time.time() - begin > timeout_s:
raise KafkaException(KafkaError(KafkaError._TIMED_OUT))
for msg in self.stream:
offset = msg.offset()
value = msg.value().decode('utf-8')
key = msg.key().decode('utf-8')
if check(offset, key, value):
return


def expect(offset, key, value):
def check(ro, rk, rv):
if ro < offset:
return False
if ro == offset:
if rk != key:
raise RuntimeError(f"expected key='{key}' got '{rk}'")
if rv != value:
raise RuntimeError(f"expected value='{value}' got '{rv}'")
return True
raise RuntimeError(f"read offset={ro} but skipped {offset}")

return check


class PingPong:
def __init__(self, brokers, topic, partition, logger):
self.brokers = brokers
random.shuffle(self.brokers)
bootstrap = ",".join(self.brokers)
self.consumer = LogReader(bootstrap)
self.consumer.init(group="ping_ponger1",
topic=topic,
partition=partition)
self.producer = SyncProducer(bootstrap)
self.producer.init()
self.logger = logger
self.topic = topic
self.partition = partition

def ping_pong(self, timeout_s=5, retries=0):
key = str(random.randint(0, 1000))
value = str(random.randint(0, 1000))

start = time.time()

offset = None
count = 0
while True:
count += 1
try:
offset = self.producer.produce(topic=self.topic,
partition=self.partition,
key=key,
value=value,
timeout_s=timeout_s)
break
except KafkaException as e:
if count > retries:
raise
if e.args[0].code() == KafkaError._MSG_TIMED_OUT:
pass
elif e.args[0].code() == KafkaError._TIMED_OUT:
pass
else:
raise
random.shuffle(self.brokers)
bootstrap = ",".join(self.brokers)
# recreating a producer to overcome this issue
# https://github.com/confluentinc/confluent-kafka-python/issues/1335
# once it's fixed we should rely on the internal confluent_kafka's
# ability to retry the init_producer_id request
self.producer = SyncProducer(bootstrap)
rystsov marked this conversation as resolved.
Show resolved Hide resolved
self.producer.init()
self.logger.info(f"produce request {key}={value} timed out")

self.consumer.read_until(expect(offset, key, value),
timeout_s=timeout_s)
latency = time.time() - start
self.logger.info(
f"ping_pong produced and consumed {key}={value}@{offset} in {(latency)*1000.0:.2f} ms"
)
52 changes: 24 additions & 28 deletions tests/rptest/tests/raft_availability_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from rptest.clients.kafka_cat import KafkaCat
from rptest.clients.rpk import RpkTool, RpkException
from rptest.clients.types import TopicSpec
from rptest.clients.ping_pong import PingPong
from rptest.services.failure_injector import FailureInjector, FailureSpec
from rptest.tests.redpanda_test import RedpandaTest
from rptest.services.rpk_producer import RpkProducer
Expand Down Expand Up @@ -95,41 +96,30 @@ def check():
assert result[0] is not None
return result[0]

def _ping_pong(self, timeout=5):
kc = KafkaCat(self.redpanda)
rpk = RpkTool(self.redpanda)
def ping_pong(self):
return PingPong(self.redpanda.brokers_list(), self.topic, 0,
self.logger)

payload = str(random.randint(0, 1000))
start = time.time()
offset = rpk.produce(self.topic, "tkey", payload, timeout=timeout)
consumed = kc.consume_one(self.topic, 0, offset)
latency = time.time() - start
self.logger.info(
f"_ping_pong produced '{payload}' consumed '{consumed}' in {(latency)*1000.0:.2f} ms"
)
if consumed['payload'] != payload:
raise RuntimeError(f"expected '{payload}' got '{consumed}'")

def _is_available(self, timeout=5):
def _is_available(self, timeout_s=5):
try:
# Should fail
self._ping_pong(timeout)
except RpkException:
self.ping_pong().ping_pong(timeout_s)
except:
return False
else:
return True

def _expect_unavailable(self):
try:
# Should fail
self._ping_pong()
except RpkException:
self.logger.info("Cluster is unavailable as expected")
self.ping_pong().ping_pong()
except:
self.logger.exception("Cluster is unavailable as expected")
else:
assert False, "ping_pong should not have worked "

def _expect_available(self):
self._ping_pong()
self.ping_pong().ping_pong()
self.logger.info("Cluster is available as expected")

def _transfer_leadership(self, admin: Admin, namespace: str, topic: str,
Expand Down Expand Up @@ -276,7 +266,7 @@ def test_two_nodes_down(self):
# Find which node is the leader
initial_leader_id, replicas = self._wait_for_leader()

self._ping_pong()
self.ping_pong().ping_pong()

leader_node = self.redpanda.get_node(initial_leader_id)
other_node_id = (set(replicas) - {initial_leader_id}).pop()
Expand Down Expand Up @@ -474,8 +464,10 @@ def test_follower_isolation(self):
self.redpanda.get_node(follower)))

# expect messages to be produced and consumed without a timeout
for i in range(0, 128):
self._ping_pong(ELECTION_TIMEOUT * 6)
connection = self.ping_pong()
connection.ping_pong(timeout_s=10, retries=10)
for i in range(0, 127):
connection.ping_pong()

@cluster(num_nodes=3, log_allow_list=RESTART_LOG_ALLOW_LIST)
def test_id_allocator_leader_isolation(self):
Expand Down Expand Up @@ -505,8 +497,10 @@ def test_id_allocator_leader_isolation(self):
self.redpanda.get_node(initial_leader_id)))

# expect messages to be produced and consumed without a timeout
for i in range(0, 128):
self._ping_pong(ELECTION_TIMEOUT * 6)
connection = self.ping_pong()
connection.ping_pong(timeout_s=10, retries=10)
for i in range(0, 127):
connection.ping_pong()

@cluster(num_nodes=3)
def test_initial_leader_stability(self):
Expand Down Expand Up @@ -580,5 +574,7 @@ def controller_available():
hosts=hosts,
check=lambda node_id: node_id != controller_id)

for i in range(0, 128):
self._ping_pong()
connection = self.ping_pong()
connection.ping_pong(timeout_s=10, retries=10)
for i in range(0, 127):
connection.ping_pong()