From 14a1909963cfa41208f4e25b82b7c84c5e02452f Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Mon, 18 Sep 2023 18:38:13 -0700 Subject: [PATCH] feat: add gccl-gcs-cmd field to X-Goog-API-Client header for Transfer Manager calls (#1119) --- google/cloud/storage/_helpers.py | 23 +- google/cloud/storage/blob.py | 519 +++++++++++++++++++--- google/cloud/storage/client.py | 64 +-- google/cloud/storage/transfer_manager.py | 42 +- tests/unit/test__http.py | 3 +- tests/unit/test_blob.py | 541 ++++++++++++----------- tests/unit/test_client.py | 21 +- tests/unit/test_transfer_manager.py | 73 +-- 8 files changed, 860 insertions(+), 426 deletions(-) diff --git a/google/cloud/storage/_helpers.py b/google/cloud/storage/_helpers.py index 29968a9aa..77a9dffd0 100644 --- a/google/cloud/storage/_helpers.py +++ b/google/cloud/storage/_helpers.py @@ -599,19 +599,32 @@ def _get_default_headers( user_agent, content_type="application/json; charset=UTF-8", x_upload_content_type=None, + command=None, ): """Get the headers for a request. - Args: - user_agent (str): The user-agent for requests. - Returns: - Dict: The headers to be used for the request. + :type user_agent: str + :param user_agent: The user-agent for requests. + + :type command: str + :param command: + (Optional) Information about which interface for the operation was + used, to be included in the X-Goog-API-Client header. Please leave + as None unless otherwise directed. + + :rtype: dict + :returns: The headers to be used for the request. """ + x_goog_api_client = f"{user_agent} {_get_invocation_id()}" + + if command: + x_goog_api_client += f" gccl-gcs-cmd/{command}" + return { "Accept": "application/json", "Accept-Encoding": "gzip, deflate", "User-Agent": user_agent, - "X-Goog-API-Client": f"{user_agent} {_get_invocation_id()}", + "X-Goog-API-Client": x_goog_api_client, "content-type": content_type, "x-upload-content-type": x_upload_content_type or content_type, } diff --git a/google/cloud/storage/blob.py b/google/cloud/storage/blob.py index 4c493485f..ece758dbc 100644 --- a/google/cloud/storage/blob.py +++ b/google/cloud/storage/blob.py @@ -904,7 +904,7 @@ def _do_download( ): """Perform a download without any error handling. - This is intended to be called by :meth:`download_to_file` so it can + This is intended to be called by :meth:`_prep_and_do_download` so it can be wrapped with error handling / remapping. :type transport: @@ -957,7 +957,7 @@ def _do_download( This private method does not accept ConditionalRetryPolicy values because the information necessary to evaluate the policy is instead - evaluated in client.download_blob_to_file(). + evaluated in blob._prep_and_do_download(). See the retry.py source code and docstrings in this package (google.cloud.storage.retry) for information on retry types and how @@ -1124,11 +1124,10 @@ def download_to_file( :raises: :class:`google.cloud.exceptions.NotFound` """ - client = self._require_client(client) - client.download_blob_to_file( - self, - file_obj=file_obj, + self._prep_and_do_download( + file_obj, + client=client, start=start, end=end, raw_download=raw_download, @@ -1143,6 +1142,33 @@ def download_to_file( retry=retry, ) + def _handle_filename_and_download(self, filename, *args, **kwargs): + """Download the contents of this blob into a named file. + + :type filename: str + :param filename: A filename to be passed to ``open``. + + For *args and **kwargs, refer to the documentation for download_to_filename() for more information. + """ + + try: + with open(filename, "wb") as file_obj: + self._prep_and_do_download( + file_obj, + *args, + **kwargs, + ) + + except resumable_media.DataCorruption: + # Delete the corrupt downloaded file. + os.remove(filename) + raise + + updated = self.updated + if updated is not None: + mtime = updated.timestamp() + os.utime(file_obj.name, (mtime, mtime)) + def download_to_filename( self, filename, @@ -1250,34 +1276,23 @@ def download_to_filename( :raises: :class:`google.cloud.exceptions.NotFound` """ - client = self._require_client(client) - try: - with open(filename, "wb") as file_obj: - client.download_blob_to_file( - self, - file_obj, - start=start, - end=end, - raw_download=raw_download, - if_etag_match=if_etag_match, - if_etag_not_match=if_etag_not_match, - if_generation_match=if_generation_match, - if_generation_not_match=if_generation_not_match, - if_metageneration_match=if_metageneration_match, - if_metageneration_not_match=if_metageneration_not_match, - timeout=timeout, - checksum=checksum, - retry=retry, - ) - except resumable_media.DataCorruption: - # Delete the corrupt downloaded file. - os.remove(filename) - raise - updated = self.updated - if updated is not None: - mtime = updated.timestamp() - os.utime(file_obj.name, (mtime, mtime)) + self._handle_filename_and_download( + filename, + client=client, + start=start, + end=end, + raw_download=raw_download, + if_etag_match=if_etag_match, + if_etag_not_match=if_etag_not_match, + if_generation_match=if_generation_match, + if_generation_not_match=if_generation_not_match, + if_metageneration_match=if_metageneration_match, + if_metageneration_not_match=if_metageneration_not_match, + timeout=timeout, + checksum=checksum, + retry=retry, + ) def download_as_bytes( self, @@ -1382,11 +1397,12 @@ def download_as_bytes( :raises: :class:`google.cloud.exceptions.NotFound` """ - client = self._require_client(client) + string_buffer = BytesIO() - client.download_blob_to_file( - self, + + self._prep_and_do_download( string_buffer, + client=client, start=start, end=end, raw_download=raw_download, @@ -1697,7 +1713,7 @@ def _get_writable_metadata(self): return object_metadata - def _get_upload_arguments(self, client, content_type, filename=None): + def _get_upload_arguments(self, client, content_type, filename=None, command=None): """Get required arguments for performing an upload. The content type returned will be determined in order of precedence: @@ -1709,6 +1725,12 @@ def _get_upload_arguments(self, client, content_type, filename=None): :type content_type: str :param content_type: Type of content being uploaded (or :data:`None`). + :type command: str + :param command: + (Optional) Information about which interface for upload was used, + to be included in the X-Goog-API-Client header. Please leave as None + unless otherwise directed. + :rtype: tuple :returns: A triple of @@ -1718,7 +1740,9 @@ def _get_upload_arguments(self, client, content_type, filename=None): """ content_type = self._get_content_type(content_type, filename=filename) headers = { - **_get_default_headers(client._connection.user_agent, content_type), + **_get_default_headers( + client._connection.user_agent, content_type, command=command + ), **_get_encryption_headers(self._encryption_key), } object_metadata = self._get_writable_metadata() @@ -1739,6 +1763,7 @@ def _do_multipart_upload( timeout=_DEFAULT_TIMEOUT, checksum=None, retry=None, + command=None, ): """Perform a multipart upload. @@ -1822,6 +1847,12 @@ def _do_multipart_upload( (google.cloud.storage.retry) for information on retry types and how to configure them. + :type command: str + :param command: + (Optional) Information about which interface for upload was used, + to be included in the X-Goog-API-Client header. Please leave as None + unless otherwise directed. + :rtype: :class:`~requests.Response` :returns: The "200 OK" response object returned after the multipart upload request. @@ -1840,7 +1871,7 @@ def _do_multipart_upload( transport = self._get_transport(client) if "metadata" in self._properties and "metadata" not in self._changes: self._changes.add("metadata") - info = self._get_upload_arguments(client, content_type) + info = self._get_upload_arguments(client, content_type, command=command) headers, object_metadata, content_type = info hostname = _get_host_name(client._connection) @@ -1910,6 +1941,7 @@ def _initiate_resumable_upload( timeout=_DEFAULT_TIMEOUT, checksum=None, retry=None, + command=None, ): """Initiate a resumable upload. @@ -2008,6 +2040,12 @@ def _initiate_resumable_upload( (google.cloud.storage.retry) for information on retry types and how to configure them. + :type command: str + :param command: + (Optional) Information about which interface for upload was used, + to be included in the X-Goog-API-Client header. Please leave as None + unless otherwise directed. + :rtype: tuple :returns: Pair of @@ -2025,7 +2063,7 @@ def _initiate_resumable_upload( transport = self._get_transport(client) if "metadata" in self._properties and "metadata" not in self._changes: self._changes.add("metadata") - info = self._get_upload_arguments(client, content_type) + info = self._get_upload_arguments(client, content_type, command=command) headers, object_metadata, content_type = info if extra_headers is not None: headers.update(extra_headers) @@ -2103,6 +2141,7 @@ def _do_resumable_upload( timeout=_DEFAULT_TIMEOUT, checksum=None, retry=None, + command=None, ): """Perform a resumable upload. @@ -2191,6 +2230,12 @@ def _do_resumable_upload( (google.cloud.storage.retry) for information on retry types and how to configure them. + :type command: str + :param command: + (Optional) Information about which interface for upload was used, + to be included in the X-Goog-API-Client header. Please leave as None + unless otherwise directed. + :rtype: :class:`~requests.Response` :returns: The "200 OK" response object returned after the final chunk is uploaded. @@ -2209,6 +2254,7 @@ def _do_resumable_upload( timeout=timeout, checksum=checksum, retry=retry, + command=command, ) while not upload.finished: try: @@ -2234,6 +2280,7 @@ def _do_upload( timeout=_DEFAULT_TIMEOUT, checksum=None, retry=None, + command=None, ): """Determine an upload strategy and then perform the upload. @@ -2333,6 +2380,12 @@ def _do_upload( configuration changes for Retry objects such as delays and deadlines are respected. + :type command: str + :param command: + (Optional) Information about which interface for upload was used, + to be included in the X-Goog-API-Client header. Please leave as None + unless otherwise directed. + :rtype: dict :returns: The parsed JSON from the "200 OK" response. This will be the **only** response in the multipart case and it will be the @@ -2366,6 +2419,7 @@ def _do_upload( timeout=timeout, checksum=checksum, retry=retry, + command=command, ) else: response = self._do_resumable_upload( @@ -2382,11 +2436,12 @@ def _do_upload( timeout=timeout, checksum=checksum, retry=retry, + command=command, ) return response.json() - def upload_from_file( + def _prep_and_do_upload( self, file_obj, rewind=False, @@ -2402,6 +2457,7 @@ def upload_from_file( timeout=_DEFAULT_TIMEOUT, checksum=None, retry=DEFAULT_RETRY_IF_GENERATION_SPECIFIED, + command=None, ): """Upload the contents of this blob from a file-like object. @@ -2522,6 +2578,12 @@ def upload_from_file( configuration changes for Retry objects such as delays and deadlines are respected. + :type command: str + :param command: + (Optional) Information about which interface for upload was used, + to be included in the X-Goog-API-Client header. Please leave as None + unless otherwise directed. + :raises: :class:`~google.cloud.exceptions.GoogleCloudError` if the upload response returns an error status. """ @@ -2551,11 +2613,192 @@ def upload_from_file( timeout=timeout, checksum=checksum, retry=retry, + command=command, ) self._set_properties(created_json) except resumable_media.InvalidResponse as exc: _raise_from_invalid_response(exc) + def upload_from_file( + self, + file_obj, + rewind=False, + size=None, + content_type=None, + num_retries=None, + client=None, + predefined_acl=None, + if_generation_match=None, + if_generation_not_match=None, + if_metageneration_match=None, + if_metageneration_not_match=None, + timeout=_DEFAULT_TIMEOUT, + checksum=None, + retry=DEFAULT_RETRY_IF_GENERATION_SPECIFIED, + ): + """Upload the contents of this blob from a file-like object. + + The content type of the upload will be determined in order + of precedence: + + - The value passed in to this method (if not :data:`None`) + - The value stored on the current blob + - The default value ('application/octet-stream') + + .. note:: + The effect of uploading to an existing blob depends on the + "versioning" and "lifecycle" policies defined on the blob's + bucket. In the absence of those policies, upload will + overwrite any existing contents. + + See the [`object versioning`](https://cloud.google.com/storage/docs/object-versioning) + and [`lifecycle`](https://cloud.google.com/storage/docs/lifecycle) + API documents for details. + + If the size of the data to be uploaded exceeds 8 MB a resumable media + request will be used, otherwise the content and the metadata will be + uploaded in a single multipart upload request. + + For more fine-grained over the upload process, check out + [`google-resumable-media`](https://googleapis.dev/python/google-resumable-media/latest/index.html). + + If :attr:`user_project` is set on the bucket, bills the API request + to that project. + + :type file_obj: file + :param file_obj: A file handle opened in binary mode for reading. + + :type rewind: bool + :param rewind: + If True, seek to the beginning of the file handle before writing + the file to Cloud Storage. + + :type size: int + :param size: + The number of bytes to be uploaded (which will be read from + ``file_obj``). If not provided, the upload will be concluded once + ``file_obj`` is exhausted. + + :type content_type: str + :param content_type: (Optional) Type of content being uploaded. + + :type num_retries: int + :param num_retries: + Number of upload retries. By default, only uploads with + if_generation_match set will be retried, as uploads without the + argument are not guaranteed to be idempotent. Setting num_retries + will override this default behavior and guarantee retries even when + if_generation_match is not set. (Deprecated: This argument + will be removed in a future release.) + + :type client: :class:`~google.cloud.storage.client.Client` + :param client: + (Optional) The client to use. If not passed, falls back to the + ``client`` stored on the blob's bucket. + + :type predefined_acl: str + :param predefined_acl: (Optional) Predefined access control list + + :type if_generation_match: long + :param if_generation_match: + (Optional) See :ref:`using-if-generation-match` + + :type if_generation_not_match: long + :param if_generation_not_match: + (Optional) See :ref:`using-if-generation-not-match` + + :type if_metageneration_match: long + :param if_metageneration_match: + (Optional) See :ref:`using-if-metageneration-match` + + :type if_metageneration_not_match: long + :param if_metageneration_not_match: + (Optional) See :ref:`using-if-metageneration-not-match` + + :type timeout: float or tuple + :param timeout: + (Optional) The amount of time, in seconds, to wait + for the server response. See: :ref:`configuring_timeouts` + + :type checksum: str + :param checksum: + (Optional) The type of checksum to compute to verify + the integrity of the object. If the upload is completed in a single + request, the checksum will be entirely precomputed and the remote + server will handle verification and error handling. If the upload + is too large and must be transmitted in multiple requests, the + checksum will be incrementally computed and the client will handle + verification and error handling, raising + google.resumable_media.common.DataCorruption on a mismatch and + attempting to delete the corrupted file. Supported values are + "md5", "crc32c" and None. The default is None. + + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_generation_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. + + :raises: :class:`~google.cloud.exceptions.GoogleCloudError` + if the upload response returns an error status. + """ + self._prep_and_do_upload( + file_obj, + rewind=rewind, + size=size, + content_type=content_type, + num_retries=num_retries, + client=client, + predefined_acl=predefined_acl, + if_generation_match=if_generation_match, + if_generation_not_match=if_generation_not_match, + if_metageneration_match=if_metageneration_match, + if_metageneration_not_match=if_metageneration_not_match, + timeout=timeout, + checksum=checksum, + retry=retry, + ) + + def _handle_filename_and_upload(self, filename, content_type=None, *args, **kwargs): + """Upload this blob's contents from the content of a named file. + + :type filename: str + :param filename: The path to the file. + + :type content_type: str + :param content_type: (Optional) Type of content being uploaded. + + For *args and **kwargs, refer to the documentation for upload_from_filename() for more information. + """ + + content_type = self._get_content_type(content_type, filename=filename) + + with open(filename, "rb") as file_obj: + total_bytes = os.fstat(file_obj.fileno()).st_size + self._prep_and_do_upload( + file_obj, + content_type=content_type, + size=total_bytes, + *args, + **kwargs, + ) + def upload_from_filename( self, filename, @@ -2677,25 +2920,21 @@ def upload_from_filename( configuration changes for Retry objects such as delays and deadlines are respected. """ - content_type = self._get_content_type(content_type, filename=filename) - with open(filename, "rb") as file_obj: - total_bytes = os.fstat(file_obj.fileno()).st_size - self.upload_from_file( - file_obj, - content_type=content_type, - num_retries=num_retries, - client=client, - size=total_bytes, - predefined_acl=predefined_acl, - if_generation_match=if_generation_match, - if_generation_not_match=if_generation_not_match, - if_metageneration_match=if_metageneration_match, - if_metageneration_not_match=if_metageneration_not_match, - timeout=timeout, - checksum=checksum, - retry=retry, - ) + self._handle_filename_and_upload( + filename, + content_type=content_type, + num_retries=num_retries, + client=client, + predefined_acl=predefined_acl, + if_generation_match=if_generation_match, + if_generation_not_match=if_generation_not_match, + if_metageneration_match=if_metageneration_match, + if_metageneration_not_match=if_metageneration_not_match, + timeout=timeout, + checksum=checksum, + retry=retry, + ) def upload_from_string( self, @@ -3936,6 +4175,168 @@ def open( :rtype: str or ``NoneType`` """ + def _prep_and_do_download( + self, + file_obj, + client=None, + start=None, + end=None, + raw_download=False, + if_etag_match=None, + if_etag_not_match=None, + if_generation_match=None, + if_generation_not_match=None, + if_metageneration_match=None, + if_metageneration_not_match=None, + timeout=_DEFAULT_TIMEOUT, + checksum="md5", + retry=DEFAULT_RETRY, + command=None, + ): + """Download the contents of a blob object into a file-like object. + + See https://cloud.google.com/storage/docs/downloading-objects + + If :attr:`user_project` is set on the bucket, bills the API request + to that project. + + :type file_obj: file + :param file_obj: A file handle to which to write the blob's data. + + :type client: :class:`~google.cloud.storage.client.Client` + :param client: + (Optional) The client to use. If not passed, falls back to the + ``client`` stored on the blob's bucket. + + :type start: int + :param start: (Optional) The first byte in a range to be downloaded. + + :type end: int + :param end: (Optional) The last byte in a range to be downloaded. + + :type raw_download: bool + :param raw_download: + (Optional) If true, download the object without any expansion. + + :type if_etag_match: Union[str, Set[str]] + :param if_etag_match: + (Optional) See :ref:`using-if-etag-match` + + :type if_etag_not_match: Union[str, Set[str]] + :param if_etag_not_match: + (Optional) See :ref:`using-if-etag-not-match` + + :type if_generation_match: long + :param if_generation_match: + (Optional) See :ref:`using-if-generation-match` + + :type if_generation_not_match: long + :param if_generation_not_match: + (Optional) See :ref:`using-if-generation-not-match` + + :type if_metageneration_match: long + :param if_metageneration_match: + (Optional) See :ref:`using-if-metageneration-match` + + :type if_metageneration_not_match: long + :param if_metageneration_not_match: + (Optional) See :ref:`using-if-metageneration-not-match` + + :type timeout: float or tuple + :param timeout: + (Optional) The amount of time, in seconds, to wait + for the server response. See: :ref:`configuring_timeouts` + + :type checksum: str + :param checksum: + (Optional) The type of checksum to compute to verify the integrity + of the object. The response headers must contain a checksum of the + requested type. If the headers lack an appropriate checksum (for + instance in the case of transcoded or ranged downloads where the + remote service does not know the correct checksum, including + downloads where chunk_size is set) an INFO-level log will be + emitted. Supported values are "md5", "crc32c" and None. The default + is "md5". + + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. + + :type command: str + :param command: + (Optional) Information about which interface for download was used, + to be included in the X-Goog-API-Client header. Please leave as None + unless otherwise directed. + """ + # Handle ConditionalRetryPolicy. + if isinstance(retry, ConditionalRetryPolicy): + # Conditional retries are designed for non-media calls, which change + # arguments into query_params dictionaries. Media operations work + # differently, so here we make a "fake" query_params to feed to the + # ConditionalRetryPolicy. + query_params = { + "ifGenerationMatch": if_generation_match, + "ifMetagenerationMatch": if_metageneration_match, + } + retry = retry.get_retry_policy_if_conditions_met(query_params=query_params) + + client = self._require_client(client) + + download_url = self._get_download_url( + client, + if_generation_match=if_generation_match, + if_generation_not_match=if_generation_not_match, + if_metageneration_match=if_metageneration_match, + if_metageneration_not_match=if_metageneration_not_match, + ) + headers = _get_encryption_headers(self._encryption_key) + headers["accept-encoding"] = "gzip" + _add_etag_match_headers( + headers, + if_etag_match=if_etag_match, + if_etag_not_match=if_etag_not_match, + ) + headers = { + **_get_default_headers(client._connection.user_agent, command=command), + **headers, + } + + transport = client._http + + try: + self._do_download( + transport, + file_obj, + download_url, + headers, + start, + end, + raw_download, + timeout=timeout, + checksum=checksum, + retry=retry, + ) + except resumable_media.InvalidResponse as exc: + _raise_from_invalid_response(exc) + @property def component_count(self): """Number of underlying components that make up this object. diff --git a/google/cloud/storage/client.py b/google/cloud/storage/client.py index bec5da9a3..e6391f5fb 100644 --- a/google/cloud/storage/client.py +++ b/google/cloud/storage/client.py @@ -25,18 +25,16 @@ from google.auth.credentials import AnonymousCredentials -from google import resumable_media - from google.api_core import page_iterator from google.cloud._helpers import _LocalStack, _NOW from google.cloud.client import ClientWithProject from google.cloud.exceptions import NotFound -from google.cloud.storage._helpers import _get_default_headers + from google.cloud.storage._helpers import _get_environ_project from google.cloud.storage._helpers import _get_storage_host from google.cloud.storage._helpers import _DEFAULT_STORAGE_HOST from google.cloud.storage._helpers import _bucket_bound_hostname_url -from google.cloud.storage._helpers import _add_etag_match_headers + from google.cloud.storage._http import Connection from google.cloud.storage._signing import ( get_expiration_seconds_v4, @@ -46,17 +44,12 @@ ) from google.cloud.storage.batch import Batch from google.cloud.storage.bucket import Bucket, _item_to_blob, _blobs_page_start -from google.cloud.storage.blob import ( - Blob, - _get_encryption_headers, - _raise_from_invalid_response, -) +from google.cloud.storage.blob import Blob from google.cloud.storage.hmac_key import HMACKeyMetadata from google.cloud.storage.acl import BucketACL from google.cloud.storage.acl import DefaultObjectACL from google.cloud.storage.constants import _DEFAULT_TIMEOUT from google.cloud.storage.retry import DEFAULT_RETRY -from google.cloud.storage.retry import ConditionalRetryPolicy _marker = object() @@ -1064,52 +1057,25 @@ def download_blob_to_file( are respected. """ - # Handle ConditionalRetryPolicy. - if isinstance(retry, ConditionalRetryPolicy): - # Conditional retries are designed for non-media calls, which change - # arguments into query_params dictionaries. Media operations work - # differently, so here we make a "fake" query_params to feed to the - # ConditionalRetryPolicy. - query_params = { - "ifGenerationMatch": if_generation_match, - "ifMetagenerationMatch": if_metageneration_match, - } - retry = retry.get_retry_policy_if_conditions_met(query_params=query_params) - if not isinstance(blob_or_uri, Blob): blob_or_uri = Blob.from_string(blob_or_uri) - download_url = blob_or_uri._get_download_url( - self, + + blob_or_uri._prep_and_do_download( + file_obj, + client=self, + start=start, + end=end, + raw_download=raw_download, + if_etag_match=if_etag_match, + if_etag_not_match=if_etag_not_match, if_generation_match=if_generation_match, if_generation_not_match=if_generation_not_match, if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, + timeout=timeout, + checksum=checksum, + retry=retry, ) - headers = _get_encryption_headers(blob_or_uri._encryption_key) - headers["accept-encoding"] = "gzip" - _add_etag_match_headers( - headers, - if_etag_match=if_etag_match, - if_etag_not_match=if_etag_not_match, - ) - headers = {**_get_default_headers(self._connection.user_agent), **headers} - - transport = self._http - try: - blob_or_uri._do_download( - transport, - file_obj, - download_url, - headers, - start, - end, - raw_download, - timeout=timeout, - checksum=checksum, - retry=retry, - ) - except resumable_media.InvalidResponse as exc: - _raise_from_invalid_response(exc) def list_blobs( self, diff --git a/google/cloud/storage/transfer_manager.py b/google/cloud/storage/transfer_manager.py index 5cb9b6c46..b213d9e79 100644 --- a/google/cloud/storage/transfer_manager.py +++ b/google/cloud/storage/transfer_manager.py @@ -102,8 +102,7 @@ def upload_many( :type file_blob_pairs: List(Tuple(IOBase or str, 'google.cloud.storage.blob.Blob')) :param file_blob_pairs: A list of tuples of a file or filename and a blob. Each file will be - uploaded to the corresponding blob by using blob.upload_from_file() or - blob.upload_from_filename() as appropriate. + uploaded to the corresponding blob by using APIs identical to blob.upload_from_file() or blob.upload_from_filename() as appropriate. File handlers are only supported if worker_type is set to THREAD. If worker_type is set to PROCESS, please use filenames only. @@ -120,8 +119,7 @@ def upload_many( :param upload_kwargs: A dictionary of keyword arguments to pass to the upload method. Refer to the documentation for blob.upload_from_file() or - blob.upload_from_filename() for more information. The dict is directly - passed into the upload methods and is not validated by this function. + blob.upload_from_filename() for more information. The dict is directly passed into the upload methods and is not validated by this function. :type threads: int :param threads: @@ -192,10 +190,13 @@ def upload_many( """ if upload_kwargs is None: upload_kwargs = {} + if skip_if_exists: upload_kwargs = upload_kwargs.copy() upload_kwargs["if_generation_match"] = 0 + upload_kwargs["command"] = "tm.upload_many" + pool_class, needs_pickling = _get_pool_class_and_requirements(worker_type) with pool_class(max_workers=max_workers) as executor: @@ -212,9 +213,9 @@ def upload_many( executor.submit( _call_method_on_maybe_pickled_blob, _pickle_client(blob) if needs_pickling else blob, - "upload_from_filename" + "_handle_filename_and_upload" if isinstance(path_or_file, str) - else "upload_from_file", + else "_prep_and_do_upload", path_or_file, **upload_kwargs, ) @@ -256,12 +257,9 @@ def download_many( :type blob_file_pairs: List(Tuple('google.cloud.storage.blob.Blob', IOBase or str)) :param blob_file_pairs: - A list of tuples of blob and a file or filename. Each blob will be - downloaded to the corresponding blob by using blob.download_to_file() or - blob.download_to_filename() as appropriate. + A list of tuples of blob and a file or filename. Each blob will be downloaded to the corresponding blob by using APIs identical to blob.download_to_file() or blob.download_to_filename() as appropriate. - Note that blob.download_to_filename() does not delete the destination - file if the download fails. + Note that blob.download_to_filename() does not delete the destination file if the download fails. File handlers are only supported if worker_type is set to THREAD. If worker_type is set to PROCESS, please use filenames only. @@ -269,9 +267,7 @@ def download_many( :type download_kwargs: dict :param download_kwargs: A dictionary of keyword arguments to pass to the download method. Refer - to the documentation for blob.download_to_file() or - blob.download_to_filename() for more information. The dict is directly - passed into the download methods and is not validated by this function. + to the documentation for blob.download_to_file() or blob.download_to_filename() for more information. The dict is directly passed into the download methods and is not validated by this function. :type threads: int :param threads: @@ -341,6 +337,8 @@ def download_many( if download_kwargs is None: download_kwargs = {} + download_kwargs["command"] = "tm.download_many" + pool_class, needs_pickling = _get_pool_class_and_requirements(worker_type) with pool_class(max_workers=max_workers) as executor: @@ -357,9 +355,9 @@ def download_many( executor.submit( _call_method_on_maybe_pickled_blob, _pickle_client(blob) if needs_pickling else blob, - "download_to_filename" + "_handle_filename_and_download" if isinstance(path_or_file, str) - else "download_to_file", + else "_prep_and_do_download", path_or_file, **download_kwargs, ) @@ -467,8 +465,7 @@ def upload_many_from_filenames( :param upload_kwargs: A dictionary of keyword arguments to pass to the upload method. Refer to the documentation for blob.upload_from_file() or - blob.upload_from_filename() for more information. The dict is directly - passed into the upload methods and is not validated by this function. + blob.upload_from_filename() for more information. The dict is directly passed into the upload methods and is not validated by this function. :type threads: int :param threads: @@ -767,8 +764,7 @@ def download_chunks_concurrently( :param download_kwargs: A dictionary of keyword arguments to pass to the download method. Refer to the documentation for blob.download_to_file() or - blob.download_to_filename() for more information. The dict is directly - passed into the download methods and is not validated by this function. + blob.download_to_filename() for more information. The dict is directly passed into the download methods and is not validated by this function. Keyword arguments "start" and "end" which are not supported and will cause a ValueError if present. @@ -821,6 +817,8 @@ def download_chunks_concurrently( "Download arguments 'start' and 'end' are not supported by download_chunks_concurrently." ) + download_kwargs["command"] = "tm.download_sharded" + # We must know the size and the generation of the blob. if not blob.size or not blob.generation: blob.reload() @@ -981,7 +979,7 @@ def upload_chunks_concurrently( ) base_headers, object_metadata, content_type = blob._get_upload_arguments( - client, content_type, filename=filename + client, content_type, filename=filename, command="tm.upload_sharded" ) headers = {**base_headers, **_headers_from_metadata(object_metadata)} @@ -1113,7 +1111,7 @@ def _download_and_write_chunk_in_place( filename, "rb+" ) as f: # Open in mixed read/write mode to avoid truncating or appending f.seek(start) - return blob.download_to_file(f, start=start, end=end, **download_kwargs) + return blob._prep_and_do_download(f, start=start, end=end, **download_kwargs) def _call_method_on_maybe_pickled_blob( diff --git a/tests/unit/test__http.py b/tests/unit/test__http.py index 9e7bf216b..e64ae0bab 100644 --- a/tests/unit/test__http.py +++ b/tests/unit/test__http.py @@ -18,7 +18,8 @@ import mock from google.cloud.storage import _helpers -from tests.unit.test__helpers import GCCL_INVOCATION_TEST_CONST + +GCCL_INVOCATION_TEST_CONST = "gccl-invocation-id/test-invocation-123" class TestConnection(unittest.TestCase): diff --git a/tests/unit/test_blob.py b/tests/unit/test_blob.py index 638db9f4e..a8d024176 100644 --- a/tests/unit/test_blob.py +++ b/tests/unit/test_blob.py @@ -1411,33 +1411,35 @@ def test_download_to_file_with_failure(self): blob_name = "blob-name" client = self._make_client() - client.download_blob_to_file.side_effect = NotFound("testing") bucket = _Bucket(client) blob = self._make_one(blob_name, bucket=bucket) file_obj = io.BytesIO() - with self.assertRaises(NotFound): - blob.download_to_file(file_obj) + with mock.patch.object(blob, "_prep_and_do_download"): + blob._prep_and_do_download.side_effect = NotFound("testing") - self.assertEqual(file_obj.tell(), 0) + with self.assertRaises(NotFound): + blob.download_to_file(file_obj) - expected_timeout = self._get_default_timeout() - client.download_blob_to_file.assert_called_once_with( - blob, - file_obj, - start=None, - end=None, - if_etag_match=None, - if_etag_not_match=None, - if_generation_match=None, - if_generation_not_match=None, - if_metageneration_match=None, - if_metageneration_not_match=None, - raw_download=False, - timeout=expected_timeout, - checksum="md5", - retry=DEFAULT_RETRY, - ) + self.assertEqual(file_obj.tell(), 0) + + expected_timeout = self._get_default_timeout() + blob._prep_and_do_download.assert_called_once_with( + file_obj, + client=None, + start=None, + end=None, + if_etag_match=None, + if_etag_not_match=None, + if_generation_match=None, + if_generation_not_match=None, + if_metageneration_match=None, + if_metageneration_not_match=None, + raw_download=False, + timeout=expected_timeout, + checksum="md5", + retry=DEFAULT_RETRY, + ) def test_download_to_file_wo_media_link(self): blob_name = "blob-name" @@ -1446,28 +1448,29 @@ def test_download_to_file_wo_media_link(self): blob = self._make_one(blob_name, bucket=bucket) file_obj = io.BytesIO() - blob.download_to_file(file_obj) + with mock.patch.object(blob, "_prep_and_do_download"): + blob.download_to_file(file_obj) - # Make sure the media link is still unknown. - self.assertIsNone(blob.media_link) + # Make sure the media link is still unknown. + self.assertIsNone(blob.media_link) - expected_timeout = self._get_default_timeout() - client.download_blob_to_file.assert_called_once_with( - blob, - file_obj, - start=None, - end=None, - if_etag_match=None, - if_etag_not_match=None, - if_generation_match=None, - if_generation_not_match=None, - if_metageneration_match=None, - if_metageneration_not_match=None, - raw_download=False, - timeout=expected_timeout, - checksum="md5", - retry=DEFAULT_RETRY, - ) + expected_timeout = self._get_default_timeout() + blob._prep_and_do_download.assert_called_once_with( + file_obj, + client=None, + start=None, + end=None, + if_etag_match=None, + if_etag_not_match=None, + if_generation_match=None, + if_generation_not_match=None, + if_metageneration_match=None, + if_metageneration_not_match=None, + raw_download=False, + timeout=expected_timeout, + checksum="md5", + retry=DEFAULT_RETRY, + ) def test_download_to_file_w_etag_match(self): etag = "kittens" @@ -1475,25 +1478,26 @@ def test_download_to_file_w_etag_match(self): blob = self._make_one("blob-name", bucket=_Bucket(client)) file_obj = io.BytesIO() - blob.download_to_file(file_obj, if_etag_not_match=etag) + with mock.patch.object(blob, "_prep_and_do_download"): + blob.download_to_file(file_obj, if_etag_not_match=etag) - expected_timeout = self._get_default_timeout() - client.download_blob_to_file.assert_called_once_with( - blob, - file_obj, - start=None, - end=None, - if_etag_match=None, - if_etag_not_match=etag, - if_generation_match=None, - if_generation_not_match=None, - if_metageneration_match=None, - if_metageneration_not_match=None, - raw_download=False, - timeout=expected_timeout, - checksum="md5", - retry=DEFAULT_RETRY, - ) + expected_timeout = self._get_default_timeout() + blob._prep_and_do_download.assert_called_once_with( + file_obj, + client=None, + start=None, + end=None, + if_etag_match=None, + if_etag_not_match=etag, + if_generation_match=None, + if_generation_not_match=None, + if_metageneration_match=None, + if_metageneration_not_match=None, + raw_download=False, + timeout=expected_timeout, + checksum="md5", + retry=DEFAULT_RETRY, + ) def test_download_to_file_w_generation_match(self): generation_number = 6 @@ -1501,25 +1505,26 @@ def test_download_to_file_w_generation_match(self): blob = self._make_one("blob-name", bucket=_Bucket(client)) file_obj = io.BytesIO() - blob.download_to_file(file_obj, if_generation_not_match=generation_number) + with mock.patch.object(blob, "_prep_and_do_download"): + blob.download_to_file(file_obj, if_generation_not_match=generation_number) - expected_timeout = self._get_default_timeout() - client.download_blob_to_file.assert_called_once_with( - blob, - file_obj, - start=None, - end=None, - if_etag_match=None, - if_etag_not_match=None, - if_generation_match=None, - if_generation_not_match=generation_number, - if_metageneration_match=None, - if_metageneration_not_match=None, - raw_download=False, - timeout=expected_timeout, - checksum="md5", - retry=DEFAULT_RETRY, - ) + expected_timeout = self._get_default_timeout() + blob._prep_and_do_download.assert_called_once_with( + file_obj, + client=None, + start=None, + end=None, + if_etag_match=None, + if_etag_not_match=None, + if_generation_match=None, + if_generation_not_match=generation_number, + if_metageneration_match=None, + if_metageneration_not_match=None, + raw_download=False, + timeout=expected_timeout, + checksum="md5", + retry=DEFAULT_RETRY, + ) def _download_to_file_helper( self, use_chunks, raw_download, timeout=None, **extra_kwargs @@ -1544,28 +1549,30 @@ def _download_to_file_helper( extra_kwargs.update(timeout_kwarg) file_obj = io.BytesIO() - if raw_download: - blob.download_to_file(file_obj, raw_download=True, **extra_kwargs) - else: - blob.download_to_file(file_obj, **extra_kwargs) - expected_retry = extra_kwargs.get("retry", DEFAULT_RETRY) - client.download_blob_to_file.assert_called_once_with( - blob, - file_obj, - start=None, - end=None, - if_etag_match=None, - if_etag_not_match=None, - if_generation_match=None, - if_generation_not_match=None, - if_metageneration_match=None, - if_metageneration_not_match=None, - raw_download=raw_download, - timeout=expected_timeout, - checksum="md5", - retry=expected_retry, - ) + with mock.patch.object(blob, "_prep_and_do_download"): + if raw_download: + blob.download_to_file(file_obj, raw_download=True, **extra_kwargs) + else: + blob.download_to_file(file_obj, **extra_kwargs) + + expected_retry = extra_kwargs.get("retry", DEFAULT_RETRY) + blob._prep_and_do_download.assert_called_once_with( + file_obj, + client=None, + start=None, + end=None, + if_etag_match=None, + if_etag_not_match=None, + if_generation_match=None, + if_generation_not_match=None, + if_metageneration_match=None, + if_metageneration_not_match=None, + raw_download=raw_download, + timeout=expected_timeout, + checksum="md5", + retry=expected_retry, + ) def test_download_to_file_wo_chunks_wo_raw(self): self._download_to_file_helper(use_chunks=False, raw_download=False) @@ -1602,48 +1609,51 @@ def _download_to_filename_helper( blob = self._make_one(blob_name, bucket=bucket, properties=properties) - with _NamedTemporaryFile() as temp: - if timeout is None: - blob.download_to_filename( - temp.name, raw_download=raw_download, **extra_kwargs - ) - else: - blob.download_to_filename( - temp.name, - raw_download=raw_download, - timeout=timeout, - **extra_kwargs, - ) - - if updated is None: - self.assertIsNone(blob.updated) - else: - mtime = os.path.getmtime(temp.name) - updated_time = blob.updated.timestamp() - self.assertEqual(mtime, updated_time) - - expected_timeout = self._get_default_timeout() if timeout is None else timeout + with mock.patch.object(blob, "_prep_and_do_download"): + with _NamedTemporaryFile() as temp: + if timeout is None: + blob.download_to_filename( + temp.name, raw_download=raw_download, **extra_kwargs + ) + else: + blob.download_to_filename( + temp.name, + raw_download=raw_download, + timeout=timeout, + **extra_kwargs, + ) + + if updated is None: + self.assertIsNone(blob.updated) + else: + mtime = os.path.getmtime(temp.name) + updated_time = blob.updated.timestamp() + self.assertEqual(mtime, updated_time) + + expected_timeout = ( + self._get_default_timeout() if timeout is None else timeout + ) - expected_retry = extra_kwargs.get("retry", DEFAULT_RETRY) + expected_retry = extra_kwargs.get("retry", DEFAULT_RETRY) - client.download_blob_to_file.assert_called_once_with( - blob, - mock.ANY, - start=None, - end=None, - if_etag_match=None, - if_etag_not_match=None, - if_generation_match=None, - if_generation_not_match=None, - if_metageneration_match=None, - if_metageneration_not_match=None, - raw_download=raw_download, - timeout=expected_timeout, - checksum="md5", - retry=expected_retry, - ) - stream = client.download_blob_to_file.mock_calls[0].args[1] - self.assertEqual(stream.name, temp.name) + blob._prep_and_do_download.assert_called_once_with( + mock.ANY, + client=None, + start=None, + end=None, + raw_download=raw_download, + if_etag_match=None, + if_etag_not_match=None, + if_generation_match=None, + if_generation_not_match=None, + if_metageneration_match=None, + if_metageneration_not_match=None, + timeout=expected_timeout, + checksum="md5", + retry=expected_retry, + ) + stream = blob._prep_and_do_download.mock_calls[0].args[0] + self.assertEqual(stream.name, temp.name) def test_download_to_filename_w_updated_wo_raw(self): updated = "2014-12-06T13:13:50.690Z" @@ -1677,28 +1687,29 @@ def test_download_to_filename_w_etag_match(self): client = self._make_client() blob = self._make_one("blob-name", bucket=_Bucket(client)) - with _NamedTemporaryFile() as temp: - blob.download_to_filename(temp.name, if_etag_match=etag) + with mock.patch.object(blob, "_prep_and_do_download"): + with _NamedTemporaryFile() as temp: + blob.download_to_filename(temp.name, if_etag_match=etag) - expected_timeout = self._get_default_timeout() - client.download_blob_to_file.assert_called_once_with( - blob, - mock.ANY, - start=None, - end=None, - if_etag_match=etag, - if_etag_not_match=None, - if_generation_match=None, - if_generation_not_match=None, - if_metageneration_match=None, - if_metageneration_not_match=None, - raw_download=False, - timeout=expected_timeout, - checksum="md5", - retry=DEFAULT_RETRY, - ) - stream = client.download_blob_to_file.mock_calls[0].args[1] - self.assertEqual(stream.name, temp.name) + expected_timeout = self._get_default_timeout() + blob._prep_and_do_download.assert_called_once_with( + mock.ANY, + client=None, + start=None, + end=None, + if_etag_match=etag, + if_etag_not_match=None, + if_generation_match=None, + if_generation_not_match=None, + if_metageneration_match=None, + if_metageneration_not_match=None, + raw_download=False, + timeout=expected_timeout, + checksum="md5", + retry=DEFAULT_RETRY, + ) + stream = blob._prep_and_do_download.mock_calls[0].args[0] + self.assertEqual(stream.name, temp.name) def test_download_to_filename_w_generation_match(self): from google.cloud._testing import _NamedTemporaryFile @@ -1707,28 +1718,31 @@ def test_download_to_filename_w_generation_match(self): client = self._make_client() blob = self._make_one("blob-name", bucket=_Bucket(client)) - with _NamedTemporaryFile() as temp: - blob.download_to_filename(temp.name, if_generation_match=generation_number) + with mock.patch.object(blob, "_prep_and_do_download"): + with _NamedTemporaryFile() as temp: + blob.download_to_filename( + temp.name, if_generation_match=generation_number + ) - expected_timeout = self._get_default_timeout() - client.download_blob_to_file.assert_called_once_with( - blob, - mock.ANY, - start=None, - end=None, - if_etag_match=None, - if_etag_not_match=None, - if_generation_match=generation_number, - if_generation_not_match=None, - if_metageneration_match=None, - if_metageneration_not_match=None, - raw_download=False, - timeout=expected_timeout, - checksum="md5", - retry=DEFAULT_RETRY, - ) - stream = client.download_blob_to_file.mock_calls[0].args[1] - self.assertEqual(stream.name, temp.name) + expected_timeout = self._get_default_timeout() + blob._prep_and_do_download.assert_called_once_with( + mock.ANY, + client=None, + start=None, + end=None, + if_etag_match=None, + if_etag_not_match=None, + if_generation_match=generation_number, + if_generation_not_match=None, + if_metageneration_match=None, + if_metageneration_not_match=None, + raw_download=False, + timeout=expected_timeout, + checksum="md5", + retry=DEFAULT_RETRY, + ) + stream = blob._prep_and_do_download.mock_calls[0].args[0] + self.assertEqual(stream.name, temp.name) def test_download_to_filename_corrupted(self): from google.resumable_media import DataCorruption @@ -1737,40 +1751,42 @@ def test_download_to_filename_corrupted(self): client = self._make_client() bucket = _Bucket(client) blob = self._make_one(blob_name, bucket=bucket) - client.download_blob_to_file.side_effect = DataCorruption("testing") - # Try to download into a temporary file (don't use - # `_NamedTemporaryFile` it will try to remove after the file is - # already removed) - filehandle, filename = tempfile.mkstemp() - os.close(filehandle) - self.assertTrue(os.path.exists(filename)) + with mock.patch.object(blob, "_prep_and_do_download"): + blob._prep_and_do_download.side_effect = DataCorruption("testing") - with self.assertRaises(DataCorruption): - blob.download_to_filename(filename) + # Try to download into a temporary file (don't use + # `_NamedTemporaryFile` it will try to remove after the file is + # already removed) + filehandle, filename = tempfile.mkstemp() + os.close(filehandle) + self.assertTrue(os.path.exists(filename)) - # Make sure the file was cleaned up. - self.assertFalse(os.path.exists(filename)) + with self.assertRaises(DataCorruption): + blob.download_to_filename(filename) - expected_timeout = self._get_default_timeout() - client.download_blob_to_file.assert_called_once_with( - blob, - mock.ANY, - start=None, - end=None, - if_etag_match=None, - if_etag_not_match=None, - if_generation_match=None, - if_generation_not_match=None, - if_metageneration_match=None, - if_metageneration_not_match=None, - raw_download=False, - timeout=expected_timeout, - checksum="md5", - retry=DEFAULT_RETRY, - ) - stream = client.download_blob_to_file.mock_calls[0].args[1] - self.assertEqual(stream.name, filename) + # Make sure the file was cleaned up. + self.assertFalse(os.path.exists(filename)) + + expected_timeout = self._get_default_timeout() + blob._prep_and_do_download.assert_called_once_with( + mock.ANY, + client=None, + start=None, + end=None, + if_etag_match=None, + if_etag_not_match=None, + if_generation_match=None, + if_generation_not_match=None, + if_metageneration_match=None, + if_metageneration_not_match=None, + raw_download=False, + timeout=expected_timeout, + checksum="md5", + retry=DEFAULT_RETRY, + ) + stream = blob._prep_and_do_download.mock_calls[0].args[0] + self.assertEqual(stream.name, filename) def _download_as_bytes_helper(self, raw_download, timeout=None, **extra_kwargs): blob_name = "blob-name" @@ -1778,36 +1794,39 @@ def _download_as_bytes_helper(self, raw_download, timeout=None, **extra_kwargs): bucket = _Bucket(client) blob = self._make_one(blob_name, bucket=bucket) - if timeout is None: - expected_timeout = self._get_default_timeout() - fetched = blob.download_as_bytes(raw_download=raw_download, **extra_kwargs) - else: - expected_timeout = timeout - fetched = blob.download_as_bytes( - raw_download=raw_download, timeout=timeout, **extra_kwargs - ) - self.assertEqual(fetched, b"") + with mock.patch.object(blob, "_prep_and_do_download"): + if timeout is None: + expected_timeout = self._get_default_timeout() + fetched = blob.download_as_bytes( + raw_download=raw_download, **extra_kwargs + ) + else: + expected_timeout = timeout + fetched = blob.download_as_bytes( + raw_download=raw_download, timeout=timeout, **extra_kwargs + ) + self.assertEqual(fetched, b"") - expected_retry = extra_kwargs.get("retry", DEFAULT_RETRY) + expected_retry = extra_kwargs.get("retry", DEFAULT_RETRY) - client.download_blob_to_file.assert_called_once_with( - blob, - mock.ANY, - start=None, - end=None, - if_etag_match=None, - if_etag_not_match=None, - if_generation_match=None, - if_generation_not_match=None, - if_metageneration_match=None, - if_metageneration_not_match=None, - raw_download=raw_download, - timeout=expected_timeout, - checksum="md5", - retry=expected_retry, - ) - stream = client.download_blob_to_file.mock_calls[0].args[1] - self.assertIsInstance(stream, io.BytesIO) + blob._prep_and_do_download.assert_called_once_with( + mock.ANY, + client=None, + start=None, + end=None, + raw_download=raw_download, + if_etag_match=None, + if_etag_not_match=None, + if_generation_match=None, + if_generation_not_match=None, + if_metageneration_match=None, + if_metageneration_not_match=None, + timeout=expected_timeout, + checksum="md5", + retry=expected_retry, + ) + stream = blob._prep_and_do_download.mock_calls[0].args[0] + self.assertIsInstance(stream, io.BytesIO) def test_download_as_bytes_w_custom_timeout(self): self._download_as_bytes_helper(raw_download=False, timeout=9.58) @@ -1820,14 +1839,14 @@ def test_download_as_bytes_w_etag_match(self): blob = self._make_one( "blob-name", bucket=_Bucket(client), properties={"mediaLink": MEDIA_LINK} ) - client.download_blob_to_file = mock.Mock() + blob._prep_and_do_download = mock.Mock() fetched = blob.download_as_bytes(if_etag_match=ETAG) self.assertEqual(fetched, b"") - client.download_blob_to_file.assert_called_once_with( - blob, + blob._prep_and_do_download.assert_called_once_with( mock.ANY, + client=None, start=None, end=None, raw_download=False, @@ -1850,14 +1869,14 @@ def test_download_as_bytes_w_generation_match(self): blob = self._make_one( "blob-name", bucket=_Bucket(client), properties={"mediaLink": MEDIA_LINK} ) - client.download_blob_to_file = mock.Mock() + blob._prep_and_do_download = mock.Mock() fetched = blob.download_as_bytes(if_generation_match=GENERATION_NUMBER) self.assertEqual(fetched, b"") - client.download_blob_to_file.assert_called_once_with( - blob, + blob._prep_and_do_download.assert_called_once_with( mock.ANY, + client=None, start=None, end=None, raw_download=False, @@ -2087,14 +2106,14 @@ def test_download_as_string(self, mock_warn): blob = self._make_one( "blob-name", bucket=_Bucket(client), properties={"mediaLink": MEDIA_LINK} ) - client.download_blob_to_file = mock.Mock() + blob._prep_and_do_download = mock.Mock() fetched = blob.download_as_string() self.assertEqual(fetched, b"") - client.download_blob_to_file.assert_called_once_with( - blob, + blob._prep_and_do_download.assert_called_once_with( mock.ANY, + client=None, start=None, end=None, raw_download=False, @@ -2125,14 +2144,14 @@ def test_download_as_string_no_retry(self, mock_warn): blob = self._make_one( "blob-name", bucket=_Bucket(client), properties={"mediaLink": MEDIA_LINK} ) - client.download_blob_to_file = mock.Mock() + blob._prep_and_do_download = mock.Mock() fetched = blob.download_as_string(retry=None) self.assertEqual(fetched, b"") - client.download_blob_to_file.assert_called_once_with( - blob, + blob._prep_and_do_download.assert_called_once_with( mock.ANY, + client=None, start=None, end=None, raw_download=False, @@ -2232,11 +2251,12 @@ def test__get_upload_arguments(self): blob = self._make_one(name, bucket=None, encryption_key=key) blob.content_disposition = "inline" + COMMAND = "tm.upload_many" content_type = "image/jpeg" with patch.object( _helpers, "_get_invocation_id", return_value=GCCL_INVOCATION_TEST_CONST ): - info = blob._get_upload_arguments(client, content_type) + info = blob._get_upload_arguments(client, content_type, command=COMMAND) headers, object_metadata, new_content_type = info header_key_value = "W3BYd0AscEBAQWZCZnJSM3gtMmIyU0NIUiwuP1l3Uk8=" @@ -2245,11 +2265,17 @@ def test__get_upload_arguments(self): _helpers, "_get_invocation_id", return_value=GCCL_INVOCATION_TEST_CONST ): expected_headers = { - **_get_default_headers(client._connection.user_agent, content_type), + **_get_default_headers( + client._connection.user_agent, content_type, command=COMMAND + ), "X-Goog-Encryption-Algorithm": "AES256", "X-Goog-Encryption-Key": header_key_value, "X-Goog-Encryption-Key-Sha256": header_key_hash_value, } + self.assertEqual( + headers["X-Goog-API-Client"], + f"{client._connection.user_agent} {GCCL_INVOCATION_TEST_CONST} gccl-gcs-cmd/{COMMAND}", + ) self.assertEqual(headers, expected_headers) expected_metadata = { "contentDisposition": blob.content_disposition, @@ -3165,6 +3191,7 @@ def _do_upload_helper( timeout=expected_timeout, checksum=None, retry=retry, + command=None, ) blob._do_resumable_upload.assert_not_called() else: @@ -3183,6 +3210,7 @@ def _do_upload_helper( timeout=expected_timeout, checksum=None, retry=retry, + command=None, ) def test__do_upload_uses_multipart(self): @@ -3275,6 +3303,7 @@ def _upload_from_file_helper(self, side_effect=None, **kwargs): timeout=expected_timeout, checksum=None, retry=retry, + command=None, ) return stream @@ -3366,7 +3395,13 @@ def _do_upload_mock_call_helper( if not retry: retry = DEFAULT_RETRY_IF_GENERATION_SPECIFIED if not num_retries else None self.assertEqual( - kwargs, {"timeout": expected_timeout, "checksum": None, "retry": retry} + kwargs, + { + "timeout": expected_timeout, + "checksum": None, + "retry": retry, + "command": None, + }, ) return pos_args[1] diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 31f7e3988..277610696 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -1639,9 +1639,16 @@ def test_create_bucket_w_name_only(self): _target_object=bucket, ) + @staticmethod + def _make_blob(*args, **kw): + from google.cloud.storage.blob import Blob + + blob = Blob(*args, **kw) + + return blob + def test_download_blob_to_file_with_failure(self): from google.resumable_media import InvalidResponse - from google.cloud.storage.blob import Blob from google.cloud.storage.constants import _DEFAULT_TIMEOUT project = "PROJECT" @@ -1652,7 +1659,7 @@ def test_download_blob_to_file_with_failure(self): grmp_response = InvalidResponse(raw_response) credentials = _make_credentials(project=project) client = self._make_one(credentials=credentials) - blob = mock.create_autospec(Blob) + blob = self._make_blob(name="blob_name", bucket=None) blob._encryption_key = None blob._get_download_url = mock.Mock() blob._do_download = mock.Mock() @@ -1689,7 +1696,7 @@ def test_download_blob_to_file_with_uri(self): project = "PROJECT" credentials = _make_credentials(project=project) client = self._make_one(project=project, credentials=credentials) - blob = mock.Mock() + blob = self._make_blob(name="blob_name", bucket=None) file_obj = io.BytesIO() blob._encryption_key = None blob._get_download_url = mock.Mock() @@ -1787,13 +1794,12 @@ def test_download_blob_to_file_w_conditional_retry_fail(self): def _download_blob_to_file_helper( self, use_chunks, raw_download, expect_condition_fail=False, **extra_kwargs ): - from google.cloud.storage.blob import Blob from google.cloud.storage.constants import _DEFAULT_TIMEOUT project = "PROJECT" credentials = _make_credentials(project=project) client = self._make_one(credentials=credentials) - blob = mock.create_autospec(Blob) + blob = self._make_blob(name="blob_name", bucket=None) blob._encryption_key = None blob._get_download_url = mock.Mock() if use_chunks: @@ -1863,14 +1869,13 @@ def test_download_blob_to_file_w_chunks_w_raw(self): self._download_blob_to_file_helper(use_chunks=True, raw_download=True) def test_download_blob_have_different_uuid(self): - from google.cloud.storage.blob import Blob - project = "PROJECT" credentials = _make_credentials(project=project) client = self._make_one(credentials=credentials) - blob = mock.create_autospec(Blob) + blob = self._make_blob(name="blob_name", bucket=None) blob._encryption_key = None blob._do_download = mock.Mock() + blob._get_download_url = mock.Mock() file_obj = io.BytesIO() client.download_blob_to_file(blob, file_obj) client.download_blob_to_file(blob, file_obj) diff --git a/tests/unit/test_transfer_manager.py b/tests/unit/test_transfer_manager.py index f1d760043..eb2a5711e 100644 --- a/tests/unit/test_transfer_manager.py +++ b/tests/unit/test_transfer_manager.py @@ -37,6 +37,14 @@ HOSTNAME = "https://example.com" URL = "https://example.com/bucket/blob" USER_AGENT = "agent" +EXPECTED_UPLOAD_KWARGS = { + "command": "tm.upload_many", + **UPLOAD_KWARGS, +} +EXPECTED_DOWNLOAD_KWARGS = { + "command": "tm.download_many", + **DOWNLOAD_KWARGS, +} # Used in subprocesses only, so excluded from coverage @@ -44,9 +52,9 @@ def _validate_blob_token_in_subprocess( maybe_pickled_blob, method_name, path_or_file, **kwargs ): # pragma: NO COVER assert pickle.loads(maybe_pickled_blob) == BLOB_TOKEN_STRING - assert method_name.endswith("filename") + assert "filename" in method_name assert path_or_file.startswith("file") - assert kwargs == UPLOAD_KWARGS or kwargs == DOWNLOAD_KWARGS + assert kwargs == EXPECTED_UPLOAD_KWARGS or kwargs == EXPECTED_DOWNLOAD_KWARGS return FAKE_RESULT @@ -55,10 +63,11 @@ def test_upload_many_with_filenames(): ("file_a.txt", mock.Mock(spec=Blob)), ("file_b.txt", mock.Mock(spec=Blob)), ] - EXPECTED_UPLOAD_KWARGS = {"if_generation_match": 0, **UPLOAD_KWARGS} + expected_upload_kwargs = EXPECTED_UPLOAD_KWARGS.copy() + expected_upload_kwargs["if_generation_match"] = 0 for _, blob_mock in FILE_BLOB_PAIRS: - blob_mock.upload_from_filename.return_value = FAKE_RESULT + blob_mock._handle_filename_and_upload.return_value = FAKE_RESULT results = transfer_manager.upload_many( FILE_BLOB_PAIRS, @@ -67,8 +76,8 @@ def test_upload_many_with_filenames(): worker_type=transfer_manager.THREAD, ) for (filename, mock_blob) in FILE_BLOB_PAIRS: - mock_blob.upload_from_filename.assert_any_call( - filename, **EXPECTED_UPLOAD_KWARGS + mock_blob._handle_filename_and_upload.assert_any_call( + filename, **expected_upload_kwargs ) for result in results: assert result == FAKE_RESULT @@ -79,10 +88,11 @@ def test_upload_many_with_file_objs(): (tempfile.TemporaryFile(), mock.Mock(spec=Blob)), (tempfile.TemporaryFile(), mock.Mock(spec=Blob)), ] - EXPECTED_UPLOAD_KWARGS = {"if_generation_match": 0, **UPLOAD_KWARGS} + expected_upload_kwargs = EXPECTED_UPLOAD_KWARGS.copy() + expected_upload_kwargs["if_generation_match"] = 0 for _, blob_mock in FILE_BLOB_PAIRS: - blob_mock.upload_from_file.return_value = FAKE_RESULT + blob_mock._prep_and_do_upload.return_value = FAKE_RESULT results = transfer_manager.upload_many( FILE_BLOB_PAIRS, @@ -91,7 +101,7 @@ def test_upload_many_with_file_objs(): worker_type=transfer_manager.THREAD, ) for (file, mock_blob) in FILE_BLOB_PAIRS: - mock_blob.upload_from_file.assert_any_call(file, **EXPECTED_UPLOAD_KWARGS) + mock_blob._prep_and_do_upload.assert_any_call(file, **expected_upload_kwargs) for result in results: assert result == FAKE_RESULT @@ -157,7 +167,7 @@ def test_upload_many_suppresses_exceptions(): ("file_b.txt", mock.Mock(spec=Blob)), ] for _, mock_blob in FILE_BLOB_PAIRS: - mock_blob.upload_from_filename.side_effect = ConnectionError() + mock_blob._handle_filename_and_upload.side_effect = ConnectionError() results = transfer_manager.upload_many( FILE_BLOB_PAIRS, worker_type=transfer_manager.THREAD @@ -172,7 +182,7 @@ def test_upload_many_raises_exceptions(): ("file_b.txt", mock.Mock(spec=Blob)), ] for _, mock_blob in FILE_BLOB_PAIRS: - mock_blob.upload_from_filename.side_effect = ConnectionError() + mock_blob._handle_filename_and_upload.side_effect = ConnectionError() with pytest.raises(ConnectionError): transfer_manager.upload_many( @@ -186,8 +196,8 @@ def test_upload_many_suppresses_412_with_skip_if_exists(): ("file_b.txt", mock.Mock(spec=Blob)), ] for _, mock_blob in FILE_BLOB_PAIRS: - mock_blob.upload_from_filename.side_effect = exceptions.PreconditionFailed( - "412" + mock_blob._handle_filename_and_upload.side_effect = ( + exceptions.PreconditionFailed("412") ) results = transfer_manager.upload_many( FILE_BLOB_PAIRS, @@ -246,7 +256,7 @@ def test_download_many_with_filenames(): ] for blob_mock, _ in BLOB_FILE_PAIRS: - blob_mock.download_to_filename.return_value = FAKE_RESULT + blob_mock._handle_filename_and_download.return_value = FAKE_RESULT results = transfer_manager.download_many( BLOB_FILE_PAIRS, @@ -254,7 +264,9 @@ def test_download_many_with_filenames(): worker_type=transfer_manager.THREAD, ) for (mock_blob, file) in BLOB_FILE_PAIRS: - mock_blob.download_to_filename.assert_any_call(file, **DOWNLOAD_KWARGS) + mock_blob._handle_filename_and_download.assert_any_call( + file, **EXPECTED_DOWNLOAD_KWARGS + ) for result in results: assert result == FAKE_RESULT @@ -266,7 +278,7 @@ def test_download_many_with_file_objs(): ] for blob_mock, _ in BLOB_FILE_PAIRS: - blob_mock.download_to_file.return_value = FAKE_RESULT + blob_mock._prep_and_do_download.return_value = FAKE_RESULT results = transfer_manager.download_many( BLOB_FILE_PAIRS, @@ -274,7 +286,7 @@ def test_download_many_with_file_objs(): worker_type=transfer_manager.THREAD, ) for (mock_blob, file) in BLOB_FILE_PAIRS: - mock_blob.download_to_file.assert_any_call(file, **DOWNLOAD_KWARGS) + mock_blob._prep_and_do_download.assert_any_call(file, **DOWNLOAD_KWARGS) for result in results: assert result == FAKE_RESULT @@ -305,7 +317,7 @@ def test_download_many_suppresses_exceptions(): (mock.Mock(spec=Blob), "file_b.txt"), ] for mock_blob, _ in BLOB_FILE_PAIRS: - mock_blob.download_to_filename.side_effect = ConnectionError() + mock_blob._handle_filename_and_download.side_effect = ConnectionError() results = transfer_manager.download_many( BLOB_FILE_PAIRS, worker_type=transfer_manager.THREAD @@ -320,7 +332,7 @@ def test_download_many_raises_exceptions(): (mock.Mock(spec=Blob), "file_b.txt"), ] for mock_blob, _ in BLOB_FILE_PAIRS: - mock_blob.download_to_filename.side_effect = ConnectionError() + mock_blob._handle_filename_and_download.side_effect = ConnectionError() with pytest.raises(ConnectionError): transfer_manager.download_many( @@ -531,7 +543,10 @@ def test_download_chunks_concurrently(): MULTIPLE = 4 blob_mock.size = CHUNK_SIZE * MULTIPLE - blob_mock.download_to_filename.return_value = FAKE_RESULT + expected_download_kwargs = EXPECTED_DOWNLOAD_KWARGS.copy() + expected_download_kwargs["command"] = "tm.download_sharded" + + blob_mock._handle_filename_and_download.return_value = FAKE_RESULT with mock.patch("google.cloud.storage.transfer_manager.open", mock.mock_open()): result = transfer_manager.download_chunks_concurrently( @@ -542,13 +557,13 @@ def test_download_chunks_concurrently(): worker_type=transfer_manager.THREAD, ) for x in range(MULTIPLE): - blob_mock.download_to_file.assert_any_call( + blob_mock._prep_and_do_download.assert_any_call( mock.ANY, - **DOWNLOAD_KWARGS, + **expected_download_kwargs, start=x * CHUNK_SIZE, - end=((x + 1) * CHUNK_SIZE) - 1 + end=((x + 1) * CHUNK_SIZE) - 1, ) - assert blob_mock.download_to_file.call_count == 4 + assert blob_mock._prep_and_do_download.call_count == 4 assert result is None @@ -754,7 +769,7 @@ def test_upload_chunks_concurrently_with_metadata_and_encryption(): "Accept": "application/json", "Accept-Encoding": "gzip, deflate", "User-Agent": "agent", - "X-Goog-API-Client": "agent gccl-invocation-id/{}".format(invocation_id), + "X-Goog-API-Client": f"agent gccl-invocation-id/{invocation_id} gccl-gcs-cmd/tm.upload_sharded", "content-type": FAKE_CONTENT_TYPE, "x-upload-content-type": FAKE_CONTENT_TYPE, "X-Goog-Encryption-Algorithm": "AES256", @@ -801,7 +816,7 @@ def reload(self): self.size = self._size_after_reload self.generation = self._generation_after_reload - def download_to_file(self, *args, **kwargs): + def _prep_and_do_download(self, *args, **kwargs): return "SUCCESS" @@ -924,14 +939,14 @@ def test__reduce_client(): def test__call_method_on_maybe_pickled_blob(): blob = mock.Mock(spec=Blob) - blob.download_to_file.return_value = "SUCCESS" + blob._prep_and_do_download.return_value = "SUCCESS" result = transfer_manager._call_method_on_maybe_pickled_blob( - blob, "download_to_file" + blob, "_prep_and_do_download" ) assert result == "SUCCESS" pickled_blob = pickle.dumps(_PickleableMockBlob()) result = transfer_manager._call_method_on_maybe_pickled_blob( - pickled_blob, "download_to_file" + pickled_blob, "_prep_and_do_download" ) assert result == "SUCCESS"