Skip to content

Commit

Permalink
Making databaseAccount read dynamic after V4 master merge (#8466)
Browse files Browse the repository at this point in the history
* making databaseAccount update dynamic

* Revert "making databaseAccount update dynamic"

This reverts commit 1d8856a.

* making databaseAccount update dynamic

* resolving comments

* removing cahce logic

* resolving conflicts
  • Loading branch information
simplynaveen20 committed Feb 27, 2020
1 parent 124e3d9 commit d937bf3
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 254 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,28 @@

package com.azure.cosmos.implementation;

import com.azure.cosmos.implementation.routing.LocationCache;
import com.azure.cosmos.implementation.routing.LocationHelper;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConnectionPolicy;
import com.azure.cosmos.DatabaseAccount;
import com.azure.cosmos.implementation.caches.AsyncCache;
import com.azure.cosmos.implementation.routing.LocationCache;
import com.azure.cosmos.implementation.routing.LocationHelper;
import org.apache.commons.collections4.list.UnmodifiableList;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.net.URISyntaxException;
import java.net.URI;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -45,6 +47,8 @@ public class GlobalEndpointManager implements AutoCloseable {
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private final Scheduler scheduler = Schedulers.fromExecutor(executor);
private volatile boolean isClosed;
private AtomicBoolean firstTimeDatabaseAccountInitialization = new AtomicBoolean(true);
private volatile DatabaseAccount latestDatabaseAccount;

public GlobalEndpointManager(DatabaseAccountManagerInternal owner, ConnectionPolicy connectionPolicy, Configs configs) {
this.backgroundRefreshLocationTimeIntervalInMS = configs.getUnavailableLocationsExpirationTimeInSeconds() * 1000;
Expand Down Expand Up @@ -159,6 +163,16 @@ public Mono<Void> refreshLocationAsync(DatabaseAccount databaseAccount, boolean
});
}

/**
* This will provide the latest databaseAccount.
* If due to some reason last databaseAccount update was null,
* this method will return previous valid value
* @return DatabaseAccount
*/
public DatabaseAccount getLatestDatabaseAccount() {
return this.latestDatabaseAccount;
}

private Mono<Void> refreshLocationPrivateAsync(DatabaseAccount databaseAccount) {
return Mono.defer(() -> {
logger.debug("refreshLocationPrivateAsync() refreshing locations");
Expand Down Expand Up @@ -254,7 +268,13 @@ private Mono<Void> startRefreshLocationTimerAsync(boolean initialization) {

private Mono<DatabaseAccount> getDatabaseAccountAsync(URI serviceEndpoint) {
return this.owner.getDatabaseAccountFromEndpoint(serviceEndpoint)
.doOnNext(i -> logger.debug("account retrieved: {}", i)).single();
.doOnNext(databaseAccount -> {
if(databaseAccount != null) {
this.latestDatabaseAccount = databaseAccount;
}

logger.debug("account retrieved: {}", databaseAccount);
}).single();
}

public boolean isClosed() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,29 +251,16 @@ private RxDocumentClientImpl(URI serviceEndpoint,
}

private void initializeGatewayConfigurationReader() {
String resourceToken;
if(this.tokenResolver != null) {
resourceToken = this.tokenResolver.getAuthorizationToken(RequestVerb.GET, "", CosmosResourceType.System, null);
} else if(!this.hasAuthKeyResourceToken && this.authorizationTokenProvider == null) {
resourceToken = this.firstResourceTokenFromPermissionFeed;
} else {
assert this.masterKeyOrResourceToken != null || this.cosmosKeyCredential != null;
resourceToken = this.masterKeyOrResourceToken;
}

this.gatewayConfigurationReader = new GatewayServiceConfigurationReader(this.serviceEndpoint,
this.hasAuthKeyResourceToken,
resourceToken,
this.connectionPolicy,
this.authorizationTokenProvider,
this.reactorHttpClient);

DatabaseAccount databaseAccount = this.gatewayConfigurationReader.initializeReaderAsync().block();
this.gatewayConfigurationReader = new GatewayServiceConfigurationReader(this.globalEndpointManager);
DatabaseAccount databaseAccount = this.globalEndpointManager.getLatestDatabaseAccount();
//Database account should not be null here,
// this.globalEndpointManager.init() must have been already called
// hence asserting it
assert(databaseAccount != null);
this.useMultipleWriteLocations = this.connectionPolicy.getUsingMultipleWriteLocations() && BridgeInternal.isEnableMultipleWriteLocations(databaseAccount);

// TODO: add support for openAsync
// https://msdata.visualstudio.com/CosmosDB/_workitems/edit/332589
this.globalEndpointManager.refreshLocationAsync(databaseAccount, false).block();
}

public void init() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,11 @@
package com.azure.cosmos.implementation.directconnectivity;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConnectionPolicy;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.RequestVerb;
import com.azure.cosmos.implementation.BaseAuthorizationTokenProvider;
import com.azure.cosmos.implementation.Constants;
import com.azure.cosmos.DatabaseAccount;
import com.azure.cosmos.implementation.GlobalEndpointManager;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.ReplicationPolicy;
import com.azure.cosmos.implementation.UserAgentContainer;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.http.HttpClient;
import com.azure.cosmos.implementation.http.HttpHeaders;
import com.azure.cosmos.implementation.http.HttpRequest;
import com.azure.cosmos.implementation.http.HttpResponse;
import io.netty.handler.codec.http.HttpMethod;
import reactor.core.publisher.Mono;

import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

/**
Expand All @@ -42,118 +24,25 @@
*/
public class GatewayServiceConfigurationReader {

public static final String GATEWAY_READER_NOT_INITIALIZED = "GatewayServiceConfigurationReader has not been initialized";
private GlobalEndpointManager globalEndpointManager;

public ReplicationPolicy userReplicationPolicy;
private ReplicationPolicy systemReplicationPolicy;
private ConsistencyLevel consistencyLevel;
private volatile boolean initialized;
private URI serviceEndpoint;
private final ConnectionPolicy connectionPolicy;
private Map<String, Object> queryEngineConfiguration;
private final BaseAuthorizationTokenProvider baseAuthorizationTokenProvider;
private final boolean hasAuthKeyResourceToken;
private final String authKeyResourceToken;
private HttpClient httpClient;

public GatewayServiceConfigurationReader(URI serviceEndpoint, boolean hasResourceToken, String resourceToken,
ConnectionPolicy connectionPolicy, BaseAuthorizationTokenProvider baseAuthorizationTokenProvider,
HttpClient httpClient) {
this.serviceEndpoint = serviceEndpoint;
this.baseAuthorizationTokenProvider = baseAuthorizationTokenProvider;
this.hasAuthKeyResourceToken = hasResourceToken;
this.authKeyResourceToken = resourceToken;
this.connectionPolicy = connectionPolicy;
this.httpClient = httpClient;
public GatewayServiceConfigurationReader(GlobalEndpointManager globalEndpointManager) {
this.globalEndpointManager = globalEndpointManager;
}

public ReplicationPolicy getUserReplicationPolicy() {
this.throwIfNotInitialized();
return this.userReplicationPolicy;
return BridgeInternal.getReplicationPolicy(this.globalEndpointManager.getLatestDatabaseAccount());
}

public ReplicationPolicy getSystemReplicationPolicy() {
this.throwIfNotInitialized();
return this.systemReplicationPolicy;
}

public boolean enableAuthorization() {
return true;
return BridgeInternal.getSystemReplicationPolicy(this.globalEndpointManager.getLatestDatabaseAccount());
}

public ConsistencyLevel getDefaultConsistencyLevel() {
this.throwIfNotInitialized();
return this.consistencyLevel;
}

public void setDefaultConsistencyLevel(ConsistencyLevel value) {
this.throwIfNotInitialized();
this.consistencyLevel = value;
return BridgeInternal.getConsistencyPolicy(this.globalEndpointManager.getLatestDatabaseAccount()).getDefaultConsistencyLevel();
}

public Map<String, Object> getQueryEngineConfiguration() {
this.throwIfNotInitialized();
return this.queryEngineConfiguration;
}

private Mono<DatabaseAccount> getDatabaseAccountAsync(URI serviceEndpoint) {

HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.set(HttpConstants.HttpHeaders.VERSION, HttpConstants.Versions.CURRENT_VERSION);

UserAgentContainer userAgentContainer = new UserAgentContainer();
String userAgentSuffix = this.connectionPolicy.getUserAgentSuffix();
if (userAgentSuffix != null && userAgentSuffix.length() > 0) {
userAgentContainer.setSuffix(userAgentSuffix);
}

httpHeaders.set(HttpConstants.HttpHeaders.USER_AGENT, userAgentContainer.getUserAgent());
httpHeaders.set(HttpConstants.HttpHeaders.API_TYPE, Constants.Properties.SQL_API_TYPE);

String xDate = Utils.nowAsRFC1123();
httpHeaders.set(HttpConstants.HttpHeaders.X_DATE, xDate);

String authorizationToken;
if (this.hasAuthKeyResourceToken || baseAuthorizationTokenProvider == null) {
authorizationToken = HttpUtils.urlEncode(this.authKeyResourceToken);
} else {
// Retrieve the document service properties.
Map<String, String> header = new HashMap<>();
header.put(HttpConstants.HttpHeaders.X_DATE, xDate);
authorizationToken = baseAuthorizationTokenProvider
.generateKeyAuthorizationSignature(RequestVerb.GET, serviceEndpoint, header);
}
httpHeaders.set(HttpConstants.HttpHeaders.AUTHORIZATION, authorizationToken);

HttpRequest httpRequest = new HttpRequest(HttpMethod.GET, serviceEndpoint, serviceEndpoint.getPort(), httpHeaders);
Mono<HttpResponse> httpResponse = httpClient.send(httpRequest);
return toDatabaseAccountObservable(httpResponse, httpRequest);
}

public Mono<DatabaseAccount> initializeReaderAsync() {
return GlobalEndpointManager.getDatabaseAccountFromAnyLocationsAsync(this.serviceEndpoint,

new ArrayList<>(this.connectionPolicy.getPreferredLocations()), url -> {
return getDatabaseAccountAsync(url);

}).doOnSuccess(databaseAccount -> {
userReplicationPolicy = BridgeInternal.getReplicationPolicy(databaseAccount);
systemReplicationPolicy = BridgeInternal.getSystemReplicationPolicy(databaseAccount);
queryEngineConfiguration = BridgeInternal.getQueryEngineConfiuration(databaseAccount);
consistencyLevel = BridgeInternal.getConsistencyPolicy(databaseAccount).getDefaultConsistencyLevel();
initialized = true;
});
}

private Mono<DatabaseAccount> toDatabaseAccountObservable(Mono<HttpResponse> httpResponse, HttpRequest httpRequest) {

return HttpClientUtils.parseResponseAsync(httpResponse, httpRequest)
.map(rxDocumentServiceResponse -> rxDocumentServiceResponse.getResource(DatabaseAccount.class));
}

private void throwIfNotInitialized() {
if (!this.initialized) {
throw new IllegalArgumentException(GATEWAY_READER_NOT_INITIALIZED);
}
return BridgeInternal.getQueryEngineConfiuration(this.globalEndpointManager.getLatestDatabaseAccount());
}
}
Loading

0 comments on commit d937bf3

Please sign in to comment.