Skip to content

Commit

Permalink
feat: handle interrupted downloads with decompressive transcoding (#346)
Browse files Browse the repository at this point in the history
* feat: handle interrupted downloads with decompressive transcoding

* add test

* check content-encoding response header

Co-authored-by: Anthonios Partheniou <partheniou@google.com>
  • Loading branch information
cojenco and parthea committed Sep 21, 2022
1 parent b7597d5 commit f4d26b7
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 0 deletions.
20 changes: 20 additions & 0 deletions google/resumable_media/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

RANGE_HEADER = "range"
CONTENT_RANGE_HEADER = "content-range"
CONTENT_ENCODING_HEADER = "content-encoding"

_SLOW_CRC32C_WARNING = (
"Currently using crcmod in pure python form. This is a slow "
Expand All @@ -40,6 +41,8 @@
)
_GENERATION_HEADER = "x-goog-generation"
_HASH_HEADER = "x-goog-hash"
_STORED_CONTENT_ENCODING_HEADER = "x-goog-stored-content-encoding"

_MISSING_CHECKSUM = """\
No {checksum_type} checksum was returned from the service while downloading {}
(which happens for composite objects), so client-side content integrity
Expand Down Expand Up @@ -369,6 +372,23 @@ def add_query_parameters(media_url, query_params):
return urlunsplit((scheme, netloc, path, query, frag))


def _is_decompressive_transcoding(response, get_headers):
"""Returns True if the object was served decompressed. This happens when the
"x-goog-stored-content-encoding" header is "gzip" and "content-encoding" header
is not "gzip". See more at: https://cloud.google.com/storage/docs/transcoding#transcoding_and_gzip
Args:
response (~requests.Response): The HTTP response object.
get_headers (callable: response->dict): returns response headers.
Returns:
bool: Returns True if decompressive transcoding has occurred; otherwise, False.
"""
headers = get_headers(response)
return (
headers.get(_STORED_CONTENT_ENCODING_HEADER) == "gzip"
and headers.get(CONTENT_ENCODING_HEADER) != "gzip"
)


class _DoNothingHash(object):
"""Do-nothing hash object.
Expand Down
29 changes: 29 additions & 0 deletions google/resumable_media/requests/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@
{}
"""

_STREAM_SEEK_ERROR = """\
Incomplete download for:
{}
Error writing to stream while handling a gzip-compressed file download.
Please restart the download.
"""


class Download(_request_helpers.RequestsMixin, _download.Download):
"""Helper to manage downloading a resource from a Google API.
Expand Down Expand Up @@ -206,7 +213,18 @@ def retriable_request():

self._process_response(result)

# With decompressive transcoding, GCS serves back the whole file regardless of the range request,
# thus we reset the stream position to the start of the stream.
# See: https://cloud.google.com/storage/docs/transcoding#range
if self._stream is not None:
if _helpers._is_decompressive_transcoding(result, self._get_headers):
try:
self._stream.seek(0)
except Exception as exc:
msg = _STREAM_SEEK_ERROR.format(url)
raise Exception(msg) from exc
self._bytes_downloaded = 0

self._write_to_stream(result)

return result
Expand Down Expand Up @@ -379,7 +397,18 @@ def retriable_request():

self._process_response(result)

# With decompressive transcoding, GCS serves back the whole file regardless of the range request,
# thus we reset the stream position to the start of the stream.
# See: https://cloud.google.com/storage/docs/transcoding#range
if self._stream is not None:
if _helpers._is_decompressive_transcoding(result, self._get_headers):
try:
self._stream.seek(0)
except Exception as exc:
msg = _STREAM_SEEK_ERROR.format(url)
raise Exception(msg) from exc
self._bytes_downloaded = 0

self._write_to_stream(result)

return result
Expand Down
20 changes: 20 additions & 0 deletions tests/system/requests/test_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,26 @@ def test_download_to_stream(self, add_files, authorized_transport):
assert stream.getvalue() == actual_contents
check_tombstoned(download, authorized_transport)

def test_download_gzip_w_stored_content_headers(
self, add_files, authorized_transport
):
# Retrieve the gzip compressed file
info = ALL_FILES[-1]
actual_contents = self._get_contents(info)
blob_name = get_blob_name(info)

# Create the actual download object.
media_url = utils.DOWNLOAD_URL_TEMPLATE.format(blob_name=blob_name)
stream = io.BytesIO()
download = self._make_one(media_url, stream=stream)
# Consume the resource.
response = download.consume(authorized_transport)
assert response.status_code == http.client.OK
assert response.headers.get(_helpers._STORED_CONTENT_ENCODING_HEADER) == "gzip"
assert response.headers.get("X-Goog-Stored-Content-Length") is not None
assert stream.getvalue() == actual_contents
check_tombstoned(download, authorized_transport)

def test_extra_headers(self, authorized_transport, secret_file):
blob_name, data, headers = secret_file
# Create the actual download object.
Expand Down
82 changes: 82 additions & 0 deletions tests/unit/requests/test_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,45 @@ def test_consume_w_bytes_downloaded(self):
range_bytes = "bytes={:d}-{:d}".format(offset, end)
assert download._headers["range"] == range_bytes

def test_consume_gzip_reset_stream_w_bytes_downloaded(self):
stream = io.BytesIO()
chunks = (b"up down ", b"charlie ", b"brown")
end = 65536

download = download_mod.Download(
EXAMPLE_URL, stream=stream, end=end, headers=None, checksum="md5"
)
transport = mock.Mock(spec=["request"])

# Mock a decompressive transcoding retry operation with bytes already downloaded in the stream
headers = {_helpers._STORED_CONTENT_ENCODING_HEADER: "gzip"}
transport.request.return_value = _mock_response(chunks=chunks, headers=headers)
offset = 16
download._bytes_downloaded = offset
download.consume(transport)

assert stream.getvalue() == b"".join(chunks)
assert download._bytes_downloaded == len(b"".join(chunks))

def test_consume_gzip_reset_stream_error(self):
stream = io.BytesIO()
chunks = (b"up down ", b"charlie ", b"brown")
end = 65536

download = download_mod.Download(
EXAMPLE_URL, stream=stream, end=end, headers=None, checksum="md5"
)
transport = mock.Mock(spec=["request"])

# Mock a stream seek error while resuming a decompressive transcoding download
stream.seek = mock.Mock(side_effect=OSError("mock stream seek error"))
headers = {_helpers._STORED_CONTENT_ENCODING_HEADER: "gzip"}
transport.request.return_value = _mock_response(chunks=chunks, headers=headers)
offset = 16
download._bytes_downloaded = offset
with pytest.raises(Exception):
download.consume(transport)


class TestRawDownload(object):
def test__write_to_stream_no_hash_check(self):
Expand Down Expand Up @@ -772,6 +811,49 @@ def test_consume_w_bytes_downloaded(self):
range_bytes = "bytes={:d}-{:d}".format(offset, end)
assert download._headers["range"] == range_bytes

def test_consume_gzip_reset_stream_w_bytes_downloaded(self):
stream = io.BytesIO()
chunks = (b"up down ", b"charlie ", b"brown")
end = 65536

download = download_mod.RawDownload(
EXAMPLE_URL, stream=stream, end=end, headers=None, checksum="md5"
)
transport = mock.Mock(spec=["request"])

# Mock a decompressive transcoding retry operation with bytes already downloaded in the stream
headers = {_helpers._STORED_CONTENT_ENCODING_HEADER: "gzip"}
transport.request.return_value = _mock_raw_response(
chunks=chunks, headers=headers
)
offset = 16
download._bytes_downloaded = offset
download.consume(transport)

assert stream.getvalue() == b"".join(chunks)
assert download._bytes_downloaded == len(b"".join(chunks))

def test_consume_gzip_reset_stream_error(self):
stream = io.BytesIO()
chunks = (b"up down ", b"charlie ", b"brown")
end = 65536

download = download_mod.RawDownload(
EXAMPLE_URL, stream=stream, end=end, headers=None, checksum="md5"
)
transport = mock.Mock(spec=["request"])

# Mock a stream seek error while resuming a decompressive transcoding download
stream.seek = mock.Mock(side_effect=OSError("mock stream seek error"))
headers = {_helpers._STORED_CONTENT_ENCODING_HEADER: "gzip"}
transport.request.return_value = _mock_raw_response(
chunks=chunks, headers=headers
)
offset = 16
download._bytes_downloaded = offset
with pytest.raises(Exception):
download.consume(transport)


class TestChunkedDownload(object):
@staticmethod
Expand Down
25 changes: 25 additions & 0 deletions tests/unit/test__helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,31 @@ def test_header_value(self):
assert generation_header == self.GENERATION_VALUE


class Test__is_decompressive_transcoding(object):
def test_empty_value(self):
headers = {}
response = _mock_response(headers=headers)
assert _helpers._is_decompressive_transcoding(response, _get_headers) is False

def test_gzip_in_headers(self):
headers = {_helpers._STORED_CONTENT_ENCODING_HEADER: "gzip"}
response = _mock_response(headers=headers)
assert _helpers._is_decompressive_transcoding(response, _get_headers) is True

def test_gzip_not_in_headers(self):
headers = {_helpers._STORED_CONTENT_ENCODING_HEADER: "identity"}
response = _mock_response(headers=headers)
assert _helpers._is_decompressive_transcoding(response, _get_headers) is False

def test_gzip_w_content_encoding_in_headers(self):
headers = {
_helpers._STORED_CONTENT_ENCODING_HEADER: "gzip",
_helpers.CONTENT_ENCODING_HEADER: "gzip",
}
response = _mock_response(headers=headers)
assert _helpers._is_decompressive_transcoding(response, _get_headers) is False


class Test__get_generation_from_url(object):

GENERATION_VALUE = 1641590104888641
Expand Down

0 comments on commit f4d26b7

Please sign in to comment.