Skip to content

Commit

Permalink
allow providing stream start ts (#162)
Browse files Browse the repository at this point in the history
Co-authored-by: Lucas Pedroza <lucas.pedroza@fauna.com>
  • Loading branch information
fauna-chase and pnwpedro committed Mar 26, 2024
1 parent 9fd6dbe commit 4efd958
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 7 deletions.
9 changes: 7 additions & 2 deletions fauna/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,16 @@ class StreamOptions:
* max_attempts - The maximum number of times to attempt a stream query when a retryable exception is thrown.
* max_backoff - The maximum backoff in seconds for an individual retry.
* start_ts - The starting timestamp of the stream, exclusive. If set, Fauna will return events starting after
the timestamp.
* status_events - Indicates if stream should include status events. Status events are periodic events that
* update the client with the latest valid timestamp (in the event of a dropped connection) as well as metrics
* about about the cost of maintaining the stream other than the cost of the received events.
update the client with the latest valid timestamp (in the event of a dropped connection) as well as metrics
about the cost of maintaining the stream other than the cost of the received events.
"""

max_attempts: Optional[int] = None
max_backoff: Optional[int] = None
start_ts: Optional[int] = None
status_events: bool = False


Expand Down Expand Up @@ -550,6 +553,8 @@ def _create_stream(self):
data: Dict[str, Any] = {"token": self._token.token}
if self.last_ts is not None:
data["start_ts"] = self.last_ts
elif self._opts.start_ts is not None:
data["start_ts"] = self._opts.start_ts

return self._http_client.stream(
url=self._endpoint, headers=self._headers, data=data)
Expand Down
40 changes: 35 additions & 5 deletions tests/integration/test_stream.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
import threading
import time
from datetime import timedelta

import pytest
import httpx
import fauna

from fauna import fql
from fauna.client import Client, StreamOptions
from fauna.http.httpx_client import HTTPXClient
Expand Down Expand Up @@ -77,8 +74,6 @@ def thread_fn():
def test_error_on_stream(scoped_client):
scoped_client.query(fql("Collection.create({name: 'Product'})"))

events = []

def thread_fn():
stream = scoped_client.stream(fql("Product.all().map(.foo / 0).toStream()"))

Expand Down Expand Up @@ -171,6 +166,41 @@ def thread_fn():
assert events == ["add", "add", "add"]


def test_providing_start_ts(scoped_client):
scoped_client.query(fql("Collection.create({name: 'Product'})"))

stream_token = scoped_client.query(fql("Product.all().toStream()")).data

createOne = scoped_client.query(fql("Product.create({})"))
createTwo = scoped_client.query(fql("Product.create({})"))
createThree = scoped_client.query(fql("Product.create({})"))

events = []

def thread_fn():
# replay excludes the ts that was passed in, it provides events for all ts after the one provided
stream = scoped_client.stream(stream_token,
StreamOptions(start_ts=createOne.txn_ts))
with stream as iter:
for event in iter:
events.append(event)
if (len(events) == 3):
iter.close()

stream_thread = threading.Thread(target=thread_fn)
stream_thread.start()

# adds a delay so the thread can open the stream,
# otherwise we could miss some events
time.sleep(0.5)

createFour = scoped_client.query(fql("Product.create({})"))
stream_thread.join()
assert events[0]["txn_ts"] == createTwo.txn_ts
assert events[1]["txn_ts"] == createThree.txn_ts
assert events[2]["txn_ts"] == createFour.txn_ts


@pytest.mark.xfail(reason="not currently supported by core")
def test_handle_status_events(scoped_client):
scoped_client.query(fql("Collection.create({name: 'Product'})"))
Expand Down

0 comments on commit 4efd958

Please sign in to comment.