Skip to content

Commit

Permalink
Move dict modification helper functions to config module
Browse files Browse the repository at this point in the history
Add submit_cancel & listen key managment to binance backend
  • Loading branch information
Guillermo Rodriguez committed Jun 22, 2021
1 parent 5263eb5 commit 14449dd
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 63 deletions.
110 changes: 106 additions & 4 deletions piker/brokers/binance.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def get_config() -> 'configparser.ConfigParser':


_url = 'https://api.binance.com'
_fapi_url = 'https://testnet.binancefuture.com'

# XXX: some additional fields are defined in the docs:
# https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data
Expand Down Expand Up @@ -157,16 +158,26 @@ class Client:
def __init__(self) -> None:
self._sesh = asks.Session(connections=4)
self._sesh.base_location = _url

self._fapi_sesh = ask.Session(connections=4)
self._fapi_sesh.base_location = _fapi_url

self._pairs: dict[str, Any] = {}

conf = get_config()
self.api_key = conf.get('api', {}).get('key')
self.api_secret = conf.get('api', {}).get('secret')

if self.api_key:
self._sesh.headers.update({'X-MBX-APIKEY': self.api_key})
api_key_header = {'X-MBX-APIKEY': self.api_key}
self._sesh.headers.update(api_key_header)
self._fapi_sesh.headers.update(api_key_header)

def _get_signature(self, data: OrderedDict) -> str:

# XXX: Info on security and authentification
# https://binance-docs.github.io/apidocs/#endpoint-security-type

if not self.api_secret:
raise BrokerConfigurationError(
'Attempt to get a signature without setting up credentials'
Expand Down Expand Up @@ -202,6 +213,25 @@ async def _api(

return resproc(resp, log)

async def _fapi(
self,
method: str,
params: Union[dict, OrderedDict],
signed: bool = False,
action: str = 'get'
) -> Dict[str, Any]:

if signed:
params['signature'] = self._get_signature(params)

resp = await getattr(self._fapi_sesh, action)(
path=f'/fapi/v1/{method}',
params=params,
timeout=float('inf')
)

return resproc(resp, log)

async def symbol_info(

self,
Expand Down Expand Up @@ -365,6 +395,70 @@ async def submit_limit(
# return resp['orderId']
return oid

async def submit_cancel(
self,
symbol: str,
oid: str,
recv_window: int = 60000
) -> None:
symbol = symbol.upper()

params = OrderedDict([
('symbol', symbol),
('orderId', oid),
('recvWindow', recv_window),
('timestamp', binance_timestamp(arrow.utcnow()))
])

await self._api(
'order',
params=params,
signed=True,
action='delete'
)

async def get_listen_key(self) -> str:
return await self._api(
'userDataStream',
params={},
action='post'
)['listenKey']

async def keep_alive_key(self, listen_key: str) -> None:
await self._fapi(
'userDataStream',
params={'listenKey': listen_key},
action='put'
)

async def close_listen_key(self, listen_key: str) -> None:
await self._fapi(
'userDataStream',
params={'listenKey': listen_key},
action='delete'
)

@asynccontextmanager
async def manage_listen_key(self):

async def periodic_keep_alive(
self,
listen_key: str,
timeout=60 * 29 # 29 minutes
):
while True:
await trio.sleep(timeout)
await self.keep_alive_key(listen_key)

key = await self.get_listen_key()

async with trio.open_nursery() as n:
n.start_soon(periodic_keep_alive, key)
yield key

await self.close_listen_key(key)


@asynccontextmanager
async def get_client() -> Client:
client = Client()
Expand Down Expand Up @@ -591,7 +685,8 @@ async def subscribe(ws: wsproto.WSConnection):


async def handle_order_requests(
ems_order_stream: tractor.MsgStream
ems_order_stream: tractor.MsgStream,
symbol: str
) -> None:
async with open_cached_client('binance') as client:
async for request_msg in ems_order_stream:
Expand Down Expand Up @@ -625,7 +720,8 @@ async def handle_order_requests(

elif action == 'cancel':
# msg = BrokerdCancel(**request_msg)
# await run_client_method
#
# await client.submit_cancel(symbol, msg.reqid)
...

else:
Expand All @@ -647,10 +743,16 @@ async def trades_dialogue(

async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n
trio.open_nursery() as n,
open_cached_client('binance') as client,
client.manage_listen_key() as listen_key,
):
n.start_soon(handle_order_requests, ems_stream)
await trio.sleep_forever()
# async with open_autorecon_ws(
# f'wss://stream.binance.com:9443/ws/{listen_key}',
# ) as ws:
# ...


@tractor.context
Expand Down
69 changes: 19 additions & 50 deletions piker/brokers/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@

@cli.command()
@click.option('--keys', '-k', multiple=True,
help='Return results only for these keys')
help='Return results only for these keys')
@click.argument('meth', nargs=1)
@click.argument('kwargs', nargs=-1)
@click.pass_obj
Expand Down Expand Up @@ -301,64 +301,33 @@ async def main(func):


@cli.command()
@click.argument('section', required=True)
@click.argument('section', required=False)
@click.argument('value', required=False)
@click.option('--delete', '-d', flag_value=True, help='Delete section')
@click.pass_obj
def brokercfg(config, section, value, delete):
conf, path = broker_conf.load()

# XXX: Recursive getting & setting

def get_value(_dict, _section):
subs = _section.split('.')
if len(subs) > 1:
return get_value(
_dict[subs[0]],
'.'.join(subs[1:]),
)

else:
return _dict[_section]

def set_value(_dict, _section, val):
subs = _section.split('.')
if len(subs) > 1:
if subs[0] not in _dict:
_dict[subs[0]] = {}

return set_value(
_dict[subs[0]],
'.'.join(subs[1:]),
val
)
"""If invoked with no arguments, open an editor to edit broker configs file
or get / update an individual section.
"""

else:
_dict[_section] = val
if section:
conf, path = broker_conf.load()

def del_value(_dict, _section):
subs = _section.split('.')
if len(subs) > 1:
if subs[0] not in _dict:
return
if not delete:
if value:
broker_conf.set_value(conf, section, value)

return del_value(
_dict[subs[0]],
'.'.join(subs[1:])
click.echo(
colorize_json(
broker_conf.get_value(conf, section))
)

else:
if _section not in _dict:
return
broker_conf.del_value(conf, section)

del _dict[_section]
broker_conf.write(config=conf)

if not delete:
if value:
set_value(conf, section, value)

click.echo(colorize_json(get_value(conf, section)))
else:
del_value(conf, section)

broker_conf.write(conf)
conf, path = broker_conf.load(raw=True)
broker_conf.write(
raw=click.edit(text=conf)
)
73 changes: 64 additions & 9 deletions piker/brokers/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from os.path import dirname
import shutil

from typing import Optional
from pathlib import Path

import toml
Expand Down Expand Up @@ -70,7 +71,8 @@ def repodir():


def load(
conf_path: str = None
conf_path: str = None,
raw: bool = False
) -> (dict, str):
"""Load broker config.
"""
Expand All @@ -82,29 +84,82 @@ def load(
conf_path,
)

config = toml.load(conf_path)
if raw:
with open(conf_path, 'r') as cf:
config = cf.read()
else:
config = toml.load(conf_path)
log.debug(f"Read config file {conf_path}")
return config, conf_path


def write(
config: dict, # toml config as dict
config: Optional[dict] = None, # toml config as dict
raw: Optional[str] = None, # toml config as string
path: str = None,
) -> None:
"""Write broker config to disk.
Create a ``brokers.ini`` file if one does not exist.
"""
path = path or get_broker_conf_path()
dirname = os.path.dirname(path)
if not os.path.isdir(dirname):
log.debug(f"Creating config dir {_config_dir}")
os.makedirs(dirname)
Path(path).parent.mkdir(exist_ok=True)

if not config:
if not config and not raw:
raise ValueError(
"Watch out you're trying to write a blank config!")

log.debug(f"Writing config file {path}")
with open(path, 'w') as cf:
return toml.dump(config, cf)
if config:
return toml.dump(config, cf)
elif raw:
return cf.write(raw)


# XXX: Recursive getting & setting

def get_value(_dict, _section):
subs = _section.split('.')
if len(subs) > 1:
return get_value(
_dict[subs[0]],
'.'.join(subs[1:]),
)

else:
return _dict[_section]


def set_value(_dict, _section, val):
subs = _section.split('.')
if len(subs) > 1:
if subs[0] not in _dict:
_dict[subs[0]] = {}

return set_value(
_dict[subs[0]],
'.'.join(subs[1:]),
val
)

else:
_dict[_section] = val


def del_value(_dict, _section):
subs = _section.split('.')
if len(subs) > 1:
if subs[0] not in _dict:
return

return del_value(
_dict[subs[0]],
'.'.join(subs[1:])
)

else:
if _section not in _dict:
return

del _dict[_section]

0 comments on commit 14449dd

Please sign in to comment.