Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: option to disable monitoring during testing added #125

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/test_client.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,3 +257,13 @@ async def test_event_produced():
it shows what to do when the `procuder code` is encapsulated in other functions,
for example a `FastAPI` view.
Then you don't want to use `client.send` directly, just called the function that contains `stream_engine.send(...)`

## Disabling monitoring during testing

Monitoring streams and producers is vital for streaming application but it requires extra effort. Sometimes during testing,
monitoring is not required as we only want to focus on testing the buisness logic. In order to disable monitoring
during testing use:

```python
client = TestStreamClient(stream_engine, monitoring_enabled=False)
```
33 changes: 32 additions & 1 deletion kstreams/test_utils/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from kstreams import ConsumerRecord
from kstreams.engine import StreamEngine
from kstreams.prometheus.monitor import PrometheusMonitor
from kstreams.serializers import Serializer
from kstreams.streams import Stream
from kstreams.types import Headers
Expand All @@ -12,19 +13,48 @@
from .topics import Topic, TopicManager


class TestMonitor(PrometheusMonitor):
__test__ = False

def start(self, *args, **kwargs) -> None:
print("herte....")
# ...

async def stop(self, *args, **kwargs) -> None:
...

def add_topic_partition_offset(self, *args, **kwargs) -> None:
...

def clean_stream_consumer_metrics(self, *args, **kwargs) -> None:
...

Check warning on line 30 in kstreams/test_utils/test_utils.py

View check run for this annotation

Codecov / codecov/patch

kstreams/test_utils/test_utils.py#L30

Added line #L30 was not covered by tests

def add_producer(self, *args, **kwargs):
...

def add_streams(self, *args, **kwargs):
...


class TestStreamClient:
__test__ = False

def __init__(self, stream_engine: StreamEngine) -> None:
def __init__(
self, stream_engine: StreamEngine, monitoring_enabled: bool = True
) -> None:
self.stream_engine = stream_engine

# store the user clients to restore them later
self.monitor = stream_engine.monitor
self.producer_class = self.stream_engine.producer_class
self.consumer_class = self.stream_engine.consumer_class

self.stream_engine.producer_class = TestProducer
self.stream_engine.consumer_class = TestConsumer

if not monitoring_enabled:
self.stream_engine.monitor = TestMonitor()

def mock_streams(self) -> None:
streams: List[Stream] = self.stream_engine._streams
for stream in streams:
Expand All @@ -46,6 +76,7 @@
# restore original config
self.stream_engine.producer_class = self.producer_class
self.stream_engine.consumer_class = self.consumer_class
self.stream_engine.monitor = self.monitor

# clean the topics after finishing the test to make sure that
# no data is left tover
Expand Down
24 changes: 20 additions & 4 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,18 @@ async def test_engine_clients(stream_engine: StreamEngine):


@pytest.mark.asyncio
async def test_send_event_with_test_client(stream_engine: StreamEngine):
@pytest.mark.parametrize(
"monitoring_enabled",
(
True,
False,
),
)
async def test_send_event_with_test_client(
stream_engine: StreamEngine, monitoring_enabled: bool
):
topic = "local--kstreams"
client = TestStreamClient(stream_engine)
client = TestStreamClient(stream_engine, monitoring_enabled=monitoring_enabled)

async with client:
metadata = await client.send(
Expand Down Expand Up @@ -278,11 +287,18 @@ async def test_e2e_example():


@pytest.mark.asyncio
async def test_e2e_consume_multiple_topics():
@pytest.mark.parametrize(
"monitoring_enabled",
(
True,
False,
),
)
async def test_e2e_consume_multiple_topics(monitoring_enabled):
from examples.consume_multiple_topics import produce, stream_engine, topics

total_events = 2
client = TestStreamClient(stream_engine)
client = TestStreamClient(stream_engine, monitoring_enabled=monitoring_enabled)

async with client:
await produce(total_events)
Expand Down
Loading