diff --git a/src/sentry/replays/scripts/delete_replays.py b/src/sentry/replays/scripts/delete_replays.py index 773b6d733cefc..0926480409e2c 100644 --- a/src/sentry/replays/scripts/delete_replays.py +++ b/src/sentry/replays/scripts/delete_replays.py @@ -3,7 +3,7 @@ import contextlib import logging from collections.abc import Sequence -from datetime import datetime +from datetime import datetime, timezone from sentry.api.event_search import SearchFilter, parse_search_query from sentry.models.organization import Organization @@ -28,6 +28,9 @@ def delete_replays( search_filters = translate_cli_tags_param_to_snuba_tag_param(tags) offset = 0 + start_utc = start_utc.replace(tzinfo=timezone.utc) + end_utc = end_utc.replace(tzinfo=timezone.utc) + has_more = True while has_more: response = query_replays_collection_paginated( diff --git a/src/sentry/replays/usecases/query/__init__.py b/src/sentry/replays/usecases/query/__init__.py index 6d86a1a81e325..5b1bbe2edaf5a 100644 --- a/src/sentry/replays/usecases/query/__init__.py +++ b/src/sentry/replays/usecases/query/__init__.py @@ -41,6 +41,7 @@ from sentry.models.organization import Organization from sentry.replays.lib.new_query.errors import CouldNotParseValue, OperatorNotSupported from sentry.replays.lib.new_query.fields import ColumnField, ExpressionField, FieldProtocol +from sentry.replays.usecases.query.errors import RetryAggregated from sentry.replays.usecases.query.fields import ComputedField, TagField from sentry.utils.snuba import RateLimitExceeded, raw_snql_query @@ -339,9 +340,9 @@ def _query_using_scalar_strategy( period_start: datetime, period_stop: datetime, ): - if not can_scalar_search_subquery(search_filters) or not sort_is_scalar_compatible( - sort or DEFAULT_SORT_FIELD - ): + can_scalar_search = can_scalar_search_subquery(search_filters, period_start) + can_scalar_sort = sort_is_scalar_compatible(sort or DEFAULT_SORT_FIELD) + if not can_scalar_search or not can_scalar_sort: return _query_using_aggregated_strategy( search_filters, sort, @@ -357,8 +358,17 @@ def _query_using_scalar_strategy( # To fix this issue remove the ability to search against "varying" columns and apply a # "segment_id = 0" condition to the WHERE clause. - where = handle_search_filters(scalar_search_config, search_filters) - orderby = handle_ordering(agg_sort_config, sort or "-" + DEFAULT_SORT_FIELD) + try: + where = handle_search_filters(scalar_search_config, search_filters) + orderby = handle_ordering(agg_sort_config, sort or "-" + DEFAULT_SORT_FIELD) + except RetryAggregated: + return _query_using_aggregated_strategy( + search_filters, + sort, + project_ids, + period_start, + period_stop, + ) query = Query( match=Entity("replays"), diff --git a/src/sentry/replays/usecases/query/conditions/__init__.py b/src/sentry/replays/usecases/query/conditions/__init__.py index 42168dee31ecb..c4ca15cd99723 100644 --- a/src/sentry/replays/usecases/query/conditions/__init__.py +++ b/src/sentry/replays/usecases/query/conditions/__init__.py @@ -16,9 +16,9 @@ "SumOfRageClickSelectorComposite", "SumOfStringArray", "SumOfStringScalar", - "SumOfTagScalar", + "SumOfTagAggregate", "SumOfUUIDArray", - "TagScalar", + "TagAggregate", ] @@ -44,4 +44,4 @@ SumOfDeadClickSelectorComposite, SumOfRageClickSelectorComposite, ) -from .tags import SumOfTagScalar, TagScalar +from .tags import SumOfTagAggregate, TagAggregate diff --git a/src/sentry/replays/usecases/query/conditions/tags.py b/src/sentry/replays/usecases/query/conditions/tags.py index 52e504cd96a97..b2807720fc8da 100644 --- a/src/sentry/replays/usecases/query/conditions/tags.py +++ b/src/sentry/replays/usecases/query/conditions/tags.py @@ -4,11 +4,47 @@ from snuba_sdk.expressions import Expression from sentry.replays.lib.new_query.conditions import GenericBase -from sentry.replays.lib.new_query.utils import contains, does_not_contain +from sentry.replays.lib.new_query.utils import ( + contains, + does_not_contain, + translate_condition_to_function, +) +from sentry.replays.usecases.query.errors import RetryAggregated class TagScalar(GenericBase): - """Tag scalar condition class.""" + @staticmethod + def visit_eq(expression_name: str, value: str) -> Condition: + hashed_needle = Function("cityHash64", parameters=[f"{expression_name}={value}"]) + expression = Function("has", parameters=[Column("_tags_hash_map"), hashed_needle]) + return Condition(expression, Op.EQ, 1) + + @staticmethod + def visit_in(expression_name: str, value: list[str]) -> Condition: + expressions = [ + translate_condition_to_function(TagScalar.visit_eq(expression_name, v)) for v in value + ] + return Condition(Function("or", parameters=expressions), Op.EQ, 1) + + @staticmethod + def visit_neq(expression_name: str, value: str) -> Condition: + raise RetryAggregated + + @staticmethod + def visit_not_in(expression_name: str, value: list[str]) -> Condition: + raise RetryAggregated + + @staticmethod + def visit_match(expression_name: str, value: str) -> Condition: + raise RetryAggregated + + @staticmethod + def visit_not_match(expression_name: str, value: str) -> Condition: + raise RetryAggregated + + +class TagAggregate(GenericBase): + """Tag aggregate condition class.""" @staticmethod def visit_eq(expression_name: str, value: str) -> Condition: @@ -35,30 +71,30 @@ def visit_not_match(expression_name: str, value: str) -> Condition: return Condition(_match_key_value_wildcard(expression_name, value), Op.EQ, 0) -class SumOfTagScalar(GenericBase): +class SumOfTagAggregate(GenericBase): @staticmethod def visit_eq(expression: Expression, value: str) -> Condition: - return contains(TagScalar.visit_eq(expression, value)) + return contains(TagAggregate.visit_eq(expression, value)) @staticmethod def visit_neq(expression: Expression, value: str) -> Condition: - return does_not_contain(TagScalar.visit_eq(expression, value)) + return does_not_contain(TagAggregate.visit_eq(expression, value)) @staticmethod def visit_match(expression: Expression, value: str) -> Condition: - return contains(TagScalar.visit_match(expression, value)) + return contains(TagAggregate.visit_match(expression, value)) @staticmethod def visit_not_match(expression: Expression, value: str) -> Condition: - return does_not_contain(TagScalar.visit_match(expression, value)) + return does_not_contain(TagAggregate.visit_match(expression, value)) @staticmethod def visit_in(expression: Expression, value: list[str]) -> Condition: - return contains(TagScalar.visit_in(expression, value)) + return contains(TagAggregate.visit_in(expression, value)) @staticmethod def visit_not_in(expression: Expression, value: list[str]) -> Condition: - return does_not_contain(TagScalar.visit_in(expression, value)) + return does_not_contain(TagAggregate.visit_in(expression, value)) def _match_key_value_exact(key: str, value: str) -> Function: diff --git a/src/sentry/replays/usecases/query/configs/aggregate.py b/src/sentry/replays/usecases/query/configs/aggregate.py index e22170a072ea1..740a9e333528e 100644 --- a/src/sentry/replays/usecases/query/configs/aggregate.py +++ b/src/sentry/replays/usecases/query/configs/aggregate.py @@ -42,6 +42,7 @@ ) from sentry.replays.usecases.query.conditions.aggregate import SumOfUUIDScalar from sentry.replays.usecases.query.conditions.event_ids import SumOfErrorIdScalar, SumOfInfoIdScalar +from sentry.replays.usecases.query.conditions.tags import SumOfTagAggregate from sentry.replays.usecases.query.fields import ComputedField, TagField @@ -154,4 +155,4 @@ def array_string_field(column_name: str) -> StringColumnField: # Field-names which could not be found in the set are tag-keys and will, by default, look for # the `*` key to find their search instructions. If this is not defined an error is returned. -search_config["*"] = TagField() +search_config["*"] = TagField(query=SumOfTagAggregate) diff --git a/src/sentry/replays/usecases/query/configs/scalar.py b/src/sentry/replays/usecases/query/configs/scalar.py index f8e2c1c0d6615..efb3a022589e0 100644 --- a/src/sentry/replays/usecases/query/configs/scalar.py +++ b/src/sentry/replays/usecases/query/configs/scalar.py @@ -1,7 +1,9 @@ """Scalar query filtering configuration module.""" + from __future__ import annotations from collections.abc import Sequence +from datetime import datetime, timezone from sentry.api.event_search import ParenExpression, SearchFilter from sentry.replays.lib.new_query.conditions import ( @@ -20,7 +22,9 @@ RageClickSelectorComposite, ) from sentry.replays.usecases.query.conditions.event_ids import ErrorIdScalar -from sentry.replays.usecases.query.fields import ComputedField +from sentry.replays.usecases.query.conditions.tags import TagScalar +from sentry.replays.usecases.query.configs.aggregate import search_config as aggregate_search_config +from sentry.replays.usecases.query.fields import ComputedField, TagField def string_field(column_name: str) -> StringColumnField: @@ -71,6 +75,7 @@ def string_field(column_name: str) -> StringColumnField: varying_search_config["trace"] = varying_search_config["trace_ids"] varying_search_config["url"] = varying_search_config["urls"] varying_search_config["user.ip"] = varying_search_config["user.ip_address"] +varying_search_config["*"] = TagField(query=TagScalar) # Click Search Config @@ -98,6 +103,7 @@ def string_field(column_name: str) -> StringColumnField: def can_scalar_search_subquery( search_filters: Sequence[ParenExpression | SearchFilter | str], + started_at: datetime, ) -> bool: """Return "True" if a scalar event search can be performed.""" has_seen_varying_field = False @@ -109,7 +115,7 @@ def can_scalar_search_subquery( # ParenExpressions are recursive. So we recursively call our own function and return early # if any of the fields fail. elif isinstance(search_filter, ParenExpression): - is_ok = can_scalar_search_subquery(search_filter.children) + is_ok = can_scalar_search_subquery(search_filter.children, started_at) if not is_ok: return False else: @@ -117,7 +123,18 @@ def can_scalar_search_subquery( # If the search-filter does not exist in either configuration then return false. if name not in static_search_config and name not in varying_search_config: - return False + # If the field is not a tag or the query's start period is greater than the + # period when the new field was introduced then we can not apply the + # optimization. + # + # TODO(cmanallen): Remove date condition after 90 days (~12/17/2024). + if name in aggregate_search_config or started_at < datetime( + 2024, 9, 17, tzinfo=timezone.utc + ): + return False + else: + has_seen_varying_field = True + continue if name in varying_search_config: # If a varying field has been seen before then we can't use a row-based sub-query. We diff --git a/src/sentry/replays/usecases/query/errors.py b/src/sentry/replays/usecases/query/errors.py new file mode 100644 index 0000000000000..16d2c5121d7e6 --- /dev/null +++ b/src/sentry/replays/usecases/query/errors.py @@ -0,0 +1,4 @@ +class RetryAggregated(Exception): + """Raised when a query can only be executed by an aggregate.""" + + ... diff --git a/src/sentry/replays/usecases/query/fields.py b/src/sentry/replays/usecases/query/fields.py index 804079ab99300..a494e90e0b483 100644 --- a/src/sentry/replays/usecases/query/fields.py +++ b/src/sentry/replays/usecases/query/fields.py @@ -9,8 +9,8 @@ from sentry.api.event_search import SearchFilter from sentry.replays.lib.new_query.errors import OperatorNotSupported from sentry.replays.lib.new_query.parsers import parse_str -from sentry.replays.usecases.query.conditions import SumOfTagScalar from sentry.replays.usecases.query.conditions.base import ComputedBase +from sentry.replays.usecases.query.conditions.tags import SumOfTagAggregate, TagScalar T = TypeVar("T") @@ -95,9 +95,9 @@ def _apply_scalar(self, operator: str, value: T) -> Condition: class TagField: - def __init__(self) -> None: + def __init__(self, query: type[SumOfTagAggregate] | type[TagScalar]) -> None: self.parse = parse_str - self.query = SumOfTagScalar + self.query = query def apply(self, search_filter: SearchFilter) -> Condition: """Apply a search operation against any named expression. diff --git a/tests/sentry/replays/test_organization_replay_index.py b/tests/sentry/replays/test_organization_replay_index.py index 9300426f027b2..65eff73a5c08d 100644 --- a/tests/sentry/replays/test_organization_replay_index.py +++ b/tests/sentry/replays/test_organization_replay_index.py @@ -1762,6 +1762,52 @@ def test_query_scalar_optimization_multiple_varying(self): response_data = response.json() assert len(response_data["data"]) == 1 + def test_query_scalar_optimization_varying_with_tags(self): + project = self.create_project(teams=[self.team]) + + replay1_id = uuid.uuid4().hex + seq1_timestamp = datetime.datetime.now() - datetime.timedelta(seconds=22) + seq2_timestamp = datetime.datetime.now() - datetime.timedelta(seconds=5) + + self.store_replays( + mock_replay(seq1_timestamp, project.id, replay1_id, tags={"something": "else"}) + ) + self.store_replays(mock_replay(seq2_timestamp, project.id, replay1_id)) + + with self.feature(self.features): + # EQ and IN supported. + response = self.client.get(self.url + "?field=id&query=something:else&statsPeriod=1d") + assert response.status_code == 200 + assert response.headers["X-Data-Source"] == "scalar-subquery" + + response = self.client.get( + self.url + "?field=id&query=something:else,other&statsPeriod=1d" + ) + assert response.status_code == 200 + assert response.headers["X-Data-Source"] == "scalar-subquery" + + # Not operators are not supported. + response = self.client.get(self.url + "?field=id&query=!something:else&statsPeriod=1d") + assert response.status_code == 200 + assert response.headers["X-Data-Source"] == "aggregated-subquery" + + response = self.client.get( + self.url + "?field=id&query=!something:else,other&statsPeriod=1d" + ) + assert response.status_code == 200 + assert response.headers["X-Data-Source"] == "aggregated-subquery" + + # Match not supported. + response = self.client.get(self.url + "?field=id&query=something:*else*&statsPeriod=1d") + assert response.status_code == 200 + assert response.headers["X-Data-Source"] == "aggregated-subquery" + + response = self.client.get( + self.url + "?field=id&query=!something:*else*&statsPeriod=1d" + ) + assert response.status_code == 200 + assert response.headers["X-Data-Source"] == "aggregated-subquery" + def test_get_replays_missing_segment_0(self): """Test fetching replays when the 0th segment is missing.""" project = self.create_project(teams=[self.team])