Skip to content

Commit

Permalink
Try out msgspec in our msgpack stream channel
Browse files Browse the repository at this point in the history
Can only really use an encoder currently since there is no streaming api
in `msgspec` as of currently. See jcrist/msgspec#27.

Not sure if any encoding speedups are currently noticeable especially
without any validation going on yet XD.

First experiments toward #196
  • Loading branch information
goodboy committed Sep 5, 2021
1 parent 93a83ea commit adc7786
Showing 1 changed file with 12 additions and 3 deletions.
15 changes: 12 additions & 3 deletions tractor/_ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
"""
import platform
import typing
from typing import Any, Tuple, Optional
from typing import Any, Tuple, Optional, Callable
from functools import partial

import msgpack
import msgspec
import trio
from async_generator import asynccontextmanager

Expand All @@ -27,6 +28,9 @@
Unpacker = partial(msgpack.Unpacker, strict_map_key=False)


ms_decode = msgspec.Encoder().encode


class MsgpackTCPStream:
'''A ``trio.SocketStream`` delivering ``msgpack`` formatted data
using ``msgpack-python``.
Expand All @@ -35,15 +39,16 @@ class MsgpackTCPStream:
def __init__(
self,
stream: trio.SocketStream,

) -> None:

self.stream = stream
assert self.stream.socket

# should both be IP sockets
lsockname = stream.socket.getsockname()
assert isinstance(lsockname, tuple)
self._laddr = lsockname[:2]

rsockname = stream.socket.getpeername()
assert isinstance(rsockname, tuple)
self._raddr = rsockname[:2]
Expand All @@ -61,6 +66,7 @@ async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
raw=False,
use_list=False,
)
# decoder = msgspec.Decoder() #dict[str, Any])
while True:
try:
data = await self.stream.receive_some(2**10)
Expand Down Expand Up @@ -95,6 +101,7 @@ async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
f'transport {self} was already closed prior ro read'
)

# yield decoder.decode(data)
unpacker.feed(data)
for packet in unpacker:
yield packet
Expand All @@ -111,7 +118,9 @@ def raddr(self) -> Tuple[Any, ...]:
async def send(self, data: Any) -> None:
async with self._send_lock:
return await self.stream.send_all(
msgpack.dumps(data, use_bin_type=True))
# msgpack.dumps(data, use_bin_type=True))
ms_decode(data)
)

async def recv(self) -> Any:
return await self._agen.asend(None)
Expand Down

0 comments on commit adc7786

Please sign in to comment.