Skip to content

Commit

Permalink
Merge branch 'main' into renovate/all
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-sanche committed Aug 7, 2024
2 parents 081a69e + 1500f70 commit 4974f65
Show file tree
Hide file tree
Showing 19 changed files with 1,359 additions and 115 deletions.
12 changes: 11 additions & 1 deletion google/cloud/datastore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@
from google.cloud.datastore.entity import Entity
from google.cloud.datastore.key import Key
from google.cloud.datastore.query import Query
from google.cloud.datastore.query_profile import ExplainOptions
from google.cloud.datastore.transaction import Transaction

__all__ = ["__version__", "Batch", "Client", "Entity", "Key", "Query", "Transaction"]
__all__ = [
"__version__",
"Batch",
"Client",
"Entity",
"Key",
"Query",
"ExplainOptions",
"Transaction",
]
99 changes: 67 additions & 32 deletions google/cloud/datastore/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,11 @@
from google.cloud.datastore import helpers
from google.cloud.datastore.query import _pb_from_query

from google.cloud.datastore.query_profile import ExplainMetrics
from google.cloud.datastore.query_profile import QueryExplainError

_NOT_FINISHED = query_pb2.QueryResultBatch.MoreResultsType.NOT_FINISHED
_NO_MORE_RESULTS = query_pb2.QueryResultBatch.MoreResultsType.NO_MORE_RESULTS

_FINISHED = (
_NO_MORE_RESULTS,
query_pb2.QueryResultBatch.MoreResultsType.MORE_RESULTS_AFTER_LIMIT,
query_pb2.QueryResultBatch.MoreResultsType.MORE_RESULTS_AFTER_CURSOR,
)
from google.cloud.datastore.query import _NOT_FINISHED
from google.cloud.datastore.query import _FINISHED


class BaseAggregation(ABC):
Expand Down Expand Up @@ -159,16 +155,25 @@ class AggregationQuery(object):
:type query: :class:`google.cloud.datastore.query.Query`
:param query: The query used for aggregations.
:type explain_options: :class:`~google.cloud.datastore.ExplainOptions`
:param explain_options: (Optional) Options to enable query profiling for
this query. When set, explain_metrics will be available on the iterator
returned by query.fetch().
If not passed, will use value from given query.
"""

def __init__(
self,
client,
query,
explain_options=None,
):
self._client = client
self._nested_query = query
self._aggregations = []
# fallback to query._explain_options if not set
self._explain_options = explain_options or query._explain_options

@property
def project(self):
Expand Down Expand Up @@ -391,6 +396,7 @@ def __init__(
self._read_time = read_time
self._limit = limit
# The attributes below will change over the life of the iterator.
self._explain_metrics = None
self._more_results = True

def _build_protobuf(self):
Expand Down Expand Up @@ -441,7 +447,6 @@ def _next_page(self):
if not self._more_results:
return None

query_pb = self._build_protobuf()
transaction_id, new_transaction_options = helpers.get_transaction_options(
self.client.current_transaction
)
Expand All @@ -466,38 +471,68 @@ def _next_page(self):
"project_id": self._aggregation_query.project,
"partition_id": partition_id,
"read_options": read_options,
"aggregation_query": query_pb,
"aggregation_query": self._build_protobuf(),
}
if self._aggregation_query._explain_options:
request[
"explain_options"
] = self._aggregation_query._explain_options._to_dict()
helpers.set_database_id_to_request(request, self.client.database)
response_pb = self.client._datastore_api.run_aggregation_query(
request=request,
**kwargs,
)

while response_pb.batch.more_results == _NOT_FINISHED:
# We haven't finished processing. A likely reason is we haven't
# skipped all of the results yet. Don't return any results.
# Instead, rerun query, adjusting offsets. Datastore doesn't process
# more than 1000 skipped results in a query.
old_query_pb = query_pb
query_pb = query_pb2.AggregationQuery()
query_pb._pb.CopyFrom(old_query_pb._pb) # copy for testability

request = {
"project_id": self._aggregation_query.project,
"partition_id": partition_id,
"read_options": read_options,
"aggregation_query": query_pb,
}
helpers.set_database_id_to_request(request, self.client.database)
response_pb = None

while response_pb is None or response_pb.batch.more_results == _NOT_FINISHED:
if response_pb is not None:
# We haven't finished processing. A likely reason is we haven't
# skipped all of the results yet. Don't return any results.
# Instead, rerun query, adjusting offsets. Datastore doesn't process
# more than 1000 skipped results in a query.
new_query_pb = query_pb2.AggregationQuery()
new_query_pb._pb.CopyFrom(
request["aggregation_query"]._pb
) # copy for testability
request["aggregation_query"] = new_query_pb

response_pb = self.client._datastore_api.run_aggregation_query(
request=request,
**kwargs,
request=request.copy(), **kwargs
)
# capture explain metrics if present in response
# should only be present in last response, and only if explain_options was set
if response_pb.explain_metrics:
self._explain_metrics = ExplainMetrics._from_pb(
response_pb.explain_metrics
)

item_pbs = self._process_query_results(response_pb)
return page_iterator.Page(self, item_pbs, self.item_to_value)

@property
def explain_metrics(self) -> ExplainMetrics:
"""
Get the metrics associated with the query execution.
Metrics are only available when explain_options is set on the query. If
ExplainOptions.analyze is False, only plan_summary is available. If it is
True, execution_stats is also available.
:rtype: :class:`~google.cloud.datastore.query_profile.ExplainMetrics`
:returns: The metrics associated with the query execution.
:raises: :class:`~google.cloud.datastore.query_profile.QueryExplainError`
if explain_metrics is not available on the query.
"""
if self._explain_metrics is not None:
return self._explain_metrics
elif self._aggregation_query._explain_options is None:
raise QueryExplainError("explain_options not set on query.")
elif self._aggregation_query._explain_options.analyze is False:
# we need to run the query to get the explain_metrics
# analyze=False only returns explain_metrics, no results
self._next_page()
if self._explain_metrics is not None:
return self._explain_metrics
raise QueryExplainError(
"explain_metrics not available until query is complete."
)


# pylint: disable=unused-argument
def _item_to_aggregation_result(iterator, pb):
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/datastore/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,7 @@ def do_something_with(entity):
kwargs["namespace"] = self.namespace
return Query(self, **kwargs)

def aggregation_query(self, query):
def aggregation_query(self, query, **kwargs):
"""Proxy to :class:`google.cloud.datastore.aggregation.AggregationQuery`.
Using aggregation_query to count over a query:
Expand Down Expand Up @@ -953,7 +953,7 @@ def do_something_with(entity):
:rtype: :class:`~google.cloud.datastore.aggregation.AggregationQuery`
:returns: An AggregationQuery object.
"""
return AggregationQuery(self, query)
return AggregationQuery(self, query, **kwargs)

def reserve_ids_sequential(self, complete_key, num_ids, retry=None, timeout=None):
"""Reserve a list of IDs sequentially from a complete key.
Expand Down
93 changes: 63 additions & 30 deletions google/cloud/datastore/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,21 @@
# limitations under the License.

"""Create / interact with Google Cloud Datastore queries."""

import base64
import warnings


from google.api_core import page_iterator
from google.cloud._helpers import _ensure_tuple_or_list


from google.cloud.datastore_v1.types import entity as entity_pb2
from google.cloud.datastore_v1.types import query as query_pb2
from google.cloud.datastore import helpers
from google.cloud.datastore.key import Key


from google.cloud.datastore.query_profile import ExplainMetrics
from google.cloud.datastore.query_profile import QueryExplainError

import abc
from abc import ABC

Expand All @@ -38,6 +39,7 @@
_NO_MORE_RESULTS,
query_pb2.QueryResultBatch.MoreResultsType.MORE_RESULTS_AFTER_LIMIT,
query_pb2.QueryResultBatch.MoreResultsType.MORE_RESULTS_AFTER_CURSOR,
query_pb2.QueryResultBatch.MoreResultsType.MORE_RESULTS_TYPE_UNSPECIFIED, # received when explain_options(analyze=False)
)

KEY_PROPERTY_NAME = "__key__"
Expand Down Expand Up @@ -176,6 +178,11 @@ class Query(object):
:type distinct_on: sequence of string
:param distinct_on: field names used to group query results.
:type explain_options: :class:`~google.cloud.datastore.ExplainOptions`
:param explain_options: (Optional) Options to enable query profiling for
this query. When set, explain_metrics will be available on the iterator
returned by query.fetch().
:raises: ValueError if ``project`` is not passed and no implicit
default is set.
"""
Expand Down Expand Up @@ -203,6 +210,7 @@ def __init__(
projection=(),
order=(),
distinct_on=(),
explain_options=None,
):
self._client = client
self._kind = kind
Expand All @@ -221,6 +229,7 @@ def __init__(
else:
self._namespace = None

self._explain_options = explain_options
self._ancestor = ancestor
self._filters = []

Expand Down Expand Up @@ -704,6 +713,7 @@ def __init__(
self._timeout = timeout
self._read_time = read_time
# The attributes below will change over the life of the iterator.
self._explain_metrics = None
self._more_results = True
self._skipped_results = 0

Expand Down Expand Up @@ -777,7 +787,6 @@ def _next_page(self):
if not self._more_results:
return None

query_pb = self._build_protobuf()
new_transaction_options = None
transaction_id, new_transaction_options = helpers.get_transaction_options(
self.client.current_transaction
Expand All @@ -804,46 +813,70 @@ def _next_page(self):
"project_id": self._query.project,
"partition_id": partition_id,
"read_options": read_options,
"query": query_pb,
"query": self._build_protobuf(),
}
if self._query._explain_options:
request["explain_options"] = self._query._explain_options._to_dict()

helpers.set_database_id_to_request(request, self.client.database)

response_pb = self.client._datastore_api.run_query(
request=request,
**kwargs,
)
response_pb = None

while (
while response_pb is None or (
response_pb.batch.more_results == _NOT_FINISHED
and response_pb.batch.skipped_results < query_pb.offset
and response_pb.batch.skipped_results < request["query"].offset
):
# We haven't finished processing. A likely reason is we haven't
# skipped all of the results yet. Don't return any results.
# Instead, rerun query, adjusting offsets. Datastore doesn't process
# more than 1000 skipped results in a query.
old_query_pb = query_pb
query_pb = query_pb2.Query()
query_pb._pb.CopyFrom(old_query_pb._pb) # copy for testability
query_pb.start_cursor = response_pb.batch.skipped_cursor
query_pb.offset -= response_pb.batch.skipped_results

request = {
"project_id": self._query.project,
"partition_id": partition_id,
"read_options": read_options,
"query": query_pb,
}
helpers.set_database_id_to_request(request, self.client.database)
if response_pb is not None:
# We haven't finished processing. A likely reason is we haven't
# skipped all of the results yet. Don't return any results.
# Instead, rerun query, adjusting offsets. Datastore doesn't process
# more than 1000 skipped results in a query.
new_query_pb = query_pb2.Query()
new_query_pb._pb.CopyFrom(request["query"]._pb) # copy for testability
new_query_pb.start_cursor = response_pb.batch.end_cursor
new_query_pb.offset -= response_pb.batch.skipped_results
request["query"] = new_query_pb

response_pb = self.client._datastore_api.run_query(
request=request,
**kwargs,
request=request.copy(), **kwargs
)
# capture explain metrics if present in response
# should only be present in last response, and only if explain_options was set
if response_pb and response_pb.explain_metrics:
self._explain_metrics = ExplainMetrics._from_pb(
response_pb.explain_metrics
)

entity_pbs = self._process_query_results(response_pb)
return page_iterator.Page(self, entity_pbs, self.item_to_value)

@property
def explain_metrics(self) -> ExplainMetrics:
"""
Get the metrics associated with the query execution.
Metrics are only available when explain_options is set on the query. If
ExplainOptions.analyze is False, only plan_summary is available. If it is
True, execution_stats is also available.
:rtype: :class:`~google.cloud.datastore.query_profile.ExplainMetrics`
:returns: The metrics associated with the query execution.
:raises: :class:`~google.cloud.datastore.query_profile.QueryExplainError`
if explain_metrics is not available on the query.
"""
if self._explain_metrics is not None:
return self._explain_metrics
elif self._query._explain_options is None:
raise QueryExplainError("explain_options not set on query.")
elif self._query._explain_options.analyze is False:
# we need to run the query to get the explain_metrics
# analyze=False only returns explain_metrics, no results
self._next_page()
if self._explain_metrics is not None:
return self._explain_metrics
raise QueryExplainError(
"explain_metrics not available until query is complete."
)


def _pb_from_query(query):
"""Convert a Query instance to the corresponding protobuf.
Expand Down
Loading

0 comments on commit 4974f65

Please sign in to comment.