diff --git a/piker/brokers/__init__.py b/piker/brokers/__init__.py index a35e4aea2..16069a055 100644 --- a/piker/brokers/__init__.py +++ b/piker/brokers/__init__.py @@ -24,6 +24,7 @@ 'binance', 'ib', 'kraken', + 'deribit', # broken but used to work # 'questrade', diff --git a/piker/brokers/deribit/__init__.py b/piker/brokers/deribit/__init__.py index f5c48b58d..4b71e22c0 100644 --- a/piker/brokers/deribit/__init__.py +++ b/piker/brokers/deribit/__init__.py @@ -30,7 +30,6 @@ open_history_client, open_symbol_search, stream_quotes, - backfill_bars ) # from .broker import ( # trades_dialogue, @@ -53,13 +52,3 @@ 'feed', # 'broker', ] - -# passed to ``tractor.ActorNursery.start_actor()`` -_spawn_kwargs = { - 'infect_asyncio': True, -} - -# annotation to let backend agnostic code -# know if ``brokerd`` should be spawned with -# ``tractor``'s aio mode. -_infect_asyncio: bool = True diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index 4159b18a7..94d17ca08 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) Guillermo Rodriguez (in stewardship for piker0) +# Copyright (C) Guillermo Rodriguez (in stewardship for pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -18,58 +18,42 @@ Deribit backend. ''' -import json +from __future__ import annotations import time -import asyncio -from contextlib import asynccontextmanager as acm, AsyncExitStack +from contextlib import asynccontextmanager as acm from functools import partial from datetime import datetime -from typing import Any, Optional, Iterable, Callable +from typing import ( + Any, + Optional, + Callable, +) import pendulum import asks import trio -from trio_typing import Nursery, TaskStatus +from trio_typing import TaskStatus from fuzzywuzzy import process as fuzzy import numpy as np +from tractor.trionics import ( + broadcast_receiver, + maybe_open_context +) from piker.data.types import Struct from piker.data._web_bs import ( - NoBsWs, - open_autorecon_ws, open_jsonrpc_session ) -from .._util import resproc - from piker import config from piker.log import get_logger +from piker._cacheables import open_cached_client -from tractor.trionics import ( - broadcast_receiver, - BroadcastReceiver, - maybe_open_context -) -from tractor import to_asyncio - -from cryptofeed import FeedHandler - -from cryptofeed.defines import ( - DERIBIT, - L1_BOOK, TRADES, - OPTION, CALL, PUT -) -from cryptofeed.symbols import Symbol log = get_logger(__name__) -_spawn_kwargs = { - 'infect_asyncio': True, -} - - _url = 'https://www.deribit.com' _ws_url = 'wss://www.deribit.com/ws/api/v2' _testnet_ws_url = 'wss://test.deribit.com/ws/api/v2' @@ -89,19 +73,20 @@ class JSONRPCResult(Struct): - jsonrpc: str = '2.0' id: int - result: Optional[dict] = None - error: Optional[dict] = None - usIn: int - usOut: int + usIn: int + usOut: int usDiff: int testnet: bool + jsonrpc: str = '2.0' + result: Optional[dict] = None + error: Optional[dict] = None + class JSONRPCChannel(Struct): - jsonrpc: str = '2.0' method: str params: dict + jsonrpc: str = '2.0' class KLinesResult(Struct): @@ -114,6 +99,7 @@ class KLinesResult(Struct): ticks: list[int] volume: list[float] + class Trade(Struct): trade_seq: int trade_id: str @@ -125,9 +111,11 @@ class Trade(Struct): instrument_name: str index_price: float direction: str - combo_trade_id: Optional[int] = 0, - combo_id: Optional[str] = '', amount: float + combo_trade_id: Optional[int] = 0 + combo_id: Optional[str] = '' + block_trade_id: str | None = '' + class LastTradesResult(Struct): trades: list[Trade] @@ -139,65 +127,12 @@ def deribit_timestamp(when): return int((when.timestamp() * 1000) + (when.microsecond / 1000)) -def str_to_cb_sym(name: str) -> Symbol: - base, strike_price, expiry_date, option_type = name.split('-') - - quote = base - - if option_type == 'put': - option_type = PUT - elif option_type == 'call': - option_type = CALL - else: - raise Exception("Couldn\'t parse option type") - - return Symbol( - base, quote, - type=OPTION, - strike_price=strike_price, - option_type=option_type, - expiry_date=expiry_date, - expiry_normalize=False) - - -def piker_sym_to_cb_sym(name: str) -> Symbol: - base, expiry_date, strike_price, option_type = tuple( - name.upper().split('-')) - - quote = base - - if option_type == 'P': - option_type = PUT - elif option_type == 'C': - option_type = CALL - else: - raise Exception("Couldn\'t parse option type") - - return Symbol( - base, quote, - type=OPTION, - strike_price=strike_price, - option_type=option_type, - expiry_date=expiry_date.upper()) +def sym_fmt_piker_to_deribit(sym: str) -> str: + return sym.upper() -def cb_sym_to_deribit_inst(sym: Symbol): - # cryptofeed normalized - cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z'] - - # deribit specific - months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC'] - - exp = sym.expiry_date - - # YYMDD - # 01234 - year, month, day = ( - exp[:2], months[cb_norm.index(exp[2:3])], exp[3:]) - - otype = 'C' if sym.option_type == CALL else 'P' - - return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}' +def sym_fmt_deribit_to_piker(sym: str): + return sym.lower() def get_config() -> dict[str, Any]: @@ -206,32 +141,30 @@ def get_config() -> dict[str, Any]: section = conf.get('deribit') - # TODO: document why we send this, basically because logging params for cryptofeed - conf['log'] = {} - conf['log']['disabled'] = True - if section is None: log.warning(f'No config section found for deribit in {path}') - return conf + return conf class Client: - def __init__(self, json_rpc: Callable) -> None: - self._pairs: dict[str, Any] = None - - config = get_config().get('deribit', {}) - - if ('key_id' in config) and ('key_secret' in config): - self._key_id = config['key_id'] - self._key_secret = config['key_secret'] + def __init__( + self, + json_rpc: Callable, + append_hooks: Callable, + update_types: Callable, + key_id: str | None = None, + key_secret: str | None = None + ) -> None: - else: - self._key_id = None - self._key_secret = None + self._pairs: dict[str, Any] = None + self._key_id = key_id + self._key_secret = key_secret self.json_rpc = json_rpc + self.append_hooks = append_hooks + self.update_types = update_types @property def currencies(self): @@ -278,7 +211,7 @@ async def submit_limit( """Place an order """ params = { - 'instrument_name': symbol.upper(), + 'instrument_name': sym_fmt_piker_to_deribit(symbol), 'amount': size, 'type': 'limit', 'price': price, @@ -319,7 +252,7 @@ async def symbol_info( results = resp.result instruments = { - item['instrument_name'].lower(): item + sym_fmt_deribit_to_piker(item['instrument_name']): item for item in results } @@ -350,8 +283,10 @@ async def search_symbols( limit=limit ) # repack in dict form - return {item[0]['instrument_name'].lower(): item[0] - for item in matches} + return { + sym_fmt_deribit_to_piker(item[0]['instrument_name']): item[0] + for item in matches + } async def bars( self, @@ -360,6 +295,7 @@ async def bars( end_dt: Optional[datetime] = None, limit: int = 1000, as_np: bool = True, + ) -> dict: instrument = symbol @@ -377,7 +313,7 @@ async def bars( resp = await self.json_rpc( 'public/get_tradingview_chart_data', params={ - 'instrument_name': instrument.upper(), + 'instrument_name': sym_fmt_piker_to_deribit(instrument), 'start_timestamp': start_time, 'end_timestamp': end_time, 'resolution': '1' @@ -385,14 +321,8 @@ async def bars( result = KLinesResult(**resp.result) new_bars = [] - for i in range(len(result.close)): - - _open = result.open[i] - high = result.high[i] - low = result.low[i] - close = result.close[i] - volume = result.volume[i] + for i in range(len(result.close)): row = [ (start_time + (i * (60 * 1000))) / 1000.0, # time result.open[i], @@ -405,7 +335,7 @@ async def bars( new_bars.append((i,) + tuple(row)) - array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else klines + array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else new_bars return array async def last_trades( @@ -416,24 +346,55 @@ async def last_trades( resp = await self.json_rpc( 'public/get_last_trades_by_instrument', params={ - 'instrument_name': instrument, + 'instrument_name': sym_fmt_piker_to_deribit(instrument), 'count': count }) return LastTradesResult(**resp.result) + async def get_book_summary( + self, + currency: str, + kind: str = 'option' + ): + return await self.json_rpc( + 'public/get_book_summary_by_currency', + params={ + 'currency': currency, + 'kind': kind + }) + + + +class JSONRPCSubRequest(Struct): + method: str + params: dict + jsonrpc: str = '2.0' + @acm async def get_client( is_brokercheck: bool = False ) -> Client: + config = get_config().get('deribit', {}) + + ws_url = config.get('ws_url', _ws_url) + key_id = config.get('key_id', None) + key_secret = config.get('key_secret', None) + async with ( trio.open_nursery() as n, open_jsonrpc_session( - _testnet_ws_url, dtype=JSONRPCResult) as json_rpc + ws_url, + response_type=JSONRPCResult + ) as control_functions ): - client = Client(json_rpc) + client = Client( + *control_functions, + key_id=key_id, + key_secret=key_secret + ) _refresh_token: Optional[str] = None _access_token: Optional[str] = None @@ -446,7 +407,7 @@ async def _auth_loop( https://docs.deribit.com/?python#authentication-2 """ - renew_time = 10 + renew_time = 240 access_scope = 'trade:read_write' _expiry_time = time.time() got_access = False @@ -457,7 +418,7 @@ async def _auth_loop( if time.time() - _expiry_time < renew_time: # if we are close to token expiry time - if _refresh_token != None: + if _refresh_token is not None: # if we have a refresh token already dont need to send # secret params = { @@ -467,7 +428,8 @@ async def _auth_loop( } else: - # we don't have refresh token, send secret to initialize + # we don't have refresh token, send secret to + # initialize params = { 'grant_type': 'client_credentials', 'client_id': client._key_id, @@ -475,7 +437,7 @@ async def _auth_loop( 'scope': access_scope } - resp = await json_rpc('public/auth', params) + resp = await client.json_rpc('public/auth', params) result = resp.result _expiry_time = time.time() + result['expires_in'] @@ -503,87 +465,75 @@ async def _auth_loop( @acm -async def open_feed_handler(): - fh = FeedHandler(config=get_config()) - yield fh - await to_asyncio.run_task(fh.stop_async) - +async def open_price_feed( + instrument: str +) -> trio.abc.ReceiveStream: -@acm -async def maybe_open_feed_handler() -> trio.abc.ReceiveStream: - async with maybe_open_context( - acm_func=open_feed_handler, - key='feedhandler', - ) as (cache_hit, fh): - yield fh - - -async def aio_price_feed_relay( - fh: FeedHandler, - instrument: Symbol, - from_trio: asyncio.Queue, - to_trio: trio.abc.SendChannel, -) -> None: - async def _trade(data: dict, receipt_timestamp): - to_trio.send_nowait(('trade', { - 'symbol': cb_sym_to_deribit_inst( - str_to_cb_sym(data.symbol)).lower(), - 'last': data, - 'broker_ts': time.time(), - 'data': data.to_dict(), - 'receipt': receipt_timestamp - })) - - async def _l1(data: dict, receipt_timestamp): - to_trio.send_nowait(('l1', { - 'symbol': cb_sym_to_deribit_inst( - str_to_cb_sym(data.symbol)).lower(), - 'ticks': [ - {'type': 'bid', - 'price': float(data.bid_price), 'size': float(data.bid_size)}, - {'type': 'bsize', - 'price': float(data.bid_price), 'size': float(data.bid_size)}, - {'type': 'ask', - 'price': float(data.ask_price), 'size': float(data.ask_size)}, - {'type': 'asize', - 'price': float(data.ask_price), 'size': float(data.ask_size)} - ] - })) - - fh.add_feed( - DERIBIT, - channels=[TRADES, L1_BOOK], - symbols=[piker_sym_to_cb_sym(instrument)], - callbacks={ - TRADES: _trade, - L1_BOOK: _l1 + instrument_db = sym_fmt_piker_to_deribit(instrument) + + trades_chan = f'trades.{instrument_db}.raw' + book_chan = f'book.{instrument_db}.none.1.100ms' + + channels = [trades_chan, book_chan] + + send_chann, recv_chann = trio.open_memory_channel(0) + async def sub_hook(msg): + chan = msg.params['channel'] + data = msg.params['data'] + if chan == trades_chan: + await send_chann.send(( + 'trade', { + 'symbol': instrument, + 'last': data['price'], + 'brokerd_ts': time.time(), + 'ticks': [{ + 'type': 'trade', + 'price': data['price'], + 'size': data['amount'], + 'broker_ts': data['timestamp'] + }] + } + )) + return True + + elif chan == book_chan: + bid, bsize = data['bids'][0] + ask, asize = data['asks'][0] + await send_chann.send(( + 'l1', { + 'symbol': instrument, + 'ticks': [ + {'type': 'bid', 'price': bid, 'size': bsize}, + {'type': 'bsize', 'price': bid, 'size': bsize}, + {'type': 'ask', 'price': ask, 'size': asize}, + {'type': 'asize', 'price': ask, 'size': asize} + ]} + )) + return True + + return False + + async with open_cached_client('deribit') as client: + + client.append_hooks({ + 'request': [sub_hook] + }) + client.update_types({ + 'request': JSONRPCSubRequest }) - if not fh.running: - fh.run( - start_loop=False, - install_signal_handlers=False) + resp = await client.json_rpc( + 'private/subscribe', {'channels': channels}) - # sync with trio - to_trio.send_nowait(None) + assert not resp.error - await asyncio.sleep(float('inf')) + log.info(f'Subscribed to {channels}') + yield recv_chann -@acm -async def open_price_feed( - instrument: str -) -> trio.abc.ReceiveStream: - async with maybe_open_feed_handler() as fh: - async with to_asyncio.open_channel_from( - partial( - aio_price_feed_relay, - fh, - instrument - ) - ) as (first, chan): - yield chan + resp = await client.json_rpc('private/unsubscribe', {'channels': channels}) + assert not resp.error @acm async def maybe_open_price_feed( @@ -604,67 +554,65 @@ async def maybe_open_price_feed( yield feed +@acm +async def open_ticker_feed( + instrument: str +) -> trio.abc.ReceiveStream: + + instrument_db = sym_fmt_piker_to_deribit(instrument) + + ticker_chan = f'incremental_ticker.{instrument_db}' + + channels = [ticker_chan] + + send_chann, recv_chann = trio.open_memory_channel(0) + async def sub_hook(msg): + chann = msg.params['channel'] + if chann == ticker_chan: + data = msg.params['data'] + await send_chann.send(( + 'ticker', { + 'symbol': instrument, + 'data': data + } + )) + return True -async def aio_order_feed_relay( - fh: FeedHandler, - instrument: Symbol, - from_trio: asyncio.Queue, - to_trio: trio.abc.SendChannel, -) -> None: - async def _fill(data: dict, receipt_timestamp): - breakpoint() + return False - async def _order_info(data: dict, receipt_timestamp): - breakpoint() + async with open_cached_client('deribit') as client: - fh.add_feed( - DERIBIT, - channels=[FILLS, ORDER_INFO], - symbols=[instrument.upper()], - callbacks={ - FILLS: _fill, - ORDER_INFO: _order_info, + client.append_hooks({ + 'request': [sub_hook] + }) + client.update_types({ + 'request': JSONRPCSubRequest }) - if not fh.running: - fh.run( - start_loop=False, - install_signal_handlers=False) + resp = await client.json_rpc( + 'private/subscribe', {'channels': channels}) - # sync with trio - to_trio.send_nowait(None) + assert not resp.error - await asyncio.sleep(float('inf')) + log.info(f'Subscribed to {channels}') + yield recv_chann -@acm -async def open_order_feed( - instrument: list[str] -) -> trio.abc.ReceiveStream: - async with maybe_open_feed_handler() as fh: - async with to_asyncio.open_channel_from( - partial( - aio_order_feed_relay, - fh, - instrument - ) - ) as (first, chan): - yield chan + resp = await client.json_rpc('private/unsubscribe', {'channels': channels}) + assert not resp.error @acm -async def maybe_open_order_feed( +async def maybe_open_ticker_feed( instrument: str ) -> trio.abc.ReceiveStream: - # TODO: add a predicate to maybe_open_context async with maybe_open_context( - acm_func=open_order_feed, + acm_func=open_ticker_feed, kwargs={ - 'instrument': instrument, - 'fh': fh + 'instrument': instrument }, - key=f'{instrument}-order', + key=f'{instrument}-ticker', ) as (cache_hit, feed): if cache_hit: yield broadcast_receiver(feed, 10) diff --git a/piker/brokers/deribit/feed.py b/piker/brokers/deribit/feed.py index deb0422f8..84e04fa17 100644 --- a/piker/brokers/deribit/feed.py +++ b/piker/brokers/deribit/feed.py @@ -20,43 +20,29 @@ ''' from contextlib import asynccontextmanager as acm from datetime import datetime -from typing import Any, Optional, Callable -import time +from typing import ( + Callable, +) import trio from trio_typing import TaskStatus import pendulum -from fuzzywuzzy import process as fuzzy import numpy as np import tractor from piker._cacheables import open_cached_client from piker.log import get_logger, get_console_log -from piker.data import ShmArray from piker.brokers._util import ( - BrokerError, DataUnavailable, ) -from cryptofeed import FeedHandler - -from cryptofeed.defines import ( - DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT -) -from cryptofeed.symbols import Symbol from .api import ( - Client, Trade, - get_config, - str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst, + Client, + Trade, maybe_open_price_feed ) -_spawn_kwargs = { - 'infect_asyncio': True, -} - - log = get_logger(__name__) @@ -69,20 +55,24 @@ async def open_history_client( async with open_cached_client('deribit') as client: async def get_ohlc( - end_dt: Optional[datetime] = None, - start_dt: Optional[datetime] = None, + timeframe: float, + end_dt: datetime | None = None, + start_dt: datetime | None = None, ) -> tuple[ np.ndarray, datetime, # start datetime, # end ]: + if timeframe != 60: + raise DataUnavailable('Only 1m bars are supported') array = await client.bars( instrument, start_dt=start_dt, end_dt=end_dt, ) + if len(array) == 0: raise DataUnavailable @@ -110,10 +100,7 @@ async def stream_quotes( sym = symbols[0] - async with ( - open_cached_client('deribit') as client, - send_chan as send_chan - ): + async with open_cached_client('deribit') as client: init_msgs = { # pass back token, and bool, signalling if we're the writer @@ -121,21 +108,18 @@ async def stream_quotes( sym: { 'symbol_info': { 'asset_type': 'option', - 'price_tick_size': 0.0005 + 'price_tick_size': 0.0005, + 'lot_tick_size': 0.1 }, 'shm_write_opts': {'sum_tick_vml': False}, 'fqsn': sym, }, } - nsym = piker_sym_to_cb_sym(sym) - async with maybe_open_price_feed(sym) as stream: + last_trades = (await client.last_trades(sym, count=1)).trades - cache = await client.cache_symbols() - - last_trades = (await client.last_trades( - cb_sym_to_deribit_inst(nsym), count=1)).trades + async with maybe_open_price_feed(sym) as stream: if len(last_trades) == 0: last_trade = None @@ -174,7 +158,7 @@ async def open_symbol_search( async with open_cached_client('deribit') as client: # load all symbols locally for fast search - cache = await client.cache_symbols() + await client.cache_symbols() await ctx.started() async with ctx.open_stream() as stream: diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index 2dd7f4afe..2780a75a1 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -187,7 +187,6 @@ async def open_autorecon_ws( ''' - class JSONRPCResult(Struct): id: int jsonrpc: str = '2.0' @@ -202,9 +201,41 @@ async def open_jsonrpc_session( response_type: type = JSONRPCResult, request_type: Optional[type] = None, request_hook: Optional[Callable] = None, - error_hook: Optional[Callable] = None, + error_hook: Optional[Callable] = None ) -> Callable[[str, dict], dict]: + # xor: this two params need to be passed together or not at all + if bool(request_type) ^ bool(request_hook): + raise ValueError( + 'Need to path both a request_type and request_hook') + + req_hooks = [] + if request_hook: + req_hooks.append(request_hook) + + err_hooks = [] + if error_hook: + err_hooks.append(error_hook) + + hook_table = { + 'request': req_hooks, + 'error': err_hooks + } + + types_table = { + 'response': response_type, + 'request': request_type + } + + def append_hooks(new_hooks: dict): + nonlocal hook_table + for htype, hooks in new_hooks.items(): + hook_table[htype] += hooks + + def update_types(new_types: dict): + nonlocal types_table + types_table.update(new_types) + async with ( trio.open_nursery() as n, open_autorecon_ws(url) as ws @@ -212,7 +243,7 @@ async def open_jsonrpc_session( rpc_id: Iterable = count(start_id) rpc_results: dict[int, dict] = {} - async def json_rpc(method: str, params: dict) -> dict: + async def json_rpc(method: str, params: dict = {}) -> dict: ''' perform a json rpc call and wait for the result, raise exception in case of error field present on response @@ -257,8 +288,7 @@ async def recv_task(): 'result': _, 'id': mid, } if res_entry := rpc_results.get(mid): - - res_entry['result'] = response_type(**msg) + res_entry['result'] = types_table['response'](**msg) res_entry['event'].set() case { @@ -269,24 +299,38 @@ async def recv_task(): f'Unexpected ws msg: {json.dumps(msg, indent=4)}' ) + case { + 'error': error, + 'id': mid + } if res_entry := rpc_results.get(mid): + + res_entry['result'] = types_table['response'](**msg) + res_entry['event'].set() + case { 'method': _, 'params': _, }: - log.debug(f'Recieved\n{msg}') - if request_hook: - await request_hook(request_type(**msg)) + log.info(f'Recieved\n{msg}') + if len(hook_table['request']) > 0: + for hook in hook_table['request']: + result = await hook(types_table['request'](**msg)) + if result: + break case { - 'error': error + 'error': error, }: log.warning(f'Recieved\n{error}') - if error_hook: - await error_hook(response_type(**msg)) + if len(hook_table['error']) > 0: + for hook in hook_table['error']: + result = await hook(types_table['response'](**msg)) + if result: + break case _: log.warning(f'Unhandled JSON-RPC msg!?\n{msg}') n.start_soon(recv_task) - yield json_rpc + yield json_rpc, append_hooks, update_types n.cancel_scope.cancel() diff --git a/tests/test_deribit.py b/tests/test_deribit.py new file mode 100644 index 000000000..ac37534f0 --- /dev/null +++ b/tests/test_deribit.py @@ -0,0 +1,46 @@ +import trio +import pytest +import tractor + +from piker import config + +from piker.brokers.deribit import api as deribit +from piker.brokers.deribit.api import _testnet_ws_url + +from piker._cacheables import open_cached_client + + +TESTNET_KEY_ID: str | None = None +TESTNET_KEY_SECRET: str | None = None + +@pytest.mark.skipif( + not TESTNET_KEY_ID or not TESTNET_KEY_SECRET, + reason='configure a deribit testnet key pair before running this test' +) +def test_deribit_get_ticker(open_test_pikerd): + + async def _test_main(): + async with open_test_pikerd() as _: + async with open_cached_client('deribit') as client: + + symbols = await client.symbol_info() + + syms = list(symbols.keys()) + sym = syms[int(len(syms) / 2)] + + async with deribit.maybe_open_ticker_feed(sym) as tick_stream: + async for typ, msg in tick_stream: + assert typ == 'ticker' + assert 'open_interest' in msg['data'] + break + + + + config.write({ + 'deribit': { + 'ws_url': _testnet_ws_url, + 'key_id': TESTNET_KEY_ID, + 'key_secret': TESTNET_KEY_SECRET + } + }) + trio.run(_test_main)