Skip to content

Simple Producer Consumer Streaming API Example

hmanfarmer edited this page Sep 3, 2020 · 4 revisions

This example demonstrates the following:

  • Implementing a producer consumer topology to gain better control over the message stream.
  • Using an asyncio.Queue, we can manage the message stream and toss out older messages to keep the stream from getting backed up.

Code

from tda.auth import easy_client
from tda.client import Client
from tda.streaming import StreamClient
import asyncio

client = easy_client(
        api_key='11111111111',
        redirect_uri='https://localhost:8080',
        token_path='tokens.ini')
stream_client = StreamClient(client, account_id=1111111)

# login to TDA, setup stream, subscribe to streams and add a stream handler
# this sets up a producer loop, then uses a deque of size=1 to send data to the consumer loop
async def read_stream(message_queue):

    await stream_client.login()
    await stream_client.quality_of_service(StreamClient.QOSLevel.EXPRESS)
    await stream_client.timesale_equity_subs(
        ['GOOG', 'GOOGL', 'BP', 'CVS', 'ADBE', 'CRM', 'SNAP', 'AMZN', 'BABA', 'DIS', 'TWTR', 'M', 'USO',
        'AAPL', 'NFLX', 'GE', 'TSLA', 'F', 'SPY', 'FDX', 'UBER', 'ROKU', 'X', 'FB', 'BIDU', 'FIT']
        )

    stream_client.add_timesale_equity_handler(lambda msg: handler(msg, message_queue))

    while True:
        await stream_client.handle_message()

# Cannot use blocking asyncio.Queue methods since add_timesale_equity_handler is not
# defined as async inside tda-api. Thus, only non-blocking asyncio.Queue methods are used.
def handler(msg, message_queue):
    
    # If queue full, remove item to make space.
    if message_queue.full():
        try:
            # A race condition might cause the Queue to be empty, ignore the resulting exception
            message_queue.get_nowait()
        except asyncio.QueueEmpty:
            pass
    
    message_queue.put_nowait(msg)

async def main():
    message_queue = asyncio.Queue(1)
    asyncio.create_task(read_stream(message_queue))

    while True:
        timesales = await message_queue.get()
        print(timesales)
        
asyncio.run(main())

Links