diff --git a/include/aws/s3/private/s3_auto_ranged_put.h b/include/aws/s3/private/s3_auto_ranged_put.h index 04e1703fc..b376506f7 100644 --- a/include/aws/s3/private/s3_auto_ranged_put.h +++ b/include/aws/s3/private/s3_auto_ranged_put.h @@ -73,10 +73,8 @@ struct aws_s3_auto_ranged_put { struct aws_string *list_parts_continuation_token; /* Number of parts we've started work on */ - uint32_t num_parts_sent; - /* Number of "sent" parts we've finished reading the body for - * (does not include skipped parts in the case of pause/resume) */ - uint32_t num_parts_read; + uint32_t num_parts_started; + /* Number of parts we've started, and we have no more work to do */ uint32_t num_parts_completed; uint32_t num_parts_successful; uint32_t num_parts_failed; @@ -85,6 +83,11 @@ struct aws_s3_auto_ranged_put { * work to do*/ uint32_t num_parts_noop; + /* Number of parts we've started, but they're not done reading from stream yet. + * Though reads are serial (only 1 part can be reading from stream at a time) + * we may queue up more to minimize delays between each read. */ + uint32_t num_parts_pending_read; + struct aws_http_headers *needed_response_headers; /* Whether body stream is exhausted. */ diff --git a/source/s3_auto_ranged_put.c b/source/s3_auto_ranged_put.c index a8b3109b6..925a77d87 100644 --- a/source/s3_auto_ranged_put.c +++ b/source/s3_auto_ranged_put.c @@ -24,12 +24,19 @@ static const size_t s_abort_multipart_upload_init_body_size_bytes = 512; */ static const uint32_t s_unknown_length_default_num_parts = 32; -/* For unknown length body we dont know how many parts we'll end up with. - * We optimistically schedule upload requests, but cap the number of requests - * being prepared by bellow number to avoid unknown length request dominating the - * queue. - * TODO: this value needs further benchmarking.*/ -static const uint32_t s_unknown_length_max_optimistic_prepared_parts = 5; +/* Max number of parts (per meta-request) that can be: "started, but not done reading from stream". + * Though reads are serial (only 1 part can be reading from stream at a time) + * we may queue up more to minimize delays between each read. + * + * If this number is too low, there could be an avoidable delay between each read + * (meta-request ready for more work, but client hasn't run update and given it more work yet) + * + * If this number is too high, early meta-requests could hog all the "work tokens" + * (1st meta-request as queue of 100 "work tokens" that it needs to read + * the stream for, while later meta-requests are doing nothing waiting for work tokens) + * + * TODO: this value needs further benchmarking. */ +static const uint32_t s_max_parts_pending_read = 5; static const struct aws_byte_cursor s_create_multipart_upload_copy_headers[] = { AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("x-amz-server-side-encryption-customer-algorithm"), @@ -258,7 +265,7 @@ static int s_try_init_resume_state_from_persisted_data( return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); } - auto_ranged_put->synced_data.num_parts_sent = 0; + auto_ranged_put->synced_data.num_parts_started = 0; auto_ranged_put->synced_data.num_parts_completed = 0; auto_ranged_put->synced_data.num_parts_noop = 0; auto_ranged_put->synced_data.create_multipart_upload_sent = true; @@ -441,22 +448,20 @@ static bool s_should_skip_scheduling_more_parts_based_on_flags( const struct aws_s3_auto_ranged_put *auto_ranged_put, uint32_t flags) { - uint32_t num_parts_in_flight = - (auto_ranged_put->synced_data.num_parts_sent - auto_ranged_put->synced_data.num_parts_completed); - - /* If the stream is actually async, only allow 1 part in flight at a time. - * We need to wait for async read() to complete before calling it again */ + /* If the stream is actually async, only allow 1 pending-read. + * We need to wait for async read() to complete before calling it again. */ if (auto_ranged_put->base.request_body_async_stream != NULL) { - return num_parts_in_flight > 0; + return auto_ranged_put->synced_data.num_parts_pending_read > 0; } + /* If this is the conservative pass, only allow 1 pending-read. + * Reads are serial anyway, so queuing up a whole bunch isn't necessarily a speedup. */ if ((flags & AWS_S3_META_REQUEST_UPDATE_FLAG_CONSERVATIVE) != 0) { - /* Because uploads must read from their streams serially, we try to limit the amount of in flight - * requests for a given multipart upload if we can. */ - return num_parts_in_flight > 0; + return auto_ranged_put->synced_data.num_parts_pending_read > 0; } - return false; + /* In all other cases, cap the number of pending-reads to something reasonable */ + return auto_ranged_put->synced_data.num_parts_pending_read >= s_max_parts_pending_read; } static bool s_s3_auto_ranged_put_update( @@ -526,8 +531,8 @@ static bool s_s3_auto_ranged_put_update( } bool should_create_next_part_request = false; - if (auto_ranged_put->has_content_length && - (auto_ranged_put->synced_data.num_parts_sent < auto_ranged_put->total_num_parts_from_content_length)) { + if (auto_ranged_put->has_content_length && (auto_ranged_put->synced_data.num_parts_started < + auto_ranged_put->total_num_parts_from_content_length)) { /* Check if the etag/checksum list has the result already */ int part_index = auto_ranged_put->threaded_update_data.next_part_number - 1; @@ -563,15 +568,6 @@ static bool s_s3_auto_ranged_put_update( goto has_work_remaining; } - uint32_t num_parts_not_read = - (auto_ranged_put->synced_data.num_parts_sent - auto_ranged_put->synced_data.num_parts_read); - - /* Because uploads must read from their streams serially, we try to limit the amount of in flight - * requests for a given multipart upload if we can. */ - if (num_parts_not_read >= s_unknown_length_max_optimistic_prepared_parts) { - goto has_work_remaining; - } - should_create_next_part_request = true; } @@ -587,7 +583,8 @@ static bool s_s3_auto_ranged_put_update( request->part_number = auto_ranged_put->threaded_update_data.next_part_number; ++auto_ranged_put->threaded_update_data.next_part_number; - ++auto_ranged_put->synced_data.num_parts_sent; + ++auto_ranged_put->synced_data.num_parts_started; + ++auto_ranged_put->synced_data.num_parts_pending_read; AWS_LOGF_DEBUG( AWS_LS_S3_META_REQUEST, @@ -608,7 +605,8 @@ static bool s_s3_auto_ranged_put_update( } } else { if ((!auto_ranged_put->synced_data.is_body_stream_at_end) || - auto_ranged_put->synced_data.num_parts_completed != auto_ranged_put->synced_data.num_parts_sent) { + auto_ranged_put->synced_data.num_parts_completed != + auto_ranged_put->synced_data.num_parts_started) { goto has_work_remaining; } } @@ -646,7 +644,7 @@ static bool s_s3_auto_ranged_put_update( /* If the number of parts completed is less than the number of parts sent, then we need to wait until all of * those parts are done sending before aborting. */ - if (auto_ranged_put->synced_data.num_parts_completed < auto_ranged_put->synced_data.num_parts_sent) { + if (auto_ranged_put->synced_data.num_parts_completed < auto_ranged_put->synced_data.num_parts_started) { goto has_work_remaining; } @@ -922,20 +920,19 @@ static void s_skip_parts_from_stream_loop(void *user_data) { if (error_code != AWS_ERROR_SUCCESS) { AWS_LOGF_ERROR( AWS_LS_S3_META_REQUEST, - "id=%p: Failed to resume upload. Input stream read error %d (%s)", + "id=%p: Failed resuming upload, error reading request body %d (%s)", (void *)meta_request, error_code, aws_error_str(error_code)); - error_code = AWS_ERROR_S3_RESUME_FAILED; goto on_done; } if (temp_body_buf->len < temp_body_buf->capacity) { AWS_LOGF_ERROR( AWS_LS_S3_META_REQUEST, - "id=%p: Failed to resume upload. Input steam ended prematurely.", + "id=%p: Failed resuming upload, request body smaller than 'Content-Length' header said it would be.", (void *)meta_request); - error_code = AWS_ERROR_S3_RESUME_FAILED; + error_code = AWS_ERROR_S3_INCORRECT_CONTENT_LENGTH; goto on_done; } @@ -1177,9 +1174,7 @@ static void s_s3_prepare_upload_part_on_read_done(void *user_data) { { aws_s3_meta_request_lock_synced_data(meta_request); - /* TODO: should we signal the client to run update again? - * since we just bumped up this number we use to throttle work? */ - ++auto_ranged_put->synced_data.num_parts_read; + --auto_ranged_put->synced_data.num_parts_pending_read; auto_ranged_put->synced_data.is_body_stream_at_end = is_body_stream_at_end; @@ -1202,6 +1197,14 @@ static void s_s3_prepare_upload_part_on_read_done(void *user_data) { } /* END CRITICAL SECTION */ + /* We throttle the number of parts that can be "pending read" + * (e.g. only 1 at a time if reading from async-stream). + * Now that read is complete, poke the client to see if it can give us more work. + * + * Poking now gives measurable speedup (1%) for async streaming, + * vs waiting until all the part-prep steps are complete (still need to sign, etc) */ + aws_s3_client_schedule_process_work(meta_request->client); + on_done: s_s3_prepare_upload_part_finish(part_prep, error_code); } @@ -1536,7 +1539,7 @@ static void s_s3_auto_ranged_put_request_finished( aws_array_list_get_at(&auto_ranged_put->synced_data.etag_list, &etag, etag_index); if (etag != NULL) { /* Update the number of parts sent/completed previously */ - ++auto_ranged_put->synced_data.num_parts_sent; + ++auto_ranged_put->synced_data.num_parts_started; ++auto_ranged_put->synced_data.num_parts_completed; } } diff --git a/tests/s3_cancel_tests.c b/tests/s3_cancel_tests.c index a78177948..5f16a1f4a 100644 --- a/tests/s3_cancel_tests.c +++ b/tests/s3_cancel_tests.c @@ -62,7 +62,7 @@ static bool s_s3_meta_request_update_cancel_test( break; case S3_UPDATE_CANCEL_TYPE_MPU_ONE_PART_COMPLETED: call_cancel = auto_ranged_put->synced_data.num_parts_completed == 1; - block_update = !call_cancel && auto_ranged_put->synced_data.num_parts_sent == 1; + block_update = !call_cancel && auto_ranged_put->synced_data.num_parts_started == 1; break; case S3_UPDATE_CANCEL_TYPE_MPU_ALL_PARTS_COMPLETED: call_cancel = auto_ranged_put->synced_data.num_parts_completed == diff --git a/tests/s3_data_plane_tests.c b/tests/s3_data_plane_tests.c index ae38ff498..3ed8c3565 100644 --- a/tests/s3_data_plane_tests.c +++ b/tests/s3_data_plane_tests.c @@ -5940,6 +5940,13 @@ static int s_test_s3_put_pause_resume_helper( struct aws_s3_client_config client_config; AWS_ZERO_STRUCT(client_config); + if (resume_state == NULL) { + /* If we're going to cancel this operation, limit the client to 1 HTTP connection. + * That way, we don't end up "cancelling" but all the parts actually + * succeed anyway on other connections */ + client_config.max_active_connections_override = 1; + } + ASSERT_SUCCESS(aws_s3_tester_bind_client( tester, &client_config, AWS_S3_TESTER_BIND_CLIENT_REGION | AWS_S3_TESTER_BIND_CLIENT_SIGNING)); @@ -6353,7 +6360,7 @@ static int s_test_s3_put_pause_resume_invalid_resume_stream(struct aws_allocator resume_upload_stream, persistable_state, AWS_SCA_CRC32, - AWS_ERROR_S3_RESUME_FAILED, + AWS_IO_STREAM_READ_FAILED, 0)); bytes_uploaded = aws_atomic_load_int(&test_data.total_bytes_uploaded); @@ -6436,7 +6443,7 @@ static int s_test_s3_put_pause_resume_invalid_content_length(struct aws_allocato resume_upload_stream, persistable_state, AWS_SCA_CRC32, - AWS_ERROR_S3_RESUME_FAILED, + AWS_ERROR_S3_INCORRECT_CONTENT_LENGTH, 0)); bytes_uploaded = aws_atomic_load_int(&test_data.total_bytes_uploaded); diff --git a/tests/s3_tester.c b/tests/s3_tester.c index fdac15652..3500bd0e5 100644 --- a/tests/s3_tester.c +++ b/tests/s3_tester.c @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include