Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(escalating-issues): Update cron task to use nodestore #47496

Merged
merged 6 commits into from
Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions src/sentry/issues/escalating_group_forecast.py
jangjodi marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from __future__ import annotations

import hashlib
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List, Optional, TypedDict, cast

from sentry import nodestore
from sentry.utils.dates import parse_timestamp

TWO_WEEKS_IN_DAYS = 14
jangjodi marked this conversation as resolved.
Show resolved Hide resolved


class EscalatingGroupForecastData(TypedDict):
project_id: int
group_id: int
forecast: List[int]
date_added: float


@dataclass(frozen=True)
class EscalatingGroupForecast:
"""
A class representing a group's escalating forecast.
jangjodi marked this conversation as resolved.
Show resolved Hide resolved
"""

project_id: int
group_id: int
forecast: List[int]
date_added: datetime

def to_dict(
self,
) -> EscalatingGroupForecastData:
return {
"project_id": self.project_id,
"group_id": self.group_id,
"forecast": self.forecast,
"date_added": self.date_added.timestamp(),
}

@classmethod
def from_dict(cls, data: EscalatingGroupForecastData) -> EscalatingGroupForecast:
return cls(
data["project_id"],
data["group_id"],
data["forecast"],
cast(datetime, parse_timestamp(data["date_added"])),
)

@classmethod
def build_storage_identifier(cls, project_id: int, group_id: int) -> str:
identifier = hashlib.md5(f"{project_id}::{group_id}".encode()).hexdigest()
return f"e-g-f:{identifier}"

def save(self) -> None:
jangjodi marked this conversation as resolved.
Show resolved Hide resolved
nodestore.set(
self.build_storage_identifier(self.project_id, self.group_id),
self.to_dict(),
ttl=timedelta(TWO_WEEKS_IN_DAYS),
)

@classmethod
def fetch(cls, project_id: int, group_id: int) -> Optional[EscalatingGroupForecast]:
results = nodestore.get(cls.build_storage_identifier(project_id, group_id))
if results:
return EscalatingGroupForecast.from_dict(results)
return None
54 changes: 20 additions & 34 deletions src/sentry/tasks/weekly_escalating_forecast.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
import logging
import math
from datetime import datetime
from typing import Dict, List, Tuple, TypedDict
from typing import Dict, List, TypedDict

from django.db import transaction
from sentry_sdk.crons.decorator import monitor

from sentry.issues.escalating import GroupsCountResponse, query_groups_past_counts
from sentry.issues.escalating_group_forecast import EscalatingGroupForecast
from sentry.issues.escalating_issues_alg import generate_issue_forecast
from sentry.models import Group, GroupStatus
from sentry.models.group import GroupSubStatus
from sentry.models.groupforecast import GroupForecast
from sentry.tasks.base import instrumented_task

BATCH_SIZE = 500


class GroupCount(TypedDict):
intervals: List[str]
Expand Down Expand Up @@ -53,28 +49,7 @@ def run_escalating_forecast() -> None:
if not until_escalating_groups:
return

response = query_groups_past_counts(until_escalating_groups)
group_counts = parse_groups_past_counts(response)
group_forecast_list = get_forecast_per_group(until_escalating_groups, group_counts)

# Delete and bulk create GroupForecasts in batches
num_batches = math.ceil(len(group_forecast_list) / BATCH_SIZE)
start_index = 0
for batch_num in range(1, num_batches + 1):
end_index = BATCH_SIZE * batch_num
group_forecast_batch = group_forecast_list[start_index:end_index]
with transaction.atomic():
GroupForecast.objects.filter(
group__in=[group for group, forecast in group_forecast_batch]
).delete()

GroupForecast.objects.bulk_create(
[
GroupForecast(group=group, forecast=forecast)
for group, forecast in group_forecast_batch
]
)
start_index = end_index
get_forecasts(until_escalating_groups)


def parse_groups_past_counts(response: List[GroupsCountResponse]) -> ParsedGroupsCount:
Expand All @@ -99,20 +74,31 @@ def parse_groups_past_counts(response: List[GroupsCountResponse]) -> ParsedGroup
return group_counts


def get_forecast_per_group(
def save_forecast_per_group(
until_escalating_groups: List[Group], group_counts: ParsedGroupsCount
) -> List[Tuple[Group, List[int]]]:
) -> None:
"""
Returns a list of forecasted values for each group.
Saves the list of forecasted values for each group in nodestore.

`until_escalating_groups`: List of archived until escalating groups to be forecasted
`group_counts`: Parsed snuba response of group counts
"""
time = datetime.now()
group_forecast_list = []
group_dict = {group.id: group for group in until_escalating_groups}
for group_id in group_counts.keys():
forecasts = generate_issue_forecast(group_counts[group_id], time)
forecasts_list = [forecast["forecasted_value"] for forecast in forecasts]
group_forecast_list.append((group_dict[group_id], forecasts_list))
return group_forecast_list
escalating_group_forecast = EscalatingGroupForecast(
group_dict[group_id].project.id, group_id, forecasts_list, datetime.now()
)
escalating_group_forecast.save()


def get_forecasts(groups: List[Group]) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function will be called by Snigdha's PR, thus, I would like to decouple it from the weekly task module.

Could you please open a new PR moving this function and save_forecast_per_group to src/sentry/issues/escalating.py? (I believe that's a good centralized place) or even src/sentry/issues/forecasts.py if it makes more sense. Feel free to propose a better location.

Copy link
Member Author

@jangjodi jangjodi Apr 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the suggestion, here's the PR

"""
Returns a list of forecasted values for each group.
`groups`: List of groups to be forecasted
"""
past_counts = query_groups_past_counts(groups)
group_counts = parse_groups_past_counts(past_counts)
save_forecast_per_group(groups, group_counts)
53 changes: 34 additions & 19 deletions tests/sentry/tasks/test_weekly_escalating_forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
from unittest.mock import patch

from sentry.issues.escalating import GroupsCountResponse
from sentry.issues.escalating_group_forecast import EscalatingGroupForecast
from sentry.models.group import Group, GroupStatus, GroupSubStatus
from sentry.models.groupforecast import GroupForecast
from sentry.models.project import Project
from sentry.tasks.weekly_escalating_forecast import run_escalating_forecast
from sentry.testutils.cases import APITestCase, SnubaTestCase

FORECAST_LIST_MOCK = [200] * 14
jangjodi marked this conversation as resolved.
Show resolved Hide resolved


class TestWeeklyEscalatingForecast(APITestCase, SnubaTestCase):
def get_mock_groups_past_counts_response(
Expand Down Expand Up @@ -52,6 +54,16 @@ def create_archived_until_escalating_groups(self, num_groups: int) -> List[Group
group_list.append(group)
return group_list

@patch("sentry.tasks.weekly_escalating_forecast.query_groups_past_counts")
def test_empty_escalating_forecast(self, mock_query_groups_past_counts):
group_list = self.create_archived_until_escalating_groups(num_groups=1)

mock_query_groups_past_counts.return_value = {}

run_escalating_forecast()
fetched_forecast = EscalatingGroupForecast.fetch(group_list[0].project.id, group_list[0].id)
assert fetched_forecast is None

@patch("sentry.tasks.weekly_escalating_forecast.query_groups_past_counts")
def test_single_group_escalating_forecast(self, mock_query_groups_past_counts):
group_list = self.create_archived_until_escalating_groups(num_groups=1)
Expand All @@ -61,9 +73,10 @@ def test_single_group_escalating_forecast(self, mock_query_groups_past_counts):
)

run_escalating_forecast()
group_forecast = GroupForecast.objects.all()
assert len(group_forecast) == 1
assert group_forecast[0].group == group_list[0]
fetched_forecast = EscalatingGroupForecast.fetch(group_list[0].project.id, group_list[0].id)
jangjodi marked this conversation as resolved.
Show resolved Hide resolved
assert fetched_forecast.project_id == group_list[0].project.id
assert fetched_forecast.group_id == group_list[0].id
jangjodi marked this conversation as resolved.
Show resolved Hide resolved
assert fetched_forecast.forecast == FORECAST_LIST_MOCK
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm all confused. Why is the forecast 14 times the number 200?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The forecast is [200] * 14 because I didn't add a lot of data from the past week in the tests, and it defaults to 200 when this happens (see here). I figured the tests for the algorithm itself should suffice to actually test the forecast values

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. This is low end forecast. Thank you for clarifying.


@patch("sentry.tasks.weekly_escalating_forecast.query_groups_past_counts")
def test_multiple_groups_escalating_forecast(self, mock_query_groups_past_counts):
Expand All @@ -74,28 +87,30 @@ def test_multiple_groups_escalating_forecast(self, mock_query_groups_past_counts
)

run_escalating_forecast()
group_forecast = GroupForecast.objects.all()
assert len(group_forecast) == 3
for i in range(len(group_forecast)):
assert group_forecast[i].group in group_list
for i in range(len(group_list)):
fetched_forecast = EscalatingGroupForecast.fetch(
group_list[i].project.id, group_list[i].id
)
assert fetched_forecast.project_id == group_list[i].project.id
assert fetched_forecast.group_id == group_list[i].id
assert fetched_forecast.forecast == FORECAST_LIST_MOCK

@patch("sentry.tasks.weekly_escalating_forecast.query_groups_past_counts")
def test_no_duped_groups_escalating_forecast(self, mock_query_groups_past_counts):
group_list = self.create_archived_until_escalating_groups(num_groups=3)
def test_update_group_escalating_forecast(self, mock_query_groups_past_counts):
group_list = self.create_archived_until_escalating_groups(num_groups=1)

mock_query_groups_past_counts.return_value = self.get_mock_groups_past_counts_response(
num_days=7, num_hours=2, groups=group_list
)

run_escalating_forecast()
group_forecast = GroupForecast.objects.all()
assert len(group_forecast) == 3
for i in range(len(group_forecast)):
assert group_forecast[i].group in group_list
first_fetched_forecast = EscalatingGroupForecast.fetch(
group_list[0].project.id, group_list[0].id
)

# Assert no duplicates when this is run twice
# Assert update when this is run twice
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably fine, however, we should have it in mind if re-runs of the tasks happened often and we wanted the re-runs to be faster.

run_escalating_forecast()
group_forecast = GroupForecast.objects.all()
assert len(group_forecast) == 3
for i in range(len(group_forecast)):
assert group_forecast[i].group in group_list
second_fetched_forecast = EscalatingGroupForecast.fetch(
group_list[0].project.id, group_list[0].id
)
assert first_fetched_forecast.date_added < second_fetched_forecast.date_added