Skip to content

Commit

Permalink
empty download (#86)
Browse files Browse the repository at this point in the history
  • Loading branch information
TingDaoK committed Feb 15, 2021
1 parent 977815f commit ab2120f
Show file tree
Hide file tree
Showing 6 changed files with 293 additions and 76 deletions.
4 changes: 4 additions & 0 deletions include/aws/s3/private/s3_auto_ranged_get.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

enum aws_s3_auto_ranged_get_request_type {
AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_PART,
AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_PART_WITHOUT_RANGE,
};

struct aws_s3_auto_ranged_get {
Expand All @@ -26,6 +27,9 @@ struct aws_s3_auto_ranged_get {

size_t total_object_size;

uint32_t get_without_range : 1;
uint32_t get_without_range_sent : 1;
uint32_t get_without_range_completed : 1;
} synced_data;
};

Expand Down
4 changes: 3 additions & 1 deletion include/aws/s3/private/s3_request_messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/

#include <inttypes.h>
#include <stdbool.h>
#include <stddef.h>

struct aws_allocator;
Expand All @@ -30,7 +31,8 @@ struct aws_http_message *aws_s3_get_object_message_new(
struct aws_allocator *allocator,
struct aws_http_message *base_message,
uint32_t part_number,
size_t part_size);
size_t part_size,
bool has_range);

/* Create an HTTP request for an S3 Put Object request, using the original request as a basis. Creates and assigns a
* body stream using the passed in buffer. If multipart is not needed, part number and upload_id can be 0 and NULL,
Expand Down
223 changes: 150 additions & 73 deletions source/s3_auto_ranged_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#endif

const uint32_t s_conservative_max_requests_in_flight = 8;
const struct aws_byte_cursor g_application_xml_value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("application/xml");
const struct aws_byte_cursor g_object_size_value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("ActualObjectSize");

static void s_s3_meta_request_auto_ranged_get_destroy(struct aws_s3_meta_request *meta_request);

Expand Down Expand Up @@ -100,6 +102,31 @@ static void s_s3_meta_request_auto_ranged_get_destroy(struct aws_s3_meta_request
aws_mem_release(meta_request->allocator, auto_ranged_get);
}

/* Check the finish result of meta request, in case of the request failed because of downloading an empty file */
static bool s_check_empty_file_download_error(struct aws_s3_request *failed_request) {
struct aws_http_headers *failed_headers = failed_request->send_data.response_headers;
struct aws_byte_buf failed_body = failed_request->send_data.response_body;
if (failed_headers && failed_body.capacity > 0) {
struct aws_byte_cursor content_type;
AWS_ZERO_STRUCT(content_type);
if (!aws_http_headers_get(failed_headers, g_content_type_header_name, &content_type)) {
/* Content type found */
if (aws_byte_cursor_eq_ignore_case(&content_type, &g_application_xml_value)) {
/* XML response */
struct aws_byte_cursor body_cursor = aws_byte_cursor_from_buf(&failed_body);
struct aws_string *size =
get_top_level_xml_tag_value(failed_request->allocator, &g_object_size_value, &body_cursor);
bool check_size = aws_string_eq_c_str(size, "0");
aws_string_destroy(size);
if (check_size) {
return true;
}
}
}
}
return false;
}

static bool s_s3_auto_ranged_get_update(
struct aws_s3_meta_request *meta_request,
uint32_t flags,
Expand Down Expand Up @@ -161,6 +188,28 @@ static bool s_s3_auto_ranged_get_update(
goto has_work_remaining;
}

if (auto_ranged_get->synced_data.get_without_range) {
if (auto_ranged_get->synced_data.get_without_range_sent) {
if (auto_ranged_get->synced_data.get_without_range_completed) {
goto no_work_remaining;
} else {
goto has_work_remaining;
}
}
if (out_request == NULL) {
goto has_work_remaining;
}

request = aws_s3_request_new(
meta_request,
AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_PART_WITHOUT_RANGE,
1,
AWS_S3_REQUEST_DESC_RECORD_RESPONSE_HEADERS);

auto_ranged_get->synced_data.get_without_range_sent = true;
goto has_work_remaining;
}

/* If we have gotten a response for the first request, then the total number of parts for the object is now
* known. Continue sending parts until the total number of parts is reached.*/
if (auto_ranged_get->synced_data.num_parts_requested < auto_ranged_get->synced_data.total_num_parts) {
Expand Down Expand Up @@ -190,6 +239,15 @@ static bool s_s3_auto_ranged_get_update(
goto has_work_remaining;
}

if (auto_ranged_get->synced_data.get_without_range) {
if (auto_ranged_get->synced_data.get_without_range_sent &&
!auto_ranged_get->synced_data.get_without_range_completed) {
goto has_work_remaining;
} else {
goto no_work_remaining;
}
}

/* If some parts are still being delivered to the caller, then wait for those to finish. */
if (meta_request->synced_data.num_parts_delivery_completed <
meta_request->synced_data.num_parts_delivery_sent) {
Expand Down Expand Up @@ -249,11 +307,13 @@ static int s_s3_auto_ranged_get_prepare_request(

struct aws_http_message *message = NULL;

AWS_ASSERT(request->request_tag == AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_PART);

/* Generate a new ranged get request based on the original message. */
message = aws_s3_get_object_message_new(
meta_request->allocator, meta_request->initial_request_message, request->part_number, meta_request->part_size);
meta_request->allocator,
meta_request->initial_request_message,
request->part_number,
meta_request->part_size,
request->request_tag != AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_PART_WITHOUT_RANGE);

if (message == NULL) {
AWS_LOGF_ERROR(
Expand Down Expand Up @@ -294,66 +354,67 @@ static void s_s3_auto_ranged_get_request_finished(

struct aws_s3_auto_ranged_get *auto_ranged_get = meta_request->impl;

AWS_ASSERT(request->request_tag == AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_PART);

uint32_t num_parts = 0;

if (error_code == AWS_ERROR_SUCCESS && request->part_number == 1) {
struct aws_byte_cursor content_range_header_value;

if (aws_http_headers_get(
request->send_data.response_headers, g_content_range_header_name, &content_range_header_value)) {
AWS_LOGF_ERROR(
AWS_LS_S3_META_REQUEST,
"id=%p Could not find content range header for request %p",
(void *)meta_request,
(void *)request);

error_code = AWS_ERROR_S3_MISSING_CONTENT_RANGE_HEADER;
goto error_encountered;
}

uint64_t range_start = 0;
uint64_t range_end = 0;
uint64_t total_object_size = 0;
if (request->request_tag == AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_PART) {
struct aws_byte_cursor content_range_header_value;

if (aws_http_headers_get(
request->send_data.response_headers, g_content_range_header_name, &content_range_header_value)) {
AWS_LOGF_ERROR(
AWS_LS_S3_META_REQUEST,
"id=%p Could not find content range header for request %p",
(void *)meta_request,
(void *)request);

error_code = AWS_ERROR_S3_MISSING_CONTENT_RANGE_HEADER;
goto error_encountered;
}

/* The memory the byte cursor refers to should be valid, but if it's referring to a buffer that was
* previously used, the null terminating character may not be where we expect. We copy to a string to
* ensure that our null terminating character placement corresponds with the length. */
struct aws_string *content_range_header_value_str =
aws_string_new_from_cursor(meta_request->allocator, &content_range_header_value);

/* Format of header is: "bytes StartByte-EndByte/TotalObjectSize" */
sscanf(
(const char *)content_range_header_value_str->bytes,
"bytes %" PRIu64 "-%" PRIu64 "/%" PRIu64,
&range_start,
&range_end,
&total_object_size);

aws_string_destroy(content_range_header_value_str);
content_range_header_value_str = NULL;

if (total_object_size == 0) {
AWS_LOGF_ERROR(AWS_LS_S3_META_REQUEST, "id=%p Get Object has invalid content range.", (void *)meta_request);
error_code = AWS_ERROR_S3_MISSING_CONTENT_RANGE_HEADER;
goto error_encountered;
}
uint64_t range_start = 0;
uint64_t range_end = 0;

/* The memory the byte cursor refers to should be valid, but if it's referring to a buffer that was
* previously used, the null terminating character may not be where we expect. We copy to a string to
* ensure that our null terminating character placement corresponds with the length. */
struct aws_string *content_range_header_value_str =
aws_string_new_from_cursor(meta_request->allocator, &content_range_header_value);

/* Format of header is: "bytes StartByte-EndByte/TotalObjectSize" */
sscanf(
(const char *)content_range_header_value_str->bytes,
"bytes %" PRIu64 "-%" PRIu64 "/%" PRIu64,
&range_start,
&range_end,
&total_object_size);

aws_string_destroy(content_range_header_value_str);
content_range_header_value_str = NULL;

if (total_object_size == 0) {
AWS_LOGF_ERROR(
AWS_LS_S3_META_REQUEST, "id=%p Get Object has invalid content range.", (void *)meta_request);
error_code = AWS_ERROR_S3_MISSING_CONTENT_RANGE_HEADER;
goto error_encountered;
}

num_parts = (uint32_t)(total_object_size / meta_request->part_size);
num_parts = (uint32_t)(total_object_size / meta_request->part_size);

if (total_object_size % meta_request->part_size) {
++num_parts;
}
if (total_object_size % meta_request->part_size) {
++num_parts;
}

AWS_LOGF_DEBUG(
AWS_LS_S3_META_REQUEST,
"id=%p Object being requested is %" PRIu64 " bytes which will have %d parts based off of a %" PRIu64
" part size.",
(void *)meta_request,
total_object_size,
num_parts,
(uint64_t)meta_request->part_size);
AWS_LOGF_DEBUG(
AWS_LS_S3_META_REQUEST,
"id=%p Object being requested is %" PRIu64 " bytes which will have %d parts based off of a %" PRIu64
" part size.",
(void *)meta_request,
total_object_size,
num_parts,
(uint64_t)meta_request->part_size);
}

if (meta_request->headers_callback != NULL) {
struct aws_http_headers *response_headers = aws_http_headers_new(meta_request->allocator);
Expand Down Expand Up @@ -381,28 +442,44 @@ static void s_s3_auto_ranged_get_request_finished(

aws_s3_meta_request_lock_synced_data(meta_request);

++auto_ranged_get->synced_data.num_parts_completed;
if (request->request_tag == AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_PART) {
++auto_ranged_get->synced_data.num_parts_completed;

if (error_code == AWS_ERROR_SUCCESS) {
++auto_ranged_get->synced_data.num_parts_successful;
if (error_code == AWS_ERROR_SUCCESS) {
++auto_ranged_get->synced_data.num_parts_successful;

if (request->part_number == 1) {
AWS_ASSERT(num_parts > 0);
auto_ranged_get->synced_data.total_num_parts = num_parts;
}
if (request->part_number == 1) {
AWS_ASSERT(num_parts > 0);
auto_ranged_get->synced_data.total_num_parts = num_parts;
}

aws_s3_meta_request_stream_response_body_synced(meta_request, request);
aws_s3_meta_request_stream_response_body_synced(meta_request, request);

AWS_LOGF_DEBUG(
AWS_LS_S3_META_REQUEST,
"id=%p: %d out of %d parts have completed.",
(void *)meta_request,
(auto_ranged_get->synced_data.num_parts_successful + auto_ranged_get->synced_data.num_parts_failed),
auto_ranged_get->synced_data.total_num_parts);
} else {
++auto_ranged_get->synced_data.num_parts_failed;

aws_s3_meta_request_set_fail_synced(meta_request, request, error_code);
AWS_LOGF_DEBUG(
AWS_LS_S3_META_REQUEST,
"id=%p: %d out of %d parts have completed.",
(void *)meta_request,
(auto_ranged_get->synced_data.num_parts_successful + auto_ranged_get->synced_data.num_parts_failed),
auto_ranged_get->synced_data.total_num_parts);
} else {
++auto_ranged_get->synced_data.num_parts_failed;
if (s_check_empty_file_download_error(request)) {
AWS_LOGF_DEBUG(
AWS_LS_S3_META_REQUEST,
"id=%p Getting an empty file, create a new request without range header to fetch the empty "
"file",
(void *)meta_request);
auto_ranged_get->synced_data.get_without_range = true;
} else {
aws_s3_meta_request_set_fail_synced(meta_request, request, error_code);
}
}
} else if (request->request_tag == AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_PART_WITHOUT_RANGE) {
AWS_LOGF_DEBUG(AWS_LS_S3_META_REQUEST, "id=%p Get empty file completed", (void *)meta_request);
auto_ranged_get->synced_data.get_without_range_completed = true;
if (error_code != AWS_ERROR_SUCCESS) {
aws_s3_meta_request_set_fail_synced(meta_request, request, error_code);
}
}

aws_s3_meta_request_unlock_synced_data(meta_request);
Expand Down
5 changes: 3 additions & 2 deletions source/s3_request_messages.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ struct aws_http_message *aws_s3_get_object_message_new(
struct aws_allocator *allocator,
struct aws_http_message *base_message,
uint32_t part_number,
size_t part_size) {
size_t part_size,
bool has_range) {
AWS_PRECONDITION(allocator);
AWS_PRECONDITION(base_message);

Expand All @@ -44,7 +45,7 @@ struct aws_http_message *aws_s3_get_object_message_new(
return NULL;
}

if (part_number > 0) {
if (part_number > 0 && has_range) {
if (s_s3_message_util_add_content_range_header(part_number - 1, part_size, message)) {
goto error_clean_up;
}
Expand Down
3 changes: 3 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ add_net_test_case(test_s3_get_object_tls_disabled)
add_net_test_case(test_s3_get_object_tls_enabled)
add_net_test_case(test_s3_get_object_tls_default)
add_net_test_case(test_s3_get_object_less_than_part_size)
add_net_test_case(test_s3_get_object_empty_object)
add_net_test_case(test_s3_get_object_multiple)
add_net_test_case(test_s3_get_object_sse_kms)
add_net_test_case(test_s3_get_object_sse_aes256)
Expand All @@ -39,7 +40,9 @@ add_net_test_case(test_s3_put_object_tls_disabled)
add_net_test_case(test_s3_put_object_tls_enabled)
add_net_test_case(test_s3_put_object_tls_default)
add_net_test_case(test_s3_multipart_put_object_with_acl)
add_net_test_case(test_s3_put_object_multiple)
add_net_test_case(test_s3_put_object_less_than_part_size)
add_net_test_case(test_s3_put_object_empty_object)
add_net_test_case(test_s3_put_object_with_part_remainder)
add_net_test_case(test_s3_put_object_sse_kms)
add_net_test_case(test_s3_put_object_sse_kms_multipart)
Expand Down
Loading

0 comments on commit ab2120f

Please sign in to comment.