Skip to content

Commit

Permalink
fix: use primary key instead of lastmodifieddate as a placeholder whe…
Browse files Browse the repository at this point in the history
…n iterating through search results to avoid infinite loop

fixes airbytehq/airbyte/#43317
  • Loading branch information
ehearty committed Sep 20, 2024
1 parent 253e789 commit 2291e73
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 13 deletions.
1 change: 1 addition & 0 deletions .secrets
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c
dockerImageTag: 4.2.19
dockerImageTag: 4.2.20
dockerRepository: airbyte/source-hubspot
documentationUrl: https://docs.airbyte.com/integrations/sources/hubspot
erdUrl: https://dbdocs.io/airbyteio/source-hubspot?view=relationships
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "4.2.19"
version = "4.2.20"
name = "source-hubspot"
description = "Source implementation for HubSpot."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1128,13 +1128,31 @@ def _process_search(
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
last_id=None,
) -> Tuple[List, requests.Response]:
stream_records = {}
properties_list = list(self.properties.keys())
if last_id == None:
last_id = 0
# The search query below uses the following criteria:
# - Last modified >= timestemp of previous sync
# - Last modified <= timestamp of current sync to avoid open ended queries
# - Object primary key <= last_id with initial value 0, then max(last_id) returned from previous pagination loop
# - Sort results by primary key ASC
# Note: Although results return out of chronological order, sorting on primary key ensures retrieval of *all* records
# once the final pagination loop completes. This is preferable to sorting by a non-unique value, such as
# last modified date, which may result in an infinite loop in some edge cases.
key = self.primary_key
if key == "id":
key = "hs_object_id"
payload = (
{
"filters": [{"value": int(self._state.timestamp() * 1000), "propertyName": self.last_modified_field, "operator": "GTE"}],
"sorts": [{"propertyName": self.last_modified_field, "direction": "ASCENDING"}],
"filters": [
{"value": int(self._state.timestamp() * 1000), "propertyName": self.last_modified_field, "operator": "GTE"},
{"value": int(self._init_sync.timestamp() * 1000), "propertyName": self.last_modified_field, "operator": "LTE"},
{"value": last_id, "propertyName": key, "operator": "GTE"},
],
"sorts": [{"propertyName": key, "direction": "ASCENDING"}],
"properties": properties_list,
"limit": 100,
}
Expand Down Expand Up @@ -1168,6 +1186,16 @@ def _read_associations(self, records: Iterable) -> Iterable[Mapping[str, Any]]:
current_record[_slice] = associations_list
return records_by_pk.values()

def get_max(self, val1, val2):
try:
# Try to convert both values to integers
int_val1 = int(val1)
int_val2 = int(val2)
return max(int_val1, int_val2)
except ValueError:
# If conversion fails, fall back to string comparison
return max(str(val1), str(val2))

def read_records(
self,
sync_mode: SyncMode,
Expand All @@ -1178,14 +1206,13 @@ def read_records(
stream_state = stream_state or {}
pagination_complete = False
next_page_token = None
last_id = None
max_last_id = None

latest_cursor = None
while not pagination_complete:
if self.state:
records, raw_response = self._process_search(
next_page_token=next_page_token,
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token, stream_state=stream_state, stream_slice=stream_slice, last_id=max_last_id
)
if self.associations:
records = self._read_associations(records)
Expand All @@ -1200,8 +1227,7 @@ def read_records(
records = self.record_unnester.unnest(records)

for record in records:
cursor = self._field_to_datetime(record[self.updated_at_field])
latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor
last_id = self.get_max(record[self.primary_key], last_id) if last_id else record[self.primary_key]
yield record

next_page_token = self.next_page_token(raw_response)
Expand All @@ -1211,13 +1237,13 @@ def read_records(
# Hubspot documentation states that the search endpoints are limited to 10,000 total results
# for any given query. Attempting to page beyond 10,000 will result in a 400 error.
# https://developers.hubspot.com/docs/api/crm/search. We stop getting data at 10,000 and
# start a new search query with the latest state that has been collected.
self._update_state(latest_cursor=latest_cursor)
# start a new search query with the latest id that has been collected.
max_last_id = self.get_max(max_last_id, last_id) if max_last_id else last_id
next_page_token = None

# Since Search stream does not have slices is safe to save the latest
# state as the initial sync date
self._update_state(latest_cursor=latest_cursor, is_last_record=True)
self._update_state(latest_cursor=self._init_sync, is_last_record=True)
# Always return an empty generator just in case no records were ever yielded
yield from []

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@


import logging
import random
from datetime import timedelta
from http import HTTPStatus
from unittest.mock import MagicMock
Expand Down Expand Up @@ -487,6 +488,75 @@ def test_search_based_stream_should_not_attempt_to_get_more_than_10k_records(req
assert len(records) == 11000
assert test_stream.state["updatedAt"] == test_stream._init_sync.to_iso8601_string()

def test_search_based_incremental_stream_should_sort_by_id(requests_mock, common_params, fake_properties_list):
"""
If there are more than 10,000 records that would be returned by the Hubspot search endpoint,
the CRMSearchStream instance should stop at the 10Kth record
"""
# Create test_stream instance with some state
test_stream = Companies(**common_params)
test_stream._init_sync = pendulum.parse("2022-02-24T16:43:11Z")
test_stream.state = {"updatedAt": "2022-01-24T16:43:11Z"}
test_stream.associations = []

def random_date(start, end):
return pendulum.from_timestamp(random.randint(start, end)/1000).to_iso8601_string()

after = 0

# Custom callback to mock search endpoint filter and sort behavior, returns 100 records per request.
# See _process_search in stream.py for details on the structure of the filter amd sort parameters.
# The generated records will have an id that is the sum of the current id and the current "after" value
# and the updatedAt field will be a random date between min_time and max_time.
# Store "after" value in the record to check if it resets after 10k records.
def custom_callback(request, context):
post_data = request.json() # Access JSON data from the request body
after = int(post_data.get("after", 0))
filters = post_data.get("filters", [])
min_time = int(filters[0].get("value", 0))
max_time = int(filters[1].get("value", 0))
id = int(filters[2].get("value", 0))
next = int(after) + 100
results = [
{
"id": f"{y + id}",
"updatedAt": random_date(min_time, max_time),
"after": after
} for y in range(int(after) + 1, next + 1)
]
context.status_code = 200
if ((id + next) < 11000):
return {"results": results, "paging": {"next": {"after": f"{next}"}}}
else:
return {"results": results, "paging": {}} # Last page

properties_response = [
{
"json": [],
"status_code": 200,
}
]

# Mocking Request
test_stream._sync_mode = SyncMode.incremental
requests_mock.register_uri("POST", test_stream.url, json=custom_callback)
# test_stream._sync_mode = None
requests_mock.register_uri("GET", "/properties/v2/company/properties", properties_response)
records, _ = read_incremental(test_stream, {})
# The stream should not attempt to get more than 10K records.
# Instead, it should use the new state to start a new search query.
assert len(records) == 11000
# Check that the records are sorted by id and that "after" resets after 10k records
assert records[0]["id"] == "1"
assert records[0]["after"] == 0
assert records[10000 - 1]["id"] == "10000"
assert records[10000 - 1]["after"] == 9900
assert records[10000]["id"] == "10001"
assert records[10000]["after"] == 0
assert records[-1]["id"] == "11000"
assert records[-1]["after"] == 900
assert test_stream.state["updatedAt"] == test_stream._init_sync.to_iso8601_string()


def test_engagements_stream_pagination_works(requests_mock, common_params):
"""
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/hubspot.md
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ The connector is restricted by normal HubSpot [rate limitations](https://legacyd

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 4.2.20 | 2024-09-23 | [42688](https://github.com/airbytehq/airbyte/pull/44899) | Fix incremental search to use primary key as placeholder instead of lastModifiedDate |
| 4.2.19 | 2024-09-14 | [45018](https://github.com/airbytehq/airbyte/pull/45018) | Update dependencies |
| 4.2.18 | 2024-08-24 | [43762](https://github.com/airbytehq/airbyte/pull/43762) | Update dependencies |
| 4.2.17 | 2024-08-21 | [44538](https://github.com/airbytehq/airbyte/pull/44538) | Fix issue with CRM search streams when they have no `associations` |
Expand Down

0 comments on commit 2291e73

Please sign in to comment.