Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into remote-state-stats
Browse files Browse the repository at this point in the history
  • Loading branch information
Aman Khare committed Oct 23, 2023
2 parents c8bfed4 + b5ef078 commit a099a86
Show file tree
Hide file tree
Showing 98 changed files with 2,644 additions and 608 deletions.
1 change: 1 addition & 0 deletions .github/pull_request_template.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Resolves #[Issue number to be closed when this PR is merged]
- [ ] All tests pass
- [ ] New functionality has been documented.
- [ ] New functionality has javadoc added
- [ ] Failing checks are inspected and point to the corresponding known issue(s) (See: [Troubleshooting Failing Builds](../blob/main/CONTRIBUTING.md#troubleshooting-failing-builds))
- [ ] Commits are signed per the DCO using --signoff
- [ ] Commit changes are listed out in CHANGELOG.md file (See: [Changelog](../blob/main/CONTRIBUTING.md#changelog))
- [ ] Public documentation issue/PR [created](https://github.com/opensearch-project/documentation-website/issues/new/choose)
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote cluster state] Upload global metadata in cluster state to remote store([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404))
- [Remote cluster state] Download functionality of global metadata from remote store ([#10535](https://github.com/opensearch-project/OpenSearch/pull/10535))
- [Remote cluster state] Restore global metadata from remote store when local state is lost after quorum loss ([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404))
- [AdmissionControl] Added changes for AdmissionControl Interceptor and AdmissionControlService for RateLimiting ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down Expand Up @@ -114,6 +115,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Store] Add Remote Store backpressure rejection stats to `_nodes/stats` ([#10524](https://github.com/opensearch-project/OpenSearch/pull/10524))
- [BUG] Fix java.lang.SecurityException in repository-gcs plugin ([#10642](https://github.com/opensearch-project/OpenSearch/pull/10642))
- Add telemetry tracer/metric enable flag and integ test. ([#10395](https://github.com/opensearch-project/OpenSearch/pull/10395))
- Add instrumentation for indexing in transport bulk action and transport shard bulk action. ([#10273](https://github.com/opensearch-project/OpenSearch/pull/10273))

### Deprecated

Expand Down
12 changes: 12 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- [Developer Certificate of Origin](#developer-certificate-of-origin)
- [Changelog](#changelog)
- [Review Process](#review-process)
- [Troubleshooting Failing Builds](#troubleshooting-failing-builds)

# Contributing to OpenSearch

Expand Down Expand Up @@ -162,3 +163,14 @@ During the PR process, expect that there will be some back-and-forth. Please try
If we accept the PR, a [maintainer](MAINTAINERS.md) will merge your change and usually take care of backporting it to appropriate branches ourselves.

If we reject the PR, we will close the pull request with a comment explaining why. This decision isn't always final: if you feel we have misunderstood your intended change or otherwise think that we should reconsider then please continue the conversation with a comment on the PR and we'll do our best to address any further points you raise.

## Troubleshooting Failing Builds

The OpenSearch testing framework offers many capabilities but exhibits significant complexity (it does lot of randomization internally to cover as many edge cases and variations as possible). Unfortunately, this posses a challenge by making it harder to discover important issues/bugs in straightforward way and may lead to so called flaky tests - the tests which flip randomly from success to failure without any code changes.

If your pull request reports a failing test(s) on one of the checks, please:
- look if there is an existing [issue](https://github.com/opensearch-project/OpenSearch/issues) reported for the test in question
- if not, please make sure this is not caused by your changes, run the failing test(s) locally for some time
- if you are sure the failure is not related, please open a new [bug](https://github.com/opensearch-project/OpenSearch/issues/new?assignees=&labels=bug%2C+untriaged&projects=&template=bug_template.md&title=%5BBUG%5D) with `flaky-test` label
- add a comment referencing the issue(s) or bug report(s) to your pull request explaining the failing build(s)
- as a bonus point, try to contribute by fixing the flaky test(s)
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public AcknowledgedResponse createDataStream(String name) throws Exception {
CreateDataStreamAction.Request request = new CreateDataStreamAction.Request(name);
AcknowledgedResponse response = client().admin().indices().createDataStream(request).get();
assertThat(response.isAcknowledged(), is(true));
performRemoteStoreTestAction();
return response;
}

Expand Down Expand Up @@ -67,6 +68,7 @@ public RolloverResponse rolloverDataStream(String name) throws Exception {
RolloverResponse response = client().admin().indices().rolloverIndex(request).get();
assertThat(response.isAcknowledged(), is(true));
assertThat(response.isRolledOver(), is(true));
performRemoteStoreTestAction();
return response;
}

Expand Down Expand Up @@ -109,5 +111,4 @@ public AcknowledgedResponse deleteIndexTemplate(String name) throws Exception {
assertThat(response.isAcknowledged(), is(true));
return response;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.junit.annotations.TestLogging;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;

Expand Down Expand Up @@ -55,6 +56,7 @@ private void createIndex(int replicaCount) {
* This test verifies happy path when primary shard is relocated newly added node (target) in the cluster. Before
* relocation and after relocation documents are indexed and documents are verified
*/
@TestLogging(reason = "Getting trace logs from replication,shard and allocation package", value = "org.opensearch.indices.replication:TRACE, org.opensearch.index.shard:TRACE, org.opensearch.cluster.routing.allocation:TRACE")
public void testPrimaryRelocation() throws Exception {
final String oldPrimary = internalCluster().startNode();
createIndex(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
}

protected void restore(String... indices) {
boolean restoreAllShards = randomBoolean();
restore(randomBoolean(), indices);
}

protected void restore(boolean restoreAllShards, String... indices) {
if (restoreAllShards) {
assertAcked(client().admin().indices().prepareClose(indices));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void testWritesRejectedDueToBytesLagBreach() throws Exception {
public void testWritesRejectedDueToTimeLagBreach() throws Exception {
// Initially indexing happens with doc size of 1KB, then all remote store interactions start failing. Now, the
// indexing happens with doc size of 1 byte leading to time lag limit getting exceeded and leading to rejections.
validateBackpressure(ByteSizeUnit.KB.toIntBytes(1), 20, ByteSizeUnit.BYTES.toIntBytes(1), 15, "time_lag");
validateBackpressure(ByteSizeUnit.KB.toIntBytes(1), 20, ByteSizeUnit.BYTES.toIntBytes(1), 3, "time_lag");
}

private void validateBackpressure(
Expand Down Expand Up @@ -133,11 +133,13 @@ private RemoteSegmentTransferTracker.Stats stats() {
return matches.get(0).getSegmentStats();
}

private void indexDocAndRefresh(BytesReference source, int iterations) {
private void indexDocAndRefresh(BytesReference source, int iterations) throws InterruptedException {
for (int i = 0; i < iterations; i++) {
client().prepareIndex(INDEX_NAME).setSource(source, MediaTypeRegistry.JSON).get();
refresh(INDEX_NAME);
}
Thread.sleep(250);
client().prepareIndex(INDEX_NAME).setSource(source, MediaTypeRegistry.JSON).get();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.remotestore;

import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.indices.datastream.DataStreamRolloverIT;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.opensearch.cluster.ClusterState;
Expand All @@ -21,16 +22,19 @@
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;

import static org.opensearch.cluster.coordination.ClusterBootstrapService.INITIAL_CLUSTER_MANAGER_NODES_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_READ_ONLY_SETTING;
import static org.opensearch.cluster.metadata.Metadata.CLUSTER_READ_ONLY_BLOCK;
import static org.opensearch.cluster.metadata.Metadata.SETTING_READ_ONLY_SETTING;
Expand Down Expand Up @@ -65,6 +69,13 @@ private void resetCluster(int dataNodeCount, int clusterManagerNodeCount) {
internalCluster().startDataOnlyNodes(dataNodeCount);
}

protected void verifyRedIndicesAndTriggerRestore(Map<String, Long> indexStats, String indexName, boolean indexMoreDocs)
throws Exception {
ensureRed(indexName);
restore(false, indexName);
verifyRestoredData(indexStats, indexName, indexMoreDocs);
}

public void testFullClusterRestore() throws Exception {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
Expand All @@ -83,7 +94,73 @@ public void testFullClusterRestore() throws Exception {

// Step - 3 Trigger full cluster restore and validate
validateMetadata(List.of(INDEX_NAME));
verifyRestoredData(indexStats, INDEX_NAME);
verifyRedIndicesAndTriggerRestore(indexStats, INDEX_NAME, true);
}

/**
* This test scenario covers the case where right after remote state restore and persisting it to disk via LucenePersistedState, full cluster restarts.
* This is a special case for remote state as at this point cluster uuid in the restored state is still ClusterState.UNKNOWN_UUID as we persist it disk.
* After restart the local disk state will be read but should be again overridden with remote state.
*
* 1. Form a cluster and index few docs
* 2. Replace all nodes to remove all local disk state
* 3. Start cluster manager node without correct seeding to ensure local disk state is written with cluster uuid ClusterState.UNKNOWN_UUID but with remote restored Metadata
* 4. Restart the cluster manager node with correct seeding.
* 5. After restart the cluster manager picks up the local disk state with has same Metadata as remote but cluster uuid is still ClusterState.UNKNOWN_UUID
* 6. The cluster manager will try to restore from remote again.
* 7. Metadata loaded from local disk state will be overridden with remote Metadata and no conflict should arise.
* 8. Add data nodes to recover index data
* 9. Verify Metadata and index data is restored.
*/
public void testFullClusterRestoreDoesntFailWithConflictingLocalState() throws Exception {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
int dataNodeCount = shardCount * (replicaCount + 1);
int clusterManagerNodeCount = 1;

// index some data to generate files in remote directory
Map<String, Long> indexStats = initialTestSetup(shardCount, replicaCount, dataNodeCount, 1);
String prevClusterUUID = clusterService().state().metadata().clusterUUID();

// stop all nodes
internalCluster().stopAllNodes();

// start a cluster manager node with no cluster manager seeding.
// This should fail with IllegalStateException as cluster manager fails to form without any initial seed
assertThrows(
IllegalStateException.class,
() -> internalCluster().startClusterManagerOnlyNodes(
clusterManagerNodeCount,
Settings.builder()
.putList(INITIAL_CLUSTER_MANAGER_NODES_SETTING.getKey()) // disable seeding during bootstrapping
.build()
)
);

// verify cluster manager not elected
String newClusterUUID = clusterService().state().metadata().clusterUUID();
assert Objects.equals(newClusterUUID, ClusterState.UNKNOWN_UUID)
: "Disabling Cluster manager seeding failed. cluster uuid is not unknown";

// restart cluster manager with correct seed
internalCluster().fullRestart(new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) {
return Settings.builder()
.putList(INITIAL_CLUSTER_MANAGER_NODES_SETTING.getKey(), nodeName) // Seed with correct Cluster Manager node
.build();
}
});

// validate new cluster state formed
newClusterUUID = clusterService().state().metadata().clusterUUID();
assert !Objects.equals(newClusterUUID, ClusterState.UNKNOWN_UUID) : "cluster restart not successful. cluster uuid is still unknown";
assert !Objects.equals(newClusterUUID, prevClusterUUID) : "cluster restart not successful. cluster uuid is same";
validateMetadata(List.of(INDEX_NAME));

// start data nodes to trigger index data recovery
internalCluster().startDataOnlyNodes(dataNodeCount);
verifyRedIndicesAndTriggerRestore(indexStats, INDEX_NAME, true);
}

public void testFullClusterRestoreMultipleIndices() throws Exception {
Expand Down Expand Up @@ -112,8 +189,8 @@ public void testFullClusterRestoreMultipleIndices() throws Exception {

// Step - 3 Trigger full cluster restore
validateMetadata(List.of(INDEX_NAME, secondIndexName));
verifyRestoredData(indexStats, INDEX_NAME);
verifyRestoredData(indexStats2, secondIndexName, false);
verifyRedIndicesAndTriggerRestore(indexStats, INDEX_NAME, false);
verifyRedIndicesAndTriggerRestore(indexStats2, secondIndexName, false);
assertTrue(INDEX_READ_ONLY_SETTING.get(clusterService().state().metadata().index(secondIndexName).getSettings()));
assertThrows(ClusterBlockException.class, () -> indexSingleDoc(secondIndexName));
// Test is complete
Expand Down Expand Up @@ -181,7 +258,7 @@ public void testRemoteStateFullRestart() throws Exception {
String newClusterUUID = clusterService().state().metadata().clusterUUID();
assert Objects.equals(newClusterUUID, prevClusterUUID) : "Full restart not successful. cluster uuid has changed";
validateCurrentMetadata();
verifyRestoredData(indexStats, INDEX_NAME);
verifyRedIndicesAndTriggerRestore(indexStats, INDEX_NAME, true);
}

private void validateMetadata(List<String> indexNames) {
Expand Down Expand Up @@ -215,6 +292,14 @@ private void validateCurrentMetadata() throws Exception {
});
}

public void testDataStreamPostRemoteStateRestore() throws Exception {
new DataStreamRolloverIT() {
protected boolean triggerRemoteStateRestore() {
return true;
}
}.testDataStreamRollover();
}

public void testFullClusterRestoreGlobalMetadata() throws Exception {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
Expand All @@ -226,8 +311,7 @@ public void testFullClusterRestoreGlobalMetadata() throws Exception {
String prevClusterUUID = clusterService().state().metadata().clusterUUID();

// Create global metadata - register a custom repo
// TODO - uncomment after all customs is also uploaded for all repos - https://github.com/opensearch-project/OpenSearch/issues/10691
// registerCustomRepository();
Path repoPath = registerCustomRepository();

// Create global metadata - persistent settings
updatePersistentSettings(Settings.builder().put(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), 34).build());
Expand All @@ -246,41 +330,46 @@ public void testFullClusterRestoreGlobalMetadata() throws Exception {

// Step - 3 Trigger full cluster restore and validate
// validateCurrentMetadata();
verifyRestoredData(indexStats, INDEX_NAME, false);

// validate global metadata restored
verifyRestoredRepositories();
verifyRestoredIndexTemplate();
assertEquals(Integer.valueOf(34), SETTING_CLUSTER_MAX_SHARDS_PER_NODE.get(clusterService().state().metadata().settings()));
assertEquals(true, SETTING_READ_ONLY_SETTING.get(clusterService().state().metadata().settings()));
assertTrue(clusterService().state().blocks().hasGlobalBlock(CLUSTER_READ_ONLY_BLOCK));
// Test is complete

// Remote the cluster read only block to ensure proper cleanup
updatePersistentSettings(Settings.builder().put(SETTING_READ_ONLY_SETTING.getKey(), false).build());
assertFalse(clusterService().state().blocks().hasGlobalBlock(CLUSTER_READ_ONLY_BLOCK));

verifyRedIndicesAndTriggerRestore(indexStats, INDEX_NAME, false);

// validate global metadata restored
verifyRestoredRepositories(repoPath);
verifyRestoredIndexTemplate();
}

private void registerCustomRepository() {
private Path registerCustomRepository() {
Path path = randomRepoPath();
assertAcked(
client().admin()
.cluster()
.preparePutRepository("custom-repo")
.setType("fs")
.setSettings(Settings.builder().put("location", randomRepoPath()).put("compress", false))
.setSettings(Settings.builder().put("location", path).put("compress", false))
.get()
);
return path;
}

private void verifyRestoredRepositories() {
private void verifyRestoredRepositories(Path repoPath) {
RepositoriesMetadata repositoriesMetadata = clusterService().state().metadata().custom(RepositoriesMetadata.TYPE);
assertEquals(2, repositoriesMetadata.repositories().size()); // includes remote store repo as well
assertEquals(3, repositoriesMetadata.repositories().size()); // includes remote store repo as well
assertTrue(SYSTEM_REPOSITORY_SETTING.get(repositoriesMetadata.repository(REPOSITORY_NAME).settings()));
assertTrue(SYSTEM_REPOSITORY_SETTING.get(repositoriesMetadata.repository(REPOSITORY_2_NAME).settings()));
// TODO - uncomment after all customs is also uploaded for all repos - https://github.com/opensearch-project/OpenSearch/issues/10691
// assertEquals("fs", repositoriesMetadata.repository("custom-repo").type());
// assertEquals(Settings.builder().put("location", randomRepoPath()).put("compress", false).build(),
// repositoriesMetadata.repository("custom-repo").settings());
assertEquals("fs", repositoriesMetadata.repository("custom-repo").type());
assertEquals(
Settings.builder().put("location", repoPath).put("compress", false).build(),
repositoriesMetadata.repository("custom-repo").settings()
);

// repo cleanup post verification
clusterAdmin().prepareDeleteRepository("custom-repo").get();
}

private void addClusterLevelReadOnlyBlock() throws InterruptedException, ExecutionException {
Expand Down
Loading

0 comments on commit a099a86

Please sign in to comment.