From 4efd95808458cb0608518c3f9dd3a58ff9d3a274 Mon Sep 17 00:00:00 2001 From: fauna-chase <73842483+fauna-chase@users.noreply.github.com> Date: Tue, 26 Mar 2024 09:16:23 -0500 Subject: [PATCH 1/2] allow providing stream start ts (#162) Co-authored-by: Lucas Pedroza --- fauna/client/client.py | 9 +++++-- tests/integration/test_stream.py | 40 ++++++++++++++++++++++++++++---- 2 files changed, 42 insertions(+), 7 deletions(-) diff --git a/fauna/client/client.py b/fauna/client/client.py index 581ef7f2..a4a6ce42 100644 --- a/fauna/client/client.py +++ b/fauna/client/client.py @@ -56,13 +56,16 @@ class StreamOptions: * max_attempts - The maximum number of times to attempt a stream query when a retryable exception is thrown. * max_backoff - The maximum backoff in seconds for an individual retry. + * start_ts - The starting timestamp of the stream, exclusive. If set, Fauna will return events starting after + the timestamp. * status_events - Indicates if stream should include status events. Status events are periodic events that - * update the client with the latest valid timestamp (in the event of a dropped connection) as well as metrics - * about about the cost of maintaining the stream other than the cost of the received events. + update the client with the latest valid timestamp (in the event of a dropped connection) as well as metrics + about the cost of maintaining the stream other than the cost of the received events. """ max_attempts: Optional[int] = None max_backoff: Optional[int] = None + start_ts: Optional[int] = None status_events: bool = False @@ -550,6 +553,8 @@ def _create_stream(self): data: Dict[str, Any] = {"token": self._token.token} if self.last_ts is not None: data["start_ts"] = self.last_ts + elif self._opts.start_ts is not None: + data["start_ts"] = self._opts.start_ts return self._http_client.stream( url=self._endpoint, headers=self._headers, data=data) diff --git a/tests/integration/test_stream.py b/tests/integration/test_stream.py index 989bb1fe..484c064e 100644 --- a/tests/integration/test_stream.py +++ b/tests/integration/test_stream.py @@ -1,11 +1,8 @@ import threading import time -from datetime import timedelta - import pytest import httpx import fauna - from fauna import fql from fauna.client import Client, StreamOptions from fauna.http.httpx_client import HTTPXClient @@ -77,8 +74,6 @@ def thread_fn(): def test_error_on_stream(scoped_client): scoped_client.query(fql("Collection.create({name: 'Product'})")) - events = [] - def thread_fn(): stream = scoped_client.stream(fql("Product.all().map(.foo / 0).toStream()")) @@ -171,6 +166,41 @@ def thread_fn(): assert events == ["add", "add", "add"] +def test_providing_start_ts(scoped_client): + scoped_client.query(fql("Collection.create({name: 'Product'})")) + + stream_token = scoped_client.query(fql("Product.all().toStream()")).data + + createOne = scoped_client.query(fql("Product.create({})")) + createTwo = scoped_client.query(fql("Product.create({})")) + createThree = scoped_client.query(fql("Product.create({})")) + + events = [] + + def thread_fn(): + # replay excludes the ts that was passed in, it provides events for all ts after the one provided + stream = scoped_client.stream(stream_token, + StreamOptions(start_ts=createOne.txn_ts)) + with stream as iter: + for event in iter: + events.append(event) + if (len(events) == 3): + iter.close() + + stream_thread = threading.Thread(target=thread_fn) + stream_thread.start() + + # adds a delay so the thread can open the stream, + # otherwise we could miss some events + time.sleep(0.5) + + createFour = scoped_client.query(fql("Product.create({})")) + stream_thread.join() + assert events[0]["txn_ts"] == createTwo.txn_ts + assert events[1]["txn_ts"] == createThree.txn_ts + assert events[2]["txn_ts"] == createFour.txn_ts + + @pytest.mark.xfail(reason="not currently supported by core") def test_handle_status_events(scoped_client): scoped_client.query(fql("Collection.create({name: 'Product'})")) From 1f3b757b10d9df08a02247f466c64a29695853b7 Mon Sep 17 00:00:00 2001 From: Lucas Pedroza <40873230+pnwpedro@users.noreply.github.com> Date: Mon, 1 Apr 2024 14:56:34 +0200 Subject: [PATCH 2/2] Upgrade dependencies (#167) --- requirements.txt | 6 +++--- setup.py | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/requirements.txt b/requirements.txt index 2ed76a3e..197513f8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ -iso8601==1.1.0 -future==0.18.3 -httpx[http2]==0.23.* +iso8601==2.1.0 +future==1.0.0 +httpx[http2]==0.27.* diff --git a/setup.py b/setup.py index 11d1f6f8..35119ecd 100644 --- a/setup.py +++ b/setup.py @@ -13,16 +13,16 @@ long_description = f.read() requires = [ - "iso8601==1.1.0", - "future==0.18.3", - "httpx[http2]==0.23.*", + "iso8601==2.1.0", + "future==1.0.0", + "httpx[http2]==0.27.*", ] extras_require = { "lint": ["yapf==0.40.1"], "test": [ - "pytest==7.3.0", "pytest-env==0.8.1", "pytest-cov==4.0.0", - "pytest-httpx==0.21.3", "pytest-subtests==0.10.0" + "pytest==8.1.1", "pytest-env==1.1.3", "pytest-cov==5.0.0", + "pytest-httpx==0.30.0", "pytest-subtests==0.12.1" ] }