From 52ce878fb457f389c89f62532ff942a8d752d8a2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 10 Mar 2023 17:19:39 -0500 Subject: [PATCH] Break out old `.pp` components into submods: `._ledger` and `._pos` --- piker/accounting/__init__.py | 1041 +--------------------------------- piker/accounting/_ledger.py | 125 ++++ piker/accounting/_pos.py | 961 +++++++++++++++++++++++++++++++ 3 files changed, 1108 insertions(+), 1019 deletions(-) create mode 100644 piker/accounting/_ledger.py create mode 100644 piker/accounting/_pos.py diff --git a/piker/accounting/__init__.py b/piker/accounting/__init__.py index cc8af87762..a371f7c218 100644 --- a/piker/accounting/__init__.py +++ b/piker/accounting/__init__.py @@ -16,1031 +16,33 @@ # along with this program. If not, see . ''' -Personal/Private position parsing, calculating, summarizing in a way -that doesn't try to cuk most humans who prefer to not lose their moneys.. -(looking at you `ib` and dirt-bird friends) +"Accounting for degens": count dem numberz that tracks how much you got +for tendiez. ''' -from __future__ import annotations -from contextlib import contextmanager as cm -from pprint import pformat -import os -from os import path -from math import copysign -import re -import time -from typing import ( - Any, - Iterator, - Optional, - Union, - Generator -) - -import pendulum -from pendulum import datetime, now -import tomli -import toml - -from .. import config -from ..brokers import get_brokermod -from ..clearing._messages import BrokerdPosition, Status -from ..data._source import Symbol, unpack_fqsn -from ..data.types import Struct from ..log import get_logger -log = get_logger(__name__) - - -@cm -def open_trade_ledger( - broker: str, - account: str, - -) -> Generator[dict, None, None]: - ''' - Indempotently create and read in a trade log file from the - ``/ledgers/`` directory. - - Files are named per broker account of the form - ``_.toml``. The ``accountname`` here is the - name as defined in the user's ``brokers.toml`` config. - - ''' - ldir = path.join(config._config_dir, 'ledgers') - if not path.isdir(ldir): - os.makedirs(ldir) - - fname = f'trades_{broker}_{account}.toml' - tradesfile = path.join(ldir, fname) - - if not path.isfile(tradesfile): - log.info( - f'Creating new local trades ledger: {tradesfile}' - ) - with open(tradesfile, 'w') as cf: - pass # touch - with open(tradesfile, 'rb') as cf: - start = time.time() - ledger = tomli.load(cf) - log.info(f'Ledger load took {time.time() - start}s') - cpy = ledger.copy() - - try: - yield cpy - finally: - if cpy != ledger: - - # TODO: show diff output? - # https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries - log.info(f'Updating ledger for {tradesfile}:\n') - ledger.update(cpy) - - # we write on close the mutated ledger data - with open(tradesfile, 'w') as cf: - toml.dump(ledger, cf) - - -class Transaction(Struct, frozen=True): - # TODO: should this be ``.to`` (see below)? - fqsn: str - - sym: Symbol - tid: Union[str, int] # unique transaction id - size: float - price: float - cost: float # commisions or other additional costs - dt: datetime - expiry: datetime | None = None - - # optional key normally derived from the broker - # backend which ensures the instrument-symbol this record - # is for is truly unique. - bsuid: Union[str, int] | None = None - - # optional fqsn for the source "asset"/money symbol? - # from: Optional[str] = None - - -def iter_by_dt( - clears: dict[str, Any], -) -> Iterator[tuple[str, dict]]: - ''' - Iterate entries of a ``clears: dict`` table sorted by entry recorded - datetime presumably set at the ``'dt'`` field in each entry. - - ''' - for tid, data in sorted( - list(clears.items()), - key=lambda item: item[1]['dt'], - ): - yield tid, data - - -class Position(Struct): - ''' - Basic pp (personal/piker position) model with attached clearing - transaction history. - - ''' - symbol: Symbol - - # can be +ve or -ve for long/short - size: float - - # "breakeven price" above or below which pnl moves above and below - # zero for the entirety of the current "trade state". - ppu: float - - # unique backend symbol id - bsuid: str - - split_ratio: Optional[int] = None - - # ordered record of known constituent trade messages - clears: dict[ - Union[str, int, Status], # trade id - dict[str, Any], # transaction history summaries - ] = {} - first_clear_dt: Optional[datetime] = None - - expiry: Optional[datetime] = None - - def to_dict(self) -> dict: - return { - f: getattr(self, f) - for f in self.__struct_fields__ - } - - def to_pretoml(self) -> tuple[str, dict]: - ''' - Prep this position's data contents for export to toml including - re-structuring of the ``.clears`` table to an array of - inline-subtables for better ``pps.toml`` compactness. - - ''' - d = self.to_dict() - clears = d.pop('clears') - expiry = d.pop('expiry') - - if self.split_ratio is None: - d.pop('split_ratio') - - # should be obvious from clears/event table - d.pop('first_clear_dt') - - # TODO: we need to figure out how to have one top level - # listing venue here even when the backend isn't providing - # it via the trades ledger.. - # drop symbol obj in serialized form - s = d.pop('symbol') - fqsn = s.front_fqsn() - - broker, key, suffix = unpack_fqsn(fqsn) - sym_info = s.broker_info[broker] - - d['asset_type'] = sym_info['asset_type'] - d['price_tick_size'] = ( - sym_info.get('price_tick_size') - or - s.tick_size - ) - d['lot_tick_size'] = ( - sym_info.get('lot_tick_size') - or - s.lot_tick_size - ) - - if self.expiry is None: - d.pop('expiry', None) - elif expiry: - d['expiry'] = str(expiry) - - toml_clears_list = [] - - # reverse sort so latest clears are at top of section? - for tid, data in iter_by_dt(clears): - inline_table = toml.TomlDecoder().get_empty_inline_table() - - # serialize datetime to parsable `str` - inline_table['dt'] = str(data['dt']) - - # insert optional clear fields in column order - for k in ['ppu', 'accum_size']: - val = data.get(k) - if val: - inline_table[k] = val - - # insert required fields - for k in ['price', 'size', 'cost']: - inline_table[k] = data[k] - - inline_table['tid'] = tid - toml_clears_list.append(inline_table) - - d['clears'] = toml_clears_list - - return fqsn, d - - def ensure_state(self) -> None: - ''' - Audit either the `.size` and `.ppu` local instance vars against - the clears table calculations and return the calc-ed values if - they differ and log warnings to console. - - ''' - clears = list(self.clears.values()) - self.first_clear_dt = min(list(entry['dt'] for entry in clears)) - last_clear = clears[-1] - - csize = self.calc_size() - accum = last_clear['accum_size'] - if not self.expired(): - if ( - csize != accum - and csize != round(accum * self.split_ratio or 1) - ): - raise ValueError(f'Size mismatch: {csize}') - else: - assert csize == 0, 'Contract is expired but non-zero size?' - - if self.size != csize: - log.warning( - 'Position state mismatch:\n' - f'{self.size} => {csize}' - ) - self.size = csize - - cppu = self.calc_ppu() - ppu = last_clear['ppu'] - if ( - cppu != ppu - and self.split_ratio is not None - # handle any split info entered (for now) manually by user - and cppu != (ppu / self.split_ratio) - ): - raise ValueError(f'PPU mismatch: {cppu}') - - if self.ppu != cppu: - log.warning( - 'Position state mismatch:\n' - f'{self.ppu} => {cppu}' - ) - self.ppu = cppu - - def update_from_msg( - self, - msg: BrokerdPosition, - - ) -> None: - - # XXX: better place to do this? - symbol = self.symbol - - lot_size_digits = symbol.lot_size_digits - ppu, size = ( - round( - msg['avg_price'], - ndigits=symbol.tick_size_digits - ), - round( - msg['size'], - ndigits=lot_size_digits - ), - ) - - self.ppu = ppu - self.size = size - - @property - def dsize(self) -> float: - ''' - The "dollar" size of the pp, normally in trading (fiat) unit - terms. - - ''' - return self.ppu * self.size - - # TODO: idea: "real LIFO" dynamic positioning. - # - when a trade takes place where the pnl for - # the (set of) trade(s) is below the breakeven price - # it may be that the trader took a +ve pnl on a short(er) - # term trade in the same account. - # - in this case we could recalc the be price to - # be reverted back to it's prior value before the nearest term - # trade was opened.? - # def lifo_price() -> float: - # ... - - def iter_clears(self) -> Iterator[tuple[str, dict]]: - ''' - Iterate the internally managed ``.clears: dict`` table in - datetime-stamped order. - - ''' - return iter_by_dt(self.clears) - - def calc_ppu( - self, - # include transaction cost in breakeven price - # and presume the worst case of the same cost - # to exit this transaction (even though in reality - # it will be dynamic based on exit stratetgy). - cost_scalar: float = 2, - - ) -> float: - ''' - Compute the "price-per-unit" price for the given non-zero sized - rolling position. - - The recurrence relation which computes this (exponential) mean - per new clear which **increases** the accumulative postiion size - is: - - ppu[-1] = ( - ppu[-2] * accum_size[-2] - + - ppu[-1] * size - ) / accum_size[-1] - - where `cost_basis` for the current step is simply the price - * size of the most recent clearing transaction. - - ''' - asize_h: list[float] = [] # historical accumulative size - ppu_h: list[float] = [] # historical price-per-unit - - tid: str - entry: dict[str, Any] - for (tid, entry) in self.iter_clears(): - clear_size = entry['size'] - clear_price = entry['price'] - - last_accum_size = asize_h[-1] if asize_h else 0 - accum_size = last_accum_size + clear_size - accum_sign = copysign(1, accum_size) - - sign_change: bool = False - - if accum_size == 0: - ppu_h.append(0) - asize_h.append(0) - continue - - if accum_size == 0: - ppu_h.append(0) - asize_h.append(0) - continue - - # test if the pp somehow went "passed" a net zero size state - # resulting in a change of the "sign" of the size (+ve for - # long, -ve for short). - sign_change = ( - copysign(1, last_accum_size) + accum_sign == 0 - and last_accum_size != 0 - ) - - # since we passed the net-zero-size state the new size - # after sum should be the remaining size the new - # "direction" (aka, long vs. short) for this clear. - if sign_change: - clear_size = accum_size - abs_diff = abs(accum_size) - asize_h.append(0) - ppu_h.append(0) - - else: - # old size minus the new size gives us size diff with - # +ve -> increase in pp size - # -ve -> decrease in pp size - abs_diff = abs(accum_size) - abs(last_accum_size) - - # XXX: LIFO breakeven price update. only an increaze in size - # of the position contributes the breakeven price, - # a decrease does not (i.e. the position is being made - # smaller). - # abs_clear_size = abs(clear_size) - abs_new_size = abs(accum_size) - - if abs_diff > 0: - - cost_basis = ( - # cost basis for this clear - clear_price * abs(clear_size) - + - # transaction cost - accum_sign * cost_scalar * entry['cost'] - ) - - if asize_h: - size_last = abs(asize_h[-1]) - cb_last = ppu_h[-1] * size_last - ppu = (cost_basis + cb_last) / abs_new_size - - else: - ppu = cost_basis / abs_new_size - - ppu_h.append(ppu) - asize_h.append(accum_size) - - else: - # on "exit" clears from a given direction, - # only the size changes not the price-per-unit - # need to be updated since the ppu remains constant - # and gets weighted by the new size. - asize_h.append(accum_size) - ppu_h.append(ppu_h[-1]) - - final_ppu = ppu_h[-1] if ppu_h else 0 - - # handle any split info entered (for now) manually by user - if self.split_ratio is not None: - final_ppu /= self.split_ratio - - return final_ppu - - def expired(self) -> bool: - ''' - Predicate which checks if the contract/instrument is past its expiry. - - ''' - return bool(self.expiry) and self.expiry < now() - - def calc_size(self) -> float: - ''' - Calculate the unit size of this position in the destination - asset using the clears/trade event table; zero if expired. - - ''' - size: float = 0 - - # time-expired pps (normally derivatives) are "closed" - # and have a zero size. - if self.expired(): - return 0 - - for tid, entry in self.clears.items(): - size += entry['size'] - - if self.split_ratio is not None: - size = round(size * self.split_ratio) - - return float( - self.symbol.quantize_size(size), - ) - - def minimize_clears( - self, - - ) -> dict[str, dict]: - ''' - Minimize the position's clears entries by removing - all transactions before the last net zero size to avoid - unecessary history irrelevant to the current pp state. - - ''' - size: float = 0 - clears_since_zero: list[tuple(str, dict)] = [] - - # TODO: we might just want to always do this when iterating - # a ledger? keep a state of the last net-zero and only do the - # full iterate when no state was stashed? - - # scan for the last "net zero" position by iterating - # transactions until the next net-zero size, rinse, repeat. - for tid, clear in self.clears.items(): - size += clear['size'] - clears_since_zero.append((tid, clear)) - - if size == 0: - clears_since_zero.clear() - - self.clears = dict(clears_since_zero) - return self.clears - - def add_clear( - self, - t: Transaction, - ) -> dict: - ''' - Update clearing table and populate rolling ppu and accumulative - size in both the clears entry and local attrs state. - - ''' - clear = self.clears[t.tid] = { - 'cost': t.cost, - 'price': t.price, - 'size': t.size, - 'dt': t.dt - } - - # TODO: compute these incrementally instead - # of re-looping through each time resulting in O(n**2) - # behaviour..? - - # NOTE: we compute these **after** adding the entry in order to - # make the recurrence relation math work inside - # ``.calc_size()``. - self.size = clear['accum_size'] = self.calc_size() - self.ppu = clear['ppu'] = self.calc_ppu() - - return clear - - def sugest_split(self) -> float: - ... - - -class PpTable(Struct): - - brokername: str - acctid: str - pps: dict[str, Position] - conf: Optional[dict] = {} - - def update_from_trans( - self, - trans: dict[str, Transaction], - cost_scalar: float = 2, - - ) -> dict[str, Position]: - - pps = self.pps - updated: dict[str, Position] = {} - - # lifo update all pps from records, ensuring - # we compute the PPU and size sorted in time! - for t in sorted( - trans.values(), - key=lambda t: t.dt, - reverse=True, - ): - pp = pps.setdefault( - t.bsuid, - - # if no existing pp, allocate fresh one. - Position( - Symbol.from_fqsn( - t.fqsn, - info={}, - ) if not t.sym else t.sym, - size=0.0, - ppu=0.0, - bsuid=t.bsuid, - expiry=t.expiry, - ) - ) - clears = pp.clears - if clears: - first_clear_dt = pp.first_clear_dt - - # don't do updates for ledger records we already have - # included in the current pps state. - if ( - t.tid in clears - or ( - first_clear_dt - and t.dt < first_clear_dt - ) - ): - # NOTE: likely you'll see repeats of the same - # ``Transaction`` passed in here if/when you are restarting - # a ``brokerd.ib`` where the API will re-report trades from - # the current session, so we need to make sure we don't - # "double count" these in pp calculations. - continue - - # update clearing table - pp.add_clear(t) - updated[t.bsuid] = pp - - # minimize clears tables and update sizing. - for bsuid, pp in updated.items(): - pp.ensure_state() - - # deliver only the position entries that were actually updated - # (modified the state) from the input transaction set. - return updated - - def dump_active( - self, - ) -> tuple[ - dict[str, Position], - dict[str, Position] - ]: - ''' - Iterate all tabulated positions, render active positions to - a ``dict`` format amenable to serialization (via TOML) and drop - from state (``.pps``) as well as return in a ``dict`` all - ``Position``s which have recently closed. - - ''' - # NOTE: newly closed position are also important to report/return - # since a consumer, like an order mode UI ;), might want to react - # based on the closure (for example removing the breakeven line - # and clearing the entry from any lists/monitors). - closed_pp_objs: dict[str, Position] = {} - open_pp_objs: dict[str, Position] = {} - - pp_objs = self.pps - for bsuid in list(pp_objs): - pp = pp_objs[bsuid] - - # XXX: debug hook for size mismatches - # qqqbsuid = 320227571 - # if bsuid == qqqbsuid: - # breakpoint() - - pp.ensure_state() - - if ( - # "net-zero" is a "closed" position - pp.size == 0 - - # time-expired pps (normally derivatives) are "closed" - or (pp.expiry and pp.expiry < now()) - ): - # for expired cases - pp.size = 0 - - # NOTE: we DO NOT pop the pp here since it can still be - # used to check for duplicate clears that may come in as - # new transaction from some backend API and need to be - # ignored; the closed positions won't be written to the - # ``pps.toml`` since ``pp_active_entries`` above is what's - # written. - closed_pp_objs[bsuid] = pp - - else: - open_pp_objs[bsuid] = pp - - return open_pp_objs, closed_pp_objs - - def to_toml( - self, - ) -> dict[str, Any]: - - active, closed = self.dump_active() - - # ONLY dict-serialize all active positions; those that are closed - # we don't store in the ``pps.toml``. - to_toml_dict = {} - - for bsuid, pos in active.items(): - - # keep the minimal amount of clears that make up this - # position since the last net-zero state. - pos.minimize_clears() - pos.ensure_state() - - # serialize to pre-toml form - fqsn, asdict = pos.to_pretoml() - log.info(f'Updating active pp: {fqsn}') - - # XXX: ugh, it's cuz we push the section under - # the broker name.. maybe we need to rethink this? - brokerless_key = fqsn.removeprefix(f'{self.brokername}.') - to_toml_dict[brokerless_key] = asdict - - return to_toml_dict - - def write_config(self) -> None: - ''' - Write the current position table to the user's ``pps.toml``. - - ''' - # TODO: show diff output? - # https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries - # active, closed_pp_objs = table.dump_active() - pp_entries = self.to_toml() - if pp_entries: - log.info(f'Updating ``pps.toml`` for {path}:\n') - log.info(f'Current positions:\n{pp_entries}') - self.conf[self.brokername][self.acctid] = pp_entries - - elif ( - self.brokername in self.conf and - self.acctid in self.conf[self.brokername] - ): - del self.conf[self.brokername][self.acctid] - if len(self.conf[self.brokername]) == 0: - del self.conf[self.brokername] - - # TODO: why tf haven't they already done this for inline - # tables smh.. - enc = PpsEncoder(preserve=True) - # table_bs_type = type(toml.TomlDecoder().get_empty_inline_table()) - enc.dump_funcs[ - toml.decoder.InlineTableDict - ] = enc.dump_inline_table - - config.write( - self.conf, - 'pps', - encoder=enc, - fail_empty=False - ) - - -def load_pps_from_ledger( - - brokername: str, - acctname: str, - - # post normalization filter on ledger entries to be processed - filter_by: Optional[list[dict]] = None, - -) -> tuple[ - dict[str, Transaction], - dict[str, Position], -]: - ''' - Open a ledger file by broker name and account and read in and - process any trade records into our normalized ``Transaction`` form - and then update the equivalent ``Pptable`` and deliver the two - bsuid-mapped dict-sets of the transactions and pps. - - ''' - with ( - open_trade_ledger(brokername, acctname) as ledger, - open_pps(brokername, acctname) as table, - ): - if not ledger: - # null case, no ledger file with content - return {} - - mod = get_brokermod(brokername) - src_records: dict[str, Transaction] = mod.norm_trade_records(ledger) - - if filter_by: - records = {} - bsuids = set(filter_by) - for tid, r in src_records.items(): - if r.bsuid in bsuids: - records[tid] = r - else: - records = src_records - - updated = table.update_from_trans(records) - - return records, updated - - -# TODO: instead see if we can hack tomli and tomli-w to do the same: -# - https://github.com/hukkin/tomli -# - https://github.com/hukkin/tomli-w -class PpsEncoder(toml.TomlEncoder): - ''' - Special "styled" encoder that makes a ``pps.toml`` redable and - compact by putting `.clears` tables inline and everything else - flat-ish. - - ''' - separator = ',' - - def dump_list(self, v): - ''' - Dump an inline list with a newline after every element and - with consideration for denoted inline table types. - - ''' - retval = "[\n" - for u in v: - if isinstance(u, toml.decoder.InlineTableDict): - out = self.dump_inline_table(u) - else: - out = str(self.dump_value(u)) - - retval += " " + out + "," + "\n" - retval += "]" - return retval - - def dump_inline_table(self, section): - """Preserve inline table in its compact syntax instead of expanding - into subsection. - https://github.com/toml-lang/toml#user-content-inline-table - """ - val_list = [] - for k, v in section.items(): - # if isinstance(v, toml.decoder.InlineTableDict): - if isinstance(v, dict): - val = self.dump_inline_table(v) - else: - val = str(self.dump_value(v)) - - val_list.append(k + " = " + val) - - retval = "{ " + ", ".join(val_list) + " }" - return retval - - def dump_sections(self, o, sup): - retstr = "" - if sup != "" and sup[-1] != ".": - sup += '.' - retdict = self._dict() - arraystr = "" - for section in o: - qsection = str(section) - value = o[section] - - if not re.match(r'^[A-Za-z0-9_-]+$', section): - qsection = toml.encoder._dump_str(section) - - # arrayoftables = False - if ( - self.preserve - and isinstance(value, toml.decoder.InlineTableDict) - ): - retstr += ( - qsection - + - " = " - + - self.dump_inline_table(o[section]) - + - '\n' # only on the final terminating left brace - ) - - # XXX: this code i'm pretty sure is just blatantly bad - # and/or wrong.. - # if isinstance(o[section], list): - # for a in o[section]: - # if isinstance(a, dict): - # arrayoftables = True - # if arrayoftables: - # for a in o[section]: - # arraytabstr = "\n" - # arraystr += "[[" + sup + qsection + "]]\n" - # s, d = self.dump_sections(a, sup + qsection) - # if s: - # if s[0] == "[": - # arraytabstr += s - # else: - # arraystr += s - # while d: - # newd = self._dict() - # for dsec in d: - # s1, d1 = self.dump_sections(d[dsec], sup + - # qsection + "." + - # dsec) - # if s1: - # arraytabstr += ("[" + sup + qsection + - # "." + dsec + "]\n") - # arraytabstr += s1 - # for s1 in d1: - # newd[dsec + "." + s1] = d1[s1] - # d = newd - # arraystr += arraytabstr - - elif isinstance(value, dict): - retdict[qsection] = o[section] - - elif o[section] is not None: - retstr += ( - qsection - + - " = " - + - str(self.dump_value(o[section])) - ) - - # if not isinstance(value, dict): - if not isinstance(value, toml.decoder.InlineTableDict): - # inline tables should not contain newlines: - # https://toml.io/en/v1.0.0#inline-table - retstr += '\n' - - else: - raise ValueError(value) - - retstr += arraystr - return (retstr, retdict) - - -@cm -def open_pps( - brokername: str, - acctid: str, - write_on_exit: bool = False, -) -> Generator[PpTable, None, None]: - ''' - Read out broker-specific position entries from - incremental update file: ``pps.toml``. - - ''' - conf, path = config.load('pps') - brokersection = conf.setdefault(brokername, {}) - pps = brokersection.setdefault(acctid, {}) - - # TODO: ideally we can pass in an existing - # pps state to this right? such that we - # don't have to do a ledger reload all the - # time.. a couple ideas I can think of, - # - mirror this in some client side actor which - # does the actual ledger updates (say the paper - # engine proc if we decide to always spawn it?), - # - do diffs against updates from the ledger writer - # actor and the in-mem state here? - - pp_objs = {} - table = PpTable( - brokername, - acctid, - pp_objs, - conf=conf, - ) - - # unmarshal/load ``pps.toml`` config entries into object form - # and update `PpTable` obj entries. - for fqsn, entry in pps.items(): - bsuid = entry['bsuid'] - symbol = Symbol.from_fqsn( - fqsn, - - # NOTE & TODO: right now we fill in the defaults from - # `.data._source.Symbol` but eventually these should always - # either be already written to the pos table or provided at - # write time to ensure always having these values somewhere - # and thus allowing us to get our pos sizing precision - # correct! - info={ - 'asset_type': entry.get('asset_type', ''), - 'price_tick_size': entry.get('price_tick_size', 0.01), - 'lot_tick_size': entry.get('lot_tick_size', 0.0), - } - ) - - # convert clears sub-tables (only in this form - # for toml re-presentation) back into a master table. - clears_list = entry['clears'] - - # index clears entries in "object" form by tid in a top - # level dict instead of a list (as is presented in our - # ``pps.toml``). - clears = pp_objs.setdefault(bsuid, {}) - - # TODO: should be make a ``Struct`` for clear/event entries? - # convert "clear events table" from the toml config (list of - # a dicts) and load it into object form for use in position - # processing of new clear events. - trans: list[Transaction] = [] - - for clears_table in clears_list: - tid = clears_table.pop('tid') - dtstr = clears_table['dt'] - dt = pendulum.parse(dtstr) - clears_table['dt'] = dt - - trans.append(Transaction( - fqsn=bsuid, - sym=symbol, - bsuid=bsuid, - tid=tid, - size=clears_table['size'], - price=clears_table['price'], - cost=clears_table['cost'], - dt=dt, - )) - clears[tid] = clears_table - - size = entry['size'] - - # TODO: remove but, handle old field name for now - ppu = entry.get( - 'ppu', - entry.get('be_price', 0), - ) - - split_ratio = entry.get('split_ratio') - - expiry = entry.get('expiry') - if expiry: - expiry = pendulum.parse(expiry) - - pp = pp_objs[bsuid] = Position( - symbol, - size=size, - ppu=ppu, - split_ratio=split_ratio, - expiry=expiry, - bsuid=entry['bsuid'], - ) - - # XXX: super critical, we need to be sure to include - # all pps.toml clears to avoid reusing clears that were - # already included in the current incremental update - # state, since today's records may have already been - # processed! - for t in trans: - pp.add_clear(t) +from ._pos import ( + Transaction, + open_trade_ledger, + PpTable, +) +from ._pos import ( + open_pps, + load_pps_from_ledger, + Position, +) - # audit entries loaded from toml - pp.ensure_state() +log = get_logger(__name__) - try: - yield table - finally: - if write_on_exit: - table.write_config() +__all__ = [ + 'Transaction', + 'open_trade_ledger', + 'PpTable', + 'open_pps', + 'load_pps_from_ledger', + 'Position', +] def get_likely_pair( @@ -1075,6 +77,7 @@ def get_likely_pair( if __name__ == '__main__': import sys + from pprint import pformat args = sys.argv assert len(args) > 1, 'Specifiy account(s) from `brokers.toml`' diff --git a/piker/accounting/_ledger.py b/piker/accounting/_ledger.py new file mode 100644 index 0000000000..74bee9ad86 --- /dev/null +++ b/piker/accounting/_ledger.py @@ -0,0 +1,125 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License + +# along with this program. If not, see . +from __future__ import annotations +from contextlib import contextmanager as cm +import os +from os import path +import time +from typing import ( + Any, + Iterator, + Union, + Generator +) + +from pendulum import ( + datetime, +) +import tomli +import toml + +from .. import config +from ..data._source import Symbol +from ..data.types import Struct +from ..log import get_logger + +log = get_logger(__name__) + + +@cm +def open_trade_ledger( + broker: str, + account: str, + +) -> Generator[dict, None, None]: + ''' + Indempotently create and read in a trade log file from the + ``/ledgers/`` directory. + + Files are named per broker account of the form + ``_.toml``. The ``accountname`` here is the + name as defined in the user's ``brokers.toml`` config. + + ''' + ldir = path.join(config._config_dir, 'ledgers') + if not path.isdir(ldir): + os.makedirs(ldir) + + fname = f'trades_{broker}_{account}.toml' + tradesfile = path.join(ldir, fname) + + if not path.isfile(tradesfile): + log.info( + f'Creating new local trades ledger: {tradesfile}' + ) + with open(tradesfile, 'w') as cf: + pass # touch + with open(tradesfile, 'rb') as cf: + start = time.time() + ledger = tomli.load(cf) + log.info(f'Ledger load took {time.time() - start}s') + cpy = ledger.copy() + + try: + yield cpy + finally: + if cpy != ledger: + + # TODO: show diff output? + # https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries + log.info(f'Updating ledger for {tradesfile}:\n') + ledger.update(cpy) + + # we write on close the mutated ledger data + with open(tradesfile, 'w') as cf: + toml.dump(ledger, cf) + + +class Transaction(Struct, frozen=True): + # TODO: should this be ``.to`` (see below)? + fqsn: str + + sym: Symbol + tid: Union[str, int] # unique transaction id + size: float + price: float + cost: float # commisions or other additional costs + dt: datetime + expiry: datetime | None = None + + # optional key normally derived from the broker + # backend which ensures the instrument-symbol this record + # is for is truly unique. + bsuid: Union[str, int] | None = None + + # optional fqsn for the source "asset"/money symbol? + # from: Optional[str] = None + + +def iter_by_dt( + clears: dict[str, Any], +) -> Iterator[tuple[str, dict]]: + ''' + Iterate entries of a ``clears: dict`` table sorted by entry recorded + datetime presumably set at the ``'dt'`` field in each entry. + + ''' + for tid, data in sorted( + list(clears.items()), + key=lambda item: item[1]['dt'], + ): + yield tid, data diff --git a/piker/accounting/_pos.py b/piker/accounting/_pos.py new file mode 100644 index 0000000000..2a9ca0d8e8 --- /dev/null +++ b/piker/accounting/_pos.py @@ -0,0 +1,961 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License + +# along with this program. If not, see . + +''' +Personal/Private position parsing, calculating, summarizing in a way +that doesn't try to cuk most humans who prefer to not lose their moneys.. + +(looking at you `ib` and dirt-bird friends) + +''' +from __future__ import annotations +from contextlib import contextmanager as cm +from math import copysign +import re +from typing import ( + Any, + Iterator, + Optional, + Union, + Generator +) + +import pendulum +from pendulum import datetime, now +import toml + +from ._ledger import ( + Transaction, + iter_by_dt, + open_trade_ledger, +) +from .. import config +from ..brokers import get_brokermod +from ..clearing._messages import BrokerdPosition, Status +from ..data._source import Symbol, unpack_fqsn +from ..data.types import Struct +from ..log import get_logger + +log = get_logger(__name__) + + +class Position(Struct): + ''' + Basic pp (personal/piker position) model with attached clearing + transaction history. + + ''' + symbol: Symbol + + # can be +ve or -ve for long/short + size: float + + # "breakeven price" above or below which pnl moves above and below + # zero for the entirety of the current "trade state". + ppu: float + + # unique backend symbol id + bsuid: str + + split_ratio: Optional[int] = None + + # ordered record of known constituent trade messages + clears: dict[ + Union[str, int, Status], # trade id + dict[str, Any], # transaction history summaries + ] = {} + first_clear_dt: Optional[datetime] = None + + expiry: Optional[datetime] = None + + def to_dict(self) -> dict: + return { + f: getattr(self, f) + for f in self.__struct_fields__ + } + + def to_pretoml(self) -> tuple[str, dict]: + ''' + Prep this position's data contents for export to toml including + re-structuring of the ``.clears`` table to an array of + inline-subtables for better ``pps.toml`` compactness. + + ''' + d = self.to_dict() + clears = d.pop('clears') + expiry = d.pop('expiry') + + if self.split_ratio is None: + d.pop('split_ratio') + + # should be obvious from clears/event table + d.pop('first_clear_dt') + + # TODO: we need to figure out how to have one top level + # listing venue here even when the backend isn't providing + # it via the trades ledger.. + # drop symbol obj in serialized form + s = d.pop('symbol') + fqsn = s.front_fqsn() + + broker, key, suffix = unpack_fqsn(fqsn) + sym_info = s.broker_info[broker] + + d['asset_type'] = sym_info['asset_type'] + d['price_tick_size'] = ( + sym_info.get('price_tick_size') + or + s.tick_size + ) + d['lot_tick_size'] = ( + sym_info.get('lot_tick_size') + or + s.lot_tick_size + ) + + if self.expiry is None: + d.pop('expiry', None) + elif expiry: + d['expiry'] = str(expiry) + + toml_clears_list = [] + + # reverse sort so latest clears are at top of section? + for tid, data in iter_by_dt(clears): + inline_table = toml.TomlDecoder().get_empty_inline_table() + + # serialize datetime to parsable `str` + inline_table['dt'] = str(data['dt']) + + # insert optional clear fields in column order + for k in ['ppu', 'accum_size']: + val = data.get(k) + if val: + inline_table[k] = val + + # insert required fields + for k in ['price', 'size', 'cost']: + inline_table[k] = data[k] + + inline_table['tid'] = tid + toml_clears_list.append(inline_table) + + d['clears'] = toml_clears_list + + return fqsn, d + + def ensure_state(self) -> None: + ''' + Audit either the `.size` and `.ppu` local instance vars against + the clears table calculations and return the calc-ed values if + they differ and log warnings to console. + + ''' + clears = list(self.clears.values()) + self.first_clear_dt = min(list(entry['dt'] for entry in clears)) + last_clear = clears[-1] + + csize = self.calc_size() + accum = last_clear['accum_size'] + if not self.expired(): + if ( + csize != accum + and csize != round(accum * self.split_ratio or 1) + ): + raise ValueError(f'Size mismatch: {csize}') + else: + assert csize == 0, 'Contract is expired but non-zero size?' + + if self.size != csize: + log.warning( + 'Position state mismatch:\n' + f'{self.size} => {csize}' + ) + self.size = csize + + cppu = self.calc_ppu() + ppu = last_clear['ppu'] + if ( + cppu != ppu + and self.split_ratio is not None + # handle any split info entered (for now) manually by user + and cppu != (ppu / self.split_ratio) + ): + raise ValueError(f'PPU mismatch: {cppu}') + + if self.ppu != cppu: + log.warning( + 'Position state mismatch:\n' + f'{self.ppu} => {cppu}' + ) + self.ppu = cppu + + def update_from_msg( + self, + msg: BrokerdPosition, + + ) -> None: + + # XXX: better place to do this? + symbol = self.symbol + + lot_size_digits = symbol.lot_size_digits + ppu, size = ( + round( + msg['avg_price'], + ndigits=symbol.tick_size_digits + ), + round( + msg['size'], + ndigits=lot_size_digits + ), + ) + + self.ppu = ppu + self.size = size + + @property + def dsize(self) -> float: + ''' + The "dollar" size of the pp, normally in trading (fiat) unit + terms. + + ''' + return self.ppu * self.size + + # TODO: idea: "real LIFO" dynamic positioning. + # - when a trade takes place where the pnl for + # the (set of) trade(s) is below the breakeven price + # it may be that the trader took a +ve pnl on a short(er) + # term trade in the same account. + # - in this case we could recalc the be price to + # be reverted back to it's prior value before the nearest term + # trade was opened.? + # def lifo_price() -> float: + # ... + + def iter_clears(self) -> Iterator[tuple[str, dict]]: + ''' + Iterate the internally managed ``.clears: dict`` table in + datetime-stamped order. + + ''' + return iter_by_dt(self.clears) + + def calc_ppu( + self, + # include transaction cost in breakeven price + # and presume the worst case of the same cost + # to exit this transaction (even though in reality + # it will be dynamic based on exit stratetgy). + cost_scalar: float = 2, + + ) -> float: + ''' + Compute the "price-per-unit" price for the given non-zero sized + rolling position. + + The recurrence relation which computes this (exponential) mean + per new clear which **increases** the accumulative postiion size + is: + + ppu[-1] = ( + ppu[-2] * accum_size[-2] + + + ppu[-1] * size + ) / accum_size[-1] + + where `cost_basis` for the current step is simply the price + * size of the most recent clearing transaction. + + ''' + asize_h: list[float] = [] # historical accumulative size + ppu_h: list[float] = [] # historical price-per-unit + + tid: str + entry: dict[str, Any] + for (tid, entry) in self.iter_clears(): + clear_size = entry['size'] + clear_price = entry['price'] + + last_accum_size = asize_h[-1] if asize_h else 0 + accum_size = last_accum_size + clear_size + accum_sign = copysign(1, accum_size) + + sign_change: bool = False + + if accum_size == 0: + ppu_h.append(0) + asize_h.append(0) + continue + + if accum_size == 0: + ppu_h.append(0) + asize_h.append(0) + continue + + # test if the pp somehow went "passed" a net zero size state + # resulting in a change of the "sign" of the size (+ve for + # long, -ve for short). + sign_change = ( + copysign(1, last_accum_size) + accum_sign == 0 + and last_accum_size != 0 + ) + + # since we passed the net-zero-size state the new size + # after sum should be the remaining size the new + # "direction" (aka, long vs. short) for this clear. + if sign_change: + clear_size = accum_size + abs_diff = abs(accum_size) + asize_h.append(0) + ppu_h.append(0) + + else: + # old size minus the new size gives us size diff with + # +ve -> increase in pp size + # -ve -> decrease in pp size + abs_diff = abs(accum_size) - abs(last_accum_size) + + # XXX: LIFO breakeven price update. only an increaze in size + # of the position contributes the breakeven price, + # a decrease does not (i.e. the position is being made + # smaller). + # abs_clear_size = abs(clear_size) + abs_new_size = abs(accum_size) + + if abs_diff > 0: + + cost_basis = ( + # cost basis for this clear + clear_price * abs(clear_size) + + + # transaction cost + accum_sign * cost_scalar * entry['cost'] + ) + + if asize_h: + size_last = abs(asize_h[-1]) + cb_last = ppu_h[-1] * size_last + ppu = (cost_basis + cb_last) / abs_new_size + + else: + ppu = cost_basis / abs_new_size + + ppu_h.append(ppu) + asize_h.append(accum_size) + + else: + # on "exit" clears from a given direction, + # only the size changes not the price-per-unit + # need to be updated since the ppu remains constant + # and gets weighted by the new size. + asize_h.append(accum_size) + ppu_h.append(ppu_h[-1]) + + final_ppu = ppu_h[-1] if ppu_h else 0 + + # handle any split info entered (for now) manually by user + if self.split_ratio is not None: + final_ppu /= self.split_ratio + + return final_ppu + + def expired(self) -> bool: + ''' + Predicate which checks if the contract/instrument is past its expiry. + + ''' + return bool(self.expiry) and self.expiry < now() + + def calc_size(self) -> float: + ''' + Calculate the unit size of this position in the destination + asset using the clears/trade event table; zero if expired. + + ''' + size: float = 0 + + # time-expired pps (normally derivatives) are "closed" + # and have a zero size. + if self.expired(): + return 0 + + for tid, entry in self.clears.items(): + size += entry['size'] + + if self.split_ratio is not None: + size = round(size * self.split_ratio) + + return float( + self.symbol.quantize_size(size), + ) + + def minimize_clears( + self, + + ) -> dict[str, dict]: + ''' + Minimize the position's clears entries by removing + all transactions before the last net zero size to avoid + unecessary history irrelevant to the current pp state. + + ''' + size: float = 0 + clears_since_zero: list[tuple(str, dict)] = [] + + # TODO: we might just want to always do this when iterating + # a ledger? keep a state of the last net-zero and only do the + # full iterate when no state was stashed? + + # scan for the last "net zero" position by iterating + # transactions until the next net-zero size, rinse, repeat. + for tid, clear in self.clears.items(): + size += clear['size'] + clears_since_zero.append((tid, clear)) + + if size == 0: + clears_since_zero.clear() + + self.clears = dict(clears_since_zero) + return self.clears + + def add_clear( + self, + t: Transaction, + ) -> dict: + ''' + Update clearing table and populate rolling ppu and accumulative + size in both the clears entry and local attrs state. + + ''' + clear = self.clears[t.tid] = { + 'cost': t.cost, + 'price': t.price, + 'size': t.size, + 'dt': t.dt + } + + # TODO: compute these incrementally instead + # of re-looping through each time resulting in O(n**2) + # behaviour..? + + # NOTE: we compute these **after** adding the entry in order to + # make the recurrence relation math work inside + # ``.calc_size()``. + self.size = clear['accum_size'] = self.calc_size() + self.ppu = clear['ppu'] = self.calc_ppu() + + return clear + + def sugest_split(self) -> float: + ... + + +class PpTable(Struct): + + brokername: str + acctid: str + pps: dict[str, Position] + conf: Optional[dict] = {} + + def update_from_trans( + self, + trans: dict[str, Transaction], + cost_scalar: float = 2, + + ) -> dict[str, Position]: + + pps = self.pps + updated: dict[str, Position] = {} + + # lifo update all pps from records, ensuring + # we compute the PPU and size sorted in time! + for t in sorted( + trans.values(), + key=lambda t: t.dt, + reverse=True, + ): + pp = pps.setdefault( + t.bsuid, + + # if no existing pp, allocate fresh one. + Position( + Symbol.from_fqsn( + t.fqsn, + info={}, + ) if not t.sym else t.sym, + size=0.0, + ppu=0.0, + bsuid=t.bsuid, + expiry=t.expiry, + ) + ) + clears = pp.clears + if clears: + first_clear_dt = pp.first_clear_dt + + # don't do updates for ledger records we already have + # included in the current pps state. + if ( + t.tid in clears + or ( + first_clear_dt + and t.dt < first_clear_dt + ) + ): + # NOTE: likely you'll see repeats of the same + # ``Transaction`` passed in here if/when you are restarting + # a ``brokerd.ib`` where the API will re-report trades from + # the current session, so we need to make sure we don't + # "double count" these in pp calculations. + continue + + # update clearing table + pp.add_clear(t) + updated[t.bsuid] = pp + + # minimize clears tables and update sizing. + for bsuid, pp in updated.items(): + pp.ensure_state() + + # deliver only the position entries that were actually updated + # (modified the state) from the input transaction set. + return updated + + def dump_active( + self, + ) -> tuple[ + dict[str, Position], + dict[str, Position] + ]: + ''' + Iterate all tabulated positions, render active positions to + a ``dict`` format amenable to serialization (via TOML) and drop + from state (``.pps``) as well as return in a ``dict`` all + ``Position``s which have recently closed. + + ''' + # NOTE: newly closed position are also important to report/return + # since a consumer, like an order mode UI ;), might want to react + # based on the closure (for example removing the breakeven line + # and clearing the entry from any lists/monitors). + closed_pp_objs: dict[str, Position] = {} + open_pp_objs: dict[str, Position] = {} + + pp_objs = self.pps + for bsuid in list(pp_objs): + pp = pp_objs[bsuid] + + # XXX: debug hook for size mismatches + # qqqbsuid = 320227571 + # if bsuid == qqqbsuid: + # breakpoint() + + pp.ensure_state() + + if ( + # "net-zero" is a "closed" position + pp.size == 0 + + # time-expired pps (normally derivatives) are "closed" + or (pp.expiry and pp.expiry < now()) + ): + # for expired cases + pp.size = 0 + + # NOTE: we DO NOT pop the pp here since it can still be + # used to check for duplicate clears that may come in as + # new transaction from some backend API and need to be + # ignored; the closed positions won't be written to the + # ``pps.toml`` since ``pp_active_entries`` above is what's + # written. + closed_pp_objs[bsuid] = pp + + else: + open_pp_objs[bsuid] = pp + + return open_pp_objs, closed_pp_objs + + def to_toml( + self, + ) -> dict[str, Any]: + + active, closed = self.dump_active() + + # ONLY dict-serialize all active positions; those that are closed + # we don't store in the ``pps.toml``. + to_toml_dict = {} + + for bsuid, pos in active.items(): + + # keep the minimal amount of clears that make up this + # position since the last net-zero state. + pos.minimize_clears() + pos.ensure_state() + + # serialize to pre-toml form + fqsn, asdict = pos.to_pretoml() + log.info(f'Updating active pp: {fqsn}') + + # XXX: ugh, it's cuz we push the section under + # the broker name.. maybe we need to rethink this? + brokerless_key = fqsn.removeprefix(f'{self.brokername}.') + to_toml_dict[brokerless_key] = asdict + + return to_toml_dict + + def write_config(self) -> None: + ''' + Write the current position table to the user's ``pps.toml``. + + ''' + # TODO: show diff output? + # https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries + # active, closed_pp_objs = table.dump_active() + pp_entries = self.to_toml() + if pp_entries: + log.info( + f'Updating ``pps.toml``:\n' + f'Current positions:\n{pp_entries}' + ) + self.conf[self.brokername][self.acctid] = pp_entries + + elif ( + self.brokername in self.conf and + self.acctid in self.conf[self.brokername] + ): + del self.conf[self.brokername][self.acctid] + if len(self.conf[self.brokername]) == 0: + del self.conf[self.brokername] + + # TODO: why tf haven't they already done this for inline + # tables smh.. + enc = PpsEncoder(preserve=True) + # table_bs_type = type(toml.TomlDecoder().get_empty_inline_table()) + enc.dump_funcs[ + toml.decoder.InlineTableDict + ] = enc.dump_inline_table + + config.write( + self.conf, + 'pps', + encoder=enc, + fail_empty=False + ) + + +def load_pps_from_ledger( + + brokername: str, + acctname: str, + + # post normalization filter on ledger entries to be processed + filter_by: Optional[list[dict]] = None, + +) -> tuple[ + dict[str, Transaction], + dict[str, Position], +]: + ''' + Open a ledger file by broker name and account and read in and + process any trade records into our normalized ``Transaction`` form + and then update the equivalent ``Pptable`` and deliver the two + bsuid-mapped dict-sets of the transactions and pps. + + ''' + with ( + open_trade_ledger(brokername, acctname) as ledger, + open_pps(brokername, acctname) as table, + ): + if not ledger: + # null case, no ledger file with content + return {} + + mod = get_brokermod(brokername) + src_records: dict[str, Transaction] = mod.norm_trade_records(ledger) + + if filter_by: + records = {} + bsuids = set(filter_by) + for tid, r in src_records.items(): + if r.bsuid in bsuids: + records[tid] = r + else: + records = src_records + + updated = table.update_from_trans(records) + + return records, updated + + +# TODO: instead see if we can hack tomli and tomli-w to do the same: +# - https://github.com/hukkin/tomli +# - https://github.com/hukkin/tomli-w +class PpsEncoder(toml.TomlEncoder): + ''' + Special "styled" encoder that makes a ``pps.toml`` redable and + compact by putting `.clears` tables inline and everything else + flat-ish. + + ''' + separator = ',' + + def dump_list(self, v): + ''' + Dump an inline list with a newline after every element and + with consideration for denoted inline table types. + + ''' + retval = "[\n" + for u in v: + if isinstance(u, toml.decoder.InlineTableDict): + out = self.dump_inline_table(u) + else: + out = str(self.dump_value(u)) + + retval += " " + out + "," + "\n" + retval += "]" + return retval + + def dump_inline_table(self, section): + """Preserve inline table in its compact syntax instead of expanding + into subsection. + https://github.com/toml-lang/toml#user-content-inline-table + """ + val_list = [] + for k, v in section.items(): + # if isinstance(v, toml.decoder.InlineTableDict): + if isinstance(v, dict): + val = self.dump_inline_table(v) + else: + val = str(self.dump_value(v)) + + val_list.append(k + " = " + val) + + retval = "{ " + ", ".join(val_list) + " }" + return retval + + def dump_sections(self, o, sup): + retstr = "" + if sup != "" and sup[-1] != ".": + sup += '.' + retdict = self._dict() + arraystr = "" + for section in o: + qsection = str(section) + value = o[section] + + if not re.match(r'^[A-Za-z0-9_-]+$', section): + qsection = toml.encoder._dump_str(section) + + # arrayoftables = False + if ( + self.preserve + and isinstance(value, toml.decoder.InlineTableDict) + ): + retstr += ( + qsection + + + " = " + + + self.dump_inline_table(o[section]) + + + '\n' # only on the final terminating left brace + ) + + # XXX: this code i'm pretty sure is just blatantly bad + # and/or wrong.. + # if isinstance(o[section], list): + # for a in o[section]: + # if isinstance(a, dict): + # arrayoftables = True + # if arrayoftables: + # for a in o[section]: + # arraytabstr = "\n" + # arraystr += "[[" + sup + qsection + "]]\n" + # s, d = self.dump_sections(a, sup + qsection) + # if s: + # if s[0] == "[": + # arraytabstr += s + # else: + # arraystr += s + # while d: + # newd = self._dict() + # for dsec in d: + # s1, d1 = self.dump_sections(d[dsec], sup + + # qsection + "." + + # dsec) + # if s1: + # arraytabstr += ("[" + sup + qsection + + # "." + dsec + "]\n") + # arraytabstr += s1 + # for s1 in d1: + # newd[dsec + "." + s1] = d1[s1] + # d = newd + # arraystr += arraytabstr + + elif isinstance(value, dict): + retdict[qsection] = o[section] + + elif o[section] is not None: + retstr += ( + qsection + + + " = " + + + str(self.dump_value(o[section])) + ) + + # if not isinstance(value, dict): + if not isinstance(value, toml.decoder.InlineTableDict): + # inline tables should not contain newlines: + # https://toml.io/en/v1.0.0#inline-table + retstr += '\n' + + else: + raise ValueError(value) + + retstr += arraystr + return (retstr, retdict) + + +@cm +def open_pps( + brokername: str, + acctid: str, + write_on_exit: bool = False, +) -> Generator[PpTable, None, None]: + ''' + Read out broker-specific position entries from + incremental update file: ``pps.toml``. + + ''' + conf, path = config.load('pps') + brokersection = conf.setdefault(brokername, {}) + pps = brokersection.setdefault(acctid, {}) + + # TODO: ideally we can pass in an existing + # pps state to this right? such that we + # don't have to do a ledger reload all the + # time.. a couple ideas I can think of, + # - mirror this in some client side actor which + # does the actual ledger updates (say the paper + # engine proc if we decide to always spawn it?), + # - do diffs against updates from the ledger writer + # actor and the in-mem state here? + + pp_objs = {} + table = PpTable( + brokername, + acctid, + pp_objs, + conf=conf, + ) + + # unmarshal/load ``pps.toml`` config entries into object form + # and update `PpTable` obj entries. + for fqsn, entry in pps.items(): + bsuid = entry['bsuid'] + symbol = Symbol.from_fqsn( + fqsn, + + # NOTE & TODO: right now we fill in the defaults from + # `.data._source.Symbol` but eventually these should always + # either be already written to the pos table or provided at + # write time to ensure always having these values somewhere + # and thus allowing us to get our pos sizing precision + # correct! + info={ + 'asset_type': entry.get('asset_type', ''), + 'price_tick_size': entry.get('price_tick_size', 0.01), + 'lot_tick_size': entry.get('lot_tick_size', 0.0), + } + ) + + # convert clears sub-tables (only in this form + # for toml re-presentation) back into a master table. + clears_list = entry['clears'] + + # index clears entries in "object" form by tid in a top + # level dict instead of a list (as is presented in our + # ``pps.toml``). + clears = pp_objs.setdefault(bsuid, {}) + + # TODO: should be make a ``Struct`` for clear/event entries? + # convert "clear events table" from the toml config (list of + # a dicts) and load it into object form for use in position + # processing of new clear events. + trans: list[Transaction] = [] + + for clears_table in clears_list: + tid = clears_table.pop('tid') + dtstr = clears_table['dt'] + dt = pendulum.parse(dtstr) + clears_table['dt'] = dt + + trans.append(Transaction( + fqsn=bsuid, + sym=symbol, + bsuid=bsuid, + tid=tid, + size=clears_table['size'], + price=clears_table['price'], + cost=clears_table['cost'], + dt=dt, + )) + clears[tid] = clears_table + + size = entry['size'] + + # TODO: remove but, handle old field name for now + ppu = entry.get( + 'ppu', + entry.get('be_price', 0), + ) + + split_ratio = entry.get('split_ratio') + + expiry = entry.get('expiry') + if expiry: + expiry = pendulum.parse(expiry) + + pp = pp_objs[bsuid] = Position( + symbol, + size=size, + ppu=ppu, + split_ratio=split_ratio, + expiry=expiry, + bsuid=entry['bsuid'], + ) + + # XXX: super critical, we need to be sure to include + # all pps.toml clears to avoid reusing clears that were + # already included in the current incremental update + # state, since today's records may have already been + # processed! + for t in trans: + pp.add_clear(t) + + # audit entries loaded from toml + pp.ensure_state() + + try: + yield table + finally: + if write_on_exit: + table.write_config()