-
Notifications
You must be signed in to change notification settings - Fork 573
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
tools[log_viewer]: Populate txn control markers
Moves the logic in KafkaLog like other methods, cleans up duplicate code. Introduces KafkaControlRecordType For transactional && control records we extract the exact control record type(commit/abort).
- Loading branch information
Showing
3 changed files
with
56 additions
and
34 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,16 +1,57 @@ | ||
from storage import Segment | ||
from enum import Enum | ||
from io import BytesIO | ||
from reader import Reader | ||
import struct | ||
|
||
|
||
class KafkaControlRecordType(Enum): | ||
"""Valid control record types defined by Kafka, currently only | ||
used for transactional control records. | ||
Keep this in sync with model::control_record_type | ||
""" | ||
tx_abort = 0 | ||
tx_commit = 1 | ||
unknown = -1 | ||
|
||
|
||
class KafkaLog: | ||
def __init__(self, ntp): | ||
def __init__(self, ntp, headers_only): | ||
self.ntp = ntp | ||
self.headers_only = headers_only | ||
self.results = None | ||
|
||
def batch_headers(self): | ||
def get_control_record_type(self, key): | ||
rdr = Reader(BytesIO(key)) | ||
rdr.skip(2) # skip the 16bit version. | ||
# Encoded as big endian | ||
type_rdr = Reader(BytesIO(struct.pack(">h", rdr.read_int16()))) | ||
return KafkaControlRecordType(type_rdr.read_int16()).name | ||
|
||
def decode(self): | ||
self.results = [] | ||
for batch in self.batches(): | ||
yield batch.header_dict() | ||
header = batch.header_dict() | ||
self.results.append(header) | ||
if not self.headers_only: | ||
records = [] | ||
for record in batch: | ||
attrs = header["expanded_attrs"] | ||
is_tx_ctrl = attrs["transactional"] and attrs[ | ||
"control_batch"] | ||
record_dict = record.kv_dict() | ||
records.append(record_dict) | ||
if is_tx_ctrl: | ||
record_dict["type"] = self.get_control_record_type( | ||
record.key) | ||
self.results.append(records) | ||
|
||
def result(self): | ||
assert self.results | ||
return self.results | ||
|
||
def batches(self): | ||
for path in self.ntp.segments: | ||
s = Segment(path) | ||
for batch in s: | ||
yield batch | ||
yield batch |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters