Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Prometheus scrape metrics task fixed in order to have a proper s… #124

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions kstreams/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ async def start(self) -> None:
self.monitor.start()

async def stop(self) -> None:
await self.stop_streams()
await self.stop_producer()
await self.monitor.stop()
await self.stop_producer()
await self.stop_streams()

async def stop_producer(self):
logger.info("Waiting Producer to STOP....")
Expand Down
30 changes: 20 additions & 10 deletions kstreams/prometheus/monitor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
import logging
from typing import DefaultDict, Dict, Optional, TypeVar
from typing import DefaultDict, Dict, List, Optional, TypeVar

from prometheus_client import Gauge

Expand All @@ -17,6 +17,10 @@
class PrometheusMonitor:
"""
Metrics monitor to keep track of Producers and Consumers.

Attributes:
metrics_scrape_time float: Amount of seconds that the monitor
will wait until next scrape iteration
"""

# Producer metrics
Expand Down Expand Up @@ -51,23 +55,26 @@ class PrometheusMonitor:
["topic", "partition", "consumer_group"],
)

def __init__(self):
def __init__(self, metrics_scrape_time: float = 3):
self.metrics_scrape_time = metrics_scrape_time
self.running = False
self._producer = None
self._streams = []
self._streams: List[Stream] = []
self._task: Optional[asyncio.Task] = None

def start(self) -> None:
logger.info("Starting Prometheus metrics...")
self.running = True
self._task = asyncio.create_task(self._metrics_task())

async def stop(self) -> None:
logger.info("Stoping Prometheus metrics...")
if self._task is not None:
self._task.cancel()
self.running = False

# we need to make sure that the task is cancelled
if self._task is not None:
# we need to make sure that the task is `done`
# to clean up properly
while not self._task.cancelled():
while not self._task.done():
await asyncio.sleep(0.1)

self._clean_consumer_metrics()
Expand Down Expand Up @@ -214,10 +221,13 @@ async def generate_consumer_metrics(self, consumer: ConsumerType):
async def _metrics_task(self) -> None:
"""
Asyncio Task that runs in `backgroud` to generate
consumer metrics
consumer metrics.

When self.running is False the task will finish and it
will be safe to stop consumers and producers.
"""
while True:
await asyncio.sleep(3)
while self.running:
await asyncio.sleep(self.metrics_scrape_time)
for stream in self._streams:
if stream.consumer is not None:
try:
Expand Down
14 changes: 9 additions & 5 deletions kstreams/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def __init__(
self.func = func
self.backend = backend
self.consumer_class = consumer_class
self.consumer: Optional[Type[ConsumerType]] = None
self.consumer: Optional[ConsumerType] = None
self.config = config or {}
self._consumer_task: Optional[asyncio.Task] = None
self.name = name or str(uuid.uuid4())
Expand All @@ -115,7 +115,7 @@ def __init__(
# so we always create a list and then we expand it with *topics
self.topics = [topics] if isinstance(topics, str) else topics

def _create_consumer(self) -> Type[ConsumerType]:
def _create_consumer(self) -> ConsumerType:
if self.backend is None:
raise BackendNotSet("A backend has not been set for this stream")
config = {**self.backend.dict(), **self.config}
Expand All @@ -135,10 +135,14 @@ async def stop(self) -> None:
async def _subscribe(self) -> None:
# Always create a consumer on stream.start
self.consumer = self._create_consumer()
await self.consumer.start()
self.running = True

self.consumer.subscribe(topics=self.topics, listener=self.rebalance_listener)
# add the chech tp avoid `mypy` complains
if self.consumer is not None:
await self.consumer.start()
self.consumer.subscribe(
topics=self.topics, listener=self.rebalance_listener
)
self.running = True

async def commit(self, offsets: Optional[Dict[TopicPartition, int]] = None):
await self.consumer.commit(offsets=offsets) # type: ignore
Expand Down
Loading