Skip to content

Commit

Permalink
First attempt data feed side quote throttling
Browse files Browse the repository at this point in the history
Adding binance's "hft" ws feeds has resulted in a lot of context
switching in our Qt charts, so much so it's chewin CPU and definitely
worth it to throttle to the detected display rate as per discussion in
issue #192.

This is a first very very naive attempt at throttling L1 tick feeds on
the `brokerd` end (producer side) using a constant and uniform delivery
rate by way of a `trio` task + mem chan.  The new func is
`data._sampling.uniform_rate_send()`. Basically if a client request
a feed and provides a throttle rate we just spawn a task and queue up
ticks until approximately the next display rate's worth period of time
has passed before forwarding. It's definitely nothing fancy but does
provide fodder and a start point for an up and coming queueing eng to
start digging into both #107 and #109 ;)
  • Loading branch information
goodboy committed Jun 15, 2021
1 parent 57a35a3 commit ccf8152
Showing 1 changed file with 73 additions and 16 deletions.
89 changes: 73 additions & 16 deletions piker/data/_sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"""
Data buffers for fast shared humpy.
"""
import time
from typing import Dict, List

import tractor
Expand Down Expand Up @@ -152,10 +153,12 @@ async def iter_ohlc_periods(


async def sample_and_broadcast(

bus: '_FeedBus', # noqa
shm: ShmArray,
quote_stream: trio.abc.ReceiveChannel,
sum_tick_vlm: bool = True,

) -> None:

log.info("Started shared mem bar writer")
Expand All @@ -177,11 +180,10 @@ async def sample_and_broadcast(
# trade data
for tick in quote['ticks']:

# if tick['type'] in ('utrade',):
# print(tick)
ticktype = tick['type']

# write trade events to shm last OHLC sample
if tick['type'] in ('trade', 'utrade'):
if ticktype in ('trade', 'utrade'):

last = tick['price']

Expand Down Expand Up @@ -229,16 +231,71 @@ async def sample_and_broadcast(
# thus other consumers still attached.
subs = bus._subscribers[sym.lower()]

for stream in subs:
# print(f'sub is {ctx.chan.uid}')
try:
await stream.send({sym: quote})
except (
trio.BrokenResourceError,
trio.ClosedResourceError
):
# XXX: do we need to deregister here
# if it's done in the fee bus code?
# so far seems like no since this should all
# be single-threaded.
log.error(f'{stream._ctx.chan.uid} dropped connection')
for (stream, tick_throttle) in subs:

if tick_throttle:
await stream.send(quote)

else:
try:
await stream.send({sym: quote})
except (
trio.BrokenResourceError,
trio.ClosedResourceError
):
# XXX: do we need to deregister here
# if it's done in the fee bus code?
# so far seems like no since this should all
# be single-threaded.
log.error(f'{stream._ctx.chan.uid} dropped connection')


async def uniform_rate_send(
rate: float,
quote_stream: trio.abc.ReceiveChannel,
stream: tractor.MsgStream,
) -> None:

sleep_period = 1/rate - 0.000616
last_send = time.time()

while True:

first_quote = await quote_stream.receive()
start = time.time()

# append quotes since last iteration into the last quote's
# tick array/buffer.

# TODO: once we decide to get fancy really we should have
# a shared mem tick buffer that is just continually filled and
# the UI just ready from it at it's display rate.
# we'll likely head toward this once we get this issue going:
#
while True:
try:
next_quote = quote_stream.receive_nowait()
ticks = next_quote.get('ticks')

if ticks:
first_quote['ticks'].extend(ticks)

except trio.WouldBlock:
now = time.time()
rate = 1 / (now - last_send)
last_send = now

# print(f'{rate} Hz sending quotes\n{first_quote}')

# TODO: now if only we could sync this to the display
# rate timing exactly lul
await stream.send({first_quote['symbol']: first_quote})
break

end = time.time()
diff = end - start

# throttle to provided transmit rate
period = max(sleep_period - diff, 0)
if period > 0:
await trio.sleep(period)

0 comments on commit ccf8152

Please sign in to comment.