Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Storage] Use FluxUtil for reliable download. #22080

Merged
merged 7 commits into from
Jun 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/storage/azure-storage-blob-batch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.16.0</version> <!-- {x-version-update;com.azure:azure-core;dependency} -->
<version>1.17.0-beta.1</version> <!-- {x-version-update;unreleased_com.azure:azure-core;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
2 changes: 1 addition & 1 deletion sdk/storage/azure-storage-blob-changefeed/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.16.0</version> <!-- {x-version-update;com.azure:azure-core;dependency} -->
<version>1.17.0-beta.1</version> <!-- {x-version-update;unreleased_com.azure:azure-core;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
2 changes: 1 addition & 1 deletion sdk/storage/azure-storage-blob-cryptography/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.16.0</version> <!-- {x-version-update;com.azure:azure-core;dependency} -->
<version>1.17.0-beta.1</version> <!-- {x-version-update;unreleased_com.azure:azure-core;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
2 changes: 1 addition & 1 deletion sdk/storage/azure-storage-blob-nio/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.16.0</version> <!-- {x-version-update;com.azure:azure-core;dependency} -->
<version>1.17.0-beta.1</version> <!-- {x-version-update;unreleased_com.azure:azure-core;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
2 changes: 1 addition & 1 deletion sdk/storage/azure-storage-blob/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.16.0</version> <!-- {x-version-update;com.azure:azure-core;dependency} -->
<version>1.17.0-beta.1</version> <!-- {x-version-update;unreleased_com.azure:azure-core;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.azure.core.http.RequestConditions;
import com.azure.core.http.rest.Response;
import com.azure.core.http.rest.SimpleResponse;
import com.azure.core.http.rest.StreamResponse;
import com.azure.core.util.BinaryData;
import com.azure.core.util.Context;
import com.azure.core.util.CoreUtils;
Expand All @@ -22,12 +23,12 @@
import com.azure.storage.blob.BlobContainerClientBuilder;
import com.azure.storage.blob.BlobServiceAsyncClient;
import com.azure.storage.blob.BlobServiceVersion;
import com.azure.storage.blob.HttpGetterInfo;
import com.azure.storage.blob.ProgressReporter;
import com.azure.storage.blob.implementation.AzureBlobStorageImpl;
import com.azure.storage.blob.implementation.AzureBlobStorageImplBuilder;
import com.azure.storage.blob.implementation.models.BlobTag;
import com.azure.storage.blob.implementation.models.BlobTags;
import com.azure.storage.blob.implementation.models.BlobsDownloadHeaders;
import com.azure.storage.blob.implementation.models.BlobsGetAccountInfoHeaders;
import com.azure.storage.blob.implementation.models.BlobsGetPropertiesHeaders;
import com.azure.storage.blob.implementation.models.BlobsStartCopyFromURLHeaders;
Expand Down Expand Up @@ -99,6 +100,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -120,6 +122,7 @@
public class BlobAsyncClientBase {

private final ClientLogger logger = new ClientLogger(BlobAsyncClientBase.class);
private static final Duration TIMEOUT_VALUE = Duration.ofSeconds(60);

protected final AzureBlobStorageImpl azureBlobStorage;
private final String snapshot;
Expand Down Expand Up @@ -1002,32 +1005,78 @@ public Mono<BlobDownloadContentAsyncResponse> downloadContentWithResponse(

Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, DownloadRetryOptions options,
BlobRequestConditions requestConditions, boolean getRangeContentMd5, Context context) {
return downloadHelper(range, options, requestConditions, getRangeContentMd5, context)
.map(response -> new BlobDownloadAsyncResponse(response.getRequest(), response.getStatusCode(),
response.getHeaders(), response.getValue(), response.getDeserializedHeaders()));
}

private Mono<ReliableDownload> downloadHelper(BlobRange range, DownloadRetryOptions options,
BlobRequestConditions requestConditions, boolean getRangeContentMd5, Context context) {
range = range == null ? new BlobRange(0) : range;
BlobRange finalRange = range == null ? new BlobRange(0) : range;
Boolean getMD5 = getRangeContentMd5 ? getRangeContentMd5 : null;
requestConditions = requestConditions == null ? new BlobRequestConditions() : requestConditions;
HttpGetterInfo info = new HttpGetterInfo()
.setOffset(range.getOffset())
.setCount(range.getCount())
.setETag(requestConditions.getIfMatch());
BlobRequestConditions finalRequestConditions =
requestConditions == null ? new BlobRequestConditions() : requestConditions;
DownloadRetryOptions finalOptions = (options == null) ? new DownloadRetryOptions() : options;

return downloadRange(finalRange, finalRequestConditions, finalRequestConditions.getIfMatch(), getMD5, context)
.map(response -> {
String eTag = ModelHelper.getETag(response.getHeaders());
BlobsDownloadHeaders blobsDownloadHeaders =
ModelHelper.transformBlobDownloadHeaders(response.getHeaders());
BlobDownloadHeaders blobDownloadHeaders = ModelHelper.populateBlobDownloadHeaders(
blobsDownloadHeaders, ModelHelper.getErrorCode(response.getHeaders()));

/*
If the customer did not specify a count, they are reading to the end of the blob. Extract this value
from the response for better book keeping towards the end.
*/
long finalCount;
if (finalRange.getCount() == null) {
long blobLength = BlobAsyncClientBase.getBlobLength(blobDownloadHeaders);
finalCount = blobLength - finalRange.getOffset();
} else {
finalCount = finalRange.getCount();
}

Flux<ByteBuffer> bufferFlux = FluxUtil.createRetriableDownloadFlux(
() -> response.getValue().timeout(TIMEOUT_VALUE),
(throwable, offset) -> {
if (!(throwable instanceof IOException || throwable instanceof TimeoutException)) {
return Flux.error(throwable);
}

long newCount = finalCount - (offset - finalRange.getOffset());

/*
It is possible that the network stream will throw an error after emitting all data but before
completing. Issuing a retry at this stage would leave the download in a bad state with incorrect count
and offset values. Because we have read the intended amount of data, we can ignore the error at the end
of the stream.
*/
if (newCount == 0) {
logger.warning("Exception encountered in ReliableDownload after all data read from the network but "
+ "but before stream signaled completion. Returning success as all data was downloaded. "
+ "Exception message: " + throwable.getMessage());
return Flux.empty();
}

try {
return downloadRange(
new BlobRange(offset, newCount), finalRequestConditions, eTag, getMD5, context)
.flatMapMany(r -> r.getValue().timeout(TIMEOUT_VALUE));
} catch (Exception e) {
return Flux.error(e);
}
},
finalOptions.getMaxRetryRequests(),
finalRange.getOffset()
).switchIfEmpty(Flux.just(ByteBuffer.wrap(new byte[0])));

return new BlobDownloadAsyncResponse(response.getRequest(), response.getStatusCode(),
response.getHeaders(), bufferFlux, blobDownloadHeaders);
});
}

private Mono<StreamResponse> downloadRange(BlobRange range, BlobRequestConditions requestConditions,
String eTag, Boolean getMD5, Context context) {
return azureBlobStorage.getBlobs().downloadWithResponseAsync(containerName, blobName, snapshot, versionId, null,
range.toHeaderValue(), requestConditions.getLeaseId(), getMD5, null, requestConditions.getIfModifiedSince(),
requestConditions.getIfUnmodifiedSince(), requestConditions.getIfMatch(),
requestConditions.getIfUnmodifiedSince(), eTag,
requestConditions.getIfNoneMatch(), requestConditions.getTagsConditions(), null,
customerProvidedKey, context)
.map(response -> {
info.setETag(ModelHelper.getETag(response.getHeaders()));
return new ReliableDownload(response, options, info, updatedInfo ->
downloadHelper(new BlobRange(updatedInfo.getOffset(), updatedInfo.getCount()), options,
new BlobRequestConditions().setIfMatch(info.getETag()), false, context));
});
customerProvidedKey, context);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* This class automatically retries failed reads from a blob download stream.
Expand All @@ -30,7 +32,9 @@
* will be resumed from the point where the download failed. This allows for the download to be consumed as one
* continuous stream.
* </p>
* @deprecated use {@link com.azure.core.util.FluxUtil#createRetriableDownloadFlux(Supplier, BiFunction, int)} instead.
*/
@Deprecated
final class ReliableDownload {
private final ClientLogger logger = new ClientLogger(ReliableDownload.class);

Expand Down
Loading