Skip to content

Commit

Permalink
Speed up async streaming. (#313)
Browse files Browse the repository at this point in the history
- Fix bug where async-streaming prevented a meta-request from doing ANY parallelism 😅.
    -  We accidentally limited "number of parts in flight" when we meant to limit "number of parts pending read"
- Reduce gap between sequential reads by running client->update() as soon as the read completes, rather than waiting until after signing is complete.
- Rename ~num_parts_sent~ -> `num_parts_started`, for clarity
- Rename (rework) ~num_parts_read~ -> `num_parts_pending_read`.
    - Note this slightly changes the meaning as well, this number goes up and down, and it's never more than 1 for async-streaming.
    - This fixes a bug when resuming an upload. We'd forgot to increment the old `num_parts_read` along with `num_parts_started` and `num_parts_completed` which threw off the math and lead to the upload stalling. It seemed simpler and more obvious to just track the number we care about, rather than have so many different numbers ticking up, from many different locations, and their meaning is weird when skipping is involved.
- If a stream-reading failure occurs during "resume from pause", reveal the actual error code. Stop covering it with AWS_ERROR_S3_RESUME_FAILED.
  • Loading branch information
graebm committed Jun 14, 2023
1 parent 02e12e2 commit 9263f9b
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 46 deletions.
11 changes: 7 additions & 4 deletions include/aws/s3/private/s3_auto_ranged_put.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,8 @@ struct aws_s3_auto_ranged_put {
struct aws_string *list_parts_continuation_token;

/* Number of parts we've started work on */
uint32_t num_parts_sent;
/* Number of "sent" parts we've finished reading the body for
* (does not include skipped parts in the case of pause/resume) */
uint32_t num_parts_read;
uint32_t num_parts_started;
/* Number of parts we've started, and we have no more work to do */
uint32_t num_parts_completed;
uint32_t num_parts_successful;
uint32_t num_parts_failed;
Expand All @@ -85,6 +83,11 @@ struct aws_s3_auto_ranged_put {
* work to do*/
uint32_t num_parts_noop;

/* Number of parts we've started, but they're not done reading from stream yet.
* Though reads are serial (only 1 part can be reading from stream at a time)
* we may queue up more to minimize delays between each read. */
uint32_t num_parts_pending_read;

struct aws_http_headers *needed_response_headers;

/* Whether body stream is exhausted. */
Expand Down
81 changes: 42 additions & 39 deletions source/s3_auto_ranged_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,19 @@ static const size_t s_abort_multipart_upload_init_body_size_bytes = 512;
*/
static const uint32_t s_unknown_length_default_num_parts = 32;

/* For unknown length body we dont know how many parts we'll end up with.
* We optimistically schedule upload requests, but cap the number of requests
* being prepared by bellow number to avoid unknown length request dominating the
* queue.
* TODO: this value needs further benchmarking.*/
static const uint32_t s_unknown_length_max_optimistic_prepared_parts = 5;
/* Max number of parts (per meta-request) that can be: "started, but not done reading from stream".
* Though reads are serial (only 1 part can be reading from stream at a time)
* we may queue up more to minimize delays between each read.
*
* If this number is too low, there could be an avoidable delay between each read
* (meta-request ready for more work, but client hasn't run update and given it more work yet)
*
* If this number is too high, early meta-requests could hog all the "work tokens"
* (1st meta-request as queue of 100 "work tokens" that it needs to read
* the stream for, while later meta-requests are doing nothing waiting for work tokens)
*
* TODO: this value needs further benchmarking. */
static const uint32_t s_max_parts_pending_read = 5;

static const struct aws_byte_cursor s_create_multipart_upload_copy_headers[] = {
AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("x-amz-server-side-encryption-customer-algorithm"),
Expand Down Expand Up @@ -258,7 +265,7 @@ static int s_try_init_resume_state_from_persisted_data(
return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
}

auto_ranged_put->synced_data.num_parts_sent = 0;
auto_ranged_put->synced_data.num_parts_started = 0;
auto_ranged_put->synced_data.num_parts_completed = 0;
auto_ranged_put->synced_data.num_parts_noop = 0;
auto_ranged_put->synced_data.create_multipart_upload_sent = true;
Expand Down Expand Up @@ -441,22 +448,20 @@ static bool s_should_skip_scheduling_more_parts_based_on_flags(
const struct aws_s3_auto_ranged_put *auto_ranged_put,
uint32_t flags) {

uint32_t num_parts_in_flight =
(auto_ranged_put->synced_data.num_parts_sent - auto_ranged_put->synced_data.num_parts_completed);

/* If the stream is actually async, only allow 1 part in flight at a time.
* We need to wait for async read() to complete before calling it again */
/* If the stream is actually async, only allow 1 pending-read.
* We need to wait for async read() to complete before calling it again. */
if (auto_ranged_put->base.request_body_async_stream != NULL) {
return num_parts_in_flight > 0;
return auto_ranged_put->synced_data.num_parts_pending_read > 0;
}

/* If this is the conservative pass, only allow 1 pending-read.
* Reads are serial anyway, so queuing up a whole bunch isn't necessarily a speedup. */
if ((flags & AWS_S3_META_REQUEST_UPDATE_FLAG_CONSERVATIVE) != 0) {
/* Because uploads must read from their streams serially, we try to limit the amount of in flight
* requests for a given multipart upload if we can. */
return num_parts_in_flight > 0;
return auto_ranged_put->synced_data.num_parts_pending_read > 0;
}

return false;
/* In all other cases, cap the number of pending-reads to something reasonable */
return auto_ranged_put->synced_data.num_parts_pending_read >= s_max_parts_pending_read;
}

static bool s_s3_auto_ranged_put_update(
Expand Down Expand Up @@ -526,8 +531,8 @@ static bool s_s3_auto_ranged_put_update(
}

bool should_create_next_part_request = false;
if (auto_ranged_put->has_content_length &&
(auto_ranged_put->synced_data.num_parts_sent < auto_ranged_put->total_num_parts_from_content_length)) {
if (auto_ranged_put->has_content_length && (auto_ranged_put->synced_data.num_parts_started <
auto_ranged_put->total_num_parts_from_content_length)) {

/* Check if the etag/checksum list has the result already */
int part_index = auto_ranged_put->threaded_update_data.next_part_number - 1;
Expand Down Expand Up @@ -563,15 +568,6 @@ static bool s_s3_auto_ranged_put_update(
goto has_work_remaining;
}

uint32_t num_parts_not_read =
(auto_ranged_put->synced_data.num_parts_sent - auto_ranged_put->synced_data.num_parts_read);

/* Because uploads must read from their streams serially, we try to limit the amount of in flight
* requests for a given multipart upload if we can. */
if (num_parts_not_read >= s_unknown_length_max_optimistic_prepared_parts) {
goto has_work_remaining;
}

should_create_next_part_request = true;
}

Expand All @@ -587,7 +583,8 @@ static bool s_s3_auto_ranged_put_update(
request->part_number = auto_ranged_put->threaded_update_data.next_part_number;

++auto_ranged_put->threaded_update_data.next_part_number;
++auto_ranged_put->synced_data.num_parts_sent;
++auto_ranged_put->synced_data.num_parts_started;
++auto_ranged_put->synced_data.num_parts_pending_read;

AWS_LOGF_DEBUG(
AWS_LS_S3_META_REQUEST,
Expand All @@ -608,7 +605,8 @@ static bool s_s3_auto_ranged_put_update(
}
} else {
if ((!auto_ranged_put->synced_data.is_body_stream_at_end) ||
auto_ranged_put->synced_data.num_parts_completed != auto_ranged_put->synced_data.num_parts_sent) {
auto_ranged_put->synced_data.num_parts_completed !=
auto_ranged_put->synced_data.num_parts_started) {
goto has_work_remaining;
}
}
Expand Down Expand Up @@ -646,7 +644,7 @@ static bool s_s3_auto_ranged_put_update(

/* If the number of parts completed is less than the number of parts sent, then we need to wait until all of
* those parts are done sending before aborting. */
if (auto_ranged_put->synced_data.num_parts_completed < auto_ranged_put->synced_data.num_parts_sent) {
if (auto_ranged_put->synced_data.num_parts_completed < auto_ranged_put->synced_data.num_parts_started) {
goto has_work_remaining;
}

Expand Down Expand Up @@ -922,20 +920,19 @@ static void s_skip_parts_from_stream_loop(void *user_data) {
if (error_code != AWS_ERROR_SUCCESS) {
AWS_LOGF_ERROR(
AWS_LS_S3_META_REQUEST,
"id=%p: Failed to resume upload. Input stream read error %d (%s)",
"id=%p: Failed resuming upload, error reading request body %d (%s)",
(void *)meta_request,
error_code,
aws_error_str(error_code));
error_code = AWS_ERROR_S3_RESUME_FAILED;
goto on_done;
}

if (temp_body_buf->len < temp_body_buf->capacity) {
AWS_LOGF_ERROR(
AWS_LS_S3_META_REQUEST,
"id=%p: Failed to resume upload. Input steam ended prematurely.",
"id=%p: Failed resuming upload, request body smaller than 'Content-Length' header said it would be.",
(void *)meta_request);
error_code = AWS_ERROR_S3_RESUME_FAILED;
error_code = AWS_ERROR_S3_INCORRECT_CONTENT_LENGTH;
goto on_done;
}

Expand Down Expand Up @@ -1177,9 +1174,7 @@ static void s_s3_prepare_upload_part_on_read_done(void *user_data) {
{
aws_s3_meta_request_lock_synced_data(meta_request);

/* TODO: should we signal the client to run update again?
* since we just bumped up this number we use to throttle work? */
++auto_ranged_put->synced_data.num_parts_read;
--auto_ranged_put->synced_data.num_parts_pending_read;

auto_ranged_put->synced_data.is_body_stream_at_end = is_body_stream_at_end;

Expand All @@ -1202,6 +1197,14 @@ static void s_s3_prepare_upload_part_on_read_done(void *user_data) {
}
/* END CRITICAL SECTION */

/* We throttle the number of parts that can be "pending read"
* (e.g. only 1 at a time if reading from async-stream).
* Now that read is complete, poke the client to see if it can give us more work.
*
* Poking now gives measurable speedup (1%) for async streaming,
* vs waiting until all the part-prep steps are complete (still need to sign, etc) */
aws_s3_client_schedule_process_work(meta_request->client);

on_done:
s_s3_prepare_upload_part_finish(part_prep, error_code);
}
Expand Down Expand Up @@ -1536,7 +1539,7 @@ static void s_s3_auto_ranged_put_request_finished(
aws_array_list_get_at(&auto_ranged_put->synced_data.etag_list, &etag, etag_index);
if (etag != NULL) {
/* Update the number of parts sent/completed previously */
++auto_ranged_put->synced_data.num_parts_sent;
++auto_ranged_put->synced_data.num_parts_started;
++auto_ranged_put->synced_data.num_parts_completed;
}
}
Expand Down
2 changes: 1 addition & 1 deletion tests/s3_cancel_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ static bool s_s3_meta_request_update_cancel_test(
break;
case S3_UPDATE_CANCEL_TYPE_MPU_ONE_PART_COMPLETED:
call_cancel = auto_ranged_put->synced_data.num_parts_completed == 1;
block_update = !call_cancel && auto_ranged_put->synced_data.num_parts_sent == 1;
block_update = !call_cancel && auto_ranged_put->synced_data.num_parts_started == 1;
break;
case S3_UPDATE_CANCEL_TYPE_MPU_ALL_PARTS_COMPLETED:
call_cancel = auto_ranged_put->synced_data.num_parts_completed ==
Expand Down
11 changes: 9 additions & 2 deletions tests/s3_data_plane_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -5940,6 +5940,13 @@ static int s_test_s3_put_pause_resume_helper(
struct aws_s3_client_config client_config;
AWS_ZERO_STRUCT(client_config);

if (resume_state == NULL) {
/* If we're going to cancel this operation, limit the client to 1 HTTP connection.
* That way, we don't end up "cancelling" but all the parts actually
* succeed anyway on other connections */
client_config.max_active_connections_override = 1;
}

ASSERT_SUCCESS(aws_s3_tester_bind_client(
tester, &client_config, AWS_S3_TESTER_BIND_CLIENT_REGION | AWS_S3_TESTER_BIND_CLIENT_SIGNING));

Expand Down Expand Up @@ -6353,7 +6360,7 @@ static int s_test_s3_put_pause_resume_invalid_resume_stream(struct aws_allocator
resume_upload_stream,
persistable_state,
AWS_SCA_CRC32,
AWS_ERROR_S3_RESUME_FAILED,
AWS_IO_STREAM_READ_FAILED,
0));

bytes_uploaded = aws_atomic_load_int(&test_data.total_bytes_uploaded);
Expand Down Expand Up @@ -6436,7 +6443,7 @@ static int s_test_s3_put_pause_resume_invalid_content_length(struct aws_allocato
resume_upload_stream,
persistable_state,
AWS_SCA_CRC32,
AWS_ERROR_S3_RESUME_FAILED,
AWS_ERROR_S3_INCORRECT_CONTENT_LENGTH,
0));

bytes_uploaded = aws_atomic_load_int(&test_data.total_bytes_uploaded);
Expand Down
1 change: 1 addition & 0 deletions tests/s3_tester.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <aws/io/tls_channel_handler.h>
#include <aws/testing/aws_test_harness.h>
#include <aws/testing/stream_tester.h>
#include <ctype.h>
#include <inttypes.h>
#include <stdlib.h>
#include <time.h>
Expand Down

0 comments on commit 9263f9b

Please sign in to comment.