Skip to content

Commit

Permalink
fix: consumer committed metrics should use committed and not last_sta…
Browse files Browse the repository at this point in the history
…ble_offset
  • Loading branch information
marcosschroh committed Jul 20, 2023
1 parent f8f3158 commit 84beb8b
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 9 deletions.
8 changes: 4 additions & 4 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ In order to run the examples you need `docker-compose`. In the ptoject root you

### Examples

1. `simple.py` example: minimal `kstream` example that `produces` and `consumes` events.
2. `consumer_multiple_topics.py`: A streams that consumes from multiple kafka topics.
3. `json_serialization.py`: Example of how to use custom `serializers` and `deserializers` with `kstreams`. In this case we want `json`.
4. `fastapi_example`: An example of how to integrate `kstreams` with `FastAPI`. Execute it with `python -m fastapi_example`
1. `simple.py` example: minimal `kstream` example that `produces` and `consumes` events. The consumed events are printed in the terminal. Execute it with `poetry run python simple.py`
2. `consumer_multiple_topics.py`: A streams that consumes from multiple kafka topics. Execute it with `poetry run python consumer_multiple_topics.py`
3. `json_serialization.py`: Example of how to use custom `serializers` and `deserializers` with `kstreams`. In this case we want `json`. Execute it with `poetry run python json_serialization.py`
4. [fastapi-webapp](https://github.com/kpn/kstreams/tree/0.11.8/examples/fastapi-webapp): An example of how to integrate `kstreams` with `FastAPI`.
1 change: 1 addition & 0 deletions examples/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def total(self):
@stream_engine.stream(topic, group_id="example-group")
async def consume(stream: Stream):
async for cr in stream:
print(cr)
event_store.add(cr)


Expand Down
2 changes: 1 addition & 1 deletion kstreams/prometheus/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ async def generate_consumer_metrics(self, consumer: ConsumerType):
topic_partitions = consumer.assignment()

for topic_partition in topic_partitions:
committed = consumer.last_stable_offset(topic_partition)
committed = await consumer.committed(topic_partition) or 0
position = await consumer.position(topic_partition)
highwater = consumer.highwater(topic_partition)

Expand Down
2 changes: 1 addition & 1 deletion kstreams/test_utils/test_clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ async def commit(self, offsets: Optional[Dict[TopicPartition, int]] = None) -> N
return None

async def committed(self, topic_partition: TopicPartition) -> Optional[int]:
return self.partitions_committed.get(topic_partition)
return self.partitions_committed.get(topic_partition, 0)

async def end_offsets(
self, partitions: List[TopicPartition]
Expand Down
9 changes: 6 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,16 @@ def subscribe(
def assignment(self):
return self._assigments

def last_stable_offset(self, topic_partition: TopicPartition):
def last_stable_offset(self, _: TopicPartition):
return 10

async def position(self, topic_partition: TopicPartition):
async def position(self, _: TopicPartition):
return 10

def highwater(self, topic_partition: TopicPartition):
def highwater(self, _: TopicPartition):
return 10

async def committed(self, _: TopicPartition):
return 10


Expand Down

0 comments on commit 84beb8b

Please sign in to comment.