From de36fee8fe7ab02f10987877ae94a805bf440c1f Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Wed, 29 Nov 2023 15:11:35 -0800 Subject: [PATCH] Bypass for CreateSession reqeust (#384) --- source/s3_client.c | 78 +++++--- source/s3express_credentials_provider.c | 1 + tests/CMakeLists.txt | 13 +- tests/s3_data_plane_tests.c | 3 +- .../s3_mock_server_s3express_provider_test.c | 5 +- ...ient_test.c => s3_s3express_client_test.c} | 174 +++++++++++++++++- 6 files changed, 234 insertions(+), 40 deletions(-) rename tests/{s3_mock_server_s3express_client_test.c => s3_s3express_client_test.c} (70%) diff --git a/source/s3_client.c b/source/s3_client.c index cba9cf047..6d3f36adf 100644 --- a/source/s3_client.c +++ b/source/s3_client.c @@ -1606,6 +1606,54 @@ static void s_s3_client_prepare_callback_queue_request( int error_code, void *user_data); +static bool s_s3_client_should_update_meta_request( + struct aws_s3_client *client, + struct aws_s3_meta_request *meta_request, + uint32_t num_requests_in_flight, + const uint32_t max_requests_in_flight, + const uint32_t max_requests_prepare) { + + /* CreateSession has high priority to bypass the checks. */ + if (meta_request->type == AWS_S3_META_REQUEST_TYPE_DEFAULT) { + struct aws_s3_meta_request_default *meta_request_default = meta_request->impl; + if (aws_string_eq_c_str(meta_request_default->operation_name, "CreateSession")) { + return true; + } + } + + /** + * If number of being-prepared + already-prepared-and-queued requests is more than the max that can + * be in the preparation stage. + * Or total number of requests tracked by the client is more than the max tracked ("in flight") + * requests. + * + * We cannot create more requests for this meta request. + */ + if ((client->threaded_data.num_requests_being_prepared + client->threaded_data.request_queue_size) >= + max_requests_prepare) { + return false; + } + if (num_requests_in_flight >= max_requests_in_flight) { + return false; + } + + /* If this particular endpoint doesn't have any known addresses yet, then we don't want to go full speed in + * ramping up requests just yet. If there is already enough in the queue for one address (even if those + * aren't for this particular endpoint) we skip over this meta request for now. */ + struct aws_s3_endpoint *endpoint = meta_request->endpoint; + AWS_ASSERT(endpoint != NULL); + AWS_ASSERT(client->vtable->get_host_address_count); + size_t num_known_vips = client->vtable->get_host_address_count( + client->client_bootstrap->host_resolver, endpoint->host_name, AWS_GET_HOST_ADDRESS_COUNT_RECORD_TYPE_A); + if (num_known_vips == 0 && (client->threaded_data.num_requests_being_prepared + + client->threaded_data.request_queue_size) >= g_max_num_connections_per_vip) { + return false; + } + + /* Nothing blocks the meta request to create more requests */ + return true; +} + void aws_s3_client_update_meta_requests_threaded(struct aws_s3_client *client) { AWS_PRECONDITION(client); @@ -1628,37 +1676,21 @@ void aws_s3_client_update_meta_requests_threaded(struct aws_s3_client *client) { for (uint32_t pass_index = 0; pass_index < num_passes; ++pass_index) { - /* While: - * * Number of being-prepared + already-prepared-and-queued requests is less than the max that can be in the - * preparation stage. - * * Total number of requests tracked by the client is less than the max tracked ("in flight") requests. - * * There are meta requests to get requests from. - * - * Then update meta requests to get new requests that can then be prepared (reading from any streams, signing, - * etc.) for sending. + /** + * Iterate through the meta requests to update meta requests and get new requests that can then be prepared ++ * (reading from any streams, signing, etc.) for sending. */ - while ((client->threaded_data.num_requests_being_prepared + client->threaded_data.request_queue_size) < - max_requests_prepare && - num_requests_in_flight < max_requests_in_flight && - !aws_linked_list_empty(&client->threaded_data.meta_requests)) { + while (!aws_linked_list_empty(&client->threaded_data.meta_requests)) { struct aws_linked_list_node *meta_request_node = aws_linked_list_begin(&client->threaded_data.meta_requests); struct aws_s3_meta_request *meta_request = AWS_CONTAINER_OF(meta_request_node, struct aws_s3_meta_request, client_process_work_threaded_data); - struct aws_s3_endpoint *endpoint = meta_request->endpoint; - AWS_ASSERT(endpoint != NULL); - - AWS_ASSERT(client->vtable->get_host_address_count); - size_t num_known_vips = client->vtable->get_host_address_count( - client->client_bootstrap->host_resolver, endpoint->host_name, AWS_GET_HOST_ADDRESS_COUNT_RECORD_TYPE_A); + if (!s_s3_client_should_update_meta_request( + client, meta_request, num_requests_in_flight, max_requests_in_flight, max_requests_prepare)) { - /* If this particular endpoint doesn't have any known addresses yet, then we don't want to go full speed in - * ramping up requests just yet. If there is already enough in the queue for one address (even if those - * aren't for this particular endpoint) we skip over this meta request for now. */ - if (num_known_vips == 0 && (client->threaded_data.num_requests_being_prepared + - client->threaded_data.request_queue_size) >= g_max_num_connections_per_vip) { + /* Move the meta request to be processed from next loop. */ aws_linked_list_remove(&meta_request->client_process_work_threaded_data.node); aws_linked_list_push_back( &meta_requests_work_remaining, &meta_request->client_process_work_threaded_data.node); diff --git a/source/s3express_credentials_provider.c b/source/s3express_credentials_provider.c index 98f3b4a0d..1dbf9b969 100644 --- a/source/s3express_credentials_provider.c +++ b/source/s3express_credentials_provider.c @@ -506,6 +506,7 @@ static struct aws_s3express_session_creator *s_session_creator_new( /* Override endpoint only for tests. */ .endpoint = impl->mock_test.endpoint_override ? impl->mock_test.endpoint_override : NULL, .user_data = session_creator, + .operation_name = aws_byte_cursor_from_c_str("CreateSession"), }; session_creator->synced_data.meta_request = aws_s3_client_make_meta_request(impl->client, &options); AWS_FATAL_ASSERT(session_creator->synced_data.meta_request); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 19123aa46..39bfb81f5 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -294,11 +294,14 @@ if(ENABLE_MOCK_SERVER_TESTS) add_net_test_case(request_time_too_skewed_mock_server) endif() -add_net_test_case(s3express_provider_long_run_real_server) -add_net_test_case(s3express_client_put_test_small_real_server) -add_net_test_case(s3express_client_put_test_large_real_server) -add_net_test_case(s3express_client_put_long_running_test_real_server) -add_net_test_case(s3express_client_get_test_real_server) +add_net_test_case(s3express_provider_long_running_session_refresh) + +add_net_test_case(s3express_client_put_object) +add_net_test_case(s3express_client_put_object_multipart) +add_net_test_case(s3express_client_put_object_multipart_multiple) +add_net_test_case(s3express_client_put_object_long_running_session_refresh) +add_net_test_case(s3express_client_get_object) +add_net_test_case(s3express_client_get_object_multiple) add_net_test_case(meta_request_auto_ranged_get_new_error_handling) add_net_test_case(meta_request_auto_ranged_put_new_error_handling) diff --git a/tests/s3_data_plane_tests.c b/tests/s3_data_plane_tests.c index 67db4a858..e879a9557 100644 --- a/tests/s3_data_plane_tests.c +++ b/tests/s3_data_plane_tests.c @@ -1711,7 +1711,8 @@ static int s_test_s3_multipart_put_object_with_acl(struct aws_allocator *allocat static int s_test_s3_put_object_multiple_helper(struct aws_allocator *allocator, bool file_on_disk) { -#define NUM_REQUESTS 5 + enum s_numbers { NUM_REQUESTS = 5 }; + struct aws_s3_meta_request *meta_requests[NUM_REQUESTS]; struct aws_s3_meta_request_test_results meta_request_test_results[NUM_REQUESTS]; struct aws_http_message *messages[NUM_REQUESTS]; diff --git a/tests/s3_mock_server_s3express_provider_test.c b/tests/s3_mock_server_s3express_provider_test.c index ee20557e1..693c9fdf8 100644 --- a/tests/s3_mock_server_s3express_provider_test.c +++ b/tests/s3_mock_server_s3express_provider_test.c @@ -547,7 +547,6 @@ TEST_CASE(s3express_provider_stress_mock_server) { /* Stress about under load, keep hitting 10 hosts */ for (size_t i = 0; i < num_requests; i++) { - /* code */ char key_buffer[128] = ""; snprintf(key_buffer, sizeof(key_buffer), "test-%zu", (size_t)(i % 10)); struct aws_credentials_properties_s3express property = { @@ -562,7 +561,6 @@ TEST_CASE(s3express_provider_stress_mock_server) { /* Stress about over load, keep hitting different hosts */ s_s3express_tester.credentials_callbacks_received = 0; for (size_t i = 0; i < num_requests; i++) { - /* code */ char key_buffer[128] = ""; snprintf(key_buffer, sizeof(key_buffer), "test-%zu", i); struct aws_credentials_properties_s3express property = { @@ -583,7 +581,7 @@ TEST_CASE(s3express_provider_stress_mock_server) { return AWS_OP_SUCCESS; } -TEST_CASE(s3express_provider_long_run_real_server) { +TEST_CASE(s3express_provider_long_running_session_refresh) { (void)ctx; struct aws_s3_tester tester; @@ -637,7 +635,6 @@ TEST_CASE(s3express_provider_long_run_real_server) { } /** * We should have more than 2 different creds. - * Server can return a credentials that expires less than 5 mins. **/ ASSERT_TRUE(s_s3express_tester.number_of_credentials >= 2); diff --git a/tests/s3_mock_server_s3express_client_test.c b/tests/s3_s3express_client_test.c similarity index 70% rename from tests/s3_mock_server_s3express_client_test.c rename to tests/s3_s3express_client_test.c index 14019e045..cad5b81d2 100644 --- a/tests/s3_mock_server_s3express_client_test.c +++ b/tests/s3_s3express_client_test.c @@ -221,7 +221,7 @@ static int s_s3express_put_object_request( return AWS_OP_SUCCESS; } -static int s_s3express_client_put_test_real_server_helper(struct aws_allocator *allocator, size_t content_length) { +static int s_s3express_client_put_test_helper(struct aws_allocator *allocator, size_t content_length) { struct aws_s3_tester tester; ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester)); @@ -256,14 +256,104 @@ static int s_s3express_client_put_test_real_server_helper(struct aws_allocator * return AWS_OP_SUCCESS; } -TEST_CASE(s3express_client_put_test_small_real_server) { +TEST_CASE(s3express_client_put_object) { (void)ctx; - return s_s3express_client_put_test_real_server_helper(allocator, MB_TO_BYTES(1)); + return s_s3express_client_put_test_helper(allocator, MB_TO_BYTES(1)); } -TEST_CASE(s3express_client_put_test_large_real_server) { +TEST_CASE(s3express_client_put_object_multipart) { (void)ctx; - return s_s3express_client_put_test_real_server_helper(allocator, MB_TO_BYTES(100)); + return s_s3express_client_put_test_helper(allocator, MB_TO_BYTES(100)); +} + +TEST_CASE(s3express_client_put_object_multipart_multiple) { + (void)ctx; + + enum s_numbers { NUM_REQUESTS = 100 }; + + struct aws_s3_meta_request *meta_requests[NUM_REQUESTS]; + struct aws_s3_meta_request_test_results meta_request_test_results[NUM_REQUESTS]; + struct aws_input_stream *input_streams[NUM_REQUESTS]; + + struct aws_s3_tester tester; + ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester)); + + struct aws_byte_cursor region_cursor = aws_byte_cursor_from_c_str("us-east-1"); + + char endpoint[] = "crts-east1--use1-az4--x-s3.s3express-use1-az4.us-east-1.amazonaws.com"; + struct aws_byte_cursor host_cursor = aws_byte_cursor_from_c_str(endpoint); + struct aws_byte_cursor key_cursor = aws_byte_cursor_from_c_str("/crt-test"); + + struct aws_byte_cursor west2_region_cursor = aws_byte_cursor_from_c_str("us-west-2"); + char west2_endpoint[] = "crts-west2--usw2-az1--x-s3.s3express-usw2-az1.us-west-2.amazonaws.com"; + struct aws_byte_cursor west2_host_cursor = aws_byte_cursor_from_c_str(west2_endpoint); + + struct aws_s3_client_config client_config = { + .part_size = MB_TO_BYTES(5), + .enable_s3express = true, + .region = region_cursor, + }; + + ASSERT_SUCCESS(aws_s3_tester_bind_client(&tester, &client_config, AWS_S3_TESTER_BIND_CLIENT_SIGNING)); + + struct aws_s3_client *client = aws_s3_client_new(allocator, &client_config); + + for (size_t i = 0; i < NUM_REQUESTS; ++i) { + input_streams[i] = aws_s3_test_input_stream_new(allocator, MB_TO_BYTES(10)); + + struct aws_byte_cursor request_region = region_cursor; + struct aws_byte_cursor request_host = host_cursor; + if (i % 2 == 0) { + /* Make half of request to east1 and rest half to west2 */ + request_region = west2_region_cursor; + request_host = west2_host_cursor; + } + + struct aws_http_message *message = aws_s3_test_put_object_request_new( + allocator, &request_host, key_cursor, g_test_body_content_type, input_streams[i], 0); + + struct aws_s3_meta_request_options options; + AWS_ZERO_STRUCT(options); + options.type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT; + options.message = message; + struct aws_signing_config_aws s3express_signing_config = { + .algorithm = AWS_SIGNING_ALGORITHM_V4_S3EXPRESS, + .service = g_s3express_service_name, + .region = request_region, + }; + options.signing_config = &s3express_signing_config; + aws_s3_meta_request_test_results_init(&meta_request_test_results[i], allocator); + + ASSERT_SUCCESS(aws_s3_tester_bind_meta_request(&tester, &options, &meta_request_test_results[i])); + + meta_requests[i] = aws_s3_client_make_meta_request(client, &options); + ASSERT_TRUE(meta_requests[i] != NULL); + aws_http_message_release(message); + } + /* Wait for the request to finish. */ + aws_s3_tester_wait_for_meta_request_finish(&tester); + aws_s3_tester_lock_synced_data(&tester); + ASSERT_TRUE(tester.synced_data.finish_error_code == AWS_ERROR_SUCCESS); + aws_s3_tester_unlock_synced_data(&tester); + + for (size_t i = 0; i < NUM_REQUESTS; ++i) { + meta_requests[i] = aws_s3_meta_request_release(meta_requests[i]); + } + + aws_s3_tester_wait_for_meta_request_shutdown(&tester); + + for (size_t i = 0; i < NUM_REQUESTS; ++i) { + aws_s3_tester_validate_put_object_results(&meta_request_test_results[i], 0); + aws_s3_meta_request_test_results_clean_up(&meta_request_test_results[i]); + } + + for (size_t i = 0; i < NUM_REQUESTS; ++i) { + aws_input_stream_release(input_streams[i]); + } + + aws_s3_client_release(client); + aws_s3_tester_clean_up(&tester); + return AWS_OP_SUCCESS; } void s_meta_request_finished_overhead( @@ -300,7 +390,7 @@ struct aws_s3express_credentials_provider *s_s3express_provider_mock_factory( } /* Long running test to make sure our refresh works properly */ -TEST_CASE(s3express_client_put_long_running_test_real_server) { +TEST_CASE(s3express_client_put_object_long_running_session_refresh) { (void)ctx; struct aws_s3_tester tester; @@ -375,7 +465,7 @@ TEST_CASE(s3express_client_put_long_running_test_real_server) { return AWS_OP_SUCCESS; } -TEST_CASE(s3express_client_get_test_real_server) { +TEST_CASE(s3express_client_get_object) { (void)ctx; struct aws_s3_tester tester; @@ -429,3 +519,73 @@ TEST_CASE(s3express_client_get_test_real_server) { aws_s3_tester_clean_up(&tester); return AWS_OP_SUCCESS; } + +TEST_CASE(s3express_client_get_object_multiple) { + (void)ctx; + + struct aws_s3_meta_request *meta_requests[100]; + struct aws_s3_meta_request_test_results meta_request_test_results[100]; + size_t num_meta_requests = AWS_ARRAY_SIZE(meta_requests); + + struct aws_s3_tester tester; + ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester)); + + struct aws_byte_cursor region_cursor = aws_byte_cursor_from_c_str("us-east-1"); + + char endpoint[] = "crts-east1--use1-az4--x-s3.s3express-use1-az4.us-east-1.amazonaws.com"; + struct aws_byte_cursor host_cursor = aws_byte_cursor_from_c_str(endpoint); + struct aws_byte_cursor key_cursor = aws_byte_cursor_from_c_str("/crt-download-10MB"); + + struct aws_s3_client_config client_config = { + .part_size = MB_TO_BYTES(5), + .enable_s3express = true, + .region = region_cursor, + }; + + ASSERT_SUCCESS(aws_s3_tester_bind_client(&tester, &client_config, AWS_S3_TESTER_BIND_CLIENT_SIGNING)); + + struct aws_s3_client *client = aws_s3_client_new(allocator, &client_config); + + for (size_t i = 0; i < num_meta_requests; ++i) { + + struct aws_http_message *message = aws_s3_test_get_object_request_new(allocator, host_cursor, key_cursor); + + struct aws_s3_meta_request_options options; + AWS_ZERO_STRUCT(options); + options.type = AWS_S3_META_REQUEST_TYPE_GET_OBJECT; + options.message = message; + struct aws_signing_config_aws s3express_signing_config = { + .algorithm = AWS_SIGNING_ALGORITHM_V4_S3EXPRESS, + .service = g_s3express_service_name, + }; + options.signing_config = &s3express_signing_config; + aws_s3_meta_request_test_results_init(&meta_request_test_results[i], allocator); + + ASSERT_SUCCESS(aws_s3_tester_bind_meta_request(&tester, &options, &meta_request_test_results[i])); + + meta_requests[i] = aws_s3_client_make_meta_request(client, &options); + ASSERT_TRUE(meta_requests[i] != NULL); + + aws_http_message_release(message); + } + /* Wait for the request to finish. */ + aws_s3_tester_wait_for_meta_request_finish(&tester); + aws_s3_tester_lock_synced_data(&tester); + ASSERT_TRUE(tester.synced_data.finish_error_code == AWS_ERROR_SUCCESS); + aws_s3_tester_unlock_synced_data(&tester); + + for (size_t i = 0; i < num_meta_requests; ++i) { + meta_requests[i] = aws_s3_meta_request_release(meta_requests[i]); + } + + aws_s3_tester_wait_for_meta_request_shutdown(&tester); + + for (size_t i = 0; i < num_meta_requests; ++i) { + aws_s3_tester_validate_get_object_results(&meta_request_test_results[i], 0); + aws_s3_meta_request_test_results_clean_up(&meta_request_test_results[i]); + } + + aws_s3_client_release(client); + aws_s3_tester_clean_up(&tester); + return AWS_OP_SUCCESS; +}