Skip to content

Commit

Permalink
Merge pull request #4805 from NyaliaLui/kstreams-pageview-more-logs
Browse files Browse the repository at this point in the history
tests: Kstreams PageView improve logging
  • Loading branch information
NyaliaLui committed May 24, 2022
2 parents 21f310e + a1340df commit 006f3d6
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 34 deletions.
9 changes: 7 additions & 2 deletions tests/rptest/scale_tests/franzgo_bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# by the Apache License, Version 2.0

from rptest.services.cluster import cluster
from ducktape.utils.util import wait_until
from rptest.services.compatibility.example_runner import ExampleRunner
import rptest.services.compatibility.franzgo_examples as FranzGoExamples
from rptest.tests.redpanda_test import RedpandaTest
Expand Down Expand Up @@ -66,13 +67,17 @@ def __init__(self, test_context, enable_sasl=False, group=None):
def test_franzgo_bench(self):
# Start the produce bench
self._producer.start()
self._producer.wait()
wait_until(self._producer.condition_met,
timeout_sec=self._timeout,
backoff_sec=1)

# Start the consume bench.
# Running the example sequentially because
# it's easier to debug.
self._consumer.start()
self._consumer.wait()
wait_until(self._consumer.condition_met,
timeout_sec=self._timeout,
backoff_sec=1)


class FranzGoWithoutGroupTest(FranzGoBase):
Expand Down
3 changes: 2 additions & 1 deletion tests/rptest/services/compatibility/example_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ def __init__(self, redpanda):
# Calls the internal condition and
# automatically stores the result
def condition(self, line):
self._condition_met = self._condition(line)
if not self._condition_met:
self._condition_met = self._condition(line)

# Was the internal condition met?
def condition_met(self):
Expand Down
24 changes: 21 additions & 3 deletions tests/rptest/services/compatibility/example_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from ducktape.services.background_thread import BackgroundThreadService
from ducktape.cluster.remoteaccount import RemoteCommandError

import threading


class ExampleRunner(BackgroundThreadService):
"""
Expand All @@ -25,17 +27,28 @@ def __init__(self, context, example, timeout_sec=10):

self._pid = None

self._stopping = threading.Event()

def _worker(self, idx, node):
start_time = time.time()
self._stopping.clear()

# Some examples require the hostname of the node
self._example.set_node_name(node.name)

# Run the example until the condition is met or timeout occurs
cmd = "echo $$ ; " + self._example.cmd()
output_iter = node.account.ssh_capture(cmd)
while not self._example.condition_met(
) and time.time() < start_time + self._timeout:

start_time = time.time()

while True:

# Terminate loop on timeout or stop_node
if time.time() > start_time + self._timeout:
break
if self._stopping.is_set():
break

line = next(output_iter)
line = line.strip()
self.logger.debug(line)
Expand All @@ -48,11 +61,16 @@ def _worker(self, idx, node):
# store result in a boolean variable
self._example.condition(line)

def condition_met(self):
return self._example.condition_met()

# Returns the node name that the example is running on
def node_name(self):
return self._example.node_name()

def stop_node(self, node):
self._stopping.set()

try:
if self._pid:
node.account.signal(self._pid, 9, allow_fail=True)
Expand Down
76 changes: 51 additions & 25 deletions tests/rptest/tests/compatibility/kafka_streams_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# by the Apache License, Version 2.0

from rptest.services.cluster import cluster
from ducktape.mark import ok_to_fail
from ducktape.utils.util import wait_until

from rptest.services.compatibility.example_runner import ExampleRunner
Expand Down Expand Up @@ -77,14 +78,15 @@ def test_kafka_streams(self):

# Start the example
example.start()
example.wait()
wait_until(example.condition_met,
timeout_sec=self._timeout,
backoff_sec=1)

# Start the driver
driver.start()
driver.wait()

driver.stop()
example.stop()
wait_until(driver.condition_met,
timeout_sec=self._timeout,
backoff_sec=1)


class KafkaStreamsProdConsBase(KafkaStreamsTest):
Expand Down Expand Up @@ -139,7 +141,6 @@ def try_cons():

consumer.stop()
producer.stop()
example.stop()


class KafkaStreamsTopArticles(KafkaStreamsDriverBase):
Expand Down Expand Up @@ -201,25 +202,50 @@ def __init__(self, test_context):
enable_sr=True)


# Disabled for https://github.com/redpanda-data/redpanda/issues/4637
# class KafkaStreamsPageView(KafkaStreamsDriverBase):
# """
# Test KafkaStreams PageView example which performs a join between a
# KStream and a KTable
# """
# topics = (
# TopicSpec(name="PageViews"),
# TopicSpec(name="UserProfiles"),
# TopicSpec(name="PageViewsByRegion"),
# )

# Example = KafkaStreamExamples.KafkaStreamsPageView
# Driver = KafkaStreamExamples.KafkaStreamsPageView

# def __init__(self, test_context):
# super(KafkaStreamsPageView, self).__init__(test_context=test_context,
# enable_pp=True,
# enable_sr=True)
class KafkaStreamsPageView(RedpandaTest):
"""
Test KafkaStreams PageView example which performs a join between a
KStream and a KTable
"""
topics = (
TopicSpec(name="PageViews"),
TopicSpec(name="UserProfiles"),
TopicSpec(name="PageViewsByRegion"),
)

def __init__(self, test_context):
super(KafkaStreamsPageView, self).__init__(test_context=test_context,
enable_pp=True,
enable_sr=True)

self._timeout = 300

@ok_to_fail # For https://github.com/redpanda-data/redpanda/issues/4637
@cluster(num_nodes=5)
def test_kafka_streams_page_view(self):
example_jar = KafkaStreamExamples.KafkaStreamsPageView(self.redpanda,
is_driver=False)
example = ExampleRunner(self.test_context,
example_jar,
timeout_sec=self._timeout)

driver_jar = KafkaStreamExamples.KafkaStreamsPageView(self.redpanda,
is_driver=True)
driver = ExampleRunner(self.test_context,
driver_jar,
timeout_sec=self._timeout)

# Start the example
example.start()
wait_until(example.condition_met,
timeout_sec=self._timeout,
backoff_sec=1)

# Start the driver
driver.start()
wait_until(driver.condition_met,
timeout_sec=self._timeout,
backoff_sec=1)


class KafkaStreamsSumLambda(KafkaStreamsDriverBase):
Expand Down
12 changes: 9 additions & 3 deletions tests/rptest/tests/compatibility/sarama_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ def test_sarama_interceptors(self):
# Start the example
example.start()

example.wait()
wait_until(example.condition_met,
timeout_sec=self._timeout,
backoff_sec=1)

@cluster(num_nodes=4)
def test_sarama_http_server(self):
Expand All @@ -62,7 +64,9 @@ def test_sarama_http_server(self):
example.start()

# Wait for the server to load
example.wait()
wait_until(example.condition_met,
timeout_sec=self._timeout,
backoff_sec=1)

# Get the node the server is on and
# a ducktape node
Expand Down Expand Up @@ -123,7 +127,9 @@ def until_partitions():
example.start()

# Wait until the example is OK to terminate
example.wait()
wait_until(example.condition_met,
timeout_sec=self._timeout,
backoff_sec=1)


class SaramaScramTest(RedpandaTest):
Expand Down

0 comments on commit 006f3d6

Please sign in to comment.