Skip to content

Commit

Permalink
feat(replays): Add optimized tag search (#77595)
Browse files Browse the repository at this point in the history
Tags search is expensive from a bytes and rows scanned perspective. This
optimization makes it so that single tag searches can be run quickly and
efficiently.

Depends on: getsentry/snuba#6308 and
getsentry/snuba#6309
Related: #76289
  • Loading branch information
cmanallen committed Sep 20, 2024
1 parent 378e765 commit 3f1ca71
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 25 deletions.
5 changes: 4 additions & 1 deletion src/sentry/replays/scripts/delete_replays.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import contextlib
import logging
from collections.abc import Sequence
from datetime import datetime
from datetime import datetime, timezone

from sentry.api.event_search import SearchFilter, parse_search_query
from sentry.models.organization import Organization
Expand All @@ -28,6 +28,9 @@ def delete_replays(
search_filters = translate_cli_tags_param_to_snuba_tag_param(tags)
offset = 0

start_utc = start_utc.replace(tzinfo=timezone.utc)
end_utc = end_utc.replace(tzinfo=timezone.utc)

has_more = True
while has_more:
response = query_replays_collection_paginated(
Expand Down
20 changes: 15 additions & 5 deletions src/sentry/replays/usecases/query/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from sentry.models.organization import Organization
from sentry.replays.lib.new_query.errors import CouldNotParseValue, OperatorNotSupported
from sentry.replays.lib.new_query.fields import ColumnField, ExpressionField, FieldProtocol
from sentry.replays.usecases.query.errors import RetryAggregated
from sentry.replays.usecases.query.fields import ComputedField, TagField
from sentry.utils.snuba import RateLimitExceeded, raw_snql_query

Expand Down Expand Up @@ -339,9 +340,9 @@ def _query_using_scalar_strategy(
period_start: datetime,
period_stop: datetime,
):
if not can_scalar_search_subquery(search_filters) or not sort_is_scalar_compatible(
sort or DEFAULT_SORT_FIELD
):
can_scalar_search = can_scalar_search_subquery(search_filters, period_start)
can_scalar_sort = sort_is_scalar_compatible(sort or DEFAULT_SORT_FIELD)
if not can_scalar_search or not can_scalar_sort:
return _query_using_aggregated_strategy(
search_filters,
sort,
Expand All @@ -357,8 +358,17 @@ def _query_using_scalar_strategy(
# To fix this issue remove the ability to search against "varying" columns and apply a
# "segment_id = 0" condition to the WHERE clause.

where = handle_search_filters(scalar_search_config, search_filters)
orderby = handle_ordering(agg_sort_config, sort or "-" + DEFAULT_SORT_FIELD)
try:
where = handle_search_filters(scalar_search_config, search_filters)
orderby = handle_ordering(agg_sort_config, sort or "-" + DEFAULT_SORT_FIELD)
except RetryAggregated:
return _query_using_aggregated_strategy(
search_filters,
sort,
project_ids,
period_start,
period_stop,
)

query = Query(
match=Entity("replays"),
Expand Down
6 changes: 3 additions & 3 deletions src/sentry/replays/usecases/query/conditions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
"SumOfRageClickSelectorComposite",
"SumOfStringArray",
"SumOfStringScalar",
"SumOfTagScalar",
"SumOfTagAggregate",
"SumOfUUIDArray",
"TagScalar",
"TagAggregate",
]


Expand All @@ -44,4 +44,4 @@
SumOfDeadClickSelectorComposite,
SumOfRageClickSelectorComposite,
)
from .tags import SumOfTagScalar, TagScalar
from .tags import SumOfTagAggregate, TagAggregate
54 changes: 45 additions & 9 deletions src/sentry/replays/usecases/query/conditions/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,47 @@
from snuba_sdk.expressions import Expression

from sentry.replays.lib.new_query.conditions import GenericBase
from sentry.replays.lib.new_query.utils import contains, does_not_contain
from sentry.replays.lib.new_query.utils import (
contains,
does_not_contain,
translate_condition_to_function,
)
from sentry.replays.usecases.query.errors import RetryAggregated


class TagScalar(GenericBase):
"""Tag scalar condition class."""
@staticmethod
def visit_eq(expression_name: str, value: str) -> Condition:
hashed_needle = Function("cityHash64", parameters=[f"{expression_name}={value}"])
expression = Function("has", parameters=[Column("_tags_hash_map"), hashed_needle])
return Condition(expression, Op.EQ, 1)

@staticmethod
def visit_in(expression_name: str, value: list[str]) -> Condition:
expressions = [
translate_condition_to_function(TagScalar.visit_eq(expression_name, v)) for v in value
]
return Condition(Function("or", parameters=expressions), Op.EQ, 1)

@staticmethod
def visit_neq(expression_name: str, value: str) -> Condition:
raise RetryAggregated

@staticmethod
def visit_not_in(expression_name: str, value: list[str]) -> Condition:
raise RetryAggregated

@staticmethod
def visit_match(expression_name: str, value: str) -> Condition:
raise RetryAggregated

@staticmethod
def visit_not_match(expression_name: str, value: str) -> Condition:
raise RetryAggregated


class TagAggregate(GenericBase):
"""Tag aggregate condition class."""

@staticmethod
def visit_eq(expression_name: str, value: str) -> Condition:
Expand All @@ -35,30 +71,30 @@ def visit_not_match(expression_name: str, value: str) -> Condition:
return Condition(_match_key_value_wildcard(expression_name, value), Op.EQ, 0)


class SumOfTagScalar(GenericBase):
class SumOfTagAggregate(GenericBase):
@staticmethod
def visit_eq(expression: Expression, value: str) -> Condition:
return contains(TagScalar.visit_eq(expression, value))
return contains(TagAggregate.visit_eq(expression, value))

@staticmethod
def visit_neq(expression: Expression, value: str) -> Condition:
return does_not_contain(TagScalar.visit_eq(expression, value))
return does_not_contain(TagAggregate.visit_eq(expression, value))

@staticmethod
def visit_match(expression: Expression, value: str) -> Condition:
return contains(TagScalar.visit_match(expression, value))
return contains(TagAggregate.visit_match(expression, value))

@staticmethod
def visit_not_match(expression: Expression, value: str) -> Condition:
return does_not_contain(TagScalar.visit_match(expression, value))
return does_not_contain(TagAggregate.visit_match(expression, value))

@staticmethod
def visit_in(expression: Expression, value: list[str]) -> Condition:
return contains(TagScalar.visit_in(expression, value))
return contains(TagAggregate.visit_in(expression, value))

@staticmethod
def visit_not_in(expression: Expression, value: list[str]) -> Condition:
return does_not_contain(TagScalar.visit_in(expression, value))
return does_not_contain(TagAggregate.visit_in(expression, value))


def _match_key_value_exact(key: str, value: str) -> Function:
Expand Down
3 changes: 2 additions & 1 deletion src/sentry/replays/usecases/query/configs/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
)
from sentry.replays.usecases.query.conditions.aggregate import SumOfUUIDScalar
from sentry.replays.usecases.query.conditions.event_ids import SumOfErrorIdScalar, SumOfInfoIdScalar
from sentry.replays.usecases.query.conditions.tags import SumOfTagAggregate
from sentry.replays.usecases.query.fields import ComputedField, TagField


Expand Down Expand Up @@ -154,4 +155,4 @@ def array_string_field(column_name: str) -> StringColumnField:

# Field-names which could not be found in the set are tag-keys and will, by default, look for
# the `*` key to find their search instructions. If this is not defined an error is returned.
search_config["*"] = TagField()
search_config["*"] = TagField(query=SumOfTagAggregate)
23 changes: 20 additions & 3 deletions src/sentry/replays/usecases/query/configs/scalar.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""Scalar query filtering configuration module."""

from __future__ import annotations

from collections.abc import Sequence
from datetime import datetime, timezone

from sentry.api.event_search import ParenExpression, SearchFilter
from sentry.replays.lib.new_query.conditions import (
Expand All @@ -20,7 +22,9 @@
RageClickSelectorComposite,
)
from sentry.replays.usecases.query.conditions.event_ids import ErrorIdScalar
from sentry.replays.usecases.query.fields import ComputedField
from sentry.replays.usecases.query.conditions.tags import TagScalar
from sentry.replays.usecases.query.configs.aggregate import search_config as aggregate_search_config
from sentry.replays.usecases.query.fields import ComputedField, TagField


def string_field(column_name: str) -> StringColumnField:
Expand Down Expand Up @@ -71,6 +75,7 @@ def string_field(column_name: str) -> StringColumnField:
varying_search_config["trace"] = varying_search_config["trace_ids"]
varying_search_config["url"] = varying_search_config["urls"]
varying_search_config["user.ip"] = varying_search_config["user.ip_address"]
varying_search_config["*"] = TagField(query=TagScalar)


# Click Search Config
Expand Down Expand Up @@ -98,6 +103,7 @@ def string_field(column_name: str) -> StringColumnField:

def can_scalar_search_subquery(
search_filters: Sequence[ParenExpression | SearchFilter | str],
started_at: datetime,
) -> bool:
"""Return "True" if a scalar event search can be performed."""
has_seen_varying_field = False
Expand All @@ -109,15 +115,26 @@ def can_scalar_search_subquery(
# ParenExpressions are recursive. So we recursively call our own function and return early
# if any of the fields fail.
elif isinstance(search_filter, ParenExpression):
is_ok = can_scalar_search_subquery(search_filter.children)
is_ok = can_scalar_search_subquery(search_filter.children, started_at)
if not is_ok:
return False
else:
name = search_filter.key.name

# If the search-filter does not exist in either configuration then return false.
if name not in static_search_config and name not in varying_search_config:
return False
# If the field is not a tag or the query's start period is greater than the
# period when the new field was introduced then we can not apply the
# optimization.
#
# TODO(cmanallen): Remove date condition after 90 days (~12/17/2024).
if name in aggregate_search_config or started_at < datetime(
2024, 9, 17, tzinfo=timezone.utc
):
return False
else:
has_seen_varying_field = True
continue

if name in varying_search_config:
# If a varying field has been seen before then we can't use a row-based sub-query. We
Expand Down
4 changes: 4 additions & 0 deletions src/sentry/replays/usecases/query/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
class RetryAggregated(Exception):
"""Raised when a query can only be executed by an aggregate."""

...
6 changes: 3 additions & 3 deletions src/sentry/replays/usecases/query/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from sentry.api.event_search import SearchFilter
from sentry.replays.lib.new_query.errors import OperatorNotSupported
from sentry.replays.lib.new_query.parsers import parse_str
from sentry.replays.usecases.query.conditions import SumOfTagScalar
from sentry.replays.usecases.query.conditions.base import ComputedBase
from sentry.replays.usecases.query.conditions.tags import SumOfTagAggregate, TagScalar

T = TypeVar("T")

Expand Down Expand Up @@ -95,9 +95,9 @@ def _apply_scalar(self, operator: str, value: T) -> Condition:


class TagField:
def __init__(self) -> None:
def __init__(self, query: type[SumOfTagAggregate] | type[TagScalar]) -> None:
self.parse = parse_str
self.query = SumOfTagScalar
self.query = query

def apply(self, search_filter: SearchFilter) -> Condition:
"""Apply a search operation against any named expression.
Expand Down
46 changes: 46 additions & 0 deletions tests/sentry/replays/test_organization_replay_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -1762,6 +1762,52 @@ def test_query_scalar_optimization_multiple_varying(self):
response_data = response.json()
assert len(response_data["data"]) == 1

def test_query_scalar_optimization_varying_with_tags(self):
project = self.create_project(teams=[self.team])

replay1_id = uuid.uuid4().hex
seq1_timestamp = datetime.datetime.now() - datetime.timedelta(seconds=22)
seq2_timestamp = datetime.datetime.now() - datetime.timedelta(seconds=5)

self.store_replays(
mock_replay(seq1_timestamp, project.id, replay1_id, tags={"something": "else"})
)
self.store_replays(mock_replay(seq2_timestamp, project.id, replay1_id))

with self.feature(self.features):
# EQ and IN supported.
response = self.client.get(self.url + "?field=id&query=something:else&statsPeriod=1d")
assert response.status_code == 200
assert response.headers["X-Data-Source"] == "scalar-subquery"

response = self.client.get(
self.url + "?field=id&query=something:else,other&statsPeriod=1d"
)
assert response.status_code == 200
assert response.headers["X-Data-Source"] == "scalar-subquery"

# Not operators are not supported.
response = self.client.get(self.url + "?field=id&query=!something:else&statsPeriod=1d")
assert response.status_code == 200
assert response.headers["X-Data-Source"] == "aggregated-subquery"

response = self.client.get(
self.url + "?field=id&query=!something:else,other&statsPeriod=1d"
)
assert response.status_code == 200
assert response.headers["X-Data-Source"] == "aggregated-subquery"

# Match not supported.
response = self.client.get(self.url + "?field=id&query=something:*else*&statsPeriod=1d")
assert response.status_code == 200
assert response.headers["X-Data-Source"] == "aggregated-subquery"

response = self.client.get(
self.url + "?field=id&query=!something:*else*&statsPeriod=1d"
)
assert response.status_code == 200
assert response.headers["X-Data-Source"] == "aggregated-subquery"

def test_get_replays_missing_segment_0(self):
"""Test fetching replays when the 0th segment is missing."""
project = self.create_project(teams=[self.team])
Expand Down

0 comments on commit 3f1ca71

Please sign in to comment.