diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index 5c14ba710f..5abffbb090 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -74,6 +74,7 @@ def get_config() -> 'configparser.ConfigParser': _url = 'https://api.binance.com' +_fapi_url = 'https://testnet.binancefuture.com' # XXX: some additional fields are defined in the docs: # https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data @@ -157,6 +158,10 @@ class Client: def __init__(self) -> None: self._sesh = asks.Session(connections=4) self._sesh.base_location = _url + + self._fapi_sesh = ask.Session(connections=4) + self._fapi_sesh.base_location = _fapi_url + self._pairs: dict[str, Any] = {} conf = get_config() @@ -164,9 +169,15 @@ def __init__(self) -> None: self.api_secret = conf.get('api', {}).get('secret') if self.api_key: - self._sesh.headers.update({'X-MBX-APIKEY': self.api_key}) + api_key_header = {'X-MBX-APIKEY': self.api_key} + self._sesh.headers.update(api_key_header) + self._fapi_sesh.headers.update(api_key_header) def _get_signature(self, data: OrderedDict) -> str: + + # XXX: Info on security and authentification + # https://binance-docs.github.io/apidocs/#endpoint-security-type + if not self.api_secret: raise BrokerConfigurationError( 'Attempt to get a signature without setting up credentials' @@ -202,6 +213,25 @@ async def _api( return resproc(resp, log) + async def _fapi( + self, + method: str, + params: Union[dict, OrderedDict], + signed: bool = False, + action: str = 'get' + ) -> Dict[str, Any]: + + if signed: + params['signature'] = self._get_signature(params) + + resp = await getattr(self._fapi_sesh, action)( + path=f'/fapi/v1/{method}', + params=params, + timeout=float('inf') + ) + + return resproc(resp, log) + async def symbol_info( self, @@ -365,6 +395,70 @@ async def submit_limit( # return resp['orderId'] return oid + async def submit_cancel( + self, + symbol: str, + oid: str, + recv_window: int = 60000 + ) -> None: + symbol = symbol.upper() + + params = OrderedDict([ + ('symbol', symbol), + ('orderId', oid), + ('recvWindow', recv_window), + ('timestamp', binance_timestamp(arrow.utcnow())) + ]) + + await self._api( + 'order', + params=params, + signed=True, + action='delete' + ) + + async def get_listen_key(self) -> str: + return await self._api( + 'userDataStream', + params={}, + action='post' + )['listenKey'] + + async def keep_alive_key(self, listen_key: str) -> None: + await self._fapi( + 'userDataStream', + params={'listenKey': listen_key}, + action='put' + ) + + async def close_listen_key(self, listen_key: str) -> None: + await self._fapi( + 'userDataStream', + params={'listenKey': listen_key}, + action='delete' + ) + + @asynccontextmanager + async def manage_listen_key(self): + + async def periodic_keep_alive( + self, + listen_key: str, + timeout=60 * 29 # 29 minutes + ): + while True: + await trio.sleep(timeout) + await self.keep_alive_key(listen_key) + + key = await self.get_listen_key() + + async with trio.open_nursery() as n: + n.start_soon(periodic_keep_alive, key) + yield key + + await self.close_listen_key(key) + + @asynccontextmanager async def get_client() -> Client: client = Client() @@ -591,7 +685,8 @@ async def subscribe(ws: wsproto.WSConnection): async def handle_order_requests( - ems_order_stream: tractor.MsgStream + ems_order_stream: tractor.MsgStream, + symbol: str ) -> None: async with open_cached_client('binance') as client: async for request_msg in ems_order_stream: @@ -625,7 +720,8 @@ async def handle_order_requests( elif action == 'cancel': # msg = BrokerdCancel(**request_msg) - # await run_client_method + # + # await client.submit_cancel(symbol, msg.reqid) ... else: @@ -647,10 +743,16 @@ async def trades_dialogue( async with ( ctx.open_stream() as ems_stream, - trio.open_nursery() as n + trio.open_nursery() as n, + open_cached_client('binance') as client, + client.manage_listen_key() as listen_key, ): n.start_soon(handle_order_requests, ems_stream) await trio.sleep_forever() + # async with open_autorecon_ws( + # f'wss://stream.binance.com:9443/ws/{listen_key}', + # ) as ws: + # ... @tractor.context diff --git a/piker/brokers/cli.py b/piker/brokers/cli.py index 825517baab..185fd19b94 100644 --- a/piker/brokers/cli.py +++ b/piker/brokers/cli.py @@ -43,7 +43,7 @@ @cli.command() @click.option('--keys', '-k', multiple=True, - help='Return results only for these keys') + help='Return results only for these keys') @click.argument('meth', nargs=1) @click.argument('kwargs', nargs=-1) @click.pass_obj @@ -301,64 +301,33 @@ async def main(func): @cli.command() -@click.argument('section', required=True) +@click.argument('section', required=False) @click.argument('value', required=False) @click.option('--delete', '-d', flag_value=True, help='Delete section') @click.pass_obj def brokercfg(config, section, value, delete): - conf, path = broker_conf.load() - - # XXX: Recursive getting & setting - - def get_value(_dict, _section): - subs = _section.split('.') - if len(subs) > 1: - return get_value( - _dict[subs[0]], - '.'.join(subs[1:]), - ) - - else: - return _dict[_section] - - def set_value(_dict, _section, val): - subs = _section.split('.') - if len(subs) > 1: - if subs[0] not in _dict: - _dict[subs[0]] = {} - - return set_value( - _dict[subs[0]], - '.'.join(subs[1:]), - val - ) + """If invoked with no arguments, open an editor to edit broker configs file + or get / update an individual section. + """ - else: - _dict[_section] = val + if section: + conf, path = broker_conf.load() - def del_value(_dict, _section): - subs = _section.split('.') - if len(subs) > 1: - if subs[0] not in _dict: - return + if not delete: + if value: + broker_conf.set_value(conf, section, value) - return del_value( - _dict[subs[0]], - '.'.join(subs[1:]) + click.echo( + colorize_json( + broker_conf.get_value(conf, section)) ) - else: - if _section not in _dict: - return + broker_conf.del_value(conf, section) - del _dict[_section] + broker_conf.write(config=conf) - if not delete: - if value: - set_value(conf, section, value) - - click.echo(colorize_json(get_value(conf, section))) else: - del_value(conf, section) - - broker_conf.write(conf) + conf, path = broker_conf.load(raw=True) + broker_conf.write( + raw=click.edit(text=conf) + ) diff --git a/piker/brokers/config.py b/piker/brokers/config.py index 9208a4b9a2..3ab9db24ba 100644 --- a/piker/brokers/config.py +++ b/piker/brokers/config.py @@ -21,6 +21,7 @@ from os.path import dirname import shutil +from typing import Optional from pathlib import Path import toml @@ -70,7 +71,8 @@ def repodir(): def load( - conf_path: str = None + conf_path: str = None, + raw: bool = False ) -> (dict, str): """Load broker config. """ @@ -82,13 +84,18 @@ def load( conf_path, ) - config = toml.load(conf_path) + if raw: + with open(conf_path, 'r') as cf: + config = cf.read() + else: + config = toml.load(conf_path) log.debug(f"Read config file {conf_path}") return config, conf_path def write( - config: dict, # toml config as dict + config: Optional[dict] = None, # toml config as dict + raw: Optional[str] = None, # toml config as string path: str = None, ) -> None: """Write broker config to disk. @@ -96,15 +103,63 @@ def write( Create a ``brokers.ini`` file if one does not exist. """ path = path or get_broker_conf_path() - dirname = os.path.dirname(path) - if not os.path.isdir(dirname): - log.debug(f"Creating config dir {_config_dir}") - os.makedirs(dirname) + Path(path).parent.mkdir(exist_ok=True) - if not config: + if not config and not raw: raise ValueError( "Watch out you're trying to write a blank config!") log.debug(f"Writing config file {path}") with open(path, 'w') as cf: - return toml.dump(config, cf) + if config: + return toml.dump(config, cf) + elif raw: + return cf.write(raw) + + +# XXX: Recursive getting & setting + +def get_value(_dict, _section): + subs = _section.split('.') + if len(subs) > 1: + return get_value( + _dict[subs[0]], + '.'.join(subs[1:]), + ) + + else: + return _dict[_section] + + +def set_value(_dict, _section, val): + subs = _section.split('.') + if len(subs) > 1: + if subs[0] not in _dict: + _dict[subs[0]] = {} + + return set_value( + _dict[subs[0]], + '.'.join(subs[1:]), + val + ) + + else: + _dict[_section] = val + + +def del_value(_dict, _section): + subs = _section.split('.') + if len(subs) > 1: + if subs[0] not in _dict: + return + + return del_value( + _dict[subs[0]], + '.'.join(subs[1:]) + ) + + else: + if _section not in _dict: + return + + del _dict[_section]