Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make kafka output store offset for successfully delivered events #516

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
* add a `number_of_successful_writes` metric to the s3 connector, which counts how many events were successfully written to s3
* make the s3 connector work with the new `_write_backlog` method introduced by the `confluent_kafka` commit bugfix in v9.0.0
* add option to Opensearch Output Connector to use parallel bulk implementation (default is True)

* add `input_connector_metadata` preprocessor that allows input connectors to add a `_metadata` field to events.
* make confluent kafka output store offsets only for successfully delivered events if configured for that.

### Improvements

Expand Down
36 changes: 34 additions & 2 deletions logprep/abc/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ class Config(Connector.Config):
"log_arrival_time_target_field": Optional[str],
"log_arrival_timedelta": Optional[TimeDeltaConfig],
"enrich_by_env_variables": Optional[dict],
"input_connector_metadata": Optional[bool],
},
),
],
Expand Down Expand Up @@ -172,6 +173,8 @@ class Config(Connector.Config):
- `enrich_by_env_variables` - If required it is possible to automatically enrich incoming
events by environment variables. To activate this preprocessor the fields value has to be
a mapping from the target field name (key) to the environment variable name (value).
- `input_connector_metadata` - If set to True, metadata will be added by the input connector
if the connector implements `_add_input_connector_metadata_to_event`.
"""

_version_information: dict = field(
Expand All @@ -186,6 +189,11 @@ class Config(Connector.Config):
output_connector: Optional["Output"]
__slots__ = ["pipeline_index", "output_connector"]

@property
def _add_input_connector_metadata(self):
"""Check and return if input connector metadata should be added or not."""
return bool(self._config.preprocessing.get("input_connector_metadata"))

@property
def _add_hmac(self):
"""Check and return if a hmac should be added or not."""
Expand Down Expand Up @@ -283,6 +291,8 @@ def get_next(self, timeout: float) -> Tuple[Optional[dict], Optional[str]]:
self.metrics.number_of_processed_events += 1
if not isinstance(event, dict):
raise CriticalInputError(self, "not a dict", event)
if self._add_input_connector_metadata:
event, non_critical_error_msg = self._add_input_connector_metadata_to_event(event)
if self._add_hmac:
event, non_critical_error_msg = self._add_hmac_to(event, raw_event)
if self._add_version_info:
Expand All @@ -295,8 +305,30 @@ def get_next(self, timeout: float) -> Tuple[Optional[dict], Optional[str]]:
self._add_env_enrichment_to_event(event)
return event, non_critical_error_msg

def batch_finished_callback(self):
"""Can be called by output connectors after processing a batch of one or more records."""
def batch_finished_callback(self, metadata: Optional[dict] = None):
"""Can be called by output connectors after processing a batch of one or more records.

Parameters
----------
metadata: dict
Metadata that can be passed by outputs.
"""

def _add_input_connector_metadata_to_event(self, event: dict) -> Tuple[dict, Optional[str]]:
"""Add input connector metadata to the event.

Does nothing unless implemented by an input connector.

Parameters
----------
event_dict: dict
The event to which the metadata should be added to

Returns
-------
event_dict: dict
The original event extended with metadata from the input connector.
"""

def _add_env_enrichment_to_event(self, event: dict):
"""Add the env enrichment information to the event"""
Expand Down
88 changes: 76 additions & 12 deletions logprep/connector/confluent_kafka/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
Consumer,
KafkaException,
TopicPartition,
Message,
)

from logprep.abc.connector import Connector
Expand Down Expand Up @@ -209,6 +210,15 @@ class Config(Input.Config):
topic: str = field(validator=validators.instance_of(str))
"""The topic from which new log messages will be fetched."""

use_metadata_for_offsets: bool = field(
validator=validators.instance_of(bool), default=False
)
"""Use metadata to set offsets if this is set to True (default is False).

This must be set appropriately depending on the output connector for the offsets to be set
correctly.
"""

kafka_config: Optional[dict] = field(
validator=[
validators.instance_of(dict),
Expand All @@ -234,12 +244,14 @@ class Config(Input.Config):
"""

_last_valid_records: dict
_last_valid_record: Optional[Message]

__slots__ = ["_last_valid_records"]
__slots__ = ["_last_valid_records", "_last_valid_record"]

def __init__(self, name: str, configuration: "Connector.Config", logger: Logger) -> None:
super().__init__(name, configuration, logger)
self._last_valid_records = {}
self._last_valid_record = None

@cached_property
def _consumer(self) -> Consumer:
Expand Down Expand Up @@ -356,7 +368,7 @@ def describe(self) -> str:
base_description = super().describe()
return f"{base_description} - Kafka Input: {self._config.kafka_config['bootstrap.servers']}"

def _get_raw_event(self, timeout: float) -> bytearray:
def _get_raw_event(self, timeout: float) -> Optional[bytearray]:
"""Get next raw Message from Kafka.

Parameters
Expand Down Expand Up @@ -386,6 +398,7 @@ def _get_raw_event(self, timeout: float) -> bytearray:
self, "A confluent-kafka record contains an error code", kafka_error
)
self._last_valid_records[message.partition()] = message
self._last_valid_record = message
labels = {"description": f"topic: {self._config.topic} - partition: {message.partition()}"}
self.metrics.current_offsets.add_with_labels(message.offset() + 1, labels)
return message.value()
Expand Down Expand Up @@ -421,6 +434,31 @@ def _get_event(self, timeout: float) -> Union[Tuple[None, None], Tuple[dict, dic
) from error
return event_dict, raw_event

def _add_input_connector_metadata_to_event(self, event: dict) -> Tuple[dict, Optional[str]]:
"""Add last_partition and last_offset to _metadata.

Pop previous last_partition and last_offset to ensure no incorrect values are set.
Try for AttributeError, since _metadata could already exist, but not be a dict.
"""
metadata = event.get("_metadata", {})
try:
metadata.pop("last_partition", None)
metadata.pop("last_offset", None)
except AttributeError:
pass

if metadata:
non_critical_error_msg = (
"Couldn't add metadata to the input event as the field '_metadata' already exist."
)
return event, non_critical_error_msg

event["_metadata"] = {
"last_partition": self._last_valid_record.partition(),
"last_offset": self._last_valid_record.offset(),
}
return event, None

@property
def _enable_auto_offset_store(self) -> bool:
return self._config.kafka_config.get("enable.auto.offset.store") == "true"
Expand All @@ -429,24 +467,49 @@ def _enable_auto_offset_store(self) -> bool:
def _enable_auto_commit(self) -> bool:
return self._config.kafka_config.get("enable.auto.commit") == "true"

def batch_finished_callback(self) -> None:
def _get_delivered_partition_offset(self, metadata: dict) -> TopicPartition:
try:
last_partition = metadata["last_partition"]
last_offset = metadata["last_offset"]
except KeyError as error:
raise FatalInputError(
self,
"Missing fields in metadata for setting offsets: "
"'last_partition' and 'last_offset' required",
) from error
return TopicPartition(
self._config.topic,
partition=last_partition,
offset=last_offset if isinstance(last_offset, int) else last_offset[0],
)

def batch_finished_callback(self, metadata: Optional[dict] = None) -> None:
"""Store offsets for each kafka partition in `self._last_valid_records`
and if configured commit them. Should be called by output connectors if
they are finished processing a batch of records.
or instead use `metadata` to obtain offsets. If configured commit them.
Should be called by output connectors if they are finished processing a batch of records.
"""
Comment on lines +489 to 490
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Should be called by output connectors if they are finished processing a batch of records.
"""
Should be called by output connectors if they are finished processing a batch of records.
"""
metadata = {} if metadata is None else metadata

so we ensure it can't be 'None' in further processing. Please adjust the type hints accordingly

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I've added that suggestion.

metadata = {} if metadata is None else metadata
if self._enable_auto_offset_store:
return
self._handle_offsets(self._consumer.store_offsets)
self._handle_offsets(self._consumer.store_offsets, metadata)
if not self._enable_auto_commit:
self._handle_offsets(self._consumer.commit)
self._handle_offsets(self._consumer.commit, metadata)
self._last_valid_records.clear()

def _handle_offsets(self, offset_handler: Callable) -> None:
for message in self._last_valid_records.values():
def _handle_offsets(self, offset_handler: Callable, metadata: Optional[dict]) -> None:
if self._config.use_metadata_for_offsets:
delivered_offset = self._get_delivered_partition_offset(metadata)
try:
offset_handler(message=message)
offset_handler(offsets=[delivered_offset])
except KafkaException as error:
raise InputWarning(self, f"{error}, {message}") from error
raise InputWarning(self, f"{error}, {delivered_offset}") from error
else:
records = self._last_valid_records.values()
for record in records:
try:
offset_handler(message=record)
except KafkaException as error:
raise InputWarning(self, f"{error}, {record}") from error

def _assign_callback(self, consumer, topic_partitions):
for topic_partition in topic_partitions:
Expand All @@ -472,7 +535,8 @@ def _revoke_callback(self, consumer, topic_partitions):
f"partition {topic_partition.partition}"
)
self.output_connector._write_backlog()
self.batch_finished_callback()
if not self._config.use_metadata_for_offsets:
self.batch_finished_callback()

def _lost_callback(self, consumer, topic_partitions):
for topic_partition in topic_partitions:
Expand Down
Loading
Loading