diff --git a/include/aws/s3/private/s3_auto_ranged_put.h b/include/aws/s3/private/s3_auto_ranged_put.h index 42b5a02f9..916ddc4eb 100644 --- a/include/aws/s3/private/s3_auto_ranged_put.h +++ b/include/aws/s3/private/s3_auto_ranged_put.h @@ -29,6 +29,7 @@ struct aws_s3_auto_ranged_put { struct aws_s3_meta_request_resume_token *resume_token; uint64_t content_length; + bool has_content_length; /* Only meant for use in the update function, which is never called concurrently. */ struct { @@ -53,30 +54,39 @@ struct aws_s3_auto_ranged_put { uint32_t num_parts_read_from_stream; } prepare_data; - /* - * Very similar to the etag_list used in complete_multipart_upload to create the XML payload. Each part will set the - * corresponding index to it's checksum result, so while the list is shared across threads each index will only be - * accessed once to initialize by the corresponding part number, and then again during the complete multipart upload - * request which will only be invoked after all other parts/threads have completed. - */ - struct aws_byte_buf *encoded_checksum_list; - /* Members to only be used when the mutex in the base type is locked. */ struct { /* Array list of `struct aws_string *`. */ struct aws_array_list etag_list; + /* Very similar to the etag_list used in complete_multipart_upload to create the XML payload. Each part will set + * the corresponding index to its checksum result. */ + struct aws_array_list encoded_checksum_list; + struct aws_s3_paginated_operation *list_parts_operation; struct aws_string *list_parts_continuation_token; + /* Note: total num parts is known only if content-length is known, + otherwise it is running total of number of parts read from stream. */ uint32_t total_num_parts; + /* Number of parts we've started work on */ uint32_t num_parts_sent; + /* Number of "sent" parts we've finished reading the body for + * (does not include skipped parts in the case of pause/resume) */ + uint32_t num_parts_read; uint32_t num_parts_completed; uint32_t num_parts_successful; uint32_t num_parts_failed; + /* When content length is not known, requests are optimistically + * scheduled, below represents how many requests were scheduled and had no + * work to do*/ + uint32_t num_parts_noop; struct aws_http_headers *needed_response_headers; + /* Whether body stream is exhausted. */ + bool is_body_stream_at_end; + int list_parts_error_code; int create_multipart_upload_error_code; int complete_multipart_upload_error_code; @@ -102,12 +112,16 @@ struct aws_s3_auto_ranged_put { AWS_EXTERN_C_BEGIN -/* Creates a new auto-ranged put meta request. This will do a multipart upload in parallel when appropriate. */ +/* Creates a new auto-ranged put meta request. + * This will do a multipart upload in parallel when appropriate. + * Note: if has_content_length is false, content_length and num_parts are ignored. + */ AWS_S3_API struct aws_s3_meta_request *aws_s3_meta_request_auto_ranged_put_new( struct aws_allocator *allocator, struct aws_s3_client *client, size_t part_size, + bool has_content_length, uint64_t content_length, uint32_t num_parts, const struct aws_s3_meta_request_options *options); diff --git a/include/aws/s3/private/s3_copy_object.h b/include/aws/s3/private/s3_copy_object.h index a839e1fcc..c9864698b 100644 --- a/include/aws/s3/private/s3_copy_object.h +++ b/include/aws/s3/private/s3_copy_object.h @@ -22,7 +22,7 @@ enum aws_s3_copy_object_request_tag { struct aws_s3_copy_object { struct aws_s3_meta_request base; - /* Useable after the Create Multipart Upload request succeeds. */ + /* Usable after the Create Multipart Upload request succeeds. */ struct aws_string *upload_id; /* Only meant for use in the update function, which is never called concurrently. */ diff --git a/include/aws/s3/private/s3_meta_request_impl.h b/include/aws/s3/private/s3_meta_request_impl.h index 9edd65415..206f9e027 100644 --- a/include/aws/s3/private/s3_meta_request_impl.h +++ b/include/aws/s3/private/s3_meta_request_impl.h @@ -22,7 +22,6 @@ struct aws_s3_client; struct aws_s3_connection; struct aws_s3_meta_request; struct aws_s3_request; -struct aws_s3_request_options; struct aws_http_headers; struct aws_http_make_request_options; struct aws_retry_strategy; @@ -118,7 +117,7 @@ struct aws_s3_meta_request { struct aws_s3_endpoint *endpoint; - /* Event loop to schedule IO work related on, ie, reading from streams, streaming parts back to the caller, etc.. + /* Event loop to schedule IO work related on, ie, reading from streams, streaming parts back to the caller, etc... * After the meta request is finished, this will be reset along with the client reference.*/ struct aws_event_loop *io_event_loop; @@ -154,8 +153,8 @@ struct aws_s3_meta_request { /* The sum of initial_read_window, plus all window_increment() calls. This number never goes down. */ uint64_t read_window_running_total; - /* The next expected streaming part number needed to continue streaming part bodies. (For example, this will - * initially be 1 for part 1, and after that part is received, it will be 2, then 3, etc.. */ + /* The next expected streaming part number needed to continue streaming part bodies. (For example, this will + * initially be 1 for part 1, and after that part is received, it will be 2, then 3, etc.. )*/ uint32_t next_streaming_part; /* Number of parts scheduled for delivery. */ @@ -199,7 +198,7 @@ struct aws_s3_meta_request { /* checksum found in either a default get request, or in the initial head request of a multipart get */ struct aws_byte_buf meta_request_level_response_header_checksum; - /* running checksum of all of the parts of a default get, or ranged get meta request*/ + /* running checksum of all the parts of a default get, or ranged get meta request*/ struct aws_s3_checksum *meta_request_level_running_response_sum; }; @@ -289,7 +288,7 @@ void aws_s3_meta_request_finished_request( /* Called to place the request in the meta request's priority queue for streaming back to the caller. Once all requests * with a part number less than the given request has been received, the given request and the previous requests will - * scheduled for streaming. */ + * be scheduled for streaming. */ AWS_S3_API void aws_s3_meta_request_stream_response_body_synced( struct aws_s3_meta_request *meta_request, @@ -300,6 +299,8 @@ void aws_s3_meta_request_stream_response_body_synced( AWS_S3_API int aws_s3_meta_request_read_body(struct aws_s3_meta_request *meta_request, struct aws_byte_buf *buffer); +bool aws_s3_meta_request_body_has_no_more_data(const struct aws_s3_meta_request *meta_request); + /* Set the meta request finish result as failed. This is meant to be called sometime before aws_s3_meta_request_finish. * Subsequent calls to this function or to aws_s3_meta_request_set_success_synced will not overwrite the end result of * the meta request. */ diff --git a/include/aws/s3/private/s3_request.h b/include/aws/s3/private/s3_request.h index e33707d7a..8c42430c0 100644 --- a/include/aws/s3/private/s3_request.h +++ b/include/aws/s3/private/s3_request.h @@ -143,7 +143,7 @@ struct aws_s3_request { bool checksum_match; /* Tag that defines what the built request will actually consist of. This is meant to be space for an enum defined - * by the derived type. Request tags do not necessarily map 1:1 with actual S3 API requests. For example, they can + * by the derived type. Request tags do not necessarily map 1:1 with actual S3 API requests. (For example, they can * be more contextual, like "first part" instead of just "part".) */ /* TODO: this should be a union type to make it clear that this could be one of two enums for puts, and gets. */ @@ -190,6 +190,16 @@ struct aws_s3_request { /* When true, this request is intended to find out the object size. This is currently only used by auto_range_get. */ uint32_t discovers_object_size : 1; + + /* When true, this request does not represent a useful http request and + * must not be sent, however client must still call corresponding finished + * callback for the request. Those requests can occur when request is + * optimistically created during update, but cannot be prepared. ex. when + * put has no content length, requests will be scheduled as regular to + * ensure fair distribution against other requests, but can also result in + * requests for uploading data after the end of the stream (those requests + * will use below flag to indicate that they should not be sent). */ + uint32_t is_noop : 1; }; AWS_EXTERN_C_BEGIN diff --git a/include/aws/s3/private/s3_request_messages.h b/include/aws/s3/private/s3_request_messages.h index 5903ed75e..c4fa65f96 100644 --- a/include/aws/s3/private/s3_request_messages.h +++ b/include/aws/s3/private/s3_request_messages.h @@ -127,7 +127,7 @@ struct aws_http_message *aws_s3_complete_multipart_message_new( struct aws_byte_buf *body_buffer, const struct aws_string *upload_id, const struct aws_array_list *etags, - struct aws_byte_buf *checksums, + const struct aws_array_list *checksums, enum aws_s3_checksum_algorithm algorithm); AWS_S3_API diff --git a/include/aws/s3/private/s3_util.h b/include/aws/s3/private/s3_util.h index 86c518755..a97ba2171 100644 --- a/include/aws/s3/private/s3_util.h +++ b/include/aws/s3/private/s3_util.h @@ -29,7 +29,6 @@ struct aws_allocator; struct aws_http_stream; struct aws_http_headers; struct aws_http_message; -struct aws_event_loop; enum aws_s3_response_status { AWS_S3_RESPONSE_STATUS_SUCCESS = 200, diff --git a/include/aws/s3/s3_client.h b/include/aws/s3/s3_client.h index d08f25f22..e9b761760 100644 --- a/include/aws/s3/s3_client.h +++ b/include/aws/s3/s3_client.h @@ -54,6 +54,16 @@ enum aws_s3_meta_request_type { /** * The PutObject request will be split into MultiPart uploads that are executed in parallel * to improve throughput, when possible. + * Note: put object supports both known and unknown body length. The client + * relies on Content-Length header to determine length of the body. + * Request with unknown content length are always sent using multipart + * upload regardless of final number of parts and do have the following limitations: + * - multipart threshold is ignored and all request are made through mpu, + * even if they only need one part + * - pause/resume is not supported + * - meta request will throw error if checksum header is provider (due to + * general limitation of checksum not being usable if meta request is + * getting split) */ AWS_S3_META_REQUEST_TYPE_PUT_OBJECT, @@ -421,9 +431,6 @@ struct aws_s3_meta_request_options { /** * Invoked to report progress of the meta request execution. - * Currently, the progress callback is invoked only for the CopyObject meta request type. - * TODO: support this callback for all the types of meta requests - * See `aws_s3_meta_request_progress_fn` */ aws_s3_meta_request_progress_fn *progress_callback; @@ -491,7 +498,7 @@ struct aws_s3_meta_request_result { * uploaded as a multipart object. * * If the object to get is multipart object, the part checksum MAY be validated if the part size to get matches the - * part size uploaded. In that case, if any part mismatch the checksum received, the meta request will failed with + * part size uploaded. In that case, if any part mismatch the checksum received, the meta request will fail with * checksum mismatch. However, even if the parts checksum were validated, this will NOT be set to true, as the * checksum for the whole meta request was NOT validated. **/ diff --git a/source/s3.c b/source/s3.c index e2d821b70..76737b852 100644 --- a/source/s3.c +++ b/source/s3.c @@ -36,7 +36,7 @@ static struct aws_error_info s_errors[] = { AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_RESUME_FAILED, "Resuming request failed"), AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_OBJECT_MODIFIED, "The object modifed during download."), AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_NON_RECOVERABLE_ASYNC_ERROR, "Async error received from S3 and not recoverable from retry."), - AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_METRIC_DATA_NOT_AVAILABLE, "The metric data is not available, the requests ends before the metric happens."), + AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_METRIC_DATA_NOT_AVAILABLE, "The metric data is not available, the requests ends before the metric happens.") }; /* clang-format on */ diff --git a/source/s3_auto_ranged_get.c b/source/s3_auto_ranged_get.c index 21429bbcc..3f90578f2 100644 --- a/source/s3_auto_ranged_get.c +++ b/source/s3_auto_ranged_get.c @@ -11,11 +11,6 @@ #include #include -#ifdef _MSC_VER -/* sscanf warning (not currently scanning for strings) */ -# pragma warning(disable : 4996) -#endif - const uint32_t s_conservative_max_requests_in_flight = 8; const struct aws_byte_cursor g_application_xml_value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("application/xml"); const struct aws_byte_cursor g_object_size_value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("ActualObjectSize"); @@ -142,7 +137,7 @@ static bool s_s3_auto_ranged_get_update( /* auto-ranged-gets make use of body streaming, which will hold onto response bodies if parts earlier in * the file haven't arrived yet. This can potentially create a lot of backed up requests, causing us to * hit our global request limit. To help mitigate this, when the "conservative" flag is passed in, we - * only allow the total amount of requests being sent/streamed to be inside of a set limit. */ + * only allow the total amount of requests being sent/streamed to be inside a set limit. */ if (num_requests_in_flight > s_conservative_max_requests_in_flight) { goto has_work_remaining; } @@ -315,7 +310,7 @@ static bool s_s3_auto_ranged_get_update( aws_s3_meta_request_set_success_synced(meta_request, s_s3_auto_ranged_get_success_status(meta_request)); if (auto_ranged_get->synced_data.num_parts_checksum_validated == auto_ranged_get->synced_data.num_parts_requested) { - /* If we have validated the checksum for every parts, we set the meta request level checksum validation + /* If we have validated the checksum for every part, we set the meta request level checksum validation * result.*/ meta_request->synced_data.finish_result.did_validate = true; meta_request->synced_data.finish_result.validation_algorithm = auto_ranged_get->validation_algorithm; @@ -476,7 +471,7 @@ static int s_discover_object_range_and_content_length( } /* if the inital message had a ranged header, there should also be a Content-Range header that specifies the - * object range and total object size. Otherwise the size and range should be equal to the + * object range and total object size. Otherwise, the size and range should be equal to the * total_content_length. */ if (!auto_ranged_get->initial_message_has_range_header) { object_range_end = total_content_length - 1; diff --git a/source/s3_auto_ranged_put.c b/source/s3_auto_ranged_put.c index 9bf407082..496c8fef2 100644 --- a/source/s3_auto_ranged_put.c +++ b/source/s3_auto_ranged_put.c @@ -15,6 +15,19 @@ static const struct aws_byte_cursor s_upload_id = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("UploadId"); static const size_t s_complete_multipart_upload_init_body_size_bytes = 512; static const size_t s_abort_multipart_upload_init_body_size_bytes = 512; +/* For unknown length body we no longer know the number of parts. to avoid + * resizing arrays for etags/checksums too much, those array start out with + * capacity specified by the constant below. Note: constant has been arbitrary + * picked to avoid using allocations and using too much memory. might change in future. + */ +static const uint32_t s_unknown_length_default_num_parts = 32; + +/* For unknown length body we dont know how many parts we'll end up with. + * We optimistically schedule upload requests, but cap the number of requests + * being prepared by bellow number to avoid unknown length request dominating the + * queue. + * TODO: this value needs further benchmarking.*/ +static const uint32_t s_unknown_length_max_optimistic_prepared_parts = 5; static const struct aws_byte_cursor s_create_multipart_upload_copy_headers[] = { AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("x-amz-server-side-encryption-customer-algorithm"), @@ -73,11 +86,28 @@ static bool s_process_part_info(const struct aws_s3_part_info *info, void *user_ break; } + /* Parts might be out of order or have gaps in them. array list allows + * setting elements outside of current length, but values between old length + * and new value will be uninitialized. as a workaround when processing + * element make sure to init any new elements to zeroed values. */ + size_t current_num_parts = aws_array_list_length(&auto_ranged_put->synced_data.etag_list); + if (info->part_number > current_num_parts) { + struct aws_byte_buf empty_buf = {0}; + struct aws_string *null_etag = NULL; + + /* Note: using 1 based part nums here to avoid dealing with underflow of + unsigned number in case part 1 was not completed, but other parts were. */ + for (size_t part_num = info->part_number; part_num > current_num_parts; --part_num) { + aws_array_list_set_at(&auto_ranged_put->synced_data.encoded_checksum_list, &empty_buf, part_num - 1); + aws_array_list_set_at(&auto_ranged_put->synced_data.etag_list, &null_etag, part_num - 1); + } + } + if (checksum_cur) { - aws_byte_buf_init_copy_from_cursor( - &auto_ranged_put->encoded_checksum_list[info->part_number - 1], - auto_ranged_put->base.allocator, - *checksum_cur); + struct aws_byte_buf checksum_buf; + aws_byte_buf_init_copy_from_cursor(&checksum_buf, auto_ranged_put->base.allocator, *checksum_cur); + aws_array_list_set_at( + &auto_ranged_put->synced_data.encoded_checksum_list, &checksum_buf, info->part_number - 1); } aws_array_list_set_at(&auto_ranged_put->synced_data.etag_list, &etag, info->part_number - 1); @@ -168,6 +198,8 @@ static int s_try_init_resume_state_from_persisted_data( return AWS_OP_SUCCESS; } + AWS_FATAL_ASSERT(auto_ranged_put->has_content_length); + struct aws_byte_cursor request_path; if (aws_http_message_get_request_path(auto_ranged_put->base.initial_request_message, &request_path)) { AWS_LOGF_ERROR(AWS_LS_S3_META_REQUEST, "Could not load persisted state. Request path could not be read."); @@ -176,6 +208,7 @@ static int s_try_init_resume_state_from_persisted_data( auto_ranged_put->synced_data.num_parts_sent = 0; auto_ranged_put->synced_data.num_parts_completed = 0; + auto_ranged_put->synced_data.num_parts_noop = 0; auto_ranged_put->synced_data.create_multipart_upload_sent = true; auto_ranged_put->synced_data.create_multipart_upload_completed = true; auto_ranged_put->upload_id = aws_string_clone_or_reuse(allocator, resume_token->multipart_upload_id); @@ -191,10 +224,10 @@ static int s_try_init_resume_state_from_persisted_data( struct aws_http_headers *needed_response_headers = aws_http_headers_new(allocator); const size_t copy_header_count = AWS_ARRAY_SIZE(s_create_multipart_upload_copy_headers); - struct aws_http_headers *initial_headers = + const struct aws_http_headers *initial_headers = aws_http_message_get_headers(auto_ranged_put->base.initial_request_message); - /* Copy headers that would have been used for create multi part from initial message, since create will never be + /* Copy headers that would have been used for create multipart from initial message, since create will never be * called in this flow */ for (size_t header_index = 0; header_index < copy_header_count; ++header_index) { const struct aws_byte_cursor *header_name = &s_create_multipart_upload_copy_headers[header_index]; @@ -228,6 +261,7 @@ struct aws_s3_meta_request *aws_s3_meta_request_auto_ranged_put_new( struct aws_allocator *allocator, struct aws_s3_client *client, size_t part_size, + bool has_content_length, uint64_t content_length, uint32_t num_parts, const struct aws_s3_meta_request_options *options) { @@ -260,8 +294,9 @@ struct aws_s3_meta_request *aws_s3_meta_request_auto_ranged_put_new( return NULL; } - auto_ranged_put->content_length = content_length; - auto_ranged_put->synced_data.total_num_parts = num_parts; + auto_ranged_put->has_content_length = has_content_length; + auto_ranged_put->content_length = has_content_length ? content_length : 0; + auto_ranged_put->synced_data.total_num_parts = has_content_length ? num_parts : 0; auto_ranged_put->upload_id = NULL; auto_ranged_put->resume_token = options->resume_token; @@ -269,11 +304,14 @@ struct aws_s3_meta_request *aws_s3_meta_request_auto_ranged_put_new( auto_ranged_put->threaded_update_data.next_part_number = 1; auto_ranged_put->prepare_data.num_parts_read_from_stream = 0; + auto_ranged_put->synced_data.is_body_stream_at_end = false; + + uint32_t initial_num_parts = auto_ranged_put->has_content_length ? num_parts : s_unknown_length_default_num_parts; - struct aws_string **etag_c_array = aws_mem_calloc(allocator, sizeof(struct aws_string *), num_parts); - aws_array_list_init_static( - &auto_ranged_put->synced_data.etag_list, etag_c_array, num_parts, sizeof(struct aws_string *)); - auto_ranged_put->encoded_checksum_list = aws_mem_calloc(allocator, sizeof(struct aws_byte_buf), num_parts); + aws_array_list_init_dynamic( + &auto_ranged_put->synced_data.etag_list, allocator, initial_num_parts, sizeof(struct aws_string *)); + aws_array_list_init_dynamic( + &auto_ranged_put->synced_data.encoded_checksum_list, allocator, initial_num_parts, sizeof(struct aws_byte_buf)); if (s_try_init_resume_state_from_persisted_data(allocator, auto_ranged_put, options->resume_token)) { goto error_clean_up; @@ -303,7 +341,8 @@ static void s_s3_meta_request_auto_ranged_put_destroy(struct aws_s3_meta_request aws_s3_paginated_operation_release(auto_ranged_put->synced_data.list_parts_operation); - for (size_t etag_index = 0; etag_index < auto_ranged_put->synced_data.total_num_parts; ++etag_index) { + for (size_t etag_index = 0; etag_index < aws_array_list_length(&auto_ranged_put->synced_data.etag_list); + ++etag_index) { struct aws_string *etag = NULL; aws_array_list_get_at(&auto_ranged_put->synced_data.etag_list, &etag, etag_index); @@ -312,16 +351,38 @@ static void s_s3_meta_request_auto_ranged_put_destroy(struct aws_s3_meta_request aws_string_destroy(auto_ranged_put->synced_data.list_parts_continuation_token); - for (size_t checksum_index = 0; checksum_index < auto_ranged_put->synced_data.total_num_parts; ++checksum_index) { - aws_byte_buf_clean_up(&auto_ranged_put->encoded_checksum_list[checksum_index]); + for (size_t checksum_index = 0; + checksum_index < aws_array_list_length(&auto_ranged_put->synced_data.encoded_checksum_list); + ++checksum_index) { + + struct aws_byte_buf checksum_buf; + aws_array_list_get_at(&auto_ranged_put->synced_data.encoded_checksum_list, &checksum_buf, checksum_index); + + aws_byte_buf_clean_up(&checksum_buf); } - aws_mem_release(meta_request->allocator, auto_ranged_put->synced_data.etag_list.data); - aws_mem_release(meta_request->allocator, auto_ranged_put->encoded_checksum_list); aws_array_list_clean_up(&auto_ranged_put->synced_data.etag_list); + aws_array_list_clean_up(&auto_ranged_put->synced_data.encoded_checksum_list); aws_http_headers_release(auto_ranged_put->synced_data.needed_response_headers); aws_mem_release(meta_request->allocator, auto_ranged_put); } +/* Check flags and corresponding conditions to see if any more parts can be + * scheduled during this pass. */ +static bool s_should_skip_scheduling_more_parts_based_on_flags( + const struct aws_s3_auto_ranged_put *auto_ranged_put, + uint32_t flags) { + if ((flags & AWS_S3_META_REQUEST_UPDATE_FLAG_CONSERVATIVE) != 0) { + uint32_t num_parts_in_flight = + (auto_ranged_put->synced_data.num_parts_sent - auto_ranged_put->synced_data.num_parts_completed); + + /* Because uploads must read from their streams serially, we try to limit the amount of in flight + * requests for a given multipart upload if we can. */ + return num_parts_in_flight > 0; + } + + return false; +} + static bool s_s3_auto_ranged_put_update( struct aws_s3_meta_request *meta_request, uint32_t flags, @@ -339,7 +400,7 @@ static bool s_s3_auto_ranged_put_update( aws_s3_meta_request_lock_synced_data(meta_request); if (!aws_s3_meta_request_has_finish_result_synced(meta_request)) { - /* If resuming and list part has not be sent, do it now. */ + /* If resuming and list part has not been sent, do it now. */ if (!auto_ranged_put->synced_data.list_parts_state.started) { request = aws_s3_request_new( meta_request, @@ -388,8 +449,9 @@ static bool s_s3_auto_ranged_put_update( goto has_work_remaining; } - /* If we haven't sent all of the parts yet, then set up to send a new part now. */ - if (auto_ranged_put->synced_data.num_parts_sent < auto_ranged_put->synced_data.total_num_parts) { + bool should_create_next_part_request = false; + if (auto_ranged_put->has_content_length && + (auto_ranged_put->synced_data.num_parts_sent < auto_ranged_put->synced_data.total_num_parts)) { /* Check if the etag/checksum list has the result already */ int part_index = auto_ranged_put->threaded_update_data.next_part_number - 1; @@ -399,7 +461,8 @@ static bool s_s3_auto_ranged_put_update( struct aws_string *etag = NULL; if (!aws_array_list_get_at(&auto_ranged_put->synced_data.etag_list, &etag, etag_index) && etag) { - /* part already downloaded, skip it here and prepare will take care of adjusting the buffer */ + /* part already downloaded, skip it here and prepare will take care of adjusting the buffer + */ ++auto_ranged_put->threaded_update_data.next_part_number; } else { @@ -413,18 +476,32 @@ static bool s_s3_auto_ranged_put_update( auto_ranged_put->threaded_update_data.next_part_number <= auto_ranged_put->synced_data.total_num_parts); - if ((flags & AWS_S3_META_REQUEST_UPDATE_FLAG_CONSERVATIVE) != 0) { - uint32_t num_parts_in_flight = - (auto_ranged_put->synced_data.num_parts_sent - - auto_ranged_put->synced_data.num_parts_completed); + if (s_should_skip_scheduling_more_parts_based_on_flags(auto_ranged_put, flags)) { + goto has_work_remaining; + } - /* Because uploads must read from their streams serially, we try to limit the amount of in flight - * requests for a given multipart upload if we can. */ - if (num_parts_in_flight > 0) { - goto has_work_remaining; - } + should_create_next_part_request = true; + + } else if (!auto_ranged_put->has_content_length && !auto_ranged_put->synced_data.is_body_stream_at_end) { + + if (s_should_skip_scheduling_more_parts_based_on_flags(auto_ranged_put, flags)) { + goto has_work_remaining; + } + + uint32_t num_parts_not_read = + (auto_ranged_put->synced_data.num_parts_sent - auto_ranged_put->synced_data.num_parts_read); + + /* Because uploads must read from their streams serially, we try to limit the amount of in flight + * requests for a given multipart upload if we can. */ + if (num_parts_not_read >= s_unknown_length_max_optimistic_prepared_parts) { + goto has_work_remaining; } + should_create_next_part_request = true; + } + + if (should_create_next_part_request) { + /* Allocate a request for another part. */ request = aws_s3_request_new( meta_request, @@ -447,10 +524,17 @@ static bool s_s3_auto_ranged_put_update( goto has_work_remaining; } - /* There is one more request to send after all of the parts (the complete-multipart-upload) but it can't be - * done until all of the parts have been completed.*/ - if (auto_ranged_put->synced_data.num_parts_completed != auto_ranged_put->synced_data.total_num_parts) { - goto has_work_remaining; + /* There is one more request to send after all the parts (the complete-multipart-upload) but it can't be + * done until all the parts have been completed.*/ + if (auto_ranged_put->has_content_length) { + if (auto_ranged_put->synced_data.num_parts_completed != auto_ranged_put->synced_data.total_num_parts) { + goto has_work_remaining; + } + } else { + if ((!auto_ranged_put->synced_data.is_body_stream_at_end) || + auto_ranged_put->synced_data.num_parts_completed != auto_ranged_put->synced_data.num_parts_sent) { + goto has_work_remaining; + } } /* If the complete-multipart-upload request hasn't been set yet, then send it now. */ @@ -568,14 +652,14 @@ static bool s_s3_auto_ranged_put_update( * Basically returns either part size or if content is not equally divisible into parts, the size of the remaining last * part. */ -static size_t s_compute_request_body_size(struct aws_s3_meta_request *meta_request, uint32_t part_number) { +static size_t s_compute_request_body_size(const struct aws_s3_meta_request *meta_request, uint32_t part_number) { AWS_PRECONDITION(meta_request); - struct aws_s3_auto_ranged_put *auto_ranged_put = meta_request->impl; + const struct aws_s3_auto_ranged_put *auto_ranged_put = meta_request->impl; size_t request_body_size = meta_request->part_size; /* Last part--adjust size to match remaining content length. */ - if (part_number == auto_ranged_put->synced_data.total_num_parts) { + if (auto_ranged_put->has_content_length && part_number == auto_ranged_put->synced_data.total_num_parts) { size_t content_remainder = (size_t)(auto_ranged_put->content_length % (uint64_t)meta_request->part_size); if (content_remainder > 0) { @@ -664,7 +748,7 @@ static int s_skip_parts_from_stream( AWS_PRECONDITION(meta_request); AWS_PRECONDITION(num_parts_read_from_stream <= skip_until_part_number); - struct aws_s3_auto_ranged_put *auto_ranged_put = meta_request->impl; + const struct aws_s3_auto_ranged_put *auto_ranged_put = meta_request->impl; AWS_PRECONDITION(skip_until_part_number <= auto_ranged_put->synced_data.total_num_parts); @@ -706,13 +790,15 @@ static int s_skip_parts_from_stream( goto on_done; } + struct aws_byte_buf checksum_buf; + aws_array_list_get_at(&auto_ranged_put->synced_data.encoded_checksum_list, &checksum_buf, part_index); + // compare skipped checksum to previously uploaded checksum - if (auto_ranged_put->encoded_checksum_list[part_index].len > 0 && - s_verify_part_matches_checksum( - meta_request->allocator, - temp_body_buf, - meta_request->checksum_config.checksum_algorithm, - auto_ranged_put->encoded_checksum_list[part_index])) { + if (checksum_buf.len > 0 && s_verify_part_matches_checksum( + meta_request->allocator, + temp_body_buf, + meta_request->checksum_config.checksum_algorithm, + checksum_buf)) { return_status = AWS_OP_ERR; goto on_done; } @@ -800,23 +886,102 @@ static int s_s3_auto_ranged_put_prepare_request( size_t request_body_size = s_compute_request_body_size(meta_request, request->part_number); if (request->num_times_prepared == 0) { - if (s_skip_parts_from_stream( - meta_request, - auto_ranged_put->prepare_data.num_parts_read_from_stream, - request->part_number - 1)) { - goto message_create_failed; + if (auto_ranged_put->has_content_length) { + if (s_skip_parts_from_stream( + meta_request, + auto_ranged_put->prepare_data.num_parts_read_from_stream, + request->part_number - 1)) { + goto message_create_failed; + } + } + + if (!auto_ranged_put->has_content_length) { + ++auto_ranged_put->synced_data.total_num_parts; } - auto_ranged_put->prepare_data.num_parts_read_from_stream = request->part_number - 1; - aws_byte_buf_init(&request->request_body, meta_request->allocator, request_body_size); + /* Some streams might require additional read to realize that + * there is no more data to read. So lets try to read and mark + * part as no_op is we cannot read anymore. */ + bool successfully_read_data_from_stream = false; + if (!aws_s3_meta_request_body_has_no_more_data(meta_request)) { + aws_byte_buf_init(&request->request_body, meta_request->allocator, request_body_size); + if (aws_s3_meta_request_read_body(meta_request, &request->request_body)) { + goto message_create_failed; + } - if (aws_s3_meta_request_read_body(meta_request, &request->request_body)) { - goto message_create_failed; + successfully_read_data_from_stream = request->request_body.len != 0; + } + + if (successfully_read_data_from_stream) { + + auto_ranged_put->prepare_data.num_parts_read_from_stream = request->part_number; + + /* BEGIN CRITICAL SECTION */ + { + aws_s3_meta_request_lock_synced_data(meta_request); + + ++auto_ranged_put->synced_data.num_parts_read; + + auto_ranged_put->synced_data.is_body_stream_at_end = + aws_s3_meta_request_body_has_no_more_data(meta_request); + + /* Reset values for corresponding checksum and etag. + * Note: During resume flow this might cause the values to be + * reset twice (if we are preparing part in between + * previously completed parts). */ + struct aws_byte_buf checksum_buf = {0}; + aws_array_list_set_at( + &auto_ranged_put->synced_data.encoded_checksum_list, + &checksum_buf, + request->part_number - 1); + + struct aws_string *null_etag = NULL; + aws_array_list_set_at( + &auto_ranged_put->synced_data.etag_list, &null_etag, request->part_number - 1); + + aws_s3_meta_request_unlock_synced_data(meta_request); + } + /* END CRITICAL SECTION */ + } else { + request->is_noop = true; + + /* BEGIN CRITICAL SECTION */ + { + aws_s3_meta_request_lock_synced_data(meta_request); + + ++auto_ranged_put->synced_data.num_parts_read; + auto_ranged_put->synced_data.is_body_stream_at_end = true; + + aws_s3_meta_request_unlock_synced_data(meta_request); + } + /* END CRITICAL SECTION */ + + AWS_LOGF_DEBUG( + AWS_LS_S3_META_REQUEST, + "id=%p UploadPart with part num %u for Multi-part Upload, with ID:%s" + "is noop due to encountering end of stream", + (void *)meta_request, + request->part_number, + aws_string_c_str(auto_ranged_put->upload_id)); } - ++auto_ranged_put->prepare_data.num_parts_read_from_stream; } - /* Clean up the buffer in case of it's initialized before and retry happens. */ - aws_byte_buf_clean_up(&auto_ranged_put->encoded_checksum_list[request->part_number - 1]); + + AWS_LOGF_DEBUG( + AWS_LS_S3_META_REQUEST, + "id=%p UploadPart for Multi-part Upload, with ID:%s", + (void *)meta_request, + aws_string_c_str(auto_ranged_put->upload_id)); + + struct aws_byte_buf *checksum_buf = NULL; + if (!request->is_noop) { + aws_array_list_get_at_ptr( + &auto_ranged_put->synced_data.encoded_checksum_list, + (void **)&checksum_buf, + request->part_number - 1); + /* Clean up the buffer in case of it's initialized before and retry happens. */ + aws_byte_buf_clean_up(checksum_buf); + } + /* Create a new put-object message to upload a part. */ message = aws_s3_upload_part_message_new( meta_request->allocator, @@ -826,20 +991,41 @@ static int s_s3_auto_ranged_put_prepare_request( auto_ranged_put->upload_id, meta_request->should_compute_content_md5, &meta_request->checksum_config, - &auto_ranged_put->encoded_checksum_list[request->part_number - 1]); + checksum_buf); + break; } case AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_COMPLETE_MULTIPART_UPLOAD: { + /* Note: completeMPU fails if no parts are provided. We could + * workaround it by uploading an empty part at the cost of + * complicating flow logic for dealing with noop parts, but that + * arguably adds a lot of complexity for little benefit. + * Pre-buffering parts to determine whether mpu is needed will + * resolve this issue. + */ + if (!auto_ranged_put->has_content_length && auto_ranged_put->prepare_data.num_parts_read_from_stream == 0) { + AWS_LOGF_ERROR( + AWS_LS_S3_META_REQUEST, + "id=%p 0 byte meta requests without Content-Length header are currently not supported. Set " + "Content-Length header to 0 to upload empty object", + (void *)meta_request); + + aws_raise_error(AWS_ERROR_UNSUPPORTED_OPERATION); + goto message_create_failed; + } + if (request->num_times_prepared == 0) { /* Corner case of last part being previously uploaded during resume. * Read it from input stream and potentially verify checksum */ - if (s_skip_parts_from_stream( - meta_request, - auto_ranged_put->prepare_data.num_parts_read_from_stream, - auto_ranged_put->synced_data.total_num_parts)) { - goto message_create_failed; + if (auto_ranged_put->has_content_length) { + if (s_skip_parts_from_stream( + meta_request, + auto_ranged_put->prepare_data.num_parts_read_from_stream, + auto_ranged_put->synced_data.total_num_parts)) { + goto message_create_failed; + } } auto_ranged_put->prepare_data.num_parts_read_from_stream = auto_ranged_put->synced_data.total_num_parts; @@ -865,7 +1051,7 @@ static int s_s3_auto_ranged_put_prepare_request( &request->request_body, auto_ranged_put->upload_id, &auto_ranged_put->synced_data.etag_list, - auto_ranged_put->encoded_checksum_list, + &auto_ranged_put->synced_data.encoded_checksum_list, meta_request->checksum_config.checksum_algorithm); aws_s3_meta_request_unlock_synced_data(meta_request); @@ -930,7 +1116,7 @@ static void s_s3_auto_ranged_put_send_request_finish( struct aws_http_stream *stream, int error_code) { - struct aws_s3_request *request = connection->request; + const struct aws_s3_request *request = connection->request; AWS_PRECONDITION(request); /* Request tag is different from different type of meta requests */ @@ -1102,42 +1288,50 @@ static void s_s3_auto_ranged_put_request_finished( AWS_FATAL_ASSERT(part_number > 0); size_t part_index = part_number - 1; struct aws_string *etag = NULL; + bool request_is_noop = request->is_noop != 0; - if (error_code == AWS_ERROR_SUCCESS) { - /* Find the ETag header if it exists and cache it. */ - struct aws_byte_cursor etag_within_quotes; + if (!request_is_noop) { + if (error_code == AWS_ERROR_SUCCESS) { + /* Find the ETag header if it exists and cache it. */ + struct aws_byte_cursor etag_within_quotes; - AWS_ASSERT(request->send_data.response_headers); + AWS_ASSERT(request->send_data.response_headers); - if (aws_http_headers_get( - request->send_data.response_headers, g_etag_header_name, &etag_within_quotes) != - AWS_OP_SUCCESS) { - AWS_LOGF_ERROR( - AWS_LS_S3_META_REQUEST, - "id=%p Could not find ETag header for request %p", - (void *)meta_request, - (void *)request); + if (aws_http_headers_get( + request->send_data.response_headers, g_etag_header_name, &etag_within_quotes) != + AWS_OP_SUCCESS) { + AWS_LOGF_ERROR( + AWS_LS_S3_META_REQUEST, + "id=%p Could not find ETag header for request %p", + (void *)meta_request, + (void *)request); - error_code = AWS_ERROR_S3_MISSING_ETAG; - } else { - /* The ETag value arrives in quotes, but we don't want it in quotes when we send it back up - * later, so just get rid of the quotes now. */ - etag = aws_strip_quotes(meta_request->allocator, etag_within_quotes); + error_code = AWS_ERROR_S3_MISSING_ETAG; + } else { + /* The ETag value arrives in quotes, but we don't want it in quotes when we send it back up + * later, so just get rid of the quotes now. */ + etag = aws_strip_quotes(meta_request->allocator, etag_within_quotes); + } + } + if (error_code == AWS_ERROR_SUCCESS && meta_request->progress_callback != NULL) { + struct aws_s3_meta_request_progress progress = { + .bytes_transferred = meta_request->part_size, + .content_length = auto_ranged_put->content_length, + }; + meta_request->progress_callback(meta_request, &progress, meta_request->user_data); } } - if (error_code == AWS_ERROR_SUCCESS && meta_request->progress_callback != NULL) { - struct aws_s3_meta_request_progress progress = { - .bytes_transferred = meta_request->part_size, - .content_length = auto_ranged_put->content_length, - }; - meta_request->progress_callback(meta_request, &progress, meta_request->user_data); - } + /* BEGIN CRITICAL SECTION */ { aws_s3_meta_request_lock_synced_data(meta_request); ++auto_ranged_put->synced_data.num_parts_completed; + if (request_is_noop) { + ++auto_ranged_put->synced_data.num_parts_noop; + } + AWS_LOGF_DEBUG( AWS_LS_S3_META_REQUEST, "id=%p: %d out of %d parts have completed.", @@ -1145,18 +1339,20 @@ static void s_s3_auto_ranged_put_request_finished( auto_ranged_put->synced_data.num_parts_completed, auto_ranged_put->synced_data.total_num_parts); - if (error_code == AWS_ERROR_SUCCESS) { - AWS_ASSERT(etag != NULL); + if (!request_is_noop) { + if (error_code == AWS_ERROR_SUCCESS) { + AWS_ASSERT(etag != NULL); - ++auto_ranged_put->synced_data.num_parts_successful; + ++auto_ranged_put->synced_data.num_parts_successful; - /* ETags need to be associated with their part number, so we keep the etag indices consistent with - * part numbers. This means we may have to add padding to the list in the case that parts finish out - * of order. */ - aws_array_list_set_at(&auto_ranged_put->synced_data.etag_list, &etag, part_index); - } else { - ++auto_ranged_put->synced_data.num_parts_failed; - aws_s3_meta_request_set_fail_synced(meta_request, request, error_code); + /* ETags need to be associated with their part number, so we keep the etag indices consistent + * with part numbers. This means we may have to add padding to the list in the case that parts + * finish out of order. */ + aws_array_list_set_at(&auto_ranged_put->synced_data.etag_list, &etag, part_index); + } else { + ++auto_ranged_put->synced_data.num_parts_failed; + aws_s3_meta_request_set_fail_synced(meta_request, request, error_code); + } } aws_s3_meta_request_unlock_synced_data(meta_request); @@ -1263,9 +1459,15 @@ static int s_s3_auto_ranged_put_pause( *out_resume_token = NULL; + struct aws_s3_auto_ranged_put *auto_ranged_put = meta_request->impl; + if (!auto_ranged_put->has_content_length) { + AWS_LOGF_ERROR( + AWS_LS_S3_META_REQUEST, "id=%p: Failed to pause request with unknown content length", (void *)meta_request); + return aws_raise_error(AWS_ERROR_UNSUPPORTED_OPERATION); + } + /* lock */ aws_s3_meta_request_lock_synced_data(meta_request); - struct aws_s3_auto_ranged_put *auto_ranged_put = meta_request->impl; AWS_LOGF_DEBUG( AWS_LS_S3_META_REQUEST, diff --git a/source/s3_checksums.c b/source/s3_checksums.c index c16288535..193886831 100644 --- a/source/s3_checksums.c +++ b/source/s3_checksums.c @@ -2,7 +2,6 @@ #include "aws/s3/private/s3_util.h" #include #include -#include #define AWS_CRC32_LEN 4 #define AWS_CRC32C_LEN 4 diff --git a/source/s3_client.c b/source/s3_client.c index 268073b89..1a89a795f 100644 --- a/source/s3_client.c +++ b/source/s3_client.c @@ -16,7 +16,6 @@ #include #include #include -#include #include #include #include @@ -33,7 +32,6 @@ #include #include -#include #include #include @@ -937,12 +935,12 @@ static struct aws_s3_meta_request *s_s3_client_meta_request_factory_default( AWS_PRECONDITION(client); AWS_PRECONDITION(options); - struct aws_http_headers *initial_message_headers = aws_http_message_get_headers(options->message); + const struct aws_http_headers *initial_message_headers = aws_http_message_get_headers(options->message); AWS_ASSERT(initial_message_headers); uint64_t content_length = 0; struct aws_byte_cursor content_length_cursor; - bool content_length_header_found = false; + bool content_length_found = false; if (!aws_http_headers_get(initial_message_headers, g_content_length_header_name, &content_length_cursor)) { if (aws_byte_cursor_utf8_parse_u64(content_length_cursor, &content_length)) { @@ -953,7 +951,7 @@ static struct aws_s3_meta_request *s_s3_client_meta_request_factory_default( aws_raise_error(AWS_ERROR_S3_INVALID_CONTENT_LENGTH_HEADER); return NULL; } - content_length_header_found = true; + content_length_found = true; } /* Call the appropriate meta-request new function. */ @@ -971,15 +969,7 @@ static struct aws_s3_meta_request *s_s3_client_meta_request_factory_default( } case AWS_S3_META_REQUEST_TYPE_PUT_OBJECT: { - if (!content_length_header_found) { - AWS_LOGF_ERROR( - AWS_LS_S3_META_REQUEST, - "Could not create auto-ranged-put meta request; there is no Content-Length header present."); - aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); - return NULL; - } - - struct aws_input_stream *input_stream = aws_http_message_get_body_stream(options->message); + const struct aws_input_stream *input_stream = aws_http_message_get_body_stream(options->message); if ((input_stream == NULL) && (options->send_filepath.len == 0)) { AWS_LOGF_ERROR( @@ -1018,7 +1008,8 @@ static struct aws_s3_meta_request *s_s3_client_meta_request_factory_default( } uint64_t multipart_upload_threshold = client->multipart_upload_threshold == 0 ? client_part_size : client->multipart_upload_threshold; - if (content_length <= multipart_upload_threshold) { + + if (content_length_found && content_length <= multipart_upload_threshold) { return aws_s3_meta_request_default_new( client->allocator, client, @@ -1028,31 +1019,40 @@ static struct aws_s3_meta_request *s_s3_client_meta_request_factory_default( options); } else { if (aws_s3_message_util_check_checksum_header(options->message)) { - /* The checksum header has been set and the request will be splitted. We fail the request */ + /* The checksum header has been set and the request will be split. We fail the request */ AWS_LOGF_ERROR( AWS_LS_S3_META_REQUEST, "Could not create auto-ranged-put meta request; checksum headers has been set for " "auto-ranged-put that will be split. Pre-calculated checksums are only supported for " - "single " - "part upload."); + "single part upload."); aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); return NULL; } } - size_t part_size; - uint32_t num_parts; - if (aws_s3_calculate_optimal_mpu_part_size_and_num_parts( - content_length, client_part_size, client_max_part_size, &part_size, &num_parts)) { - return NULL; + size_t part_size = client_part_size; + uint32_t num_parts = 0; + if (content_length_found) { + if (aws_s3_calculate_optimal_mpu_part_size_and_num_parts( + content_length, client_part_size, client_max_part_size, &part_size, &num_parts)) { + return NULL; + } } return aws_s3_meta_request_auto_ranged_put_new( - client->allocator, client, part_size, content_length, num_parts, options); - } else { - /* dont pass part size and total num parts. constructor will pick it up from token */ + client->allocator, client, part_size, content_length_found, content_length, num_parts, options); + } else { /* else using resume token */ + if (!content_length_found) { + AWS_LOGF_ERROR( + AWS_LS_S3_META_REQUEST, + "Could not create auto-ranged-put resume meta request; content_length must be specified."); + aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + return NULL; + } + + /* don't pass part size and total num parts. constructor will pick it up from token */ return aws_s3_meta_request_auto_ranged_put_new( - client->allocator, client, 0, content_length, 0, options); + client->allocator, client, 0, true, content_length, 0, options); } } case AWS_S3_META_REQUEST_TYPE_COPY_OBJECT: { @@ -1460,7 +1460,10 @@ static void s_s3_client_prepare_callback_queue_request( struct aws_s3_client *client = user_data; AWS_PRECONDITION(client); - if (error_code != AWS_ERROR_SUCCESS) { + bool request_is_noop = false; + + if (error_code != AWS_ERROR_SUCCESS || request->is_noop) { + request_is_noop = request->is_noop != 0; s_s3_client_meta_request_finished_request(client, meta_request, request, error_code); aws_s3_request_release(request); @@ -1472,7 +1475,9 @@ static void s_s3_client_prepare_callback_queue_request( aws_s3_client_lock_synced_data(client); if (error_code == AWS_ERROR_SUCCESS) { - aws_linked_list_push_back(&client->synced_data.prepared_requests, &request->node); + if (!request_is_noop) { + aws_linked_list_push_back(&client->synced_data.prepared_requests, &request->node); + } } else { ++client->synced_data.num_failed_prepare_requests; } @@ -1720,7 +1725,7 @@ void aws_s3_client_notify_connection_finished( request->send_data.metrics->crt_info_metrics.error_code = error_code; } - /* If we're trying to setup a retry... */ + /* If we're trying to set up a retry... */ if (finish_code == AWS_S3_CONNECTION_FINISH_CODE_RETRY) { if (connection->retry_token == NULL) { diff --git a/source/s3_default_meta_request.c b/source/s3_default_meta_request.c index 5bc39a731..c2de89be8 100644 --- a/source/s3_default_meta_request.c +++ b/source/s3_default_meta_request.c @@ -6,11 +6,6 @@ #include #include -#ifdef _MSC_VER -/* sscanf warning (not currently scanning for strings) */ -# pragma warning(disable : 4996) -#endif - static void s_s3_meta_request_default_destroy(struct aws_s3_meta_request *meta_request); static bool s_s3_meta_request_default_update( diff --git a/source/s3_endpoint.c b/source/s3_endpoint.c index 74075ccb8..f2023523d 100644 --- a/source/s3_endpoint.c +++ b/source/s3_endpoint.c @@ -3,35 +3,24 @@ * SPDX-License-Identifier: Apache-2.0. */ -#include "aws/s3/private/s3_auto_ranged_get.h" -#include "aws/s3/private/s3_auto_ranged_put.h" #include "aws/s3/private/s3_client_impl.h" -#include "aws/s3/private/s3_default_meta_request.h" #include "aws/s3/private/s3_meta_request_impl.h" #include "aws/s3/private/s3_util.h" #include #include -#include -#include #include -#include #include -#include #include #include -#include #include #include #include -#include #include -#include #include #include #include -#include static const uint32_t s_connection_timeout_ms = 3000; static const uint16_t s_http_port = 80; diff --git a/source/s3_meta_request.c b/source/s3_meta_request.c index ba3b9ed5a..5ff206f6b 100644 --- a/source/s3_meta_request.c +++ b/source/s3_meta_request.c @@ -111,7 +111,7 @@ static int s_meta_request_get_response_headers_checksum_callback( } } -/* warning this might get screwed up with retrys/restarts */ +/* warning this might get screwed up with retries/restarts */ static int s_meta_request_get_response_body_checksum_callback( struct aws_s3_meta_request *meta_request, const struct aws_byte_cursor *body, @@ -459,11 +459,11 @@ static void s_s3_meta_request_destroy(void *user_data) { } static int s_s3_request_priority_queue_pred(const void *a, const void *b) { - const struct aws_s3_request **request_a = (const struct aws_s3_request **)a; + const struct aws_s3_request *const *request_a = a; AWS_PRECONDITION(request_a); AWS_PRECONDITION(*request_a); - const struct aws_s3_request **request_b = (const struct aws_s3_request **)b; + const struct aws_s3_request *const *request_b = b; AWS_PRECONDITION(request_b); AWS_PRECONDITION(*request_b); @@ -590,7 +590,7 @@ static void s_s3_meta_request_prepare_request_task(struct aws_task *task, void * struct aws_s3_meta_request *meta_request = request->meta_request; AWS_PRECONDITION(meta_request); - struct aws_s3_meta_request_vtable *vtable = meta_request->vtable; + const struct aws_s3_meta_request_vtable *vtable = meta_request->vtable; AWS_PRECONDITION(vtable); /* Client owns this event loop group. A cancel should not be possible. */ @@ -917,7 +917,7 @@ static void s_get_part_response_headers_checksum_helper( } } -/* warning this might get screwed up with retrys/restarts */ +/* warning this might get screwed up with retries/restarts */ static void s_get_part_response_body_checksum_helper( struct aws_s3_checksum *running_response_sum, const struct aws_byte_cursor *body) { @@ -1607,6 +1607,21 @@ int aws_s3_meta_request_read_body(struct aws_s3_meta_request *meta_request, stru return AWS_OP_SUCCESS; } +bool aws_s3_meta_request_body_has_no_more_data(const struct aws_s3_meta_request *meta_request) { + AWS_PRECONDITION(meta_request); + + struct aws_input_stream *initial_body_stream = + aws_http_message_get_body_stream(meta_request->initial_request_message); + AWS_FATAL_ASSERT(initial_body_stream); + + struct aws_stream_status status; + if (aws_input_stream_get_status(initial_body_stream, &status)) { + return true; + } + + return status.is_end_of_stream; +} + void aws_s3_meta_request_result_setup( struct aws_s3_meta_request *meta_request, struct aws_s3_meta_request_result *result, diff --git a/source/s3_request_messages.c b/source/s3_request_messages.c index e6fec0291..248a6794d 100644 --- a/source/s3_request_messages.c +++ b/source/s3_request_messages.c @@ -5,8 +5,6 @@ #include "aws/s3/private/s3_request_messages.h" #include "aws/s3/private/s3_checksums.h" -#include "aws/s3/private/s3_client_impl.h" -#include "aws/s3/private/s3_meta_request_impl.h" #include "aws/s3/private/s3_util.h" #include #include @@ -14,8 +12,6 @@ #include #include #include -#include -#include #include const struct aws_byte_cursor g_s3_create_multipart_upload_excluded_headers[] = { @@ -340,7 +336,7 @@ struct aws_http_message *aws_s3_upload_part_message_new( if (should_compute_content_md5) { if (!checksum_config || checksum_config->location == AWS_SCL_NONE) { - /* MD5 will be skiped if flexible checksum used */ + /* MD5 will be skipped if flexible checksum used */ if (aws_s3_message_util_add_content_md5_header(allocator, buffer, message)) { goto error_clean_up; } @@ -554,7 +550,7 @@ struct aws_http_message *aws_s3_complete_multipart_message_new( struct aws_byte_buf *body_buffer, const struct aws_string *upload_id, const struct aws_array_list *etags, - struct aws_byte_buf *checksums, + const struct aws_array_list *checksums, enum aws_s3_checksum_algorithm algorithm) { AWS_PRECONDITION(allocator); AWS_PRECONDITION(base_message); @@ -601,7 +597,7 @@ struct aws_http_message *aws_s3_complete_multipart_message_new( goto error_clean_up; } - /* Create XML payload with all of the etags of finished parts */ + /* Create XML payload with all the etags of finished parts */ { aws_byte_buf_reset(body_buffer, false); @@ -643,8 +639,13 @@ struct aws_http_message *aws_s3_complete_multipart_message_new( if (aws_byte_buf_append_dynamic(body_buffer, &s_close_part_number_tag)) { goto error_clean_up; } + if (mpu_algorithm_checksum_name) { - struct aws_byte_cursor checksum = aws_byte_cursor_from_buf(&checksums[etag_index]); + struct aws_byte_buf checksum_buf; + + aws_array_list_get_at(checksums, &checksum_buf, etag_index); + + struct aws_byte_cursor checksum = aws_byte_cursor_from_buf(&checksum_buf); if (aws_byte_buf_append_dynamic(body_buffer, &s_open_start_bracket)) { goto error_clean_up; diff --git a/source/s3_util.c b/source/s3_util.c index 47a990c37..f2b9bf0de 100644 --- a/source/s3_util.c +++ b/source/s3_util.c @@ -9,7 +9,6 @@ #include #include #include -#include #include #include @@ -340,7 +339,7 @@ void aws_s3_add_user_agent_header(struct aws_allocator *allocator, struct aws_ht AWS_PRECONDITION(allocator); AWS_PRECONDITION(message); - const struct aws_byte_cursor space_delimeter = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL(" "); + const struct aws_byte_cursor space_delimiter = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL(" "); const struct aws_byte_cursor forward_slash = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("/"); const size_t user_agent_product_version_length = @@ -356,16 +355,16 @@ void aws_s3_add_user_agent_header(struct aws_allocator *allocator, struct aws_ht AWS_ZERO_STRUCT(user_agent_buffer); if (aws_http_headers_get(headers, g_user_agent_header_name, ¤t_user_agent_header) == AWS_OP_SUCCESS) { - /* If the header was found, then create a buffer with the total size we'll need, and append the curent user + /* If the header was found, then create a buffer with the total size we'll need, and append the current user * agent header with a trailing space. */ aws_byte_buf_init( &user_agent_buffer, allocator, - current_user_agent_header.len + space_delimeter.len + user_agent_product_version_length); + current_user_agent_header.len + space_delimiter.len + user_agent_product_version_length); aws_byte_buf_append_dynamic(&user_agent_buffer, ¤t_user_agent_header); - aws_byte_buf_append_dynamic(&user_agent_buffer, &space_delimeter); + aws_byte_buf_append_dynamic(&user_agent_buffer, &space_delimiter); } else { AWS_ASSERT(aws_last_error() == AWS_ERROR_HTTP_HEADER_NOT_FOUND); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 8c3cd4edc..ea99e7643 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -106,6 +106,9 @@ add_net_test_case(test_s3_upload_part_message_without_content_md5) add_net_test_case(test_s3_create_multipart_upload_message_with_content_md5) add_net_test_case(test_s3_complete_multipart_message_with_content_md5) add_net_test_case(test_s3_put_object_double_slashes) +add_net_test_case(test_s3_put_object_no_content_length) +add_net_test_case(test_s3_put_object_single_part_no_content_length) +add_net_test_case(test_s3_put_object_zero_size_no_content_length) if(ENABLE_MRAP_TESTS) add_net_test_case(test_s3_get_object_less_than_part_size_mrap) diff --git a/tests/s3_checksums_test_case_helper.h b/tests/s3_checksums_test_case_helper.h index d6d36df36..bf2432785 100644 --- a/tests/s3_checksums_test_case_helper.h +++ b/tests/s3_checksums_test_case_helper.h @@ -3,9 +3,11 @@ * SPDX-License-Identifier: Apache-2.0. */ #include "aws/s3/private/s3_checksums.h" +#include -typedef struct aws_s3_checksum *( - aws_checksum_new_fn)(struct aws_allocator *allocator, enum aws_s3_checksum_algorithm algorithm); +typedef struct aws_s3_checksum *aws_checksum_new_fn( + struct aws_allocator *allocator, + enum aws_s3_checksum_algorithm algorithm); static inline int s_verify_checksum_test_case( struct aws_allocator *allocator, diff --git a/tests/s3_data_plane_tests.c b/tests/s3_data_plane_tests.c index 1dc90cc41..15e7ba102 100644 --- a/tests/s3_data_plane_tests.c +++ b/tests/s3_data_plane_tests.c @@ -2117,6 +2117,133 @@ static int s_test_s3_put_object_empty_object(struct aws_allocator *allocator, vo return 0; } +AWS_TEST_CASE(test_s3_put_object_no_content_length, s_test_s3_put_object_no_content_length) +static int s_test_s3_put_object_no_content_length(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct aws_s3_tester tester; + ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester)); + + struct aws_s3_client_config client_config = { + .part_size = MB_TO_BYTES(8), + }; + + ASSERT_SUCCESS(aws_s3_tester_bind_client( + &tester, &client_config, AWS_S3_TESTER_BIND_CLIENT_REGION | AWS_S3_TESTER_BIND_CLIENT_SIGNING)); + + struct aws_s3_client *client = aws_s3_client_new(allocator, &client_config); + + ASSERT_TRUE(client != NULL); + + struct aws_s3_tester_meta_request_options put_options = { + .allocator = allocator, + .meta_request_type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT, + .client = client, + .put_options = + { + .object_size_mb = 19, + .skip_content_length = true, + }, + }; + struct aws_s3_meta_request_test_results meta_request_test_results; + aws_s3_meta_request_test_results_init(&meta_request_test_results, allocator); + + ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &put_options, &meta_request_test_results)); + aws_s3_meta_request_test_results_clean_up(&meta_request_test_results); + + aws_s3_client_release(client); + + aws_s3_tester_clean_up(&tester); + + return 0; +} + +AWS_TEST_CASE(test_s3_put_object_single_part_no_content_length, s_test_s3_put_object_single_part_no_content_length) +static int s_test_s3_put_object_single_part_no_content_length(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct aws_s3_tester tester; + ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester)); + + struct aws_s3_client_config client_config = { + .part_size = MB_TO_BYTES(8), + }; + + ASSERT_SUCCESS(aws_s3_tester_bind_client( + &tester, &client_config, AWS_S3_TESTER_BIND_CLIENT_REGION | AWS_S3_TESTER_BIND_CLIENT_SIGNING)); + + struct aws_s3_client *client = aws_s3_client_new(allocator, &client_config); + + ASSERT_TRUE(client != NULL); + + struct aws_s3_tester_meta_request_options put_options = { + .allocator = allocator, + .meta_request_type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT, + .client = client, + .put_options = + { + .object_size_mb = 5, + .skip_content_length = true, + }, + }; + struct aws_s3_meta_request_test_results meta_request_test_results; + aws_s3_meta_request_test_results_init(&meta_request_test_results, allocator); + + ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &put_options, &meta_request_test_results)); + aws_s3_meta_request_test_results_clean_up(&meta_request_test_results); + + aws_s3_client_release(client); + + aws_s3_tester_clean_up(&tester); + + return 0; +} + +AWS_TEST_CASE(test_s3_put_object_zero_size_no_content_length, s_test_s3_put_object_zero_size_no_content_length) +static int s_test_s3_put_object_zero_size_no_content_length(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct aws_s3_tester tester; + ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester)); + + struct aws_s3_client_config client_config = { + .part_size = MB_TO_BYTES(8), + }; + + ASSERT_SUCCESS(aws_s3_tester_bind_client( + &tester, &client_config, AWS_S3_TESTER_BIND_CLIENT_REGION | AWS_S3_TESTER_BIND_CLIENT_SIGNING)); + + struct aws_s3_client *client = aws_s3_client_new(allocator, &client_config); + + ASSERT_TRUE(client != NULL); + + struct aws_s3_tester_meta_request_options put_options = { + .allocator = allocator, + .meta_request_type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT, + .client = client, + .validate_type = AWS_S3_TESTER_VALIDATE_TYPE_EXPECT_FAILURE, + .put_options = + { + .object_size_mb = 0, + .skip_content_length = true, + }, + }; + struct aws_s3_meta_request_test_results meta_request_test_results; + aws_s3_meta_request_test_results_init(&meta_request_test_results, allocator); + + ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &put_options, &meta_request_test_results)); + + ASSERT_INT_EQUALS(meta_request_test_results.finished_error_code, AWS_ERROR_UNSUPPORTED_OPERATION); + + aws_s3_meta_request_test_results_clean_up(&meta_request_test_results); + + aws_s3_client_release(client); + + aws_s3_tester_clean_up(&tester); + + return 0; +} + AWS_TEST_CASE(test_s3_put_object_sse_kms, s_test_s3_put_object_sse_kms) static int s_test_s3_put_object_sse_kms(struct aws_allocator *allocator, void *ctx) { (void)ctx; diff --git a/tests/s3_meta_request_test.c b/tests/s3_meta_request_test.c index c2c703316..03213a7b2 100644 --- a/tests/s3_meta_request_test.c +++ b/tests/s3_meta_request_test.c @@ -73,7 +73,7 @@ TEST_CASE(meta_request_auto_ranged_put_new_error_handling) { .type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT, }; struct aws_s3_meta_request *meta_request = - aws_s3_meta_request_auto_ranged_put_new(allocator, client, SIZE_MAX, MB_TO_BYTES(10), 2, &options); + aws_s3_meta_request_auto_ranged_put_new(allocator, client, SIZE_MAX, true, MB_TO_BYTES(10), 2, &options); ASSERT_NULL(meta_request); @@ -82,7 +82,7 @@ TEST_CASE(meta_request_auto_ranged_put_new_error_handling) { token->part_size = 1; /* Less than g_s3_min_upload_part_size */ options.resume_token = token; meta_request = - aws_s3_meta_request_auto_ranged_put_new(allocator, client, MB_TO_BYTES(8), MB_TO_BYTES(10), 2, &options); + aws_s3_meta_request_auto_ranged_put_new(allocator, client, MB_TO_BYTES(8), true, MB_TO_BYTES(10), 2, &options); ASSERT_NULL(meta_request); aws_s3_meta_request_resume_token_release(token); @@ -100,7 +100,7 @@ TEST_CASE(meta_request_auto_ranged_put_new_error_handling) { ASSERT_UINT_EQUALS(aws_s3_meta_request_resume_token_total_num_parts(token), token_options.total_num_parts); ASSERT_UINT_EQUALS(aws_s3_meta_request_resume_token_num_parts_completed(token), token_options.num_parts_completed); meta_request = - aws_s3_meta_request_auto_ranged_put_new(allocator, client, MB_TO_BYTES(8), MB_TO_BYTES(10), 2, &options); + aws_s3_meta_request_auto_ranged_put_new(allocator, client, MB_TO_BYTES(8), true, MB_TO_BYTES(10), 2, &options); ASSERT_NULL(meta_request); diff --git a/tests/s3_request_messages_tests.c b/tests/s3_request_messages_tests.c index db2d3dc22..02a0c009a 100644 --- a/tests/s3_request_messages_tests.c +++ b/tests/s3_request_messages_tests.c @@ -241,12 +241,12 @@ static int s_test_http_headers_match( for (size_t i = 0; i < excluded_message0_headers_count; ++i) { const struct aws_byte_cursor *excluded_header_name = &excluded_message0_headers[i]; - bool header_existance_is_valid = false; + bool header_existence_is_valid = false; - /* If the heaer is in the exception list, it's okay for message1 to have. (It may have been re-added.) */ + /* If the header is in the exception list, it's okay for message1 to have. (It may have been re-added.) */ for (size_t j = 0; j < message1_header_exceptions_count; ++j) { if (aws_byte_cursor_eq(excluded_header_name, &message1_header_exceptions[j])) { - header_existance_is_valid = true; + header_existence_is_valid = true; break; } } @@ -256,10 +256,10 @@ static int s_test_http_headers_match( AWS_ZERO_STRUCT(message1_header_value); int result = aws_http_headers_get(message1_headers, *excluded_header_name, &message1_header_value); - if (header_existance_is_valid) { + if (header_existence_is_valid) { - /* If this header is allowed to exist in message1, then we don't need to assert on its existance or - * non-existance. But we do want to erase it from the expected_message0_headers, since its value may be + /* If this header is allowed to exist in message1, then we don't need to assert on its existence or + * non-existence. But we do want to erase it from the expected_message0_headers, since its value may be * different from that in message0. */ if (result == AWS_OP_SUCCESS) { ASSERT_SUCCESS(aws_http_headers_erase(expected_message0_headers, *excluded_header_name)); diff --git a/tests/s3_test_input_stream.c b/tests/s3_test_input_stream.c index 5fb949617..1d4e5e020 100644 --- a/tests/s3_test_input_stream.c +++ b/tests/s3_test_input_stream.c @@ -31,12 +31,7 @@ static int s_aws_s3_test_input_stream_read( struct aws_s3_test_input_stream_impl *test_input_stream = AWS_CONTAINER_OF(stream, struct aws_s3_test_input_stream_impl, base); - if (dest->capacity > (test_input_stream->length - test_input_stream->position)) { - aws_raise_error(AWS_IO_STREAM_READ_FAILED); - return AWS_OP_ERR; - } - - while (dest->len < dest->capacity) { + while (dest->len < dest->capacity && test_input_stream->position < test_input_stream->length) { size_t buffer_pos = test_input_stream->position % test_string->len; struct aws_byte_cursor source_byte_cursor = { @@ -44,6 +39,11 @@ static int s_aws_s3_test_input_stream_read( .ptr = test_string->ptr + buffer_pos, }; + size_t remaining_in_stream = test_input_stream->length - test_input_stream->position; + if (remaining_in_stream < source_byte_cursor.len) { + source_byte_cursor.len = remaining_in_stream; + } + size_t remaining_in_buffer = dest->capacity - dest->len; if (remaining_in_buffer < source_byte_cursor.len) { @@ -51,7 +51,6 @@ static int s_aws_s3_test_input_stream_read( } aws_byte_buf_append(dest, &source_byte_cursor); - buffer_pos += source_byte_cursor.len; test_input_stream->position += source_byte_cursor.len; } diff --git a/tests/s3_tester.c b/tests/s3_tester.c index b109e9fe3..db9d40e15 100644 --- a/tests/s3_tester.c +++ b/tests/s3_tester.c @@ -1457,6 +1457,11 @@ int aws_s3_tester_send_meta_request_with_options( headers, g_content_length_header_name, aws_byte_cursor_from_c_str(content_length_buffer)); } + if (options->put_options.skip_content_length) { + struct aws_http_headers *headers = aws_http_message_get_headers(message); + aws_http_headers_erase(headers, g_content_length_header_name); + } + if (options->put_options.invalid_request) { /* make a invalid request */ aws_http_message_set_request_path(message, aws_byte_cursor_from_c_str("invalid_path")); diff --git a/tests/s3_tester.h b/tests/s3_tester.h index 6ffe27ab7..5d4afa1ce 100644 --- a/tests/s3_tester.h +++ b/tests/s3_tester.h @@ -187,6 +187,7 @@ struct aws_s3_tester_meta_request_options { struct aws_s3_meta_request_resume_token *resume_token; /* manually overwrite the content length for some invalid input stream */ size_t content_length; + bool skip_content_length; struct aws_byte_cursor content_encoding; } put_options;