From b135b38703be391ec68a841a8d9d11635742cf89 Mon Sep 17 00:00:00 2001 From: Matthew Elwell Date: Thu, 6 Jun 2024 13:03:43 +0100 Subject: [PATCH] infra: run influxdb feature evaluation in thread (#4125) --- api/app_analytics/views.py | 6 +++- api/task_processor/decorators.py | 3 +- .../test_unit_app_analytics_track.py | 35 +++++++++++++++++++ .../test_unit_app_analytics_views.py | 19 ++++++---- 4 files changed, 54 insertions(+), 9 deletions(-) diff --git a/api/app_analytics/views.py b/api/app_analytics/views.py index b619c89729ff..d89b76e3fb97 100644 --- a/api/app_analytics/views.py +++ b/api/app_analytics/views.py @@ -150,7 +150,11 @@ def post(self, request, *args, **kwargs): ) ) elif settings.INFLUXDB_TOKEN: - track_feature_evaluation_influxdb.delay( + # Due to load issues on the task processor, we + # explicitly run this task in a separate thread. + # TODO: batch influx data to prevent large amounts + # of tasks. + track_feature_evaluation_influxdb.run_in_thread( args=( request.environment.id, request.data, diff --git a/api/task_processor/decorators.py b/api/task_processor/decorators.py index 793875c99a47..1399b5344608 100644 --- a/api/task_processor/decorators.py +++ b/api/task_processor/decorators.py @@ -99,9 +99,10 @@ def delay( def run_in_thread( self, *, - args: tuple[typing.Any] = (), + args: tuple[typing.Any, ...] = (), kwargs: dict[str, typing.Any] | None = None, ) -> None: + kwargs = kwargs or {} _validate_inputs(*args, **kwargs) thread = Thread(target=self.unwrapped, args=args, kwargs=kwargs, daemon=True) diff --git a/api/tests/unit/app_analytics/test_unit_app_analytics_track.py b/api/tests/unit/app_analytics/test_unit_app_analytics_track.py index aa8c8ea67ff6..624f2977a882 100644 --- a/api/tests/unit/app_analytics/test_unit_app_analytics_track.py +++ b/api/tests/unit/app_analytics/test_unit_app_analytics_track.py @@ -2,9 +2,11 @@ import pytest from app_analytics.track import ( + track_feature_evaluation_influxdb, track_request_googleanalytics, track_request_influxdb, ) +from pytest_mock import MockerFixture @pytest.mark.parametrize( @@ -129,3 +131,36 @@ def test_track_request_does_not_send_data_to_influxdb_for_not_tracked_uris( # Then MockInfluxDBWrapper.assert_not_called() + + +def test_track_feature_evaluation_influxdb(mocker: MockerFixture) -> None: + # Given + mock_influxdb_wrapper = mock.MagicMock() + mocker.patch( + "app_analytics.track.InfluxDBWrapper", return_value=mock_influxdb_wrapper + ) + + data = { + "foo": 12, + "bar": 19, + "baz": 44, + } + environment_id = 1 + + # When + track_feature_evaluation_influxdb( + environment_id=environment_id, feature_evaluations=data + ) + + # Then + calls = mock_influxdb_wrapper.add_data_point.call_args_list + assert len(calls) == 3 + for i, feature_name in enumerate(data): + assert calls[i].args[0] == "request_count" + assert calls[i].args[1] == data[feature_name] + assert calls[i].kwargs["tags"] == { + "environment_id": environment_id, + "feature_id": feature_name, + } + + mock_influxdb_wrapper.write.assert_called_once_with() diff --git a/api/tests/unit/app_analytics/test_unit_app_analytics_views.py b/api/tests/unit/app_analytics/test_unit_app_analytics_views.py index 97a87503c4ca..a9276fa868ea 100644 --- a/api/tests/unit/app_analytics/test_unit_app_analytics_views.py +++ b/api/tests/unit/app_analytics/test_unit_app_analytics_views.py @@ -70,7 +70,9 @@ def test_sdk_analytics_allows_valid_data(mocker, settings, environment, feature) # Then assert response.status_code == status.HTTP_200_OK - mocked_track_feature_eval.delay.assert_called_once_with(args=(environment.id, data)) + mocked_track_feature_eval.run_in_thread.assert_called_once_with( + args=(environment.id, data) + ) def test_get_usage_data(mocker, admin_client, organisation): @@ -445,8 +447,10 @@ def test_set_sdk_analytics_flags_v1_to_influxdb( api_client.credentials(HTTP_X_ENVIRONMENT_KEY=environment.api_key) feature_request_count = 2 data = {feature.name: feature_request_count} - mock = mocker.patch("app_analytics.track.InfluxDBWrapper") - add_data_point_mock = mock.return_value.add_data_point + + mocked_track_feature_eval = mocker.patch( + "app_analytics.views.track_feature_evaluation_influxdb" + ) # When response = api_client.post( @@ -455,8 +459,9 @@ def test_set_sdk_analytics_flags_v1_to_influxdb( # Then assert response.status_code == status.HTTP_200_OK - add_data_point_mock.assert_called_with( - "request_count", - feature_request_count, - tags={"feature_id": feature.name, "environment_id": environment.id}, + mocked_track_feature_eval.run_in_thread.assert_called_once_with( + args=( + environment.id, + data, + ) )