Skip to content

Commit

Permalink
Enable queries using project slug as filter and groupby in Metrics API (
Browse files Browse the repository at this point in the history
  • Loading branch information
shellmayr committed Apr 18, 2024
1 parent 5f79cb2 commit 7780004
Show file tree
Hide file tree
Showing 12 changed files with 666 additions and 12 deletions.
18 changes: 14 additions & 4 deletions src/sentry/sentry_metrics/querying/data/api.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,31 @@
from collections.abc import Sequence
from datetime import datetime
from typing import cast

from snuba_sdk import MetricsQuery, MetricsScope, Rollup

from sentry import features
from sentry.models.environment import Environment
from sentry.models.organization import Organization
from sentry.models.project import Project
from sentry.sentry_metrics.querying.data.execution import QueryExecutor, QueryResult
from sentry.sentry_metrics.querying.data.execution import QueryExecutor
from sentry.sentry_metrics.querying.data.mapping.mapper import MapperConfig, Project2ProjectIDMapper
from sentry.sentry_metrics.querying.data.parsing import QueryParser
from sentry.sentry_metrics.querying.data.postprocessing.base import run_post_processing_steps
from sentry.sentry_metrics.querying.data.postprocessing.remapping import QueryRemappingStep
from sentry.sentry_metrics.querying.data.preparation.base import (
IntermediateQuery,
PreparationStep,
run_preparation_steps,
)
from sentry.sentry_metrics.querying.data.preparation.mapping import QueryMappingStep
from sentry.sentry_metrics.querying.data.preparation.units_normalization import (
UnitsNormalizationStep,
)
from sentry.sentry_metrics.querying.data.query import MQLQueriesResult, MQLQuery
from sentry.sentry_metrics.querying.types import QueryType

DEFAULT_MAPPINGS: MapperConfig = MapperConfig().add(Project2ProjectIDMapper)


def run_queries(
mql_queries: Sequence[MQLQuery],
Expand Down Expand Up @@ -62,12 +68,15 @@ def run_queries(
)
)

preparation_steps = []
preparation_steps: list[PreparationStep] = []

if features.has(
"organizations:ddm-metrics-api-unit-normalization", organization=organization, actor=None
):
preparation_steps.append(UnitsNormalizationStep())

preparation_steps.append(QueryMappingStep(projects, DEFAULT_MAPPINGS))

# We run a series of preparation steps which operate on the entire list of queries.
intermediate_queries = run_preparation_steps(intermediate_queries, *preparation_steps)

Expand All @@ -77,6 +86,7 @@ def run_queries(
executor.schedule(intermediate_query=intermediate_query, query_type=query_type)

results = executor.execute()
results = run_post_processing_steps(results, QueryRemappingStep(projects))

# We wrap the result in a class that exposes some utils methods to operate on results.
return MQLQueriesResult(cast(list[QueryResult], results))
return MQLQueriesResult(results)
29 changes: 24 additions & 5 deletions src/sentry/sentry_metrics/querying/data/execution.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from collections.abc import Mapping, Sequence
from dataclasses import dataclass, replace
from dataclasses import dataclass, field, replace
from datetime import datetime
from enum import Enum
from typing import Any, Union, cast
Expand All @@ -11,6 +11,7 @@
from sentry.models.organization import Organization
from sentry.models.project import Project
from sentry.sentry_metrics.querying.constants import SNUBA_QUERY_LIMIT
from sentry.sentry_metrics.querying.data.mapping.mapper import Mapper
from sentry.sentry_metrics.querying.data.preparation.base import IntermediateQuery
from sentry.sentry_metrics.querying.data.utils import adjust_time_bounds_with_interval
from sentry.sentry_metrics.querying.errors import (
Expand Down Expand Up @@ -145,6 +146,7 @@ class ScheduledQuery:
unit_family: UnitFamily | None = None
unit: MeasurementUnit | None = None
scaling_factor: float | None = None
mappers: list[Mapper] = field(default_factory=list)

def initialize(
self,
Expand Down Expand Up @@ -318,7 +320,7 @@ def _align_date_range(cls, metrics_query: MetricsQuery) -> tuple[MetricsQuery, i
return metrics_query, None


@dataclass(frozen=True)
@dataclass
class QueryResult:
"""
Represents the result of a ScheduledQuery containing its associated series and totals results.
Expand Down Expand Up @@ -445,12 +447,24 @@ def modified_end(self) -> datetime:

@property
def series(self) -> Sequence[Mapping[str, Any]]:
if "series" not in self.result:
return []
return self.result["series"]["data"]

@series.setter
def series(self, value: Sequence[Mapping[str, Any]]) -> None:
self.result["series"]["data"] = value

@property
def totals(self) -> Sequence[Mapping[str, Any]]:
if "totals" not in self.result:
return []
return self.result["totals"]["data"]

@totals.setter
def totals(self, value: Sequence[Mapping[str, Any]]) -> None:
self.result["totals"]["data"] = value

@property
def meta(self) -> Sequence[Mapping[str, str]]:
# By default, we extract the metadata from the totals query, if that is not there we extract from the series
Expand All @@ -464,7 +478,11 @@ def group_bys(self) -> list[str]:
# that we can correctly render groups in case they are not returned from the db because of missing data.
#
# Sorting of the groups is done to maintain consistency across function calls.
return sorted(UsedGroupBysVisitor().visit(self._any_query().metrics_query.query))
scheduled_query = self._any_query()
mappers = [mapper for mapper in scheduled_query.mappers if mapper.applied_on_groupby]
return sorted(
UsedGroupBysVisitor(mappers=mappers).visit(scheduled_query.metrics_query.query)
)

@property
def interval(self) -> int | None:
Expand Down Expand Up @@ -774,7 +792,7 @@ def _execution_loop(self):
while continue_execution:
continue_execution = self._bulk_execute()

def execute(self) -> Sequence[QueryResult]:
def execute(self) -> list[QueryResult]:
"""
Executes the scheduled queries in the execution loop.
Expand All @@ -798,7 +816,7 @@ def execute(self) -> Sequence[QueryResult]:
"Not all queries were executed in the execution loop"
)

return cast(Sequence[QueryResult], self._query_results)
return cast(list[QueryResult], self._query_results)

def schedule(self, intermediate_query: IntermediateQuery, query_type: QueryType):
"""
Expand All @@ -813,6 +831,7 @@ def schedule(self, intermediate_query: IntermediateQuery, query_type: QueryType)
unit_family=intermediate_query.unit_family,
unit=intermediate_query.unit,
scaling_factor=intermediate_query.scaling_factor,
mappers=intermediate_query.mappers,
)

# In case the user chooses to run also a series query, we will duplicate the query and chain it after totals.
Expand Down
Empty file.
94 changes: 94 additions & 0 deletions src/sentry/sentry_metrics/querying/data/mapping/mapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import abc
from collections.abc import Sequence
from typing import Any, TypeVar

from sentry.models.project import Project


class Mapper(abc.ABC):
from_key: str = ""
to_key: str = ""
applied_on_groupby: bool = False

def __init__(self):
# This exists to satisfy mypy, which complains otherwise
self.map: dict[Any, Any] = {}

def __hash__(self):
return hash((self.from_key, self.to_key))

@abc.abstractmethod
def forward(self, projects: Sequence[Project], value: Any) -> Any:
return value

@abc.abstractmethod
def backward(self, projects: Sequence[Project], value: Any) -> Any:
return value


TMapper = TypeVar("TMapper", bound=Mapper)


class MapperConfig:
def __init__(self):
self.mappers: set[type[Mapper]] = set()

def add(self, mapper: type[Mapper]) -> "MapperConfig":
self.mappers.add(mapper)
return self

def get(self, from_key: str | None = None, to_key: str | None = None) -> type[Mapper] | None:
for mapper in self.mappers:
if mapper.from_key == from_key:
return mapper
if mapper.to_key == to_key:
return mapper
return None


def get_or_create_mapper(
mapper_config: MapperConfig,
mappers: list[Mapper],
from_key: str | None = None,
to_key: str | None = None,
) -> Mapper | None:
# retrieve the mapper type that is applicable for the given key
mapper_class = mapper_config.get(from_key=from_key, to_key=to_key)
# check if a mapper of the type already exists
if mapper_class:
for mapper in mappers:
if mapper_class == type(mapper):
# if a mapper already exists, return the existing mapper
return mapper
else:
# if no mapper exists yet, instantiate the object and append it to the mappers list
mapper_instance = mapper_class()
mappers.append(mapper_instance)
return mapper_instance
else:
# if no mapper is configured for the key, return None
return None


class Project2ProjectIDMapper(Mapper):
from_key: str = "project"
to_key: str = "project_id"

def __init__(self):
super().__init__()

def forward(self, projects: Sequence[Project], value: str) -> int:
if value not in self.map:
self.map[value] = None
for project in projects:
if project.slug == value:
self.map[value] = project.id
return self.map[value]

def backward(self, projects: Sequence[Project], value: int) -> str:
if value not in self.map:
for project in projects:
if project.id == value:
self.map[value] = project.slug

return self.map[value]
Empty file.
37 changes: 37 additions & 0 deletions src/sentry/sentry_metrics/querying/data/postprocessing/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from abc import ABC, abstractmethod

from sentry.sentry_metrics.querying.data.execution import QueryResult


class PostProcessingStep(ABC):
"""
Represents an abstract step that post-processes a collection of QueryResult objects.
The post-processing of these objects might include transforming them or just obtaining some intermediate data that
is useful to compute other things before returning the results.
"""

@abstractmethod
def run(self, query_results: list[QueryResult]) -> list[QueryResult]:
"""
Runs the post-processing steps on a list of query results.
Returns:
A list of post-processed query results.
"""
raise NotImplementedError


def run_post_processing_steps(query_results: list[QueryResult], *steps) -> list[QueryResult]:
"""
Takes a series of query results and steps and runs the post-processing steps one after each other in order they are
supplied in.
Returns:
A list of query results after running the post-processing steps.
"""
for step in steps:
if isinstance(step, PostProcessingStep):
query_results = step.run(query_results=query_results)

return query_results
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from collections.abc import Mapping, Sequence
from copy import deepcopy
from typing import Any, cast

from sentry.models.project import Project
from sentry.sentry_metrics.querying.data.execution import QueryResult
from sentry.sentry_metrics.querying.data.mapping.mapper import Mapper
from sentry.sentry_metrics.querying.data.postprocessing.base import PostProcessingStep


class QueryRemappingStep(PostProcessingStep):
def __init__(self, projects: Sequence[Project]):
self.projects = projects

def run(self, query_results: list[QueryResult]) -> list[QueryResult]:
for query_result in query_results:
if (
query_result.totals is not None
and query_result.totals_query is not None
and len(query_result.totals) > 0
):
query_result.totals = self._unmap_data(
query_result.totals, query_result.totals_query.mappers
)
if (
query_result.series is not None
and query_result.series_query is not None
and len(query_result.series) > 0
):
query_result.series = self._unmap_data(
query_result.series, query_result.series_query.mappers
)

return query_results

def _unmap_data(
self, data: Sequence[Mapping[str, Any]], mappers: list[Mapper]
) -> Sequence[Mapping[str, Any]]:
unmapped_data: list[dict[str, Any]] = cast(list[dict[str, Any]], deepcopy(data))
for element in unmapped_data:
updated_element = dict()
keys_to_delete = []
for result_key in element.keys():
for mapper in mappers:
if mapper.to_key == result_key and mapper.applied_on_groupby:
original_value = mapper.backward(self.projects, element[result_key])
updated_element[mapper.from_key] = original_value
keys_to_delete.append(result_key)

for key in keys_to_delete:
del element[key]
element.update(updated_element)

return cast(Sequence[Mapping[str, Any]], unmapped_data)
4 changes: 3 additions & 1 deletion src/sentry/sentry_metrics/querying/data/preparation/base.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from dataclasses import dataclass, field

from snuba_sdk import MetricsQuery

from sentry.sentry_metrics.querying.data.mapping.mapper import Mapper
from sentry.sentry_metrics.querying.types import QueryOrder
from sentry.sentry_metrics.querying.units import MeasurementUnit, UnitFamily

Expand All @@ -27,6 +28,7 @@ class IntermediateQuery:
unit_family: UnitFamily | None = None
unit: MeasurementUnit | None = None
scaling_factor: float | None = None
mappers: list[Mapper] = field(default_factory=list)


class PreparationStep(ABC):
Expand Down
35 changes: 35 additions & 0 deletions src/sentry/sentry_metrics/querying/data/preparation/mapping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from collections.abc import Sequence
from dataclasses import replace

from sentry.models.project import Project
from sentry.sentry_metrics.querying.data.mapping.mapper import MapperConfig
from sentry.sentry_metrics.querying.data.preparation.base import IntermediateQuery, PreparationStep
from sentry.sentry_metrics.querying.visitors.query_expression import MapperVisitor


class QueryMappingStep(PreparationStep):
def __init__(self, projects: Sequence[Project], mapper_config: MapperConfig):
self.projects = projects
self.mapper_config = mapper_config

def _get_mapped_intermediate_query(
self, intermediate_query: IntermediateQuery
) -> IntermediateQuery:
visitor = MapperVisitor(self.projects, self.mapper_config)
mapped_query = visitor.visit(intermediate_query.metrics_query.query)

return replace(
intermediate_query,
metrics_query=intermediate_query.metrics_query.set_query(mapped_query),
mappers=visitor.mappers,
)

def run(self, intermediate_queries: list[IntermediateQuery]) -> list[IntermediateQuery]:
mapped_intermediate_queries = []

for intermediate_query in intermediate_queries:
mapped_intermediate_queries.append(
self._get_mapped_intermediate_query(intermediate_query)
)

return mapped_intermediate_queries
Loading

0 comments on commit 7780004

Please sign in to comment.