From cff5e214dab3be900553befd91470bcda309c485 Mon Sep 17 00:00:00 2001 From: marcosschroh Date: Wed, 26 Jul 2023 13:22:41 +0200 Subject: [PATCH] fix: Prometheus scrape metrics task fixed in order to have a proper shutdown The monitor shutdown will wait until the scraping Task is done rather than canceling it. Parameter metrics_scrape_time added. --- kstreams/engine.py | 4 ++-- kstreams/prometheus/monitor.py | 30 ++++++++++++++++++++---------- kstreams/streams.py | 14 +++++++++----- 3 files changed, 31 insertions(+), 17 deletions(-) diff --git a/kstreams/engine.py b/kstreams/engine.py index 2c23d20..1b44333 100644 --- a/kstreams/engine.py +++ b/kstreams/engine.py @@ -139,9 +139,9 @@ async def start(self) -> None: self.monitor.start() async def stop(self) -> None: - await self.stop_streams() - await self.stop_producer() await self.monitor.stop() + await self.stop_producer() + await self.stop_streams() async def stop_producer(self): logger.info("Waiting Producer to STOP....") diff --git a/kstreams/prometheus/monitor.py b/kstreams/prometheus/monitor.py index 8f1f4ea..0ac1582 100644 --- a/kstreams/prometheus/monitor.py +++ b/kstreams/prometheus/monitor.py @@ -1,6 +1,6 @@ import asyncio import logging -from typing import DefaultDict, Dict, Optional, TypeVar +from typing import DefaultDict, Dict, List, Optional, TypeVar from prometheus_client import Gauge @@ -17,6 +17,10 @@ class PrometheusMonitor: """ Metrics monitor to keep track of Producers and Consumers. + + Attributes: + metrics_scrape_time float: Amount of seconds that the monitor + will wait until next scrape iteration """ # Producer metrics @@ -51,23 +55,26 @@ class PrometheusMonitor: ["topic", "partition", "consumer_group"], ) - def __init__(self): + def __init__(self, metrics_scrape_time: float = 3): + self.metrics_scrape_time = metrics_scrape_time + self.running = False self._producer = None - self._streams = [] + self._streams: List[Stream] = [] self._task: Optional[asyncio.Task] = None def start(self) -> None: logger.info("Starting Prometheus metrics...") + self.running = True self._task = asyncio.create_task(self._metrics_task()) async def stop(self) -> None: logger.info("Stoping Prometheus metrics...") - if self._task is not None: - self._task.cancel() + self.running = False - # we need to make sure that the task is cancelled + if self._task is not None: + # we need to make sure that the task is `done` # to clean up properly - while not self._task.cancelled(): + while not self._task.done(): await asyncio.sleep(0.1) self._clean_consumer_metrics() @@ -214,10 +221,13 @@ async def generate_consumer_metrics(self, consumer: ConsumerType): async def _metrics_task(self) -> None: """ Asyncio Task that runs in `backgroud` to generate - consumer metrics + consumer metrics. + + When self.running is False the task will finish and it + will be safe to stop consumers and producers. """ - while True: - await asyncio.sleep(3) + while self.running: + await asyncio.sleep(self.metrics_scrape_time) for stream in self._streams: if stream.consumer is not None: try: diff --git a/kstreams/streams.py b/kstreams/streams.py index 0ac7f71..b7236d4 100644 --- a/kstreams/streams.py +++ b/kstreams/streams.py @@ -99,7 +99,7 @@ def __init__( self.func = func self.backend = backend self.consumer_class = consumer_class - self.consumer: Optional[Type[ConsumerType]] = None + self.consumer: Optional[ConsumerType] = None self.config = config or {} self._consumer_task: Optional[asyncio.Task] = None self.name = name or str(uuid.uuid4()) @@ -115,7 +115,7 @@ def __init__( # so we always create a list and then we expand it with *topics self.topics = [topics] if isinstance(topics, str) else topics - def _create_consumer(self) -> Type[ConsumerType]: + def _create_consumer(self) -> ConsumerType: if self.backend is None: raise BackendNotSet("A backend has not been set for this stream") config = {**self.backend.dict(), **self.config} @@ -135,10 +135,14 @@ async def stop(self) -> None: async def _subscribe(self) -> None: # Always create a consumer on stream.start self.consumer = self._create_consumer() - await self.consumer.start() - self.running = True - self.consumer.subscribe(topics=self.topics, listener=self.rebalance_listener) + # add the chech tp avoid `mypy` complains + if self.consumer is not None: + await self.consumer.start() + self.consumer.subscribe( + topics=self.topics, listener=self.rebalance_listener + ) + self.running = True async def commit(self, offsets: Optional[Dict[TopicPartition, int]] = None): await self.consumer.commit(offsets=offsets) # type: ignore