Skip to content

Commit

Permalink
fix: use primary key instead of lastmodifieddate as a placeholder whe…
Browse files Browse the repository at this point in the history
…n iterating through search results to avoid infinite loop

fixes airbytehq/airbyte/#43317
  • Loading branch information
ehearty committed Aug 29, 2024
1 parent ed12124 commit 41a0622
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c
dockerImageTag: 4.2.14
dockerImageTag: 4.2.15
dockerRepository: airbyte/source-hubspot
documentationUrl: https://docs.airbyte.com/integrations/sources/hubspot
githubIssueLabel: source-hubspot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "4.2.14"
version = "4.2.15"
name = "source-hubspot"
description = "Source implementation for HubSpot."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1128,13 +1128,18 @@ def _process_search(
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
last_id=0,
) -> Tuple[List, requests.Response]:
stream_records = {}
properties_list = list(self.properties.keys())
payload = (
{
"filters": [{"value": int(self._state.timestamp() * 1000), "propertyName": self.last_modified_field, "operator": "GTE"}],
"sorts": [{"propertyName": self.last_modified_field, "direction": "ASCENDING"}],
"filters": [
{"value": int(self._state.timestamp() * 1000), "propertyName": self.last_modified_field, "operator": "GTE"},
{"value": int(self._init_sync.timestamp() * 1000), "propertyName": self.last_modified_field, "operator": "LTE"},
{"value": last_id, "propertyName": self.primary_key, "operator": "GTE"},
],
"sorts": [{"propertyName": self.primary_key, "direction": "ASCENDING"}],
"properties": properties_list,
"limit": 100,
}
Expand Down Expand Up @@ -1168,6 +1173,16 @@ def _read_associations(self, records: Iterable) -> Iterable[Mapping[str, Any]]:
current_record[_slice] = associations_list
return records_by_pk.values()

def get_max(self, val1, val2):
try:
# Try to convert both values to integers
int_val1 = int(val1)
int_val2 = int(val2)
return max(int_val1, int_val2)
except ValueError:
# If conversion fails, fall back to string comparison
return max(str(val1), str(val2))

def read_records(
self,
sync_mode: SyncMode,
Expand All @@ -1178,14 +1193,13 @@ def read_records(
stream_state = stream_state or {}
pagination_complete = False
next_page_token = None
last_id = None
max_last_id = None

latest_cursor = None
while not pagination_complete:
if self.state:
records, raw_response = self._process_search(
next_page_token=next_page_token,
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token, stream_state=stream_state, stream_slice=stream_slice, last_id=max_last_id
)
records = self._read_associations(records)
else:
Expand All @@ -1199,8 +1213,8 @@ def read_records(
records = self.record_unnester.unnest(records)

for record in records:
cursor = self._field_to_datetime(record[self.updated_at_field])
latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor
print(record)
last_id = self.get_max(record[self.primary_key], last_id) if last_id else record[self.primary_key]
yield record

next_page_token = self.next_page_token(raw_response)
Expand All @@ -1210,13 +1224,13 @@ def read_records(
# Hubspot documentation states that the search endpoints are limited to 10,000 total results
# for any given query. Attempting to page beyond 10,000 will result in a 400 error.
# https://developers.hubspot.com/docs/api/crm/search. We stop getting data at 10,000 and
# start a new search query with the latest state that has been collected.
self._update_state(latest_cursor=latest_cursor)
# start a new search query with the latest id that has been collected.
max_last_id = get_max(max_last_id, last_id) if max_last_id else last_id
next_page_token = None

# Since Search stream does not have slices is safe to save the latest
# state as the initial sync date
self._update_state(latest_cursor=latest_cursor, is_last_record=True)
self._update_state(latest_cursor=self._init_sync, is_last_record=True)
# Always return an empty generator just in case no records were ever yielded
yield from []

Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/hubspot.md
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ The connector is restricted by normal HubSpot [rate limitations](https://legacyd

| Version | Date | Pull Request | Subject |
|:--------|:-----------| :------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 4.2.15 | 2024-08-29 | [42688](https://github.com/airbytehq/airbyte/pull/44899) | Fix incremental search to use primary key as placeholder instead of lastModifiedDate |
| 4.2.14 | 2024-07-27 | [42688](https://github.com/airbytehq/airbyte/pull/42688) | Update dependencies |
| 4.2.13 | 2024-07-20 | [42264](https://github.com/airbytehq/airbyte/pull/42264) | Update dependencies |
| 4.2.12 | 2024-07-13 | [41766](https://github.com/airbytehq/airbyte/pull/41766) | Update dependencies |
Expand Down

0 comments on commit 41a0622

Please sign in to comment.