From 469beaf6a8f1b0b2cf2790f5a1f9282bfdb2f45c Mon Sep 17 00:00:00 2001 From: Kamil Sobol Date: Fri, 4 Jun 2021 09:20:02 -0700 Subject: [PATCH 1/7] first draft. --- sdk/storage/azure-storage-blob-batch/pom.xml | 2 +- .../azure-storage-blob-changefeed/pom.xml | 2 +- .../azure-storage-blob-cryptography/pom.xml | 2 +- sdk/storage/azure-storage-blob-nio/pom.xml | 2 +- sdk/storage/azure-storage-blob/pom.xml | 2 +- .../blob/specialized/BlobAsyncClientBase.java | 65 +++++++++++++------ .../blob/specialized/ReliableDownload.java | 4 ++ sdk/storage/azure-storage-common/pom.xml | 2 +- .../azure-storage-file-datalake/pom.xml | 2 +- sdk/storage/azure-storage-file-share/pom.xml | 2 +- .../azure-storage-internal-avro/pom.xml | 2 +- sdk/storage/azure-storage-queue/pom.xml | 2 +- 12 files changed, 58 insertions(+), 31 deletions(-) diff --git a/sdk/storage/azure-storage-blob-batch/pom.xml b/sdk/storage/azure-storage-blob-batch/pom.xml index 251ae27e25480..e6e14a06114ff 100644 --- a/sdk/storage/azure-storage-blob-batch/pom.xml +++ b/sdk/storage/azure-storage-blob-batch/pom.xml @@ -55,7 +55,7 @@ com.azure azure-core - 1.16.0 + 1.17.0-beta.1 com.azure diff --git a/sdk/storage/azure-storage-blob-changefeed/pom.xml b/sdk/storage/azure-storage-blob-changefeed/pom.xml index 11ecec1ac3408..1582739b31513 100644 --- a/sdk/storage/azure-storage-blob-changefeed/pom.xml +++ b/sdk/storage/azure-storage-blob-changefeed/pom.xml @@ -55,7 +55,7 @@ com.azure azure-core - 1.16.0 + 1.17.0-beta.1 com.azure diff --git a/sdk/storage/azure-storage-blob-cryptography/pom.xml b/sdk/storage/azure-storage-blob-cryptography/pom.xml index 95825cfc323aa..622979d0e9012 100644 --- a/sdk/storage/azure-storage-blob-cryptography/pom.xml +++ b/sdk/storage/azure-storage-blob-cryptography/pom.xml @@ -41,7 +41,7 @@ com.azure azure-core - 1.16.0 + 1.17.0-beta.1 com.azure diff --git a/sdk/storage/azure-storage-blob-nio/pom.xml b/sdk/storage/azure-storage-blob-nio/pom.xml index 2e67e0d33ceb3..e3610092b4c3c 100644 --- a/sdk/storage/azure-storage-blob-nio/pom.xml +++ b/sdk/storage/azure-storage-blob-nio/pom.xml @@ -54,7 +54,7 @@ com.azure azure-core - 1.16.0 + 1.17.0-beta.1 com.azure diff --git a/sdk/storage/azure-storage-blob/pom.xml b/sdk/storage/azure-storage-blob/pom.xml index e82cd56ae7fd1..2a7518c01e113 100644 --- a/sdk/storage/azure-storage-blob/pom.xml +++ b/sdk/storage/azure-storage-blob/pom.xml @@ -55,7 +55,7 @@ com.azure azure-core - 1.16.0 + 1.17.0-beta.1 com.azure diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java index a4c4902d264d2..e978f07bd96cf 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java @@ -10,6 +10,7 @@ import com.azure.core.http.RequestConditions; import com.azure.core.http.rest.Response; import com.azure.core.http.rest.SimpleResponse; +import com.azure.core.http.rest.StreamResponse; import com.azure.core.util.BinaryData; import com.azure.core.util.Context; import com.azure.core.util.CoreUtils; @@ -28,6 +29,7 @@ import com.azure.storage.blob.implementation.AzureBlobStorageImplBuilder; import com.azure.storage.blob.implementation.models.BlobTag; import com.azure.storage.blob.implementation.models.BlobTags; +import com.azure.storage.blob.implementation.models.BlobsDownloadHeaders; import com.azure.storage.blob.implementation.models.BlobsGetAccountInfoHeaders; import com.azure.storage.blob.implementation.models.BlobsGetPropertiesHeaders; import com.azure.storage.blob.implementation.models.BlobsStartCopyFromURLHeaders; @@ -99,6 +101,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -120,6 +123,7 @@ public class BlobAsyncClientBase { private final ClientLogger logger = new ClientLogger(BlobAsyncClientBase.class); + private static final Duration TIMEOUT_VALUE = Duration.ofSeconds(60); protected final AzureBlobStorageImpl azureBlobStorage; private final String snapshot; @@ -1002,32 +1006,51 @@ public Mono downloadContentWithResponse( Mono downloadStreamWithResponse(BlobRange range, DownloadRetryOptions options, BlobRequestConditions requestConditions, boolean getRangeContentMd5, Context context) { - return downloadHelper(range, options, requestConditions, getRangeContentMd5, context) - .map(response -> new BlobDownloadAsyncResponse(response.getRequest(), response.getStatusCode(), - response.getHeaders(), response.getValue(), response.getDeserializedHeaders())); - } - - private Mono downloadHelper(BlobRange range, DownloadRetryOptions options, - BlobRequestConditions requestConditions, boolean getRangeContentMd5, Context context) { - range = range == null ? new BlobRange(0) : range; + BlobRange finalRange = range == null ? new BlobRange(0) : range; Boolean getMD5 = getRangeContentMd5 ? getRangeContentMd5 : null; - requestConditions = requestConditions == null ? new BlobRequestConditions() : requestConditions; - HttpGetterInfo info = new HttpGetterInfo() - .setOffset(range.getOffset()) - .setCount(range.getCount()) - .setETag(requestConditions.getIfMatch()); + return downloadRange(finalRange, requestConditions, requestConditions.getIfMatch(), getMD5, context) + .map(response -> { + String eTag = ModelHelper.getETag(response.getHeaders()); + BlobsDownloadHeaders blobsDownloadHeaders = + ModelHelper.transformBlobDownloadHeaders(response.getHeaders()); + BlobDownloadHeaders blobDownloadHeaders = ModelHelper.populateBlobDownloadHeaders( + blobsDownloadHeaders, ModelHelper.getErrorCode(response.getHeaders())); + + Flux bufferFlux = FluxUtil.createRetriableDownloadFlux( + () -> response.getValue().timeout(TIMEOUT_VALUE), + (throwable, offset) -> { + if (!(throwable instanceof IOException || throwable instanceof TimeoutException)) { + return Flux.error(throwable); + } + + Long newCount = finalRange.getCount(); + if (newCount != null) { + newCount -= (offset - finalRange.getOffset()); + } + try { + return downloadRange(new BlobRange(offset, newCount), requestConditions, eTag, getMD5, context) + .flatMapMany(StreamResponse::getValue); + } catch (Exception e) { + return Flux.error(e); + } + }, + options.getMaxRetryRequests(), + finalRange.getOffset() + ); + + return new BlobDownloadAsyncResponse(response.getRequest(), response.getStatusCode(), + response.getHeaders(), bufferFlux, blobDownloadHeaders); + }); + } + + private Mono downloadRange(BlobRange range, BlobRequestConditions requestConditions, + String eTag, Boolean getMD5, Context context) { return azureBlobStorage.getBlobs().downloadWithResponseAsync(containerName, blobName, snapshot, versionId, null, range.toHeaderValue(), requestConditions.getLeaseId(), getMD5, null, requestConditions.getIfModifiedSince(), - requestConditions.getIfUnmodifiedSince(), requestConditions.getIfMatch(), + requestConditions.getIfUnmodifiedSince(), eTag, requestConditions.getIfNoneMatch(), requestConditions.getTagsConditions(), null, - customerProvidedKey, context) - .map(response -> { - info.setETag(ModelHelper.getETag(response.getHeaders())); - return new ReliableDownload(response, options, info, updatedInfo -> - downloadHelper(new BlobRange(updatedInfo.getOffset(), updatedInfo.getCount()), options, - new BlobRequestConditions().setIfMatch(info.getETag()), false, context)); - }); + customerProvidedKey, context); } /** diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/ReliableDownload.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/ReliableDownload.java index e4beb945fa2b0..1c30ea56f84ce 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/ReliableDownload.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/ReliableDownload.java @@ -20,7 +20,9 @@ import java.nio.ByteBuffer; import java.time.Duration; import java.util.concurrent.TimeoutException; +import java.util.function.BiFunction; import java.util.function.Function; +import java.util.function.Supplier; /** * This class automatically retries failed reads from a blob download stream. @@ -30,7 +32,9 @@ * will be resumed from the point where the download failed. This allows for the download to be consumed as one * continuous stream. *

+ * @deprecated use {@link com.azure.core.util.FluxUtil#createRetriableDownloadFlux(Supplier, BiFunction, int)} instead. */ +@Deprecated final class ReliableDownload { private final ClientLogger logger = new ClientLogger(ReliableDownload.class); diff --git a/sdk/storage/azure-storage-common/pom.xml b/sdk/storage/azure-storage-common/pom.xml index 964b85ba9228d..b336eac75dc6f 100644 --- a/sdk/storage/azure-storage-common/pom.xml +++ b/sdk/storage/azure-storage-common/pom.xml @@ -41,7 +41,7 @@ com.azure azure-core - 1.16.0 + 1.17.0-beta.1 com.azure diff --git a/sdk/storage/azure-storage-file-datalake/pom.xml b/sdk/storage/azure-storage-file-datalake/pom.xml index 2b61a30f0a8bd..627718c5e434c 100644 --- a/sdk/storage/azure-storage-file-datalake/pom.xml +++ b/sdk/storage/azure-storage-file-datalake/pom.xml @@ -55,7 +55,7 @@ com.azure azure-core - 1.16.0 + 1.17.0-beta.1 com.azure diff --git a/sdk/storage/azure-storage-file-share/pom.xml b/sdk/storage/azure-storage-file-share/pom.xml index 7316ccc3d0293..417510e786f9e 100644 --- a/sdk/storage/azure-storage-file-share/pom.xml +++ b/sdk/storage/azure-storage-file-share/pom.xml @@ -36,7 +36,7 @@ com.azure azure-core - 1.16.0 + 1.17.0-beta.1 com.azure diff --git a/sdk/storage/azure-storage-internal-avro/pom.xml b/sdk/storage/azure-storage-internal-avro/pom.xml index 76807a0dd76da..1db7025544b60 100644 --- a/sdk/storage/azure-storage-internal-avro/pom.xml +++ b/sdk/storage/azure-storage-internal-avro/pom.xml @@ -41,7 +41,7 @@ com.azure azure-core - 1.16.0 + 1.17.0-beta.1 com.azure diff --git a/sdk/storage/azure-storage-queue/pom.xml b/sdk/storage/azure-storage-queue/pom.xml index 944c56f45c0ca..a90ac142b1ebb 100644 --- a/sdk/storage/azure-storage-queue/pom.xml +++ b/sdk/storage/azure-storage-queue/pom.xml @@ -36,7 +36,7 @@ com.azure azure-core - 1.16.0 + 1.17.0-beta.1 com.azure From edf51d7986f33e029c24afe3c9219dfcac3a96bc Mon Sep 17 00:00:00 2001 From: Kamil Sobol Date: Fri, 4 Jun 2021 09:24:58 -0700 Subject: [PATCH 2/7] indent. --- .../azure/storage/blob/specialized/BlobAsyncClientBase.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java index e978f07bd96cf..681f3c4bde184 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java @@ -1029,8 +1029,9 @@ Mono downloadStreamWithResponse(BlobRange range, Down newCount -= (offset - finalRange.getOffset()); } try { - return downloadRange(new BlobRange(offset, newCount), requestConditions, eTag, getMD5, context) - .flatMapMany(StreamResponse::getValue); + return downloadRange( + new BlobRange(offset, newCount), requestConditions, eTag, getMD5, context) + .flatMapMany(StreamResponse::getValue); } catch (Exception e) { return Flux.error(e); } From eed33d724ff297157a997c0aeb751b77ce0226dc Mon Sep 17 00:00:00 2001 From: Kamil Sobol Date: Fri, 4 Jun 2021 09:32:02 -0700 Subject: [PATCH 3/7] npes. --- .../storage/blob/specialized/BlobAsyncClientBase.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java index 681f3c4bde184..919c6d2b5ca88 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java @@ -1008,8 +1008,11 @@ Mono downloadStreamWithResponse(BlobRange range, Down BlobRequestConditions requestConditions, boolean getRangeContentMd5, Context context) { BlobRange finalRange = range == null ? new BlobRange(0) : range; Boolean getMD5 = getRangeContentMd5 ? getRangeContentMd5 : null; + BlobRequestConditions finalRequestConditions = + requestConditions == null ? new BlobRequestConditions() : requestConditions; + DownloadRetryOptions finalOptions = (options == null) ? new DownloadRetryOptions() : options; - return downloadRange(finalRange, requestConditions, requestConditions.getIfMatch(), getMD5, context) + return downloadRange(finalRange, finalRequestConditions, finalRequestConditions.getIfMatch(), getMD5, context) .map(response -> { String eTag = ModelHelper.getETag(response.getHeaders()); BlobsDownloadHeaders blobsDownloadHeaders = @@ -1030,13 +1033,13 @@ Mono downloadStreamWithResponse(BlobRange range, Down } try { return downloadRange( - new BlobRange(offset, newCount), requestConditions, eTag, getMD5, context) + new BlobRange(offset, newCount), finalRequestConditions, eTag, getMD5, context) .flatMapMany(StreamResponse::getValue); } catch (Exception e) { return Flux.error(e); } }, - options.getMaxRetryRequests(), + finalOptions.getMaxRetryRequests(), finalRange.getOffset() ); From c500bdaa32ff45411c6b3d007fa3c03731553071 Mon Sep 17 00:00:00 2001 From: Kamil Sobol Date: Fri, 4 Jun 2021 10:10:20 -0700 Subject: [PATCH 4/7] timeout. --- .../com/azure/storage/blob/specialized/BlobAsyncClientBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java index 919c6d2b5ca88..e38da745cb1c2 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java @@ -1034,7 +1034,7 @@ Mono downloadStreamWithResponse(BlobRange range, Down try { return downloadRange( new BlobRange(offset, newCount), finalRequestConditions, eTag, getMD5, context) - .flatMapMany(StreamResponse::getValue); + .flatMapMany(r -> r.getValue().timeout(TIMEOUT_VALUE)); } catch (Exception e) { return Flux.error(e); } From 2d1d292c644ff3d5d36a37224115eb0772af2b70 Mon Sep 17 00:00:00 2001 From: Kamil Sobol Date: Fri, 4 Jun 2021 10:43:06 -0700 Subject: [PATCH 5/7] fix empty case. --- .../com/azure/storage/blob/specialized/BlobAsyncClientBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java index e38da745cb1c2..40910037625a6 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java @@ -1041,7 +1041,7 @@ Mono downloadStreamWithResponse(BlobRange range, Down }, finalOptions.getMaxRetryRequests(), finalRange.getOffset() - ); + ).switchIfEmpty(Flux.just(ByteBuffer.wrap(new byte[0]))); return new BlobDownloadAsyncResponse(response.getRequest(), response.getStatusCode(), response.getHeaders(), bufferFlux, blobDownloadHeaders); From 21f27adbd11f1e2d79c03be26eb973df35e1fa29 Mon Sep 17 00:00:00 2001 From: Kamil Sobol Date: Fri, 4 Jun 2021 11:17:07 -0700 Subject: [PATCH 6/7] checkstyle --- .../com/azure/storage/blob/specialized/BlobAsyncClientBase.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java index 40910037625a6..e6661c1c78d75 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java @@ -23,7 +23,6 @@ import com.azure.storage.blob.BlobContainerClientBuilder; import com.azure.storage.blob.BlobServiceAsyncClient; import com.azure.storage.blob.BlobServiceVersion; -import com.azure.storage.blob.HttpGetterInfo; import com.azure.storage.blob.ProgressReporter; import com.azure.storage.blob.implementation.AzureBlobStorageImpl; import com.azure.storage.blob.implementation.AzureBlobStorageImplBuilder; From fd401b46507559dec10741d813a1e33b969a8649 Mon Sep 17 00:00:00 2001 From: Kamil Sobol Date: Fri, 4 Jun 2021 12:55:29 -0700 Subject: [PATCH 7/7] tests --- .../blob/specialized/BlobAsyncClientBase.java | 29 ++- .../specialized/DownloadResponseMockFlux.java | 201 +++++++++--------- .../specialized/DownloadResponseTest.groovy | 83 ++------ 3 files changed, 144 insertions(+), 169 deletions(-) diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java index e6661c1c78d75..a66ec7984c104 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java @@ -1019,6 +1019,18 @@ Mono downloadStreamWithResponse(BlobRange range, Down BlobDownloadHeaders blobDownloadHeaders = ModelHelper.populateBlobDownloadHeaders( blobsDownloadHeaders, ModelHelper.getErrorCode(response.getHeaders())); + /* + If the customer did not specify a count, they are reading to the end of the blob. Extract this value + from the response for better book keeping towards the end. + */ + long finalCount; + if (finalRange.getCount() == null) { + long blobLength = BlobAsyncClientBase.getBlobLength(blobDownloadHeaders); + finalCount = blobLength - finalRange.getOffset(); + } else { + finalCount = finalRange.getCount(); + } + Flux bufferFlux = FluxUtil.createRetriableDownloadFlux( () -> response.getValue().timeout(TIMEOUT_VALUE), (throwable, offset) -> { @@ -1026,10 +1038,21 @@ Mono downloadStreamWithResponse(BlobRange range, Down return Flux.error(throwable); } - Long newCount = finalRange.getCount(); - if (newCount != null) { - newCount -= (offset - finalRange.getOffset()); + long newCount = finalCount - (offset - finalRange.getOffset()); + + /* + It is possible that the network stream will throw an error after emitting all data but before + completing. Issuing a retry at this stage would leave the download in a bad state with incorrect count + and offset values. Because we have read the intended amount of data, we can ignore the error at the end + of the stream. + */ + if (newCount == 0) { + logger.warning("Exception encountered in ReliableDownload after all data read from the network but " + + "but before stream signaled completion. Returning success as all data was downloaded. " + + "Exception message: " + throwable.getMessage()); + return Flux.empty(); } + try { return downloadRange( new BlobRange(offset, newCount), finalRequestConditions, eTag, getMD5, context) diff --git a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/DownloadResponseMockFlux.java b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/DownloadResponseMockFlux.java index c308e78077d25..541127c9a851d 100644 --- a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/DownloadResponseMockFlux.java +++ b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/DownloadResponseMockFlux.java @@ -3,11 +3,12 @@ package com.azure.storage.blob.specialized; +import com.azure.core.http.HttpHeader; import com.azure.core.http.HttpHeaders; import com.azure.core.http.HttpResponse; -import com.azure.core.http.rest.StreamResponse; +import com.azure.core.http.policy.HttpPipelinePolicy; +import com.azure.core.test.http.MockHttpResponse; import com.azure.storage.blob.APISpec; -import com.azure.storage.blob.HttpGetterInfo; import com.azure.storage.blob.models.BlobStorageException; import com.azure.storage.blob.models.DownloadRetryOptions; import reactor.core.publisher.Flux; @@ -26,7 +27,6 @@ class DownloadResponseMockFlux { static final int DR_TEST_SCENARIO_MAX_RETRIES_EXCEEDED = 3; // We appropriate honor max retries static final int DR_TEST_SCENARIO_NON_RETRYABLE_ERROR = 4; // We will not retry on a non-retryable error static final int DR_TEST_SCENARIO_ERROR_GETTER_MIDDLE = 6; // Throwing an error from the getter - static final int DR_TEST_SCENARIO_INFO_TEST = 8; // Initial info values are honored static final int DR_TEST_SCENARIO_NO_MULTIPLE_SUBSCRIPTION = 9; // We do not subscribe to the same stream twice static final int DR_TEST_SCENARIO_TIMEOUT = 10; // ReliableDownload with timeout after not receiving items for 60s static final int DR_TEST_SCENARIO_ERROR_AFTER_ALL_DATA = 11; // Don't actually issue another retry if we've read all the data and the source failed at the end @@ -35,7 +35,6 @@ class DownloadResponseMockFlux { private final ByteBuffer scenarioData; private int tryNumber; - private HttpGetterInfo info; private DownloadRetryOptions options; private boolean subscribed = false; // Only used for multiple subscription test. @@ -54,7 +53,6 @@ class DownloadResponseMockFlux { case DR_TEST_SCENARIO_MAX_RETRIES_EXCEEDED: case DR_TEST_SCENARIO_NON_RETRYABLE_ERROR: case DR_TEST_SCENARIO_ERROR_GETTER_MIDDLE: - case DR_TEST_SCENARIO_INFO_TEST: case DR_TEST_SCENARIO_TIMEOUT: this.scenarioData = apiSpec.getRandomData(1024); break; @@ -66,12 +64,11 @@ class DownloadResponseMockFlux { /* For internal construction on NO_MULTIPLE_SUBSCRIPTION test */ - DownloadResponseMockFlux(int scenario, int tryNumber, ByteBuffer scenarioData, HttpGetterInfo info, + DownloadResponseMockFlux(int scenario, int tryNumber, ByteBuffer scenarioData, DownloadRetryOptions options) { this.scenario = scenario; this.tryNumber = tryNumber; this.scenarioData = scenarioData; - this.info = info; this.options = options; } @@ -83,12 +80,7 @@ int getTryNumber() { return this.tryNumber; } - DownloadResponseMockFlux setOptions(DownloadRetryOptions options) { - this.options = options; - return this; - } - - private Flux getDownloadStream() { + private Flux getDownloadStream(long offset, Long count) { switch (this.scenario) { case DR_TEST_SCENARIO_SUCCESSFUL_ONE_CHUNK: return Flux.just(scenarioData.duplicate()); @@ -112,8 +104,8 @@ private Flux getDownloadStream() { case DR_TEST_SCENARIO_SUCCESSFUL_STREAM_FAILURES: if (this.tryNumber <= 3) { // tryNumber is 1 indexed, so we have to sub 1. - if (this.info.getOffset() != (this.tryNumber - 1) * 256 - || this.info.getCount() != this.scenarioData.remaining() - (this.tryNumber - 1) * 256) { + if (offset != (this.tryNumber - 1) * 256 + || count != this.scenarioData.remaining() - (this.tryNumber - 1) * 256) { return Flux.error(new IllegalArgumentException("Info values are incorrect.")); } @@ -128,8 +120,8 @@ private Flux getDownloadStream() { return dataStream.concatWith(Flux.error(e)); } - if (this.info.getOffset() != (this.tryNumber - 1) * 256 - || this.info.getCount() != this.scenarioData.remaining() - (this.tryNumber - 1) * 256) { + if (offset != (this.tryNumber - 1) * 256 + || count != this.scenarioData.remaining() - (this.tryNumber - 1) * 256) { return Flux.error(new IllegalArgumentException("Info values are incorrect.")); } ByteBuffer toSend = this.scenarioData.duplicate(); @@ -156,18 +148,6 @@ private Flux getDownloadStream() { ? Flux.error(new IOException()) : Flux.error(new IllegalArgumentException("Retried after getter error.")); - case DR_TEST_SCENARIO_INFO_TEST: - switch (this.tryNumber) { - case 1: // Test the value of info when getting the initial response. - case 2: // Test the value of info when getting an intermediate response. - return Flux.error(new IOException()); - case 3: - // All calls to getter checked. Exit. This test does not check for data. - return Flux.empty(); - default: - return Flux.error(new IllegalArgumentException("Invalid try number.")); - } - case DR_TEST_SCENARIO_TIMEOUT: return Flux.just(scenarioData.duplicate()).delayElements(Duration.ofSeconds(61)); @@ -176,79 +156,98 @@ private Flux getDownloadStream() { } } - Mono getter(HttpGetterInfo info) { - this.tryNumber++; - this.info = info; - long contentUpperBound = info.getCount() == null - ? this.scenarioData.remaining() - 1 : info.getOffset() + info.getCount() - 1; - StreamResponse rawResponse = new StreamResponse(null, 200, new HttpHeaders().put("Content-Range", String.format("%d-%d/%d", - info.getOffset(), contentUpperBound, this.scenarioData.remaining())), this.getDownloadStream()); - ReliableDownload response = new ReliableDownload(rawResponse, options, info, this::getter); - - switch (this.scenario) { - case DR_TEST_SCENARIO_ERROR_GETTER_MIDDLE: - switch (this.tryNumber) { - case 1: - return Mono.just(response); - case 2: - /* - This validates that we don't retry in the getter even if it's a retryable error from the - service. - */ - throw new BlobStorageException("Message", new HttpResponse(null) { - @Override - public int getStatusCode() { - return 500; - } - - @Override - public String getHeaderValue(String s) { - return null; - } - - @Override - public HttpHeaders getHeaders() { - return null; - } - - @Override - public Flux getBody() { - return null; - } - - @Override - public Mono getBodyAsByteArray() { - return null; - } - - @Override - public Mono getBodyAsString() { - return null; - } - - @Override - public Mono getBodyAsString(Charset charset) { - return null; - } - }, null); - default: - throw new IllegalArgumentException("Retried after error in getter"); + HttpPipelinePolicy asPolicy() { + return (context, next) -> { + tryNumber++; + HttpHeader rangeHeader = context.getHttpRequest().getHeaders().get("x-ms-range"); + String eTag = context.getHttpRequest().getHeaders().getValue("if-match"); + long offset = 0; + Long count = null; + if (rangeHeader != null) { + String[] ranges = rangeHeader.getValue().replace("bytes=", "").split("-"); + offset = Long.parseLong(ranges[0]); + if (ranges.length > 1) { + count = Long.parseLong(ranges[1]) - offset + 1; } - case DR_TEST_SCENARIO_INFO_TEST: - // We also test that the info is updated in DR_TEST_SCENARIO_SUCCESSFUL_STREAM_FAILURES. - if (info.getCount() != 10 || info.getOffset() != 20 || !info.getETag().equals("etag")) { - throw new IllegalArgumentException("Info values incorrect"); + } + long finalOffset = offset; + Long finalCount = count; + + MockHttpResponse response = new MockHttpResponse(null, 200) { + @Override + public Flux getBody() { + return getDownloadStream(finalOffset, finalCount); } - return Mono.just(response); - case DR_TEST_SCENARIO_NO_MULTIPLE_SUBSCRIPTION: - // Construct a new flux each time to mimic getting a new download stream. - DownloadResponseMockFlux nextFlux = new DownloadResponseMockFlux(this.scenario, this.tryNumber, - this.scenarioData, this.info, this.options); - rawResponse = new StreamResponse(null, 200, new HttpHeaders(), nextFlux.getDownloadStream()); - response = new ReliableDownload(rawResponse, options, info, this::getter); - return Mono.just(response); - default: - return Mono.just(response); - } + }; + long contentUpperBound = finalCount == null + ? this.scenarioData.remaining() - 1 : finalOffset + finalCount - 1; + response.addHeader("Content-Range", String.format("%d-%d/%d", + finalOffset, contentUpperBound, this.scenarioData.remaining())); + + switch (scenario) { + case DR_TEST_SCENARIO_ERROR_GETTER_MIDDLE: + switch (tryNumber) { + case 1: + return Mono.just(response); + case 2: + /* + This validates that we don't retry in the getter even if it's a retryable error from the + service. + */ + throw new BlobStorageException("Message", new HttpResponse(null) { + @Override + public int getStatusCode() { + return 500; + } + + @Override + public String getHeaderValue(String s) { + return null; + } + + @Override + public HttpHeaders getHeaders() { + return null; + } + + @Override + public Flux getBody() { + return null; + } + + @Override + public Mono getBodyAsByteArray() { + return null; + } + + @Override + public Mono getBodyAsString() { + return null; + } + + @Override + public Mono getBodyAsString(Charset charset) { + return null; + } + }, null); + default: + throw new IllegalArgumentException("Retried after error in getter"); + } + case DR_TEST_SCENARIO_NO_MULTIPLE_SUBSCRIPTION: + // Construct a new flux each time to mimic getting a new download stream. + // Construct a new flux each time to mimic getting a new download stream. + DownloadResponseMockFlux nextFlux = new DownloadResponseMockFlux(this.scenario, this.tryNumber, + this.scenarioData, this.options); + MockHttpResponse newResponse = new MockHttpResponse(null, 200) { + @Override + public Flux getBody() { + return nextFlux.getDownloadStream(finalOffset, finalCount); + } + }; + return Mono.just(newResponse); + default: + return Mono.just(response); + } + }; } } diff --git a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/DownloadResponseTest.groovy b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/DownloadResponseTest.groovy index cba9469e2eda3..4ee065002eb1c 100644 --- a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/DownloadResponseTest.groovy +++ b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/DownloadResponseTest.groovy @@ -12,6 +12,8 @@ import com.azure.core.http.policy.HttpPipelinePolicy import com.azure.core.util.FluxUtil import com.azure.storage.blob.APISpec import com.azure.storage.blob.HttpGetterInfo +import com.azure.storage.blob.models.BlobRange +import com.azure.storage.blob.models.BlobRequestConditions import com.azure.storage.blob.models.BlobStorageException import com.azure.storage.blob.models.DownloadRetryOptions import reactor.core.Exceptions @@ -73,15 +75,15 @@ class DownloadResponseTest extends APISpec { setup: DownloadResponseMockFlux flux = new DownloadResponseMockFlux(scenario, this) - HttpGetterInfo info = new HttpGetterInfo() - .setOffset(0) - .setCount(setCount ? flux.getScenarioData().remaining() : null) - .setETag("etag") - DownloadRetryOptions options = new DownloadRetryOptions().setMaxRetryRequests(5) + def bsc = getServiceClientBuilder(env.primaryAccount.credential, primaryBlobServiceClient.getAccountUrl(), flux.asPolicy()).buildAsyncClient() + def cc = bsc.getBlobContainerAsyncClient(containerName) + def bu = cc.getBlobAsyncClient(bu.getBlobName()).getBlockBlobAsyncClient() + BlobRange range = setCount ? new BlobRange(0, flux.getScenarioData().remaining()) : new BlobRange(0); + when: - ReliableDownload response = flux.setOptions(options).getter(info).block() + def response = bu.downloadStreamWithResponse(range, options, null, false).block() then: FluxUtil.collectBytesInByteBufferStream(response.getValue()).block() == flux.getScenarioData().array() @@ -103,10 +105,13 @@ class DownloadResponseTest extends APISpec { setup: DownloadResponseMockFlux flux = new DownloadResponseMockFlux(scenario, this) DownloadRetryOptions options = new DownloadRetryOptions().setMaxRetryRequests(5) - HttpGetterInfo info = new HttpGetterInfo().setETag("etag") + + def bsc = getServiceClientBuilder(env.primaryAccount.credential, primaryBlobServiceClient.getAccountUrl(), flux.asPolicy()).buildAsyncClient() + def cc = bsc.getBlobContainerAsyncClient(containerName) + def bu = cc.getBlobAsyncClient(bu.getBlobName()).getBlockBlobAsyncClient() when: - ReliableDownload response = flux.setOptions(options).getter(info).block() + def response = bu.downloadStreamWithResponse(null, options, null, false).block() response.getValue().blockFirst() then: @@ -124,71 +129,19 @@ class DownloadResponseTest extends APISpec { DownloadResponseMockFlux.DR_TEST_SCENARIO_ERROR_GETTER_MIDDLE | BlobStorageException | 2 } - @Unroll - def "Info null IA"() { - setup: - DownloadResponseMockFlux flux = new DownloadResponseMockFlux(DownloadResponseMockFlux.DR_TEST_SCENARIO_SUCCESSFUL_ONE_CHUNK, this) - def info = null - - when: - new ReliableDownload(null, null, info, { HttpGetterInfo newInfo -> flux.getter(newInfo) }) - - then: - thrown(NullPointerException) - } - - def "Options IA"() { - when: - new DownloadRetryOptions().setMaxRetryRequests(-1) - - then: - thrown(IllegalArgumentException) - } - - def "Getter IA"() { - when: - new ReliableDownload(null, null, new HttpGetterInfo().setETag("etag"), null) - - then: - thrown(NullPointerException) - } - - def "Info"() { - setup: - DownloadResponseMockFlux flux = new DownloadResponseMockFlux(DownloadResponseMockFlux.DR_TEST_SCENARIO_INFO_TEST, this) - HttpGetterInfo info = new HttpGetterInfo() - .setOffset(20) - .setCount(10) - .setETag("etag") - - DownloadRetryOptions options = new DownloadRetryOptions().setMaxRetryRequests(5) - - when: - ReliableDownload response = flux.setOptions(options).getter(info).block() - response.getValue().blockFirst() - - then: - flux.getTryNumber() == 3 - } - - def "Info count IA"() { - when: - new HttpGetterInfo().setCount(-1) - - then: - thrown(IllegalArgumentException) - } - @Unroll def "Timeout"() { setup: DownloadResponseMockFlux flux = new DownloadResponseMockFlux(DownloadResponseMockFlux.DR_TEST_SCENARIO_TIMEOUT, this) DownloadRetryOptions options = new DownloadRetryOptions().setMaxRetryRequests(retryCount) - HttpGetterInfo info = new HttpGetterInfo().setETag("etag") + + def bsc = getServiceClientBuilder(env.primaryAccount.credential, primaryBlobServiceClient.getAccountUrl(), flux.asPolicy()).buildAsyncClient() + def cc = bsc.getBlobContainerAsyncClient(containerName) + def bu = cc.getBlobAsyncClient(bu.getBlobName()).getBlockBlobAsyncClient() when: - def bufferMono = flux.setOptions(options).getter(info) + def bufferMono = bu.downloadStreamWithResponse(null, options, null, false) .flatMapMany({ it.getValue() }) then: