Skip to content

Commit

Permalink
Override local disk state if we are able to restore from remote (#10748)
Browse files Browse the repository at this point in the history
* Override local disk state if we are able to restore from remote

Signed-off-by: bansvaru <bansvaru@amazon.com>
  • Loading branch information
linuxpi committed Oct 22, 2023
1 parent a09047a commit 9b7a9d0
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
Expand All @@ -32,6 +33,7 @@
import java.util.Objects;
import java.util.concurrent.ExecutionException;

import static org.opensearch.cluster.coordination.ClusterBootstrapService.INITIAL_CLUSTER_MANAGER_NODES_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_READ_ONLY_SETTING;
import static org.opensearch.cluster.metadata.Metadata.CLUSTER_READ_ONLY_BLOCK;
import static org.opensearch.cluster.metadata.Metadata.SETTING_READ_ONLY_SETTING;
Expand Down Expand Up @@ -94,6 +96,72 @@ public void testFullClusterRestore() throws Exception {
verifyRedIndicesAndTriggerRestore(indexStats, INDEX_NAME, true);
}

/**
* This test scenario covers the case where right after remote state restore and persisting it to disk via LucenePersistedState, full cluster restarts.
* This is a special case for remote state as at this point cluster uuid in the restored state is still ClusterState.UNKNOWN_UUID as we persist it disk.
* After restart the local disk state will be read but should be again overridden with remote state.
*
* 1. Form a cluster and index few docs
* 2. Replace all nodes to remove all local disk state
* 3. Start cluster manager node without correct seeding to ensure local disk state is written with cluster uuid ClusterState.UNKNOWN_UUID but with remote restored Metadata
* 4. Restart the cluster manager node with correct seeding.
* 5. After restart the cluster manager picks up the local disk state with has same Metadata as remote but cluster uuid is still ClusterState.UNKNOWN_UUID
* 6. The cluster manager will try to restore from remote again.
* 7. Metadata loaded from local disk state will be overridden with remote Metadata and no conflict should arise.
* 8. Add data nodes to recover index data
* 9. Verify Metadata and index data is restored.
*/
public void testFullClusterStateRestore() throws Exception {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
int dataNodeCount = shardCount * (replicaCount + 1);
int clusterManagerNodeCount = 1;

// index some data to generate files in remote directory
Map<String, Long> indexStats = initialTestSetup(shardCount, replicaCount, dataNodeCount, 1);
String prevClusterUUID = clusterService().state().metadata().clusterUUID();

// stop all nodes
internalCluster().stopAllNodes();

// start a cluster manager node with no cluster manager seeding.
// This should fail with IllegalStateException as cluster manager fails to form without any initial seed
assertThrows(
IllegalStateException.class,
() -> internalCluster().startClusterManagerOnlyNodes(
clusterManagerNodeCount,
Settings.builder()
.putList(INITIAL_CLUSTER_MANAGER_NODES_SETTING.getKey()) // disable seeding during bootstrapping
.build()
)
);

// verify cluster manager not elected
String newClusterUUID = clusterService().state().metadata().clusterUUID();
assert Objects.equals(newClusterUUID, ClusterState.UNKNOWN_UUID)
: "Disabling Cluster manager seeding failed. cluster uuid is not unknown";

// restart cluster manager with correct seed
internalCluster().fullRestart(new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) {
return Settings.builder()
.putList(INITIAL_CLUSTER_MANAGER_NODES_SETTING.getKey(), nodeName) // Seed with correct Cluster Manager node
.build();
}
});

// validate new cluster state formed
newClusterUUID = clusterService().state().metadata().clusterUUID();
assert !Objects.equals(newClusterUUID, ClusterState.UNKNOWN_UUID) : "cluster restart not successful. cluster uuid is still unknown";
assert !Objects.equals(newClusterUUID, prevClusterUUID) : "cluster restart not successful. cluster uuid is same";
validateMetadata(List.of(INDEX_NAME));

// start data nodes to trigger index data recovery
internalCluster().startDataOnlyNodes(dataNodeCount);
verifyRestoredData(indexStats, INDEX_NAME);
}

public void testFullClusterRestoreMultipleIndices() throws Exception {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ public void start(
if (ClusterState.UNKNOWN_UUID.equals(lastKnownClusterUUID) == false) {
// Load state from remote
final RemoteRestoreResult remoteRestoreResult = remoteStoreRestoreService.restore(
clusterState,
// Remote Metadata should always override local disk Metadata
// if local disk Metadata's cluster uuid is UNKNOWN_UUID
ClusterState.builder(clusterState).metadata(Metadata.EMPTY_METADATA).build(),
lastKnownClusterUUID,
false,
new String[] {}
Expand Down Expand Up @@ -549,6 +551,9 @@ static class LucenePersistedState implements PersistedState {
// out by this version of OpenSearch. TODO TBD should we avoid indexing when possible?
final PersistedClusterStateService.Writer writer = persistedClusterStateService.createWriter();
try {
// During remote state restore, there will be non empty metadata getting persisted with cluster UUID as
// ClusterState.UNKOWN_UUID . The valid UUID will be generated and persisted along with the first cluster state getting
// published.
writer.writeFullStateAndCommit(currentTerm, lastAcceptedState);
} catch (Exception e) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,10 @@

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -146,6 +144,11 @@ public RemoteRestoreResult restore(
|| restoreClusterUUID.isBlank()) == false;
if (metadataFromRemoteStore) {
try {
// Restore with current cluster UUID will fail as same indices would be present in the cluster which we are trying to
// restore
if (currentState.metadata().clusterUUID().equals(restoreClusterUUID)) {
throw new IllegalArgumentException("clusterUUID to restore from should be different from current cluster UUID");
}
remoteMetadata = remoteClusterStateService.getLatestMetadata(currentState.getClusterName().value(), restoreClusterUUID);
remoteMetadata.getIndices().values().forEach(indexMetadata -> {
indexMetadataMap.put(indexMetadata.getIndex().getName(), new Tuple<>(true, indexMetadata));
Expand All @@ -158,12 +161,21 @@ public RemoteRestoreResult restore(
IndexMetadata indexMetadata = currentState.metadata().index(indexName);
if (indexMetadata == null) {
logger.warn("Index restore is not supported for non-existent index. Skipping: {}", indexName);
} else if (indexMetadata.getSettings().getAsBoolean(SETTING_REMOTE_STORE_ENABLED, false) == false) {
logger.warn("Remote store is not enabled for index: {}", indexName);
} else if (restoreAllShards && IndexMetadata.State.CLOSE.equals(indexMetadata.getState()) == false) {
throw new IllegalStateException(
String.format(
Locale.ROOT,
"cannot restore index [%s] because an open index with same name/uuid already exists in the cluster.",
indexName
) + " Close the existing index."
);
} else {
indexMetadataMap.put(indexName, new Tuple<>(false, indexMetadata));
}
}
}
validate(currentState, indexMetadataMap, restoreClusterUUID, restoreAllShards);
return executeRestore(currentState, indexMetadataMap, restoreAllShards, remoteMetadata);
}

Expand Down Expand Up @@ -272,83 +284,6 @@ private void restoreGlobalMetadata(Metadata.Builder mdBuilder, Metadata remoteMe
repositoriesMetadata.ifPresent(metadata -> mdBuilder.putCustom(RepositoriesMetadata.TYPE, metadata));
}

/**
* Performs various validations needed before executing restore
* @param currentState current cluster state
* @param indexMetadataMap map of index metadata to restore
* @param restoreClusterUUID cluster UUID used to restore IndexMetadata
* @param restoreAllShards indicates if all shards of the index needs to be restored. This flat is ignored if remoteClusterUUID is provided
*/
private void validate(
ClusterState currentState,
Map<String, Tuple<Boolean, IndexMetadata>> indexMetadataMap,
@Nullable String restoreClusterUUID,
boolean restoreAllShards
) throws IllegalStateException, IllegalArgumentException {
String errorMsg = "cannot restore index [%s] because an open index with same name/uuid already exists in the cluster.";

// Restore with current cluster UUID will fail as same indices would be present in the cluster which we are trying to
// restore
if (currentState.metadata().clusterUUID().equals(restoreClusterUUID)) {
throw new IllegalArgumentException("clusterUUID to restore from should be different from current cluster UUID");
}
for (Map.Entry<String, Tuple<Boolean, IndexMetadata>> indexMetadataEntry : indexMetadataMap.entrySet()) {
String indexName = indexMetadataEntry.getKey();
IndexMetadata indexMetadata = indexMetadataEntry.getValue().v2();
String indexUUID = indexMetadata.getIndexUUID();
boolean metadataFromRemoteStore = indexMetadataEntry.getValue().v1();
if (indexMetadata.getSettings().getAsBoolean(SETTING_REMOTE_STORE_ENABLED, false)) {
if (metadataFromRemoteStore) {
Set<String> graveyardIndexNames = new HashSet<>();
Set<String> graveyardIndexUUID = new HashSet<>();
Set<String> liveClusterIndexUUIDs = currentState.metadata()
.indices()
.values()
.stream()
.map(IndexMetadata::getIndexUUID)
.collect(Collectors.toSet());

currentState.metadata().indexGraveyard().getTombstones().forEach(tombstone -> {
graveyardIndexNames.add(tombstone.getIndex().getName());
graveyardIndexUUID.add(tombstone.getIndex().getUUID());
});

// Since updates to graveyard are synced to remote we should neven land in a situation where remote contain index
// metadata for graveyard index.
assert graveyardIndexNames.contains(indexName) == false : String.format(
Locale.ROOT,
"Index name [%s] exists in graveyard!",
indexName
);
assert graveyardIndexUUID.contains(indexUUID) == false : String.format(
Locale.ROOT,
"Index UUID [%s] exists in graveyard!",
indexUUID
);

// Any indices being restored from remote cluster state should not already be part of the cluster as this causes
// conflict
boolean sameNameIndexExists = currentState.metadata().hasIndex(indexName);
boolean sameUUIDIndexExists = liveClusterIndexUUIDs.contains(indexUUID);
if (sameNameIndexExists || sameUUIDIndexExists) {
String finalErrorMsg = String.format(Locale.ROOT, errorMsg, indexName);
logger.info(finalErrorMsg);
throw new IllegalStateException(finalErrorMsg);
}

boolean isHidden = IndexMetadata.INDEX_HIDDEN_SETTING.get(indexMetadata.getSettings());
createIndexService.validateIndexName(indexName, currentState);
createIndexService.validateDotIndex(indexName, isHidden);
shardLimitValidator.validateShardLimit(indexName, indexMetadata.getSettings(), currentState);
} else if (restoreAllShards && IndexMetadata.State.CLOSE.equals(indexMetadata.getState()) == false) {
throw new IllegalStateException(String.format(Locale.ROOT, errorMsg, indexName) + " Close the existing index.");
}
} else {
logger.warn("Remote store is not enabled for index: {}", indexName);
}
}
}

/**
* Result of a remote restore operation.
*/
Expand Down

0 comments on commit 9b7a9d0

Please sign in to comment.