Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow providing stream start ts #162

Merged
merged 3 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions fauna/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,16 @@ class StreamOptions:

* max_attempts - The maximum number of times to attempt a stream query when a retryable exception is thrown.
* max_backoff - The maximum backoff in seconds for an individual retry.
* start_ts - The starting timestamp of the stream, exclusive. If set, Fauna will return events starting after
the timestamp.
* status_events - Indicates if stream should include status events. Status events are periodic events that
* update the client with the latest valid timestamp (in the event of a dropped connection) as well as metrics
* about about the cost of maintaining the stream other than the cost of the received events.
update the client with the latest valid timestamp (in the event of a dropped connection) as well as metrics
about the cost of maintaining the stream other than the cost of the received events.
"""

max_attempts: Optional[int] = None
max_backoff: Optional[int] = None
start_ts: Optional[int] = None
status_events: bool = False


Expand Down Expand Up @@ -550,6 +553,8 @@ def _create_stream(self):
data: Dict[str, Any] = {"token": self._token.token}
if self.last_ts is not None:
data["start_ts"] = self.last_ts
elif self._opts.start_ts is not None:
data["start_ts"] = self._opts.start_ts

return self._http_client.stream(
url=self._endpoint, headers=self._headers, data=data)
Expand Down
40 changes: 35 additions & 5 deletions tests/integration/test_stream.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
import threading
import time
from datetime import timedelta

import pytest
import httpx
import fauna

from fauna import fql
from fauna.client import Client, StreamOptions
from fauna.http.httpx_client import HTTPXClient
Expand Down Expand Up @@ -77,8 +74,6 @@ def thread_fn():
def test_error_on_stream(scoped_client):
scoped_client.query(fql("Collection.create({name: 'Product'})"))

events = []

def thread_fn():
stream = scoped_client.stream(fql("Product.all().map(.foo / 0).toStream()"))

Expand Down Expand Up @@ -171,6 +166,41 @@ def thread_fn():
assert events == ["add", "add", "add"]


def test_providing_start_ts(scoped_client):
scoped_client.query(fql("Collection.create({name: 'Product'})"))

stream_token = scoped_client.query(fql("Product.all().toStream()")).data

createOne = scoped_client.query(fql("Product.create({})"))
createTwo = scoped_client.query(fql("Product.create({})"))
createThree = scoped_client.query(fql("Product.create({})"))

events = []

def thread_fn():
# replay excludes the ts that was passed in, it provides events for all ts after the one provided
stream = scoped_client.stream(stream_token,
StreamOptions(start_ts=createOne.txn_ts))
with stream as iter:
for event in iter:
events.append(event)
if (len(events) == 3):
iter.close()

stream_thread = threading.Thread(target=thread_fn)
stream_thread.start()

# adds a delay so the thread can open the stream,
# otherwise we could miss some events
time.sleep(0.5)

createFour = scoped_client.query(fql("Product.create({})"))
stream_thread.join()
assert events[0]["txn_ts"] == createTwo.txn_ts
assert events[1]["txn_ts"] == createThree.txn_ts
assert events[2]["txn_ts"] == createFour.txn_ts


@pytest.mark.xfail(reason="not currently supported by core")
def test_handle_status_events(scoped_client):
scoped_client.query(fql("Collection.create({name: 'Product'})"))
Expand Down
Loading