diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java index e96dedaa3e6a0..5074971ab1a1f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java @@ -108,6 +108,9 @@ public void testRemoteCleanupDeleteStale() throws Exception { .add("cluster-state") .add(getClusterState().metadata().clusterUUID()); BlobPath manifestContainerPath = baseMetadataPath.add("manifest"); + RemoteClusterStateCleanupManager remoteClusterStateCleanupManager = internalCluster().getClusterManagerNodeInstance( + RemoteClusterStateCleanupManager.class + ); // set cleanup interval to 100 ms to make the test faster ClusterUpdateSettingsResponse response = client().admin() @@ -117,6 +120,7 @@ public void testRemoteCleanupDeleteStale() throws Exception { .get(); assertTrue(response.isAcknowledged()); + assertBusy(() -> assertEquals(100, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval().getMillis())); assertBusy(() -> { int manifestFiles = repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size(); @@ -128,7 +132,7 @@ public void testRemoteCleanupDeleteStale() throws Exception { "Current number of manifest files: " + manifestFiles, manifestFiles >= RETAINED_MANIFESTS && manifestFiles < RETAINED_MANIFESTS + 2 * SKIP_CLEANUP_STATE_CHANGES ); - }, 500, TimeUnit.MILLISECONDS); + }); // disable the clean up to avoid race condition during shutdown response = client().admin() diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java index 4098993246073..b052b6e1a613d 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java @@ -15,7 +15,6 @@ import org.opensearch.common.remote.RemoteWritableEntityStore; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; -import org.opensearch.core.xcontent.ToXContent; import org.opensearch.gateway.remote.model.RemoteClusterBlocks; import org.opensearch.gateway.remote.model.RemoteClusterStateBlobStore; import org.opensearch.gateway.remote.model.RemoteClusterStateCustoms; @@ -121,7 +120,7 @@ public CheckedRunnable getAsyncMetadataReadAction( LatchedActionListener listener ) { final ActionListener actionListener = ActionListener.wrap( - response -> listener.onResponse(new RemoteReadResult((ToXContent) response, CLUSTER_STATE_ATTRIBUTE, component)), + response -> listener.onResponse(new RemoteReadResult(response, CLUSTER_STATE_ATTRIBUTE, component)), listener::onFailure ); return () -> getStore(blobEntity).readAsync(blobEntity, actionListener); diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index bd371ae671cf4..ada29fdb57c57 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -276,7 +276,7 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( ClusterState clusterState, ClusterMetadataManifest previousManifest ) throws IOException { - logger.info("WRITING INCREMENTAL STATE"); + logger.trace("WRITING INCREMENTAL STATE"); final long startTimeNanos = relativeTimeNanosSupplier.getAsLong(); if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) { @@ -766,7 +766,7 @@ private UploadedMetadataResults writeMetadataInParallel( throw new IllegalStateException("Unknown metadata component name " + name); } }); - logger.info("response {}", response.uploadedIndicesRoutingMetadata.toString()); + logger.trace("response {}", response.uploadedIndicesRoutingMetadata.toString()); return response; } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java index cd29114e05684..3053095368972 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java @@ -25,7 +25,6 @@ import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.compress.Compressor; import org.opensearch.core.xcontent.NamedXContentRegistry; -import org.opensearch.core.xcontent.ToXContent; import org.opensearch.gateway.remote.model.RemoteClusterStateBlobStore; import org.opensearch.gateway.remote.model.RemoteCoordinationMetadata; import org.opensearch.gateway.remote.model.RemoteCustomMetadata; @@ -194,7 +193,7 @@ CheckedRunnable getAsyncMetadataReadAction( LatchedActionListener listener ) { ActionListener actionListener = ActionListener.wrap( - response -> listener.onResponse(new RemoteReadResult((ToXContent) response, readEntity.getType(), componentName)), + response -> listener.onResponse(new RemoteReadResult(response, readEntity.getType(), componentName)), listener::onFailure ); return () -> getStore(readEntity).readAsync(readEntity, actionListener); diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifest.java index 328601139c150..1dc56712d4ab5 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifest.java @@ -131,16 +131,17 @@ public ClusterMetadataManifest deserialize(final InputStream inputStream) throws return blobStoreFormat.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream)); } - private int getManifestCodecVersion() { + // package private for testing + int getManifestCodecVersion() { assert blobName != null; - String[] splitName = blobName.split(DELIMITER); + String[] splitName = getBlobFileName().split(DELIMITER); if (splitName.length == SPLITTED_MANIFEST_FILE_LENGTH) { return Integer.parseInt(splitName[splitName.length - 1]); // Last value would be codec version. } else if (splitName.length < SPLITTED_MANIFEST_FILE_LENGTH) { // Where codec is not part of file name, i.e. default codec version 0 // is used. return ClusterMetadataManifest.CODEC_V0; } else { - throw new IllegalArgumentException("Manifest file name is corrupted"); + throw new IllegalArgumentException("Manifest file name is corrupted : " + blobName); } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteReadResult.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteReadResult.java index adee09eaeffef..06d3b88ae1ecf 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteReadResult.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteReadResult.java @@ -8,24 +8,22 @@ package org.opensearch.gateway.remote.model; -import org.opensearch.core.xcontent.ToXContent; - /** * Container class for entity read from remote store */ public class RemoteReadResult { - ToXContent obj; + Object obj; String component; String componentName; - public RemoteReadResult(ToXContent obj, String component, String componentName) { + public RemoteReadResult(Object obj, String component, String componentName) { this.obj = obj; this.component = component; this.componentName = componentName; } - public ToXContent getObj() { + public Object getObj() { return obj; } diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java new file mode 100644 index 0000000000000..0aff1c4b0e5e2 --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java @@ -0,0 +1,141 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.action.LatchedActionListener; +import org.opensearch.cluster.block.ClusterBlocks; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.common.CheckedRunnable; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.NoneCompressor; +import org.opensearch.gateway.remote.model.RemoteClusterBlocks; +import org.opensearch.gateway.remote.model.RemoteDiscoveryNodes; +import org.opensearch.gateway.remote.model.RemoteReadResult; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import static java.util.Collections.emptyList; +import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.DISCOVERY_NODES; +import static org.opensearch.gateway.remote.model.RemoteClusterBlocks.CLUSTER_BLOCKS; +import static org.opensearch.gateway.remote.model.RemoteClusterBlocks.CLUSTER_BLOCKS_FORMAT; +import static org.opensearch.gateway.remote.model.RemoteClusterBlocksTests.randomClusterBlocks; +import static org.opensearch.gateway.remote.model.RemoteDiscoveryNodes.DISCOVERY_NODES_FORMAT; +import static org.opensearch.gateway.remote.model.RemoteDiscoveryNodesTests.getDiscoveryNodes; +import static org.mockito.ArgumentMatchers.anyIterable; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class RemoteClusterStateAttributesManagerTests extends OpenSearchTestCase { + private RemoteClusterStateAttributesManager remoteClusterStateAttributesManager; + private BlobStoreTransferService blobStoreTransferService; + private BlobStoreRepository blobStoreRepository; + private Compressor compressor; + private ThreadPool threadpool = new TestThreadPool(RemoteClusterStateAttributesManagerTests.class.getName()); + + @Before + public void setup() throws Exception { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(emptyList()); + blobStoreRepository = mock(BlobStoreRepository.class); + blobStoreTransferService = mock(BlobStoreTransferService.class); + compressor = new NoneCompressor(); + + remoteClusterStateAttributesManager = new RemoteClusterStateAttributesManager( + "test-cluster", + blobStoreRepository, + blobStoreTransferService, + namedWriteableRegistry, + threadpool + ); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + threadpool.shutdown(); + } + + public void testGetAsyncMetadataReadAction_DiscoveryNodes() throws IOException { + DiscoveryNodes discoveryNodes = getDiscoveryNodes(); + String fileName = randomAlphaOfLength(10); + when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( + DISCOVERY_NODES_FORMAT.serialize(discoveryNodes, fileName, compressor).streamInput() + ); + RemoteDiscoveryNodes remoteObjForDownload = new RemoteDiscoveryNodes(fileName, "cluster-uuid", compressor); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference readDiscoveryNodes = new AtomicReference<>(); + LatchedActionListener assertingListener = new LatchedActionListener<>( + ActionListener.wrap(response -> readDiscoveryNodes.set((DiscoveryNodes) response.getObj()), Assert::assertNull), + latch + ); + CheckedRunnable runnable = remoteClusterStateAttributesManager.getAsyncMetadataReadAction( + DISCOVERY_NODES, + remoteObjForDownload, + assertingListener + ); + + try { + runnable.run(); + latch.await(); + assertEquals(discoveryNodes.getSize(), readDiscoveryNodes.get().getSize()); + discoveryNodes.getNodes().forEach((nodeId, node) -> assertEquals(readDiscoveryNodes.get().get(nodeId), node)); + assertEquals(discoveryNodes.getClusterManagerNodeId(), readDiscoveryNodes.get().getClusterManagerNodeId()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void testGetAsyncMetadataReadAction_ClusterBlocks() throws IOException { + ClusterBlocks clusterBlocks = randomClusterBlocks(); + String fileName = randomAlphaOfLength(10); + when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( + CLUSTER_BLOCKS_FORMAT.serialize(clusterBlocks, fileName, compressor).streamInput() + ); + RemoteClusterBlocks remoteClusterBlocks = new RemoteClusterBlocks(fileName, "cluster-uuid", compressor); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference readClusterBlocks = new AtomicReference<>(); + LatchedActionListener assertingListener = new LatchedActionListener<>( + ActionListener.wrap(response -> readClusterBlocks.set((ClusterBlocks) response.getObj()), Assert::assertNull), + latch + ); + + CheckedRunnable runnable = remoteClusterStateAttributesManager.getAsyncMetadataReadAction( + CLUSTER_BLOCKS, + remoteClusterBlocks, + assertingListener + ); + + try { + runnable.run(); + latch.await(); + assertEquals(clusterBlocks.global(), readClusterBlocks.get().global()); + assertEquals(clusterBlocks.indices().keySet(), readClusterBlocks.get().indices().keySet()); + for (String index : clusterBlocks.indices().keySet()) { + assertEquals(clusterBlocks.indices().get(index), readClusterBlocks.get().indices().get(index)); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterBlocksTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterBlocksTests.java index fe273c73c651b..3c1e141b81360 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterBlocksTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterBlocksTests.java @@ -136,7 +136,7 @@ public void testSerDe() throws IOException { } } - static ClusterBlocks randomClusterBlocks() { + public static ClusterBlocks randomClusterBlocks() { ClusterBlocks.Builder builder = ClusterBlocks.builder(); int randomGlobalBlocks = randomIntBetween(1, 10); for (int i = 0; i < randomGlobalBlocks; i++) { diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifestTests.java index 7cb80a1600c03..de1befbecd924 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifestTests.java @@ -41,6 +41,8 @@ import java.util.stream.Stream; import static java.util.stream.Collectors.toList; +import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V0; +import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V2; import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST; import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION; import static org.hamcrest.Matchers.greaterThan; @@ -236,6 +238,28 @@ public void testSerDe() throws IOException { assertThrows(IllegalArgumentException.class, () -> invalidRemoteObject.deserialize(new ByteArrayInputStream(new byte[0]))); } + public void testGetManifestCodecVersion() { + String manifestFileWithDelimiterInPath = + "123456789012_test-cluster/cluster-state/dsgYj10__Nkso7/manifest/manifest__9223372036854775806__9223372036854775804__C__9223370319103329556__2"; + RemoteClusterMetadataManifest remoteManifestForDownload = new RemoteClusterMetadataManifest( + manifestFileWithDelimiterInPath, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertEquals(CODEC_V2, remoteManifestForDownload.getManifestCodecVersion()); + + String v0ManifestFileWithDelimiterInPath = + "123456789012_test-cluster/cluster-state/dsgYj10__Nkso7/manifest/manifest__9223372036854775806__9223372036854775804__C__9223370319103329556"; + RemoteClusterMetadataManifest remoteManifestV0ForDownload = new RemoteClusterMetadataManifest( + v0ManifestFileWithDelimiterInPath, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertEquals(CODEC_V0, remoteManifestV0ForDownload.getManifestCodecVersion()); + } + private ClusterMetadataManifest getClusterMetadataManifest() { return ClusterMetadataManifest.builder() .opensearchVersion(Version.CURRENT) diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustomsTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustomsTests.java index 1f7a5e8bfffb1..1b020e13324a4 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustomsTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustomsTests.java @@ -232,12 +232,12 @@ public void testSerDe() throws IOException { try (InputStream inputStream = remoteObjectForUpload.serialize()) { remoteObjectForUpload.setFullBlobName(BlobPath.cleanPath()); assertThat(inputStream.available(), greaterThan(0)); - Custom readclusterStateCustoms = remoteObjectForUpload.deserialize(inputStream); - assertThat(readclusterStateCustoms, is(clusterStateCustoms)); + Custom readClusterStateCustoms = remoteObjectForUpload.deserialize(inputStream); + assertThat(readClusterStateCustoms, is(clusterStateCustoms)); } } - private Custom getClusterStateCustom() { + public static SnapshotsInProgress getClusterStateCustom() { return SnapshotsInProgress.of( List.of( new SnapshotsInProgress.Entry( diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java index b9b6e02e8274f..f1bced2bdf855 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java @@ -156,7 +156,7 @@ public void testExceptionDuringDeserialize() throws IOException { IOException ioe = assertThrows(IOException.class, () -> remoteObjectForDownload.deserialize(in)); } - private DiscoveryNodes getDiscoveryNodes() { + public static DiscoveryNodes getDiscoveryNodes() { return DiscoveryNodes.builder() .add( new DiscoveryNode(