Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding basic FeedRanges API #17570

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
21e25e7
Initial draft of FeedRange artifacts
FabianMeiswinkel Nov 4, 2020
6e69300
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Nov 4, 2020
40ef415
Iterating on FeedRange Apis
FabianMeiswinkel Nov 5, 2020
54bed2e
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Nov 5, 2020
58f98e7
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Nov 11, 2020
46662e0
Adding public surface area
FabianMeiswinkel Nov 12, 2020
03617a4
Adding FeedRange unit tests
FabianMeiswinkel Nov 12, 2020
b7de6b3
Adding test FeedRangePKRangeId_GetEffectiveRangesAsync_Refresh
FabianMeiswinkel Nov 13, 2020
4095bc5
Adding test FeedRangePKRangeId_GetEffectiveRangesAsync_Null
FabianMeiswinkel Nov 13, 2020
8e864be
Adding test feedRangeEPK_getPartitionKeyRangesAsync
FabianMeiswinkel Nov 13, 2020
4ebd7cb
Adding test feedRangePK_getPartitionKeyRangesAsync
FabianMeiswinkel Nov 13, 2020
2e6eb79
Adding test feedRangePKRangeId_getPartitionKeyRangesAsync
FabianMeiswinkel Nov 13, 2020
dc4c66c
Adding request visitor unit tests
FabianMeiswinkel Nov 13, 2020
0ef3170
Finishing FeedRange tests
FabianMeiswinkel Nov 13, 2020
ae9dd96
Cleanup and prettifying
FabianMeiswinkel Nov 13, 2020
4811e50
Prettifying feed range tests
FabianMeiswinkel Nov 13, 2020
6515ff5
Fixes and new test for Conatiner.getFeedRanges()
FabianMeiswinkel Nov 13, 2020
30e162e
Addressing some SpotBug violations
FabianMeiswinkel Nov 13, 2020
b08488a
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Nov 16, 2020
9a4be27
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Nov 17, 2020
8ac208a
Reacting to code review feedback
FabianMeiswinkel Nov 17, 2020
ee60826
Update sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/impleme…
FabianMeiswinkel Nov 18, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.PartitionKey;
Expand Down Expand Up @@ -1124,4 +1125,15 @@ private Mono<ThroughputResponse> replaceThroughputInternal(Mono<CosmosContainerR
ItemDeserializer getItemDeserializer() {
return getDatabase().getDocClientWrapper().getItemDeserializer();
}

/**
* Obtains a list of {@link FeedRange} that can be used to parallelize Feed
* operations.
*
* @return An unmodifiable list of {@link FeedRange}
*/
@Beta(Beta.SinceVersion.V4_9_0)
public Mono<List<FeedRange>> getFeedRanges() {
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved
return this.getDatabase().getDocClientWrapper().getFeedRanges(getLink());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.SqlQuerySpec;
Expand Down Expand Up @@ -556,4 +557,23 @@ private <T> CosmosPagedIterable<T> getCosmosPagedIterable(CosmosPagedFlux<T> cos
return UtilBridgeInternal.createCosmosPagedIterable(cosmosPagedFlux);
}

/**
* Obtains a list of {@link FeedRange} that can be used to parallelize Feed
* operations.
*
* @return An unmodifiable list of {@link FeedRange}
*/
@Beta(Beta.SinceVersion.V4_9_0)
public List<FeedRange> getFeedRanges() {
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved
try {
return asyncContainer.getFeedRanges().block();
} catch (Exception ex) {
final Throwable throwable = Exceptions.unwrap(ex);
if (throwable instanceof CosmosException) {
throw (CosmosException) throwable;
} else {
throw ex;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.models.CosmosItemIdentity;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.SqlQuerySpec;
Expand Down Expand Up @@ -668,6 +669,14 @@ Flux<FeedResponse<Document>> queryDocumentChangeFeed(String collectionLink,
*/
Flux<FeedResponse<PartitionKeyRange>> readPartitionKeyRanges(String collectionLink, CosmosQueryRequestOptions options);

/**
* Gets the feed ranges of a container.
*
* @param collectionLink the link to the parent document collection.
* @return a {@link List} of @{link FeedRange} containing the feed ranges of a container.
*/
Mono<List<FeedRange>> getFeedRanges(String collectionLink);

/**
* Creates a stored procedure.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@

import com.azure.cosmos.implementation.apachecommons.collections.list.UnmodifiableList;
import com.azure.cosmos.implementation.directconnectivity.WebExceptionUtility;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.CosmosDiagnostics;
import com.azure.cosmos.ThrottlingRetryOptions;
import io.netty.handler.timeout.ReadTimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ public static final class Properties {
public static final String KeyWrapMetadataValue = "value";
public static final String EncryptedInfo = "_ei";

// Feed Ranges
public static final String RANGE = "Range";
public static final String FEED_RANGE_PARTITION_KEY = "PartitionKey";
public static final String FEED_RANGE_PARTITION_KEY_RANGE_ID = "PKRangeId";

}

public static final class UrlEncodingInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,9 @@ public static class Versions {
public static class StatusCodes {
public static final int OK = 200;
public static final int NOT_MODIFIED = 304;
// Success
public static final int MINIMUM_SUCCESS_STATUSCODE = 200;
public static final int MAXIMUM_SUCCESS_STATUSCODE = 299;
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved
// Client error
public static final int MINIMUM_STATUSCODE_AS_ERROR_GATEWAY = 400;
public static final int BADREQUEST = 400;
Expand Down Expand Up @@ -323,5 +326,6 @@ public static class SubStatusCodes {
public static class HeaderValues {
public static final String NO_CACHE = "no-cache";
public static final String PREFER_RETURN_MINIMAL = "return=minimal";
public static final String IF_NONE_MATCH_ALL = "*";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,62 +43,4 @@ public interface IRetryPolicy {
void addStatusAndSubStatusCode(Integer index, int statusCode, int subStatusCode);

List<int[]> getStatusAndSubStatusCodes();

class ShouldRetryResult {
/// <summary>
/// How long to wait before next retry. 0 indicates retry immediately.
/// </summary>
public final Duration backOffTime;
public final Exception exception;
public boolean shouldRetry;
public final Quadruple<Boolean, Boolean, Duration, Integer> policyArg;

private ShouldRetryResult(Duration dur, Exception e, boolean shouldRetry,
Quadruple<Boolean, Boolean, Duration, Integer> policyArg) {
this.backOffTime = dur;
this.exception = e;
this.shouldRetry = shouldRetry;
this.policyArg = policyArg;
}

public static ShouldRetryResult retryAfter(Duration dur) {
Utils.checkNotNullOrThrow(dur, "duration", "cannot be null");
return new ShouldRetryResult(dur, null, true, null);
}

public static ShouldRetryResult retryAfter(Duration dur,
Quadruple<Boolean, Boolean, Duration, Integer> policyArg) {
Utils.checkNotNullOrThrow(dur, "duration", "cannot be null");
return new ShouldRetryResult(dur, null, true, policyArg);
}

public static ShouldRetryResult error(Exception e) {
Utils.checkNotNullOrThrow(e, "exception", "cannot be null");
return new ShouldRetryResult(null, e, false, null);
}

public static ShouldRetryResult noRetry() {
return new ShouldRetryResult(null, null, false, null);
}

public static ShouldRetryResult noRetry(Quadruple<Boolean, Boolean, Duration, Integer> policyArg) {
return new ShouldRetryResult(
null,
null,
false,
policyArg);
}

public void throwIfDoneTrying(Exception capturedException) throws Exception {
if (this.shouldRetry) {
return;
}

if (this.exception == null) {
throw capturedException;
} else {
throw this.exception;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,18 @@ public String getMinInclusive() {
return super.getString("minInclusive");
}

public void setMinInclusive(String minInclusive) {
public PartitionKeyRange setMinInclusive(String minInclusive) {
BridgeInternal.setProperty(this, "minInclusive", minInclusive);
return this;
}

public String getMaxExclusive() {
return super.getString("maxExclusive");
}

public void setMaxExclusive(String maxExclusive) {
public PartitionKeyRange setMaxExclusive(String maxExclusive) {
BridgeInternal.setProperty(this, "maxExclusive", maxExclusive);
return this;
}

public Range<String> toRange() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ static Function<Flux<Throwable>, Flux<Long>> toRetryWhenFunc(IRetryPolicy policy
return Flux.error(t);
}
policy.captureStartTimeIfNotSet();
Flux<IRetryPolicy.ShouldRetryResult> shouldRetryResultFlux = policy.shouldRetry(e).flux();
Flux<ShouldRetryResult> shouldRetryResultFlux = policy.shouldRetry(e).flux();
return shouldRetryResultFlux.flatMap(s -> {
CosmosException clientException = Utils.as(e, CosmosException.class);
if(clientException != null) {
Expand Down Expand Up @@ -76,7 +76,7 @@ public static <T> Function<Throwable, Mono<T>> toRetryWithAlternateFunc(Function
return Mono.error(throwable);
}
retryPolicy.captureStartTimeIfNotSet();
Mono<IRetryPolicy.ShouldRetryResult> shouldRetryResultFlux = retryPolicy.shouldRetry(e);
Mono<ShouldRetryResult> shouldRetryResultFlux = retryPolicy.shouldRetry(e);
return shouldRetryResultFlux.flatMap(shouldRetryResult -> {
CosmosException clientException = Utils.as(e, CosmosException.class);
if(clientException != null) {
Expand Down Expand Up @@ -140,7 +140,7 @@ private static <T> Mono<T> recursiveFunc(
Function<Quadruple<Boolean, Boolean, Duration, Integer>, Mono<T>> callbackMethod,
IRetryPolicy retryPolicy,
Function<Quadruple<Boolean, Boolean, Duration, Integer>, Mono<T>> inBackoffAlternateCallbackMethod,
IRetryPolicy.ShouldRetryResult shouldRetryResult,
ShouldRetryResult shouldRetryResult,
Duration minBackoffForInBackoffCallback,
RxDocumentServiceRequest rxDocumentServiceRequest,
AddressSelector addressSelector) {
Expand All @@ -153,7 +153,7 @@ private static <T> Function<Throwable, Mono<T>> recursiveWithAlternateFunc(
Function<Quadruple<Boolean, Boolean, Duration, Integer>, Mono<T>> callbackMethod,
IRetryPolicy retryPolicy,
Function<Quadruple<Boolean, Boolean, Duration, Integer>, Mono<T>> inBackoffAlternateCallbackMethod,
IRetryPolicy.ShouldRetryResult shouldRetryResult,
ShouldRetryResult shouldRetryResult,
StopWatch stopwatch,
Duration minBackoffForInBackoffCallback,
RxDocumentServiceRequest rxDocumentServiceRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosDiagnostics;
import com.azure.cosmos.DirectConnectionConfig;
import com.azure.cosmos.implementation.apachecommons.collections.list.UnmodifiableList;
import com.azure.cosmos.implementation.batch.BatchResponseParser;
import com.azure.cosmos.implementation.batch.ServerBatchRequest;
import com.azure.cosmos.implementation.batch.SinglePartitionKeyServerBatchRequest;
Expand All @@ -26,6 +27,7 @@
import com.azure.cosmos.implementation.directconnectivity.ServerStoreModel;
import com.azure.cosmos.implementation.directconnectivity.StoreClient;
import com.azure.cosmos.implementation.directconnectivity.StoreClientFactory;
import com.azure.cosmos.implementation.feedranges.FeedRangePartitionKeyRangeImpl;
import com.azure.cosmos.implementation.http.HttpClient;
import com.azure.cosmos.implementation.http.HttpClientConfig;
import com.azure.cosmos.implementation.http.HttpHeaders;
Expand All @@ -40,8 +42,10 @@
import com.azure.cosmos.implementation.routing.PartitionKeyAndResourceTokenPair;
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper;
import com.azure.cosmos.implementation.routing.Range;
import com.azure.cosmos.models.CosmosItemIdentity;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.PartitionKey;
Expand Down Expand Up @@ -94,6 +98,9 @@
public class RxDocumentClientImpl implements AsyncDocumentClient, IAuthorizationTokenProvider, CpuListener, DiagnosticsClientContext {
private static final AtomicInteger activeClientsCnt = new AtomicInteger(0);
private static final AtomicInteger clientIdGenerator = new AtomicInteger(0);
private static final Range<String> RANGE_INCLUDING_ALL_PARTITION_KEY_RANGES = new Range<String>(
PartitionKeyInternalHelper.MinimumInclusiveEffectivePartitionKey,
PartitionKeyInternalHelper.MaximumExclusiveEffectivePartitionKey, true, false);

private static final String DUMMY_SQL_QUERY = "this is dummy and only used in creating " +
"ParallelDocumentQueryExecutioncontext, but not used";
Expand Down Expand Up @@ -3634,4 +3641,56 @@ private static SqlQuerySpec createLogicalPartitionScanQuerySpec(

return new SqlQuerySpec(queryStringBuilder.toString(), parameters);
}

@Override
public Mono<List<FeedRange>> getFeedRanges(String collectionLink) {

if (StringUtils.isEmpty(collectionLink)) {
throw new IllegalArgumentException("collectionLink");
}

RxDocumentServiceRequest request = RxDocumentServiceRequest.create(
this,
OperationType.Query,
ResourceType.Document,
collectionLink,
null); // This should not go to backend
Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = collectionCache.resolveCollectionAsync(null,
request);

return collectionObs.flatMap(documentCollectionResourceResponse -> {
final DocumentCollection collection = documentCollectionResourceResponse.v;
if (collection == null) {
throw new IllegalStateException("Collection cannot be null");
}

Mono<Utils.ValueHolder<List<PartitionKeyRange>>> valueHolderMono = partitionKeyRangeCache
.tryGetOverlappingRangesAsync(
BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics),
collection.getResourceId(), RANGE_INCLUDING_ALL_PARTITION_KEY_RANGES, true, null);

return valueHolderMono.map(partitionKeyRangeListResponse -> {
return toFeedRanges(partitionKeyRangeListResponse);
});
});
}

private static List<FeedRange> toFeedRanges(
Utils.ValueHolder<List<PartitionKeyRange>> partitionKeyRangeListValueHolder) {
final List<PartitionKeyRange> partitionKeyRangeList = partitionKeyRangeListValueHolder.v;
if (partitionKeyRangeList == null) {
throw new IllegalStateException("PartitionKeyRange list cannot be null");
}

List<FeedRange> feedRanges = new ArrayList<FeedRange>(partitionKeyRangeList.size());
partitionKeyRangeList.forEach(pkRange -> {
feedRanges.add(toFeedRange(pkRange));
});

return feedRanges;
}

private static FeedRange toFeedRange(PartitionKeyRange pkRange) {
return new FeedRangePartitionKeyRangeImpl(pkRange.getId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,20 @@ public void dispose() {
this.isDisposed = true;
}

/**
* Gets the request properties.
*
* @return the request properties.
*/
public Map<String, Object> getPropertiesOrThrow() {
if (this.properties == null) {
throw new IllegalStateException(
"Only requests with properties (request options) can be used when using feed ranges");
}

return this.properties;
}

private static Map<String, Object> getProperties(Object options) {
if (options == null) {
return null;
Expand Down
Loading