Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
rystsov committed May 6, 2022
1 parent feb93d5 commit d1c4176
Showing 1 changed file with 14 additions and 10 deletions.
24 changes: 14 additions & 10 deletions tests/rptest/clients/ping_pong.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,13 @@ def check(ro, rk, rv):
class PingPong:
@staticmethod
def cold(brokers, topic, partition, logger):
random.shuffle(brokers)
bootstrap = ",".join(brokers)
consumer = LogReader(bootstrap)
consumer.init(group="ping_ponger1", topic=topic, partition=partition)
producer = SyncProducer(bootstrap)
producer.init()
return PingPong(consumer, producer, topic, partition, logger)
return PingPong(brokers, consumer, producer, topic, partition, logger)

@staticmethod
def warmed(brokers, topic, partition, logger, timeout_s):
Expand Down Expand Up @@ -141,9 +142,10 @@ def warmed(brokers, topic, partition, logger, timeout_s):
bootstrap = ",".join(brokers)
consumer = LogReader(bootstrap)
consumer.init(group="ping_ponger1", topic=topic, partition=partition)
return PingPong(consumer, producer, topic, partition, logger)
return PingPong(brokers, consumer, producer, topic, partition, logger)

def __init__(self, consumer, producer, topic, partition, logger):
def __init__(self, brokers, consumer, producer, topic, partition, logger):
self.brokers = brokers
self.logger = logger
self.topic = topic
self.partition = partition
Expand Down Expand Up @@ -171,14 +173,16 @@ def ping_pong(self, timeout_s=5, retries=0):
if count > retries:
raise
if e.args[0].code() == KafkaError._MSG_TIMED_OUT:
self.logger.info(
f"produce request {key}={value} timed out")
continue
pass
elif e.args[0].code() == KafkaError._TIMED_OUT:
self.logger.info(
f"produce request {key}={value} timed out")
continue
raise
pass
else:
raise
random.shuffle(self.brokers)
bootstrap = ",".join(self.brokers)
self.producer = SyncProducer(bootstrap)
self.producer.init()
self.logger.info(f"produce request {key}={value} timed out")

self.consumer.read_until(expect(offset, key, value),
timeout_s=timeout_s)
Expand Down

0 comments on commit d1c4176

Please sign in to comment.