From fdb323ca2da10027b9cbd272b3dde4777993ed12 Mon Sep 17 00:00:00 2001 From: Jodi Jang Date: Mon, 17 Apr 2023 14:16:23 -0700 Subject: [PATCH 1/5] ref(escalating-issues): Update cron task to use nodestore WOR-2968 --- .../issues/escalating_group_forecast.py | 68 +++++++++++++++++++ .../tasks/weekly_escalating_forecast.py | 54 ++++++--------- .../tasks/test_weekly_escalating_forecast.py | 53 +++++++++------ 3 files changed, 122 insertions(+), 53 deletions(-) create mode 100644 src/sentry/issues/escalating_group_forecast.py diff --git a/src/sentry/issues/escalating_group_forecast.py b/src/sentry/issues/escalating_group_forecast.py new file mode 100644 index 00000000000000..c817353176a1a6 --- /dev/null +++ b/src/sentry/issues/escalating_group_forecast.py @@ -0,0 +1,68 @@ +from __future__ import annotations + +import hashlib +from dataclasses import dataclass +from datetime import datetime +from typing import List, Optional, TypedDict, cast + +from sentry import nodestore +from sentry.utils.dates import parse_timestamp + +TWO_WEEKS_IN_SECONDS = 1209600 + + +class EscalatingGroupForecastData(TypedDict): + project_id: int + group_id: int + forecast: List[int] + date_added: float + + +@dataclass(frozen=True) +class EscalatingGroupForecast: + """ + A class representing a group's escalating forecast. + """ + + project_id: int + group_id: int + forecast: List[int] + date_added: datetime + + def to_dict( + self, + ) -> EscalatingGroupForecastData: + return { + "project_id": self.project_id, + "group_id": self.group_id, + "forecast": self.forecast, + "date_added": self.date_added.timestamp(), + } + + @classmethod + def from_dict(cls, data: EscalatingGroupForecastData) -> EscalatingGroupForecast: + return cls( + data["project_id"], + data["group_id"], + data["forecast"], + cast(datetime, parse_timestamp(data["date_added"])), + ) + + @classmethod + def build_storage_identifier(cls, project_id: int, group_id: int) -> str: + identifier = hashlib.md5(f"{project_id}::{group_id}".encode()).hexdigest() + return f"e-g-f:{identifier}" + + def save(self) -> None: + nodestore.set( + self.build_storage_identifier(self.project_id, self.group_id), + self.to_dict(), + ttl=TWO_WEEKS_IN_SECONDS, + ) + + @classmethod + def fetch(cls, project_id: int, group_id: int) -> Optional[EscalatingGroupForecast]: + results = nodestore.get(cls.build_storage_identifier(project_id, group_id)) + if results: + return EscalatingGroupForecast.from_dict(results) + return None diff --git a/src/sentry/tasks/weekly_escalating_forecast.py b/src/sentry/tasks/weekly_escalating_forecast.py index 28e8827c0d6d52..5382f62f5e72e3 100644 --- a/src/sentry/tasks/weekly_escalating_forecast.py +++ b/src/sentry/tasks/weekly_escalating_forecast.py @@ -1,20 +1,16 @@ import logging -import math from datetime import datetime -from typing import Dict, List, Tuple, TypedDict +from typing import Dict, List, TypedDict -from django.db import transaction from sentry_sdk.crons.decorator import monitor from sentry.issues.escalating import GroupsCountResponse, query_groups_past_counts +from sentry.issues.escalating_group_forecast import EscalatingGroupForecast from sentry.issues.escalating_issues_alg import generate_issue_forecast from sentry.models import Group, GroupStatus from sentry.models.group import GroupSubStatus -from sentry.models.groupforecast import GroupForecast from sentry.tasks.base import instrumented_task -BATCH_SIZE = 500 - class GroupCount(TypedDict): intervals: List[str] @@ -53,28 +49,7 @@ def run_escalating_forecast() -> None: if not until_escalating_groups: return - response = query_groups_past_counts(until_escalating_groups) - group_counts = parse_groups_past_counts(response) - group_forecast_list = get_forecast_per_group(until_escalating_groups, group_counts) - - # Delete and bulk create GroupForecasts in batches - num_batches = math.ceil(len(group_forecast_list) / BATCH_SIZE) - start_index = 0 - for batch_num in range(1, num_batches + 1): - end_index = BATCH_SIZE * batch_num - group_forecast_batch = group_forecast_list[start_index:end_index] - with transaction.atomic(): - GroupForecast.objects.filter( - group__in=[group for group, forecast in group_forecast_batch] - ).delete() - - GroupForecast.objects.bulk_create( - [ - GroupForecast(group=group, forecast=forecast) - for group, forecast in group_forecast_batch - ] - ) - start_index = end_index + get_forecasts(until_escalating_groups) def parse_groups_past_counts(response: List[GroupsCountResponse]) -> ParsedGroupsCount: @@ -99,20 +74,31 @@ def parse_groups_past_counts(response: List[GroupsCountResponse]) -> ParsedGroup return group_counts -def get_forecast_per_group( +def save_forecast_per_group( until_escalating_groups: List[Group], group_counts: ParsedGroupsCount -) -> List[Tuple[Group, List[int]]]: +) -> None: """ - Returns a list of forecasted values for each group. + Saves the list of forecasted values for each group in nodestore. `until_escalating_groups`: List of archived until escalating groups to be forecasted `group_counts`: Parsed snuba response of group counts """ time = datetime.now() - group_forecast_list = [] group_dict = {group.id: group for group in until_escalating_groups} for group_id in group_counts.keys(): forecasts = generate_issue_forecast(group_counts[group_id], time) forecasts_list = [forecast["forecasted_value"] for forecast in forecasts] - group_forecast_list.append((group_dict[group_id], forecasts_list)) - return group_forecast_list + escalating_group_forecast = EscalatingGroupForecast( + group_dict[group_id].project.id, group_id, forecasts_list, datetime.now() + ) + escalating_group_forecast.save() + + +def get_forecasts(groups: List[Group]) -> None: + """ + Returns a list of forecasted values for each group. + `groups`: List of groups to be forecasted + """ + past_counts = query_groups_past_counts(groups) + group_counts = parse_groups_past_counts(past_counts) + save_forecast_per_group(groups, group_counts) diff --git a/tests/sentry/tasks/test_weekly_escalating_forecast.py b/tests/sentry/tasks/test_weekly_escalating_forecast.py index 4f0f31453081e3..ef32c44a340eaf 100644 --- a/tests/sentry/tasks/test_weekly_escalating_forecast.py +++ b/tests/sentry/tasks/test_weekly_escalating_forecast.py @@ -4,12 +4,14 @@ from unittest.mock import patch from sentry.issues.escalating import GroupsCountResponse +from sentry.issues.escalating_group_forecast import EscalatingGroupForecast from sentry.models.group import Group, GroupStatus, GroupSubStatus -from sentry.models.groupforecast import GroupForecast from sentry.models.project import Project from sentry.tasks.weekly_escalating_forecast import run_escalating_forecast from sentry.testutils.cases import APITestCase, SnubaTestCase +FORECAST_LIST_MOCK = [200] * 14 + class TestWeeklyEscalatingForecast(APITestCase, SnubaTestCase): def get_mock_groups_past_counts_response( @@ -52,6 +54,16 @@ def create_archived_until_escalating_groups(self, num_groups: int) -> List[Group group_list.append(group) return group_list + @patch("sentry.tasks.weekly_escalating_forecast.query_groups_past_counts") + def test_empty_escalating_forecast(self, mock_query_groups_past_counts): + group_list = self.create_archived_until_escalating_groups(num_groups=1) + + mock_query_groups_past_counts.return_value = {} + + run_escalating_forecast() + fetched_forecast = EscalatingGroupForecast.fetch(group_list[0].project.id, group_list[0].id) + assert fetched_forecast is None + @patch("sentry.tasks.weekly_escalating_forecast.query_groups_past_counts") def test_single_group_escalating_forecast(self, mock_query_groups_past_counts): group_list = self.create_archived_until_escalating_groups(num_groups=1) @@ -61,9 +73,10 @@ def test_single_group_escalating_forecast(self, mock_query_groups_past_counts): ) run_escalating_forecast() - group_forecast = GroupForecast.objects.all() - assert len(group_forecast) == 1 - assert group_forecast[0].group == group_list[0] + fetched_forecast = EscalatingGroupForecast.fetch(group_list[0].project.id, group_list[0].id) + assert fetched_forecast.project_id == group_list[0].project.id + assert fetched_forecast.group_id == group_list[0].id + assert fetched_forecast.forecast == FORECAST_LIST_MOCK @patch("sentry.tasks.weekly_escalating_forecast.query_groups_past_counts") def test_multiple_groups_escalating_forecast(self, mock_query_groups_past_counts): @@ -74,28 +87,30 @@ def test_multiple_groups_escalating_forecast(self, mock_query_groups_past_counts ) run_escalating_forecast() - group_forecast = GroupForecast.objects.all() - assert len(group_forecast) == 3 - for i in range(len(group_forecast)): - assert group_forecast[i].group in group_list + for i in range(len(group_list)): + fetched_forecast = EscalatingGroupForecast.fetch( + group_list[i].project.id, group_list[i].id + ) + assert fetched_forecast.project_id == group_list[i].project.id + assert fetched_forecast.group_id == group_list[i].id + assert fetched_forecast.forecast == FORECAST_LIST_MOCK @patch("sentry.tasks.weekly_escalating_forecast.query_groups_past_counts") - def test_no_duped_groups_escalating_forecast(self, mock_query_groups_past_counts): - group_list = self.create_archived_until_escalating_groups(num_groups=3) + def test_update_group_escalating_forecast(self, mock_query_groups_past_counts): + group_list = self.create_archived_until_escalating_groups(num_groups=1) mock_query_groups_past_counts.return_value = self.get_mock_groups_past_counts_response( num_days=7, num_hours=2, groups=group_list ) run_escalating_forecast() - group_forecast = GroupForecast.objects.all() - assert len(group_forecast) == 3 - for i in range(len(group_forecast)): - assert group_forecast[i].group in group_list + first_fetched_forecast = EscalatingGroupForecast.fetch( + group_list[0].project.id, group_list[0].id + ) - # Assert no duplicates when this is run twice + # Assert update when this is run twice run_escalating_forecast() - group_forecast = GroupForecast.objects.all() - assert len(group_forecast) == 3 - for i in range(len(group_forecast)): - assert group_forecast[i].group in group_list + second_fetched_forecast = EscalatingGroupForecast.fetch( + group_list[0].project.id, group_list[0].id + ) + assert first_fetched_forecast.date_added < second_fetched_forecast.date_added From 0975e83642ad559f49ea8071b355a020b3de752d Mon Sep 17 00:00:00 2001 From: Jodi Jang Date: Mon, 17 Apr 2023 14:46:54 -0700 Subject: [PATCH 2/5] fix: Change ttl to timedelta --- src/sentry/issues/escalating_group_forecast.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/sentry/issues/escalating_group_forecast.py b/src/sentry/issues/escalating_group_forecast.py index c817353176a1a6..706ae108e5205b 100644 --- a/src/sentry/issues/escalating_group_forecast.py +++ b/src/sentry/issues/escalating_group_forecast.py @@ -2,13 +2,13 @@ import hashlib from dataclasses import dataclass -from datetime import datetime +from datetime import datetime, timedelta from typing import List, Optional, TypedDict, cast from sentry import nodestore from sentry.utils.dates import parse_timestamp -TWO_WEEKS_IN_SECONDS = 1209600 +TWO_WEEKS_IN_DAYS = 14 class EscalatingGroupForecastData(TypedDict): @@ -57,7 +57,7 @@ def save(self) -> None: nodestore.set( self.build_storage_identifier(self.project_id, self.group_id), self.to_dict(), - ttl=TWO_WEEKS_IN_SECONDS, + ttl=timedelta(TWO_WEEKS_IN_DAYS), ) @classmethod From 2be9e7cccce4edab4b2df321bd4ae047ff12b790 Mon Sep 17 00:00:00 2001 From: Jodi Jang Date: Tue, 18 Apr 2023 09:50:44 -0700 Subject: [PATCH 3/5] ref: Address PR comments --- .../issues/escalating_group_forecast.py | 48 +++++++++++-------- .../tasks/test_weekly_escalating_forecast.py | 12 +++++ 2 files changed, 39 insertions(+), 21 deletions(-) diff --git a/src/sentry/issues/escalating_group_forecast.py b/src/sentry/issues/escalating_group_forecast.py index 706ae108e5205b..fda203cf02e4f7 100644 --- a/src/sentry/issues/escalating_group_forecast.py +++ b/src/sentry/issues/escalating_group_forecast.py @@ -1,3 +1,8 @@ +""" +This module represents a group's escalating forecast and has the logic to retrieve/store it in +Sentry's NodeStore. The forecasts are stored for 2 weeks. +""" + from __future__ import annotations import hashlib @@ -8,7 +13,7 @@ from sentry import nodestore from sentry.utils.dates import parse_timestamp -TWO_WEEKS_IN_DAYS = 14 +TWO_WEEKS_IN_DAYS_TTL = 14 class EscalatingGroupForecastData(TypedDict): @@ -21,7 +26,8 @@ class EscalatingGroupForecastData(TypedDict): @dataclass(frozen=True) class EscalatingGroupForecast: """ - A class representing a group's escalating forecast. + This class represents a group's escalating forecast and has the logic to retrieve/store it in + Sentry's NodeStore. """ project_id: int @@ -29,6 +35,25 @@ class EscalatingGroupForecast: forecast: List[int] date_added: datetime + def save(self) -> None: + nodestore.set( + self.build_storage_identifier(self.project_id, self.group_id), + self.to_dict(), + ttl=timedelta(TWO_WEEKS_IN_DAYS_TTL), + ) + + @classmethod + def fetch(cls, project_id: int, group_id: int) -> Optional[EscalatingGroupForecast]: + results = nodestore.get(cls.build_storage_identifier(project_id, group_id)) + if results: + return EscalatingGroupForecast.from_dict(results) + return None + + @classmethod + def build_storage_identifier(cls, project_id: int, group_id: int) -> str: + identifier = hashlib.md5(f"{project_id}::{group_id}".encode()).hexdigest() + return f"e-g-f:{identifier}" + def to_dict( self, ) -> EscalatingGroupForecastData: @@ -47,22 +72,3 @@ def from_dict(cls, data: EscalatingGroupForecastData) -> EscalatingGroupForecast data["forecast"], cast(datetime, parse_timestamp(data["date_added"])), ) - - @classmethod - def build_storage_identifier(cls, project_id: int, group_id: int) -> str: - identifier = hashlib.md5(f"{project_id}::{group_id}".encode()).hexdigest() - return f"e-g-f:{identifier}" - - def save(self) -> None: - nodestore.set( - self.build_storage_identifier(self.project_id, self.group_id), - self.to_dict(), - ttl=timedelta(TWO_WEEKS_IN_DAYS), - ) - - @classmethod - def fetch(cls, project_id: int, group_id: int) -> Optional[EscalatingGroupForecast]: - results = nodestore.get(cls.build_storage_identifier(project_id, group_id)) - if results: - return EscalatingGroupForecast.from_dict(results) - return None diff --git a/tests/sentry/tasks/test_weekly_escalating_forecast.py b/tests/sentry/tasks/test_weekly_escalating_forecast.py index ef32c44a340eaf..2f4a72fea03e7c 100644 --- a/tests/sentry/tasks/test_weekly_escalating_forecast.py +++ b/tests/sentry/tasks/test_weekly_escalating_forecast.py @@ -3,6 +3,8 @@ from typing import List from unittest.mock import patch +import pytz + from sentry.issues.escalating import GroupsCountResponse from sentry.issues.escalating_group_forecast import EscalatingGroupForecast from sentry.models.group import Group, GroupStatus, GroupSubStatus @@ -73,10 +75,15 @@ def test_single_group_escalating_forecast(self, mock_query_groups_past_counts): ) run_escalating_forecast() + approximate_date_added = datetime.now(pytz.utc) fetched_forecast = EscalatingGroupForecast.fetch(group_list[0].project.id, group_list[0].id) assert fetched_forecast.project_id == group_list[0].project.id assert fetched_forecast.group_id == group_list[0].id assert fetched_forecast.forecast == FORECAST_LIST_MOCK + assert fetched_forecast.date_added.replace( + second=0, microsecond=0 + ) == approximate_date_added.replace(second=0, microsecond=0) + assert fetched_forecast.date_added < approximate_date_added @patch("sentry.tasks.weekly_escalating_forecast.query_groups_past_counts") def test_multiple_groups_escalating_forecast(self, mock_query_groups_past_counts): @@ -87,6 +94,7 @@ def test_multiple_groups_escalating_forecast(self, mock_query_groups_past_counts ) run_escalating_forecast() + approximate_date_added = datetime.now(pytz.utc) for i in range(len(group_list)): fetched_forecast = EscalatingGroupForecast.fetch( group_list[i].project.id, group_list[i].id @@ -94,6 +102,10 @@ def test_multiple_groups_escalating_forecast(self, mock_query_groups_past_counts assert fetched_forecast.project_id == group_list[i].project.id assert fetched_forecast.group_id == group_list[i].id assert fetched_forecast.forecast == FORECAST_LIST_MOCK + assert fetched_forecast.date_added.replace( + second=0, microsecond=0 + ) == approximate_date_added.replace(second=0, microsecond=0) + assert fetched_forecast.date_added < approximate_date_added @patch("sentry.tasks.weekly_escalating_forecast.query_groups_past_counts") def test_update_group_escalating_forecast(self, mock_query_groups_past_counts): From c7e31b3e915d800528b959c4dbf0c2c83dc9ee9a Mon Sep 17 00:00:00 2001 From: Jodi Jang <116035587+jangjodi@users.noreply.github.com> Date: Tue, 18 Apr 2023 09:52:56 -0700 Subject: [PATCH 4/5] ref: Change variable to make it more clear Co-authored-by: Armen Zambrano G. --- tests/sentry/tasks/test_weekly_escalating_forecast.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/sentry/tasks/test_weekly_escalating_forecast.py b/tests/sentry/tasks/test_weekly_escalating_forecast.py index 2f4a72fea03e7c..715316b0bc8a23 100644 --- a/tests/sentry/tasks/test_weekly_escalating_forecast.py +++ b/tests/sentry/tasks/test_weekly_escalating_forecast.py @@ -12,7 +12,7 @@ from sentry.tasks.weekly_escalating_forecast import run_escalating_forecast from sentry.testutils.cases import APITestCase, SnubaTestCase -FORECAST_LIST_MOCK = [200] * 14 +DEFAULT_MINIMUM_CEILING_FORECAST = [200] * 14 class TestWeeklyEscalatingForecast(APITestCase, SnubaTestCase): From fe0bd02fd44da8367ee5727d6a39d5f6aa45a689 Mon Sep 17 00:00:00 2001 From: Jodi Jang Date: Tue, 18 Apr 2023 10:19:23 -0700 Subject: [PATCH 5/5] fix: Change variable name everywhere --- tests/sentry/tasks/test_weekly_escalating_forecast.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/sentry/tasks/test_weekly_escalating_forecast.py b/tests/sentry/tasks/test_weekly_escalating_forecast.py index 715316b0bc8a23..55d2d4fb5c9e9b 100644 --- a/tests/sentry/tasks/test_weekly_escalating_forecast.py +++ b/tests/sentry/tasks/test_weekly_escalating_forecast.py @@ -79,7 +79,7 @@ def test_single_group_escalating_forecast(self, mock_query_groups_past_counts): fetched_forecast = EscalatingGroupForecast.fetch(group_list[0].project.id, group_list[0].id) assert fetched_forecast.project_id == group_list[0].project.id assert fetched_forecast.group_id == group_list[0].id - assert fetched_forecast.forecast == FORECAST_LIST_MOCK + assert fetched_forecast.forecast == DEFAULT_MINIMUM_CEILING_FORECAST assert fetched_forecast.date_added.replace( second=0, microsecond=0 ) == approximate_date_added.replace(second=0, microsecond=0) @@ -101,7 +101,7 @@ def test_multiple_groups_escalating_forecast(self, mock_query_groups_past_counts ) assert fetched_forecast.project_id == group_list[i].project.id assert fetched_forecast.group_id == group_list[i].id - assert fetched_forecast.forecast == FORECAST_LIST_MOCK + assert fetched_forecast.forecast == DEFAULT_MINIMUM_CEILING_FORECAST assert fetched_forecast.date_added.replace( second=0, microsecond=0 ) == approximate_date_added.replace(second=0, microsecond=0)