diff --git a/.secrets b/.secrets new file mode 120000 index 000000000000..b776cdb724f5 --- /dev/null +++ b/.secrets @@ -0,0 +1 @@ +airbyte-integrations/connectors/source-hubspot/.secrets \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-hubspot/metadata.yaml b/airbyte-integrations/connectors/source-hubspot/metadata.yaml index bd5c884497a6..17b78f9f89cb 100644 --- a/airbyte-integrations/connectors/source-hubspot/metadata.yaml +++ b/airbyte-integrations/connectors/source-hubspot/metadata.yaml @@ -10,7 +10,7 @@ data: connectorSubtype: api connectorType: source definitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c - dockerImageTag: 4.2.19 + dockerImageTag: 4.2.20 dockerRepository: airbyte/source-hubspot documentationUrl: https://docs.airbyte.com/integrations/sources/hubspot erdUrl: https://dbdocs.io/airbyteio/source-hubspot?view=relationships diff --git a/airbyte-integrations/connectors/source-hubspot/pyproject.toml b/airbyte-integrations/connectors/source-hubspot/pyproject.toml index 1273c68e265f..46d114b9f9e6 100644 --- a/airbyte-integrations/connectors/source-hubspot/pyproject.toml +++ b/airbyte-integrations/connectors/source-hubspot/pyproject.toml @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",] build-backend = "poetry.core.masonry.api" [tool.poetry] -version = "4.2.19" +version = "4.2.20" name = "source-hubspot" description = "Source implementation for HubSpot." authors = [ "Airbyte ",] diff --git a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py index 5b7772d88346..73b519359108 100644 --- a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py +++ b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py @@ -1128,13 +1128,31 @@ def _process_search( stream_slice: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, + last_id=None, ) -> Tuple[List, requests.Response]: stream_records = {} properties_list = list(self.properties.keys()) + if last_id == None: + last_id = 0 + # The search query below uses the following criteria: + # - Last modified >= timestemp of previous sync + # - Last modified <= timestamp of current sync to avoid open ended queries + # - Object primary key <= last_id with initial value 0, then max(last_id) returned from previous pagination loop + # - Sort results by primary key ASC + # Note: Although results return out of chronological order, sorting on primary key ensures retrieval of *all* records + # once the final pagination loop completes. This is preferable to sorting by a non-unique value, such as + # last modified date, which may result in an infinite loop in some edge cases. + key = self.primary_key + if key == "id": + key = "hs_object_id" payload = ( { - "filters": [{"value": int(self._state.timestamp() * 1000), "propertyName": self.last_modified_field, "operator": "GTE"}], - "sorts": [{"propertyName": self.last_modified_field, "direction": "ASCENDING"}], + "filters": [ + {"value": int(self._state.timestamp() * 1000), "propertyName": self.last_modified_field, "operator": "GTE"}, + {"value": int(self._init_sync.timestamp() * 1000), "propertyName": self.last_modified_field, "operator": "LTE"}, + {"value": last_id, "propertyName": key, "operator": "GTE"}, + ], + "sorts": [{"propertyName": key, "direction": "ASCENDING"}], "properties": properties_list, "limit": 100, } @@ -1168,6 +1186,16 @@ def _read_associations(self, records: Iterable) -> Iterable[Mapping[str, Any]]: current_record[_slice] = associations_list return records_by_pk.values() + def get_max(self, val1, val2): + try: + # Try to convert both values to integers + int_val1 = int(val1) + int_val2 = int(val2) + return max(int_val1, int_val2) + except ValueError: + # If conversion fails, fall back to string comparison + return max(str(val1), str(val2)) + def read_records( self, sync_mode: SyncMode, @@ -1178,14 +1206,13 @@ def read_records( stream_state = stream_state or {} pagination_complete = False next_page_token = None + last_id = None + max_last_id = None - latest_cursor = None while not pagination_complete: if self.state: records, raw_response = self._process_search( - next_page_token=next_page_token, - stream_state=stream_state, - stream_slice=stream_slice, + next_page_token=next_page_token, stream_state=stream_state, stream_slice=stream_slice, last_id=max_last_id ) if self.associations: records = self._read_associations(records) @@ -1200,8 +1227,7 @@ def read_records( records = self.record_unnester.unnest(records) for record in records: - cursor = self._field_to_datetime(record[self.updated_at_field]) - latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor + last_id = self.get_max(record[self.primary_key], last_id) if last_id else record[self.primary_key] yield record next_page_token = self.next_page_token(raw_response) @@ -1211,13 +1237,13 @@ def read_records( # Hubspot documentation states that the search endpoints are limited to 10,000 total results # for any given query. Attempting to page beyond 10,000 will result in a 400 error. # https://developers.hubspot.com/docs/api/crm/search. We stop getting data at 10,000 and - # start a new search query with the latest state that has been collected. - self._update_state(latest_cursor=latest_cursor) + # start a new search query with the latest id that has been collected. + max_last_id = self.get_max(max_last_id, last_id) if max_last_id else last_id next_page_token = None # Since Search stream does not have slices is safe to save the latest # state as the initial sync date - self._update_state(latest_cursor=latest_cursor, is_last_record=True) + self._update_state(latest_cursor=self._init_sync, is_last_record=True) # Always return an empty generator just in case no records were ever yielded yield from [] diff --git a/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py b/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py index 2c6609ca0498..d383065a1cf8 100644 --- a/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py @@ -4,6 +4,7 @@ import logging +import random from datetime import timedelta from http import HTTPStatus from unittest.mock import MagicMock @@ -487,6 +488,75 @@ def test_search_based_stream_should_not_attempt_to_get_more_than_10k_records(req assert len(records) == 11000 assert test_stream.state["updatedAt"] == test_stream._init_sync.to_iso8601_string() +def test_search_based_incremental_stream_should_sort_by_id(requests_mock, common_params, fake_properties_list): + """ + If there are more than 10,000 records that would be returned by the Hubspot search endpoint, + the CRMSearchStream instance should stop at the 10Kth record + """ + # Create test_stream instance with some state + test_stream = Companies(**common_params) + test_stream._init_sync = pendulum.parse("2022-02-24T16:43:11Z") + test_stream.state = {"updatedAt": "2022-01-24T16:43:11Z"} + test_stream.associations = [] + + def random_date(start, end): + return pendulum.from_timestamp(random.randint(start, end)/1000).to_iso8601_string() + + after = 0 + + # Custom callback to mock search endpoint filter and sort behavior, returns 100 records per request. + # See _process_search in stream.py for details on the structure of the filter amd sort parameters. + # The generated records will have an id that is the sum of the current id and the current "after" value + # and the updatedAt field will be a random date between min_time and max_time. + # Store "after" value in the record to check if it resets after 10k records. + def custom_callback(request, context): + post_data = request.json() # Access JSON data from the request body + after = int(post_data.get("after", 0)) + filters = post_data.get("filters", []) + min_time = int(filters[0].get("value", 0)) + max_time = int(filters[1].get("value", 0)) + id = int(filters[2].get("value", 0)) + next = int(after) + 100 + results = [ + { + "id": f"{y + id}", + "updatedAt": random_date(min_time, max_time), + "after": after + } for y in range(int(after) + 1, next + 1) + ] + context.status_code = 200 + if ((id + next) < 11000): + return {"results": results, "paging": {"next": {"after": f"{next}"}}} + else: + return {"results": results, "paging": {}} # Last page + + properties_response = [ + { + "json": [], + "status_code": 200, + } + ] + + # Mocking Request + test_stream._sync_mode = SyncMode.incremental + requests_mock.register_uri("POST", test_stream.url, json=custom_callback) + # test_stream._sync_mode = None + requests_mock.register_uri("GET", "/properties/v2/company/properties", properties_response) + records, _ = read_incremental(test_stream, {}) + # The stream should not attempt to get more than 10K records. + # Instead, it should use the new state to start a new search query. + assert len(records) == 11000 + # Check that the records are sorted by id and that "after" resets after 10k records + assert records[0]["id"] == "1" + assert records[0]["after"] == 0 + assert records[10000 - 1]["id"] == "10000" + assert records[10000 - 1]["after"] == 9900 + assert records[10000]["id"] == "10001" + assert records[10000]["after"] == 0 + assert records[-1]["id"] == "11000" + assert records[-1]["after"] == 900 + assert test_stream.state["updatedAt"] == test_stream._init_sync.to_iso8601_string() + def test_engagements_stream_pagination_works(requests_mock, common_params): """ diff --git a/docs/integrations/sources/hubspot.md b/docs/integrations/sources/hubspot.md index 98b5f610609a..bfd2a4c9ccc3 100644 --- a/docs/integrations/sources/hubspot.md +++ b/docs/integrations/sources/hubspot.md @@ -336,6 +336,7 @@ The connector is restricted by normal HubSpot [rate limitations](https://legacyd | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 4.2.20 | 2024-09-23 | [42688](https://github.com/airbytehq/airbyte/pull/44899) | Fix incremental search to use primary key as placeholder instead of lastModifiedDate | | 4.2.19 | 2024-09-14 | [45018](https://github.com/airbytehq/airbyte/pull/45018) | Update dependencies | | 4.2.18 | 2024-08-24 | [43762](https://github.com/airbytehq/airbyte/pull/43762) | Update dependencies | | 4.2.17 | 2024-08-21 | [44538](https://github.com/airbytehq/airbyte/pull/44538) | Fix issue with CRM search streams when they have no `associations` |