diff --git a/tests/core/swiftapiresolver/run.py b/tests/core/swiftapiresolver/run.py index c2f5a32f76..ff30361fc4 100644 --- a/tests/core/swiftapiresolver/run.py +++ b/tests/core/swiftapiresolver/run.py @@ -1,29 +1,80 @@ import frida +from frida_tools.application import Reactor from pathlib import Path +import subprocess import sys +import threading import time -def on_message(message, data): - print("on_message:", message) +class Controller: + def __init__(self): + self._stop_requested = threading.Event() + self._reactor = Reactor(run_until_return=lambda reactor: self._stop_requested.wait()) + runner_src_dir = Path(__file__).parent + self._runner_js = runner_src_dir / "runner.js" + self._runner_dylib = runner_src_dir.parent.parent.parent.parent / "build" / "tmp-macos-arm64" / "frida-gum" / "tests" / "core" / "swiftapiresolver" / "libtestswiftapiresolver.dylib" -runner_src_dir = Path(__file__).parent -runner_path = runner_src_dir.parent.parent.parent.parent / "build" / "tmp-macos-arm64" / "frida-gum" / "tests" / "core" / "swiftapiresolver" / "libtestswiftapiresolver.dylib" + self._device = None + self._session = None + self._script = None -device = frida.get_remote_device() + def run(self): + self._reactor.schedule(lambda: self._start()) + self._reactor.run() -session = device.attach("Xcode") + def _start(self): + device = frida.get_remote_device() + self._device = device -script = session.create_script((runner_src_dir / "runner.js").read_text(encoding="utf-8")) -script.on("message", on_message) -script.load() + session = device.attach("Xcode") + session.on("detached", lambda reason: self._reactor.schedule(lambda: self._on_detached(reason))) + self._session = session -script.post({ "type": "start" }, runner_path.read_bytes()) + script = session.create_script(self._runner_js.read_text(encoding="utf-8")) + script.on("message", lambda message, data: self._reactor.schedule(lambda: self._on_message(message, data))) + script.load() + self._script = script -print("Running...") -t1 = time.time() -num_matches = script.exports_sync.run("*!*") -t2 = time.time() -duration = int((t2 - t1) * 1000) -print(f"Got {num_matches} matches in {duration} ms.") + script.post({ "type": "start" }, self._runner_dylib.read_bytes()) + + worker = threading.Thread(target=self._run_tests) + worker.start() + + def _run_tests(self): + print("Running...") + t1 = time.time() + num_matches = self._script.exports_sync.run("*!*") + t2 = time.time() + duration = int((t2 - t1) * 1000) + print(f"Got {num_matches} matches in {duration} ms.") + self._stop_requested.set() + + def _on_detached(self, reason): + print(f"⚡ detached: reason='{reason}'") + self._script = None + self._session = None + self._stop_requested.set() + + def _on_message(self, message, data): + handled = False + if message["type"] == "send": + payload = message["payload"] + if payload["type"] == "ready": + self._on_ready(payload["symbols"]) + handled = True + if not handled: + print(f"⚡ message: payload={message['payload']}") + + def _on_ready(self, symbols): + for line in subprocess.run(["nm", self._runner_dylib], capture_output=True, encoding="utf-8").stdout.split("\n"): + if line.endswith(" T _init"): + tokens = line.split(" ") + init_rva = int(tokens[0], 16) + runner_base = int(symbols["init"], 16) - init_rva + print(f"Runner is loaded at 0x{runner_base:x}") + + +controller = Controller() +controller.run() diff --git a/tests/core/swiftapiresolver/runner.js b/tests/core/swiftapiresolver/runner.js index f3cb845334..e9e5e0e3b9 100644 --- a/tests/core/swiftapiresolver/runner.js +++ b/tests/core/swiftapiresolver/runner.js @@ -12,6 +12,7 @@ class Runner { _onStart = (message, data) => { this._cm = new CModule(data); this._run = new NativeFunction(this._cm.run, 'uint', ['pointer'], { exceptions: 'propagate' }); + send({ type: 'ready', symbols: this._cm }); }; } diff --git a/tools/symbolicate.py b/tools/symbolicate.py new file mode 100755 index 0000000000..a66be1f84a --- /dev/null +++ b/tools/symbolicate.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python3 + +import argparse +from collections.abc import Iterable, Mapping +from dataclasses import dataclass +from pathlib import Path +import re +import subprocess + + +RAW_ADDRESS_PATTERN = re.compile(r"\b(0x[0-9a-f]+)\b") + +@dataclass +class DeclaredModule: + path: Path + start: int + end: int + + def __hash__(self): + return self.path.__hash__() + +PendingAddresses = Mapping[DeclaredModule, set[int]] + + +def main(): + parser = argparse.ArgumentParser(description="Symbolicate stack traces.") + parser.add_argument("--input", dest="input", required=True, + help="the file to symbolicate") + parser.add_argument("--output", dest="output", required=True, + help="where the symbolicated file will be written") + parser.add_argument("--declare-module", dest="modules", required=True, action="append", + help="declare a module at path:base") + args = parser.parse_args() + + modules = [] + for mod in args.modules: + raw_path, raw_base = mod.split(":", maxsplit=1) + path = Path(raw_path) + base = int(raw_base, 16) + size = compute_module_size(path) + modules.append(DeclaredModule(path, base, base + size)) + + with Path(args.input).open(encoding="utf-8") as input_file: + addresses = compute_pending_addresses(input_file, modules) + + symbols = symbolicate_pending_addresses(addresses) + + def symbolicate(m): + raw_address = m.group(1) + address = int(raw_address, 16) + + name = symbols.get(address, None) + if name is not None: + return name + + return raw_address + + with Path(args.input).open(encoding="utf-8") as input_file, \ + Path(args.output).open("w", encoding="utf-8") as output_file: + for line_raw in input_file: + line_symbolicated = RAW_ADDRESS_PATTERN.sub(symbolicate, line_raw) + output_file.write(line_symbolicated) + + +def compute_pending_addresses(data: Iterable[str], modules: Iterable[DeclaredModule]) -> PendingAddresses: + addresses = {} + for raw_line in data: + for match in RAW_ADDRESS_PATTERN.finditer(raw_line): + address = int(match.group(1), 16) + module = find_declared_module_by_address(address, modules) + if module is not None: + pending = addresses.get(module, None) + if pending is None: + pending = set() + addresses[module] = pending + pending.add(address) + return addresses + + +def symbolicate_pending_addresses(addresses: PendingAddresses) -> Mapping[int, str]: + symbols = {} + for module, pending in addresses.items(): + pending = list(pending) + pending.sort() + query = subprocess.run([ + "atos", + "-o", module.path, + "-l", hex(module.start), + ] + [hex(address) for address in pending], + capture_output=True, + encoding="utf-8", + check=True) + symbols.update(dict(zip(pending, query.stdout.split("\n")))) + return symbols + + +def find_declared_module_by_address(address, modules): + for m in modules: + if address >= m.start and address < m.end: + return m + return None + + +def compute_module_size(path: Path) -> int: + for raw_line in subprocess.run(["otool", "-l", path], capture_output=True, encoding="utf-8").stdout.split("\n"): + line = raw_line.lstrip() + if line.startswith("vmsize"): + tokens = line.split(" ", maxsplit=1) + return int(tokens[1], 16) + assert False + + +if __name__ == "__main__": + main()