Skip to content

Commit

Permalink
[aws-cpp-sdk-s3-crt]: use CommonRunTime to support CopyObject
Browse files Browse the repository at this point in the history
Provide `CopyObject` support based on awslabs/aws-c-s3#284

In contrast to the existing `CopyObject` function, objects larger than 5GB are
supported, and multipart copies are automatically handled in parallel.
  • Loading branch information
grrtrr authored and jmklix committed Aug 11, 2023
1 parent 2193a58 commit d122ecd
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -403,11 +403,7 @@ namespace Aws
/**
* An Async wrapper for CopyObject that queues the request into a thread executor and triggers associated callback when operation has finished.
*/
template<typename CopyObjectRequestT = Model::CopyObjectRequest>
void CopyObjectAsync(const CopyObjectRequestT& request, const CopyObjectResponseReceivedHandler& handler, const std::shared_ptr<const Aws::Client::AsyncCallerContext>& context = nullptr) const
{
return SubmitAsync(&S3CrtClient::CopyObject, request, handler, context);
}
virtual void CopyObjectAsync(const Model::CopyObjectRequest& request, const CopyObjectResponseReceivedHandler& handler, const std::shared_ptr<const Aws::Client::AsyncCallerContext>& context = nullptr) const;

/**
* <p>Creates a new S3 bucket. To create a bucket, you must register with Amazon S3
Expand Down Expand Up @@ -5587,6 +5583,7 @@ namespace Aws
const S3CrtClient *s3CrtClient;
GetObjectResponseReceivedHandler getResponseHandler;
PutObjectResponseReceivedHandler putResponseHandler;
CopyObjectResponseReceivedHandler copyResponseHandler;
std::shared_ptr<const Aws::Client::AsyncCallerContext> asyncCallerContext;
const Aws::AmazonWebServiceRequest *originalRequest;
std::shared_ptr<Aws::Http::HttpRequest> request;
Expand Down
87 changes: 76 additions & 11 deletions generated/src/aws-cpp-sdk-s3-crt/source/S3CrtClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -390,13 +390,13 @@ static void S3CrtRequestFinishCallback(struct aws_s3_meta_request *meta_request,
{
AWS_UNREFERENCED_PARAM(meta_request);
auto *userData = static_cast<S3CrtClient::CrtRequestCallbackUserData*>(user_data);

if (meta_request_result->error_code != AWS_ERROR_SUCCESS && meta_request_result->response_status == 0) {
/* client side error */
userData->response->SetClientErrorType(CoreErrors::NETWORK_CONNECTION);
Aws::StringStream ss;
ss << "crtCode: " << meta_request_result->error_code
<< ", " << aws_error_name(meta_request_result->error_code)
ss << "crtCode: " << meta_request_result->error_code
<< ", " << aws_error_name(meta_request_result->error_code)
<< " - " << aws_error_str(meta_request_result->error_code);
userData->response->SetClientErrorMessage(ss.str());
userData->response->SetResponseCode(HttpResponseCode::REQUEST_NOT_MADE);
Expand Down Expand Up @@ -761,29 +761,94 @@ CompleteMultipartUploadOutcome S3CrtClient::CompleteMultipartUpload(const Comple
return CompleteMultipartUploadOutcome(MakeRequest(request, endpointResolutionOutcome.GetResult(), Aws::Http::HttpMethod::HTTP_POST));
}

CopyObjectOutcome S3CrtClient::CopyObject(const CopyObjectRequest& request) const
static void CopyObjectRequestShutdownCallback(void *user_data)
{
AWS_OPERATION_GUARD(CopyObject);
AWS_OPERATION_CHECK_PTR(m_endpointProvider, CopyObject, CoreErrors, CoreErrors::ENDPOINT_RESOLUTION_FAILURE);
auto *userData = static_cast<S3CrtClient::CrtRequestCallbackUserData*>(user_data);
S3Crt::Model::CopyObjectOutcome outcome(userData->s3CrtClient->GenerateXmlOutcome(userData->response));

userData->copyResponseHandler(userData->s3CrtClient, *(reinterpret_cast<const CopyObjectRequest*>(userData->originalRequest)), std::move(outcome), userData->asyncCallerContext);
Aws::Delete(userData);
}

void S3CrtClient::CopyObjectAsync(const Model::CopyObjectRequest& request, const CopyObjectResponseReceivedHandler& handler, const std::shared_ptr<const Aws::Client::AsyncCallerContext>& handlerContext) const
{
if (!m_endpointProvider) {
return handler(this, request, CopyObjectOutcome(Aws::Client::AWSError<S3CrtErrors>(S3CrtErrors::INTERNAL_FAILURE, "INTERNAL_FAILURE", "Endpoint provider is not initialized", false)), handlerContext);
}
if (!request.BucketHasBeenSet())
{
AWS_LOGSTREAM_ERROR("CopyObject", "Required field: Bucket, is not set");
return CopyObjectOutcome(Aws::Client::AWSError<S3CrtErrors>(S3CrtErrors::MISSING_PARAMETER, "MISSING_PARAMETER", "Missing required field [Bucket]", false));
return handler(this, request, CopyObjectOutcome(Aws::Client::AWSError<S3CrtErrors>(S3CrtErrors::MISSING_PARAMETER, "MISSING_PARAMETER", "Missing required field [Bucket]", false)), handlerContext);
}
if (!request.CopySourceHasBeenSet())
{
AWS_LOGSTREAM_ERROR("CopyObject", "Required field: CopySource, is not set");
return CopyObjectOutcome(Aws::Client::AWSError<S3CrtErrors>(S3CrtErrors::MISSING_PARAMETER, "MISSING_PARAMETER", "Missing required field [CopySource]", false));
return handler(this, request, CopyObjectOutcome(Aws::Client::AWSError<S3CrtErrors>(S3CrtErrors::MISSING_PARAMETER, "MISSING_PARAMETER", "Missing required field [CopySource]", false)), handlerContext);
}
if (!request.KeyHasBeenSet())
{
AWS_LOGSTREAM_ERROR("CopyObject", "Required field: Key, is not set");
return CopyObjectOutcome(Aws::Client::AWSError<S3CrtErrors>(S3CrtErrors::MISSING_PARAMETER, "MISSING_PARAMETER", "Missing required field [Key]", false));
return handler(this, request, CopyObjectOutcome(Aws::Client::AWSError<S3CrtErrors>(S3CrtErrors::MISSING_PARAMETER, "MISSING_PARAMETER", "Missing required field [Key]", false)), handlerContext);
}

ResolveEndpointOutcome endpointResolutionOutcome = m_endpointProvider->ResolveEndpoint(request.GetEndpointContextParams());
AWS_OPERATION_CHECK_SUCCESS(endpointResolutionOutcome, CopyObject, CoreErrors, CoreErrors::ENDPOINT_RESOLUTION_FAILURE, endpointResolutionOutcome.GetError().GetMessage());
if (!endpointResolutionOutcome.IsSuccess()) {
handler(this, request, CopyObjectOutcome(Aws::Client::AWSError<CoreErrors>(
CoreErrors::ENDPOINT_RESOLUTION_FAILURE, "ENDPOINT_RESOLUTION_FAILURE", endpointResolutionOutcome.GetError().GetMessage(), false)), handlerContext);
return;
}
endpointResolutionOutcome.GetResult().AddPathSegments(request.GetKey());
return CopyObjectOutcome(MakeRequest(request, endpointResolutionOutcome.GetResult(), Aws::Http::HttpMethod::HTTP_PUT));

CrtRequestCallbackUserData *userData = Aws::New<CrtRequestCallbackUserData>(ALLOCATION_TAG);
aws_s3_meta_request_options options;
AWS_ZERO_STRUCT(options);
aws_uri endpoint;
AWS_ZERO_STRUCT(endpoint);
options.endpoint = &endpoint;
std::unique_ptr<aws_uri, void(*)(aws_uri*)> endpointCleanup { options.endpoint, &aws_uri_clean_up };

userData->copyResponseHandler = handler;
userData->asyncCallerContext = handlerContext;
InitCommonCrtRequestOption(userData, &options, &request, endpointResolutionOutcome.GetResult().GetURI(), Aws::Http::HttpMethod::HTTP_PUT);
options.shutdown_callback = CopyObjectRequestShutdownCallback;
options.type = AWS_S3_META_REQUEST_TYPE_COPY_OBJECT;

struct aws_signing_config_aws signing_config_override = m_s3CrtSigningConfig;
if (endpointResolutionOutcome.GetResult().GetAttributes() && endpointResolutionOutcome.GetResult().GetAttributes()->authScheme.GetSigningRegion()) {
signing_config_override.region = Aws::Crt::ByteCursorFromCString(endpointResolutionOutcome.GetResult().GetAttributes()->authScheme.GetSigningRegion()->c_str());
}
if (endpointResolutionOutcome.GetResult().GetAttributes() && endpointResolutionOutcome.GetResult().GetAttributes()->authScheme.GetSigningRegionSet()) {
signing_config_override.region = Aws::Crt::ByteCursorFromCString(endpointResolutionOutcome.GetResult().GetAttributes()->authScheme.GetSigningRegionSet()->c_str());
}
if (endpointResolutionOutcome.GetResult().GetAttributes() && endpointResolutionOutcome.GetResult().GetAttributes()->authScheme.GetSigningName()) {
signing_config_override.service = Aws::Crt::ByteCursorFromCString(endpointResolutionOutcome.GetResult().GetAttributes()->authScheme.GetSigningName()->c_str());
}
options.signing_config = &signing_config_override;

std::shared_ptr<Aws::Crt::Http::HttpRequest> crtHttpRequest = userData->request->ToCrtHttpRequest();
options.message= crtHttpRequest->GetUnderlyingMessage();
userData->crtHttpRequest = crtHttpRequest;

if (aws_s3_client_make_meta_request(m_s3CrtClient, &options) == nullptr)
{
return handler(this, request, CopyObjectOutcome(Aws::Client::AWSError<S3CrtErrors>(S3CrtErrors::INTERNAL_FAILURE, "INTERNAL_FAILURE", "Unable to create s3 meta request", false)), handlerContext);
}
}

CopyObjectOutcome S3CrtClient::CopyObject(const CopyObjectRequest& request) const
{
AWS_OPERATION_GUARD(CopyObject);
Aws::Utils::Threading::Semaphore sem(0, 1);
CopyObjectOutcome res;

auto handler = CopyObjectResponseReceivedHandler{[&](const S3CrtClient*, const CopyObjectRequest&, CopyObjectOutcome outcome, const std::shared_ptr<const Aws::Client::AsyncCallerContext> &) {
res = std::move(outcome);
sem.ReleaseAll();
}};

S3CrtClient::CopyObjectAsync(request, handler, nullptr);
sem.WaitOne();
return res;
}

CreateBucketOutcome S3CrtClient::CreateBucket(const CreateBucketRequest& request) const
Expand Down

0 comments on commit d122ecd

Please sign in to comment.