-
Notifications
You must be signed in to change notification settings - Fork 45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: implement query profiling #542
Changes from 35 commits
7f46c52
e66c57f
1f1e8e1
1af61d1
ac11cd2
730411b
5823dbc
aa9f334
5cdba98
0f971c6
387b01b
ddec61d
cca1f00
cd97827
96d6d52
3cf9d8a
4c24b84
73ad241
686e722
4ebf0de
f2de107
0abea2f
d4f4475
fc6187e
eced67d
730e7cb
ab8d7a2
e02b20d
02afe92
5dcf063
5cf1f88
51ffa9e
c6df706
7d2e597
e9e72b6
083e604
58c3c57
c77f299
7d802e3
5a644ab
b2a9a77
e67ca72
4319ac7
175c76a
c5d8a76
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,15 +23,11 @@ | |
from google.cloud.datastore import helpers | ||
from google.cloud.datastore.query import _pb_from_query | ||
|
||
from google.cloud.datastore.query_profile import ExplainMetrics | ||
from google.cloud.datastore.query_profile import QueryExplainError | ||
|
||
_NOT_FINISHED = query_pb2.QueryResultBatch.MoreResultsType.NOT_FINISHED | ||
_NO_MORE_RESULTS = query_pb2.QueryResultBatch.MoreResultsType.NO_MORE_RESULTS | ||
|
||
_FINISHED = ( | ||
_NO_MORE_RESULTS, | ||
query_pb2.QueryResultBatch.MoreResultsType.MORE_RESULTS_AFTER_LIMIT, | ||
query_pb2.QueryResultBatch.MoreResultsType.MORE_RESULTS_AFTER_CURSOR, | ||
) | ||
from google.cloud.datastore.query import _NOT_FINISHED | ||
from google.cloud.datastore.query import _FINISHED | ||
|
||
|
||
class BaseAggregation(ABC): | ||
|
@@ -159,16 +155,25 @@ class AggregationQuery(object): | |
|
||
:type query: :class:`google.cloud.datastore.query.Query` | ||
:param query: The query used for aggregations. | ||
|
||
:type explain_options: :class:`~google.cloud.datastore.ExplainOptions` | ||
:param explain_options: (Optional) Options to enable query profiling for | ||
this query. When set, explain_metrics will be available on the iterator | ||
returned by query.fetch(). | ||
If not passed, will use value from given query. | ||
""" | ||
|
||
def __init__( | ||
self, | ||
client, | ||
query, | ||
explain_options=None, | ||
): | ||
self._client = client | ||
self._nested_query = query | ||
self._aggregations = [] | ||
# fallback to query._explain_options if not set | ||
self._explain_options = explain_options or query._explain_options | ||
|
||
@property | ||
def project(self): | ||
|
@@ -391,6 +396,7 @@ def __init__( | |
self._read_time = read_time | ||
self._limit = limit | ||
# The attributes below will change over the life of the iterator. | ||
self._explain_metrics = None | ||
self._more_results = True | ||
|
||
def _build_protobuf(self): | ||
|
@@ -441,7 +447,6 @@ def _next_page(self): | |
if not self._more_results: | ||
return None | ||
|
||
query_pb = self._build_protobuf() | ||
transaction_id, new_transaction_options = helpers.get_transaction_options( | ||
self.client.current_transaction | ||
) | ||
|
@@ -466,38 +471,67 @@ def _next_page(self): | |
"project_id": self._aggregation_query.project, | ||
"partition_id": partition_id, | ||
"read_options": read_options, | ||
"aggregation_query": query_pb, | ||
"aggregation_query": self._build_protobuf(), | ||
} | ||
if self._aggregation_query._explain_options: | ||
request[ | ||
"explain_options" | ||
] = self._aggregation_query._explain_options._to_dict() | ||
helpers.set_database_id_to_request(request, self.client.database) | ||
response_pb = self.client._datastore_api.run_aggregation_query( | ||
request=request, | ||
**kwargs, | ||
) | ||
|
||
while response_pb.batch.more_results == _NOT_FINISHED: | ||
# We haven't finished processing. A likely reason is we haven't | ||
# skipped all of the results yet. Don't return any results. | ||
# Instead, rerun query, adjusting offsets. Datastore doesn't process | ||
# more than 1000 skipped results in a query. | ||
old_query_pb = query_pb | ||
query_pb = query_pb2.AggregationQuery() | ||
query_pb._pb.CopyFrom(old_query_pb._pb) # copy for testability | ||
|
||
request = { | ||
"project_id": self._aggregation_query.project, | ||
"partition_id": partition_id, | ||
"read_options": read_options, | ||
"aggregation_query": query_pb, | ||
} | ||
helpers.set_database_id_to_request(request, self.client.database) | ||
response_pb = None | ||
|
||
while response_pb is None or response_pb.batch.more_results == _NOT_FINISHED: | ||
if response_pb is not None: | ||
# We haven't finished processing. A likely reason is we haven't | ||
# skipped all of the results yet. Don't return any results. | ||
# Instead, rerun query, adjusting offsets. Datastore doesn't process | ||
# more than 1000 skipped results in a query. | ||
new_query_pb = query_pb2.AggregationQuery() | ||
new_query_pb._pb.CopyFrom( | ||
request["aggregation_query"]._pb | ||
) # copy for testability | ||
request["aggregation_query"] = new_query_pb | ||
|
||
response_pb = self.client._datastore_api.run_aggregation_query( | ||
request=request, | ||
**kwargs, | ||
request=request.copy(), **kwargs | ||
) | ||
# capture explain metrics if present in response | ||
# should only be present in last response, and only if explain_options was set | ||
if response_pb.explain_metrics: | ||
self._explain_metrics = ExplainMetrics._from_pb( | ||
response_pb.explain_metrics | ||
) | ||
|
||
item_pbs = self._process_query_results(response_pb) | ||
return page_iterator.Page(self, item_pbs, self.item_to_value) | ||
|
||
@property | ||
def explain_metrics(self) -> ExplainMetrics: | ||
""" | ||
Get the metrics associated with the query execution. | ||
Metrics are only available when explain_options is set on the query. If | ||
ExplainOptions.analyze is False, only plan_summary is available. If it is | ||
True, execution_stats is also available. | ||
|
||
:rtype: :class:`~google.cloud.datastore.query_profile.ExplainMetrics` | ||
:returns: The metrics associated with the query execution. | ||
:raises: :class:`~google.cloud.datastore.query_profile.QueryExplainError` | ||
if explain_metrics is not available on the query. | ||
""" | ||
if self._explain_metrics is not None: | ||
return self._explain_metrics | ||
elif self._aggregation_query._explain_options is None: | ||
raise QueryExplainError("explain_options not set on query.") | ||
elif self._aggregation_query._explain_options.analyze is False: | ||
# we need to run the query to get the explain_metrics | ||
self._next_page() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this safe? (Are the results being returned later, or thrown out) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't have any results when analyze=True, so this is safe. Added comment |
||
if self._explain_metrics is not None: | ||
return self._explain_metrics | ||
raise QueryExplainError( | ||
"explain_metrics not available until query is complete." | ||
) | ||
|
||
|
||
# pylint: disable=unused-argument | ||
def _item_to_aggregation_result(iterator, pb): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,20 +13,21 @@ | |
# limitations under the License. | ||
|
||
"""Create / interact with Google Cloud Datastore queries.""" | ||
|
||
import base64 | ||
import warnings | ||
|
||
|
||
from google.api_core import page_iterator | ||
from google.cloud._helpers import _ensure_tuple_or_list | ||
|
||
|
||
from google.cloud.datastore_v1.types import entity as entity_pb2 | ||
from google.cloud.datastore_v1.types import query as query_pb2 | ||
from google.cloud.datastore import helpers | ||
from google.cloud.datastore.key import Key | ||
|
||
|
||
from google.cloud.datastore.query_profile import ExplainMetrics | ||
from google.cloud.datastore.query_profile import QueryExplainError | ||
|
||
import abc | ||
from abc import ABC | ||
|
||
|
@@ -38,6 +39,7 @@ | |
_NO_MORE_RESULTS, | ||
query_pb2.QueryResultBatch.MoreResultsType.MORE_RESULTS_AFTER_LIMIT, | ||
query_pb2.QueryResultBatch.MoreResultsType.MORE_RESULTS_AFTER_CURSOR, | ||
query_pb2.QueryResultBatch.MoreResultsType.MORE_RESULTS_TYPE_UNSPECIFIED, # received when explain_options(analyze=False) | ||
) | ||
|
||
KEY_PROPERTY_NAME = "__key__" | ||
|
@@ -176,6 +178,11 @@ class Query(object): | |
:type distinct_on: sequence of string | ||
:param distinct_on: field names used to group query results. | ||
|
||
:type explain_options: :class:`~google.cloud.datastore.ExplainOptions` | ||
:param explain_options: (Optional) Options to enable query profiling for | ||
this query. When set, explain_metrics will be available on the iterator | ||
returned by query.fetch(). | ||
|
||
:raises: ValueError if ``project`` is not passed and no implicit | ||
default is set. | ||
""" | ||
|
@@ -203,6 +210,7 @@ def __init__( | |
projection=(), | ||
order=(), | ||
distinct_on=(), | ||
explain_options=None, | ||
): | ||
self._client = client | ||
self._kind = kind | ||
|
@@ -221,6 +229,7 @@ def __init__( | |
else: | ||
self._namespace = None | ||
|
||
self._explain_options = explain_options | ||
self._ancestor = ancestor | ||
self._filters = [] | ||
|
||
|
@@ -704,6 +713,7 @@ def __init__( | |
self._timeout = timeout | ||
self._read_time = read_time | ||
# The attributes below will change over the life of the iterator. | ||
self._explain_metrics = None | ||
self._more_results = True | ||
self._skipped_results = 0 | ||
|
||
|
@@ -777,7 +787,6 @@ def _next_page(self): | |
if not self._more_results: | ||
return None | ||
|
||
query_pb = self._build_protobuf() | ||
new_transaction_options = None | ||
transaction_id, new_transaction_options = helpers.get_transaction_options( | ||
self.client.current_transaction | ||
|
@@ -804,46 +813,69 @@ def _next_page(self): | |
"project_id": self._query.project, | ||
"partition_id": partition_id, | ||
"read_options": read_options, | ||
"query": query_pb, | ||
"query": self._build_protobuf(), | ||
} | ||
if self._query._explain_options: | ||
request["explain_options"] = self._query._explain_options._to_dict() | ||
|
||
helpers.set_database_id_to_request(request, self.client.database) | ||
|
||
response_pb = self.client._datastore_api.run_query( | ||
request=request, | ||
**kwargs, | ||
) | ||
response_pb = None | ||
|
||
while ( | ||
while response_pb is None or ( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The new while loop seems mostly the same as the previous one. Just for my education, could you explain how they are different and why we need to make the change here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adding The other change was to get rid of the |
||
response_pb.batch.more_results == _NOT_FINISHED | ||
and response_pb.batch.skipped_results < query_pb.offset | ||
and response_pb.batch.skipped_results < request["query"].offset | ||
): | ||
# We haven't finished processing. A likely reason is we haven't | ||
# skipped all of the results yet. Don't return any results. | ||
# Instead, rerun query, adjusting offsets. Datastore doesn't process | ||
# more than 1000 skipped results in a query. | ||
old_query_pb = query_pb | ||
query_pb = query_pb2.Query() | ||
query_pb._pb.CopyFrom(old_query_pb._pb) # copy for testability | ||
query_pb.start_cursor = response_pb.batch.skipped_cursor | ||
query_pb.offset -= response_pb.batch.skipped_results | ||
|
||
request = { | ||
"project_id": self._query.project, | ||
"partition_id": partition_id, | ||
"read_options": read_options, | ||
"query": query_pb, | ||
} | ||
helpers.set_database_id_to_request(request, self.client.database) | ||
if response_pb is not None: | ||
# We haven't finished processing. A likely reason is we haven't | ||
# skipped all of the results yet. Don't return any results. | ||
# Instead, rerun query, adjusting offsets. Datastore doesn't process | ||
# more than 1000 skipped results in a query. | ||
new_query_pb = query_pb2.Query() | ||
new_query_pb._pb.CopyFrom(request["query"]._pb) # copy for testability | ||
new_query_pb.start_cursor = response_pb.batch.skipped_cursor | ||
new_query_pb.offset -= response_pb.batch.skipped_results | ||
request["query"] = new_query_pb | ||
|
||
response_pb = self.client._datastore_api.run_query( | ||
request=request, | ||
**kwargs, | ||
request=request.copy(), **kwargs | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, IIRC it's just for testability. We want to be able to make assertions about the arguments passed in to each call, which doesn't work if the same request instance is used for each request |
||
) | ||
# capture explain metrics if present in response | ||
# should only be present in last response, and only if explain_options was set | ||
if response_pb and response_pb.explain_metrics: | ||
self._explain_metrics = ExplainMetrics._from_pb( | ||
response_pb.explain_metrics | ||
) | ||
|
||
entity_pbs = self._process_query_results(response_pb) | ||
return page_iterator.Page(self, entity_pbs, self.item_to_value) | ||
|
||
@property | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any difference between this and the one in google/cloud/datastore/aggregation.py ? Can the implementation be just in one place? and called from both locations? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately there's a lot of duplicated code between the two classes currently :( It's a bit complicated, because these query/aggregation classes (although very similar methods) have no superclass. And the names of some internal variables are different, so the implementation's not an exact match either. I'm open to maybe moving some of this logic into a helper in query_profile.py, and calling it from both places? Let me know what you think. But the more long-term solution is probably to do a larger-scale refactor of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. larger-scale refactor to fix all duplications is preferred. |
||
def explain_metrics(self) -> ExplainMetrics: | ||
""" | ||
Get the metrics associated with the query execution. | ||
Metrics are only available when explain_options is set on the query. If | ||
ExplainOptions.analyze is False, only plan_summary is available. If it is | ||
True, execution_stats is also available. | ||
|
||
:rtype: :class:`~google.cloud.datastore.query_profile.ExplainMetrics` | ||
:returns: The metrics associated with the query execution. | ||
:raises: :class:`~google.cloud.datastore.query_profile.QueryExplainError` | ||
if explain_metrics is not available on the query. | ||
""" | ||
if self._explain_metrics is not None: | ||
return self._explain_metrics | ||
elif self._query._explain_options is None: | ||
raise QueryExplainError("explain_options not set on query.") | ||
elif self._query._explain_options.analyze is False: | ||
# we need to run the query to get the explain_metrics | ||
self._next_page() | ||
if self._explain_metrics is not None: | ||
return self._explain_metrics | ||
raise QueryExplainError( | ||
"explain_metrics not available until query is complete." | ||
) | ||
|
||
|
||
def _pb_from_query(query): | ||
"""Convert a Query instance to the corresponding protobuf. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just for my understanding, why is this changing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a lot of duplicated code in the original implementation (see the duplicated request dictionaries)
I did some refactoring of these methods so that building the request and calling
run_aggregation_query
only happens in one place. And I also ended up removing thequery_pb
variable, since I found it a little confusing tracking the state betweenquery_pb
,old_query_pb
, andrequest["query"]