Skip to content

Commit

Permalink
chore: make mypy happy with types (#113)
Browse files Browse the repository at this point in the history
  • Loading branch information
cheahjs committed Feb 2, 2024
1 parent e13b2f3 commit a8b560d
Show file tree
Hide file tree
Showing 20 changed files with 126 additions and 109 deletions.
99 changes: 51 additions & 48 deletions lib/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,25 @@
import os
import struct
import uuid
from typing import Any, Callable, Optional, Union
from typing import Any, Callable, Optional, Sequence, Union

# Alias stdlib types to avoid name conflicts
_float = float
_bytes = bytes


class UUID:
"""Wrapper around uuid.UUID to delay evaluation of UUIDs until necessary"""

__slots__ = ("raw_bytes", "parsed_uuid")
raw_bytes: bytes
parsed_uuid: uuid.UUID
parsed_uuid: Optional[uuid.UUID]

def __init__(self, raw_bytes: bytes) -> None:
self.raw_bytes = raw_bytes
self.parsed_uuid = None

@staticmethod
def from_str(s: str) -> "UUID":
b = uuid.UUID(s).bytes
return UUID(
Expand Down Expand Up @@ -69,6 +74,12 @@ def __eq__(self, __value: object) -> bool:
return str(self) == str(__value)


# Specify a type for JSON-serializable objects
JSON = Union[
None, bool, int, float, str, list["JSON"], dict[str, "JSON"], UUID, uuid.UUID
]


def instance_id_reader(reader: "FArchiveReader") -> dict[str, UUID]:
return {
"guid": reader.guid(),
Expand Down Expand Up @@ -132,7 +143,7 @@ def bool(self) -> bool:
def fstring(self) -> str:
# in the hot loop, avoid function calls
reader = self.data
(size,) = self.unpack_i32(reader.read(4))
(size,) = FArchiveReader.unpack_i32(reader.read(4))

if size == 0:
return ""
Expand All @@ -153,60 +164,60 @@ def fstring(self) -> str:
try:
escaped = data.decode(encoding, errors="surrogatepass")
print(
f"Error decoding {encoding} string of length {size}, data loss may occur! {bytes(data)}"
f"Error decoding {encoding} string of length {size}, data loss may occur! {bytes(data)!r}"
)
return escaped
except Exception as e:
raise Exception(
f"Error decoding {encoding} string of length {size}: {bytes(data)}"
f"Error decoding {encoding} string of length {size}: {bytes(data)!r}"
) from e

unpack_i16 = struct.Struct("h").unpack

def i16(self) -> int:
return self.unpack_i16(self.data.read(2))[0]
return FArchiveReader.unpack_i16(self.data.read(2))[0]

unpack_u16 = struct.Struct("H").unpack

def u16(self) -> int:
return self.unpack_u16(self.data.read(2))[0]
return FArchiveReader.unpack_u16(self.data.read(2))[0]

unpack_i32 = struct.Struct("i").unpack

def i32(self) -> int:
return self.unpack_i32(self.data.read(4))[0]
return FArchiveReader.unpack_i32(self.data.read(4))[0]

unpack_u32 = struct.Struct("I").unpack

def u32(self) -> int:
return self.unpack_u32(self.data.read(4))[0]
return FArchiveReader.unpack_u32(self.data.read(4))[0]

unpack_i64 = struct.Struct("q").unpack

def i64(self) -> int:
return self.unpack_i64(self.data.read(8))[0]
return FArchiveReader.unpack_i64(self.data.read(8))[0]

unpack_u64 = struct.Struct("Q").unpack

def u64(self) -> int:
return self.unpack_u64(self.data.read(8))[0]
return FArchiveReader.unpack_u64(self.data.read(8))[0]

unpack_float = struct.Struct("f").unpack

def float(self) -> float:
return self.unpack_float(self.data.read(4))[0]
def float(self) -> _float:
return FArchiveReader.unpack_float(self.data.read(4))[0]

unpack_double = struct.Struct("d").unpack

def double(self) -> float:
return self.unpack_double(self.data.read(8))[0]
def double(self) -> _float:
return FArchiveReader.unpack_double(self.data.read(8))[0]

unpack_byte = struct.Struct("B").unpack

def byte(self) -> int:
return self.unpack_byte(self.data.read(1))[0]
return FArchiveReader.unpack_byte(self.data.read(1))[0]

def byte_list(self, size: int) -> list[int]:
def byte_list(self, size: int) -> Sequence[int]:
return struct.unpack(str(size) + "B", self.data.read(size))

def skip(self, size: int) -> None:
Expand All @@ -222,9 +233,7 @@ def optional_guid(self) -> Optional[UUID]:
return UUID(self.data.read(16))
return None

def tarray(
self, type_reader: Callable[["FArchiveReader"], dict[str, Any]]
) -> list[dict[str, Any]]:
def tarray(self, type_reader: Callable[["FArchiveReader"], Any]) -> list[Any]:
count = self.u32()
array = []
for _ in range(count):
Expand Down Expand Up @@ -312,7 +321,6 @@ def property(
_id = self.optional_guid()
self.u32()
count = self.u32()
values = {}
key_path = path + ".Key"
if key_type == "StructProperty":
key_struct_type = self.get_type_or(key_path, "Guid")
Expand All @@ -323,7 +331,7 @@ def property(
value_struct_type = self.get_type_or(value_path, "StructProperty")
else:
value_struct_type = None
values = []
values: list[dict[str, Any]] = []
for _ in range(count):
key = self.prop_value(key_type, key_struct_type, key_path)
value = self.prop_value(value_type, value_struct_type, value_path)
Expand Down Expand Up @@ -441,14 +449,14 @@ def array_value(self, array_type: str, count: int, size: int, path: str):

return values

def compressed_short_rotator(self) -> tuple[float, float, float]:
def compressed_short_rotator(self) -> tuple[_float, _float, _float]:
short_pitch = self.u16() if self.bool() else 0
short_yaw = self.u16() if self.bool() else 0
short_roll = self.u16() if self.bool() else 0
pitch = short_pitch * (360.0 / 65536.0)
yaw = short_yaw * (360.0 / 65536.0)
roll = short_roll * (360.0 / 65536.0)
return [pitch, yaw, roll]
return (pitch, yaw, roll)

def serializeint(self, component_bit_count: int) -> int:
b = bytearray(self.read((component_bit_count + 7) // 8))
Expand All @@ -457,7 +465,7 @@ def serializeint(self, component_bit_count: int) -> int:
value = int.from_bytes(b, "little")
return value

def packed_vector(self, scale_factor: int) -> tuple[float, float, float]:
def packed_vector(self, scale_factor: int) -> tuple[_float, _float, _float]:
component_bit_count_and_extra_info = self.u32()
component_bit_count = component_bit_count_and_extra_info & 63
extra_info = component_bit_count_and_extra_info >> 6
Expand All @@ -471,42 +479,37 @@ def packed_vector(self, scale_factor: int) -> tuple[float, float, float]:
z = (z & (sign_bit - 1)) - (z & sign_bit)

if extra_info:
x /= scale_factor
y /= scale_factor
z /= scale_factor
return (x / scale_factor, y / scale_factor, z / scale_factor)
return (x, y, z)
else:
received_scaler_type_size = 8 if extra_info else 4
if received_scaler_type_size == 8:
return self.vector()
else:
x = self.float()
y = self.float()
z = self.float()
return (x, y, z)
return (self.float(), self.float(), self.float())

def vector(self) -> tuple[float, float, float]:
def vector(self) -> tuple[_float, _float, _float]:
return (self.double(), self.double(), self.double())

def vector_dict(self) -> dict[str, float]:
def vector_dict(self) -> dict[str, _float]:
return {
"x": self.double(),
"y": self.double(),
"z": self.double(),
}

def quat(self) -> tuple[float, float, float, float]:
def quat(self) -> tuple[_float, _float, _float, _float]:
return (self.double(), self.double(), self.double(), self.double())

def quat_dict(self) -> dict[str, float]:
def quat_dict(self) -> dict[str, _float]:
return {
"x": self.double(),
"y": self.double(),
"z": self.double(),
"w": self.double(),
}

def ftransform(self) -> dict[str, dict[str, float]]:
def ftransform(self) -> dict[str, dict[str, _float]]:
return {
"rotation": self.quat_dict(),
"translation": self.vector_dict(),
Expand Down Expand Up @@ -581,7 +584,7 @@ def bytes(self) -> bytes:
self.data.seek(pos)
return b

def write(self, data: bytes):
def write(self, data: _bytes):
self.data.write(data)

def bool(self, bool: bool):
Expand Down Expand Up @@ -625,7 +628,7 @@ def u64(self, i: int):
def float(self, i: float):
self.data.write(struct.pack("f", i))

def double(self, i: float):
def double(self, i: _float):
self.data.write(struct.pack("d", i))

def byte(self, b: int):
Expand All @@ -645,7 +648,7 @@ def optional_guid(self, u: Optional[Union[str, uuid.UUID, UUID]]):
uuid_writer(self, u)

def tarray(
self, type_writer: Callable[["FArchiveWriter", dict[str, Any]], None], array
self, type_writer: Callable[["FArchiveWriter", Any], None], array: list[Any]
):
self.u32(len(array))
for i in range(len(array)):
Expand Down Expand Up @@ -822,7 +825,7 @@ def array_value(self, array_type: str, count: int, values: list[Any]):
else:
raise Exception(f"Unknown array type: {array_type}")

def compressed_short_rotator(self, pitch: float, yaw: float, roll: float):
def compressed_short_rotator(self, pitch: _float, yaw: _float, roll: _float):
short_pitch = round(pitch * (65536.0 / 360.0)) & 0xFFFF
short_yaw = round(yaw * (65536.0 / 360.0)) & 0xFFFF
short_roll = round(roll * (65536.0 / 360.0)) & 0xFFFF
Expand All @@ -843,7 +846,7 @@ def compressed_short_rotator(self, pitch: float, yaw: float, roll: float):
self.bool(False)

@staticmethod
def unreal_round_float_to_int(value: float) -> int:
def unreal_round_float_to_int(value: _float) -> int:
return int(value)

@staticmethod
Expand All @@ -860,7 +863,7 @@ def serializeint(self, component_bit_count: int, value: int):
int.to_bytes(value, (component_bit_count + 7) // 8, "little", signed=True)
)

def packed_vector(self, scale_factor: int, x: float, y: float, z: float):
def packed_vector(self, scale_factor: int, x: _float, y: _float, z: _float):
max_exponent_for_scaling = 52
max_value_to_scale = 1 << max_exponent_for_scaling
max_exponent_after_scaling = 62
Expand Down Expand Up @@ -899,29 +902,29 @@ def packed_vector(self, scale_factor: int, x: float, y: float, z: float):
self.double(y)
self.double(z)

def vector(self, x: float, y: float, z: float):
def vector(self, x: _float, y: _float, z: _float):
self.double(x)
self.double(y)
self.double(z)

def vector_dict(self, value: dict[str, float]):
def vector_dict(self, value: dict[str, _float]):
self.double(value["x"])
self.double(value["y"])
self.double(value["z"])

def quat(self, x: float, y: float, z: float, w: float):
def quat(self, x: _float, y: _float, z: _float, w: _float):
self.double(x)
self.double(y)
self.double(z)
self.double(w)

def quat_dict(self, value: dict[str, float]):
def quat_dict(self, value: dict[str, _float]):
self.double(value["x"])
self.double(value["y"])
self.double(value["z"])
self.double(value["w"])

def ftransform(self, value: dict[str, dict[str, float]]):
def ftransform(self, value: dict[str, dict[str, _float]]):
self.quat_dict(value["rotation"])
self.vector_dict(value["translation"])
self.vector_dict(value["scale3d"])
2 changes: 1 addition & 1 deletion lib/palsav.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def decompress_sav_to_gvas(data: bytes) -> tuple[bytes, int]:
f"not a compressed Palworld save, found too many null bytes, this is likely corrupted"
)
raise Exception(
f"not a compressed Palworld save, found {magic_bytes} instead of {MAGIC_BYTES}"
f"not a compressed Palworld save, found {magic_bytes!r} instead of {MAGIC_BYTES!r}"
)
# Valid save types
if save_type not in [0x30, 0x31, 0x32]:
Expand Down
1 change: 0 additions & 1 deletion lib/paltypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from lib.archive import FArchiveReader, FArchiveWriter
from lib.rawdata import (
base_camp,
base_camp_module,
build_process,
character,
character_container,
Expand Down
19 changes: 10 additions & 9 deletions lib/rawdata/base_camp.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@ def decode(

def decode_bytes(b_bytes: Sequence[int]) -> dict[str, Any]:
reader = FArchiveReader(bytes(b_bytes), debug=False)
data = {}
data["id"] = reader.guid()
data["name"] = reader.fstring()
data["state"] = reader.byte()
data["transform"] = reader.ftransform()
data["area_range"] = reader.float()
data["group_id_belong_to"] = reader.guid()
data["fast_travel_local_transform"] = reader.ftransform()
data["owner_map_object_instance_id"] = reader.guid()
data = {
"id": reader.guid(),
"name": reader.fstring(),
"state": reader.byte(),
"transform": reader.ftransform(),
"area_range": reader.float(),
"group_id_belong_to": reader.guid(),
"fast_travel_local_transform": reader.ftransform(),
"owner_map_object_instance_id": reader.guid(),
}
if not reader.eof():
raise Exception("Warning: EOF not reached")
return data
Expand Down
Loading

0 comments on commit a8b560d

Please sign in to comment.