From 330b2497a9380fa73ee548e61f0c85196eea7acf Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Thu, 12 Sep 2024 14:49:45 +0530 Subject: [PATCH 01/14] Add restriction to have a single repository with shallow snapshot v2 setting (#15901) * Add restriction to have a single repository with shallow snapshot v2 setting Signed-off-by: Sachin Kale * Do not allow shallow snapshot v2 repo name to contain SNAPSHOT_PINNED_TIMESTAMP_DELIMITER Signed-off-by: Sachin Kale --------- Signed-off-by: Sachin Kale --- .../RemoteStorePinnedTimestampService.java | 31 ++++- .../repositories/RepositoriesService.java | 79 ++++++++++- .../BlobStoreRepositoryRemoteIndexTests.java | 124 ++++++++++++++++++ 3 files changed, 228 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java index 3a7734fc0538f..71133615ed056 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java @@ -30,6 +30,8 @@ import java.io.ByteArrayInputStream; import java.io.Closeable; import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -75,25 +77,46 @@ public RemoteStorePinnedTimestampService( * and starts the asynchronous update task. */ public void start() { - validateRemoteStoreConfiguration(); + blobContainer = validateAndCreateBlobContainer(settings, repositoriesService.get()); startAsyncUpdateTask(RemoteStoreSettings.getPinnedTimestampsSchedulerInterval()); } - private void validateRemoteStoreConfiguration() { + private static BlobContainer validateAndCreateBlobContainer(Settings settings, RepositoriesService repositoriesService) { final String remoteStoreRepo = settings.get( Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY ); assert remoteStoreRepo != null : "Remote Segment Store repository is not configured"; - final Repository repository = repositoriesService.get().repository(remoteStoreRepo); + final Repository repository = repositoriesService.repository(remoteStoreRepo); assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository; - blobContainer = blobStoreRepository.blobStore().blobContainer(blobStoreRepository.basePath().add(PINNED_TIMESTAMPS_PATH_TOKEN)); + return blobStoreRepository.blobStore().blobContainer(blobStoreRepository.basePath().add(PINNED_TIMESTAMPS_PATH_TOKEN)); } private void startAsyncUpdateTask(TimeValue pinnedTimestampsSchedulerInterval) { asyncUpdatePinnedTimestampTask = new AsyncUpdatePinnedTimestampTask(logger, threadPool, pinnedTimestampsSchedulerInterval, true); } + public static Map> fetchPinnedTimestamps(Settings settings, RepositoriesService repositoriesService) + throws IOException { + BlobContainer blobContainer = validateAndCreateBlobContainer(settings, repositoriesService); + Set pinnedTimestamps = blobContainer.listBlobs().keySet(); + Map> pinningEntityTimestampMap = new HashMap<>(); + for (String pinnedTimestamp : pinnedTimestamps) { + try { + String[] tokens = pinnedTimestamp.split(PINNED_TIMESTAMPS_FILENAME_SEPARATOR); + Long timestamp = Long.parseLong(tokens[tokens.length - 1]); + String pinningEntity = pinnedTimestamp.substring(0, pinnedTimestamp.lastIndexOf(PINNED_TIMESTAMPS_FILENAME_SEPARATOR)); + if (pinningEntityTimestampMap.containsKey(pinningEntity) == false) { + pinningEntityTimestampMap.put(pinningEntity, new HashSet<>()); + } + pinningEntityTimestampMap.get(pinningEntity).add(timestamp); + } catch (NumberFormatException e) { + logger.error("Exception while parsing pinned timestamp from {}, skipping this entry", pinnedTimestamp); + } + } + return pinningEntityTimestampMap; + } + /** * Pins a timestamp in the remote store. * diff --git a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java index 68669feb16abc..7da52147661dc 100644 --- a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java @@ -68,7 +68,9 @@ import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.Strings; +import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService; import org.opensearch.repositories.blobstore.MeteredBlobStoreRepository; +import org.opensearch.snapshots.SnapshotsService; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -84,6 +86,7 @@ import java.util.stream.Stream; import static org.opensearch.repositories.blobstore.BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY; +import static org.opensearch.repositories.blobstore.BlobStoreRepository.SHALLOW_SNAPSHOT_V2; import static org.opensearch.repositories.blobstore.BlobStoreRepository.SYSTEM_REPOSITORY_SETTING; /** @@ -123,6 +126,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C private final RepositoriesStatsArchive repositoriesStatsArchive; private final ClusterManagerTaskThrottler.ThrottlingKey putRepositoryTaskKey; private final ClusterManagerTaskThrottler.ThrottlingKey deleteRepositoryTaskKey; + private final Settings settings; public RepositoriesService( Settings settings, @@ -132,6 +136,7 @@ public RepositoriesService( Map internalTypesRegistry, ThreadPool threadPool ) { + this.settings = settings; this.typesRegistry = typesRegistry; this.internalTypesRegistry = internalTypesRegistry; this.clusterService = clusterService; @@ -173,7 +178,7 @@ public void registerOrUpdateRepository(final PutRepositoryRequest request, final CryptoMetadata.fromRequest(request.cryptoSettings()) ); validate(request.name()); - validateRepositoryMetadataSettings(clusterService, request.name(), request.settings()); + validateRepositoryMetadataSettings(clusterService, request.name(), request.settings(), repositories, settings, this); if (newRepositoryMetadata.cryptoMetadata() != null) { validate(newRepositoryMetadata.cryptoMetadata().keyProviderName()); } @@ -684,7 +689,10 @@ public static void validate(final String identifier) { public static void validateRepositoryMetadataSettings( ClusterService clusterService, final String repositoryName, - final Settings repositoryMetadataSettings + final Settings repositoryMetadataSettings, + Map repositories, + Settings settings, + RepositoriesService repositoriesService ) { // We can add more validations here for repository settings in the future. Version minVersionInCluster = clusterService.state().getNodes().getMinNodeVersion(); @@ -699,6 +707,51 @@ public static void validateRepositoryMetadataSettings( + minVersionInCluster ); } + if (SHALLOW_SNAPSHOT_V2.get(repositoryMetadataSettings)) { + if (minVersionInCluster.onOrAfter(Version.V_2_17_0) == false) { + throw new RepositoryException( + repositoryName, + "setting " + + SHALLOW_SNAPSHOT_V2.getKey() + + " cannot be enabled as some of the nodes in cluster are on version older than " + + Version.V_2_17_0 + + ". Minimum node version in cluster is: " + + minVersionInCluster + ); + } + if (repositoryName.contains(SnapshotsService.SNAPSHOT_PINNED_TIMESTAMP_DELIMITER)) { + throw new RepositoryException( + repositoryName, + "setting " + + SHALLOW_SNAPSHOT_V2.getKey() + + " cannot be enabled for repository with " + + SnapshotsService.SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + + " in the name as this delimiter is used to create pinning entity" + ); + } + if (repositoryWithShallowV2Exists(repositories)) { + throw new RepositoryException( + repositoryName, + "setting " + + SHALLOW_SNAPSHOT_V2.getKey() + + " cannot be enabled as this setting can be enabled only on one repository " + + " and one or more repositories in the cluster have the setting as enabled" + ); + } + try { + if (pinnedTimestampExistsWithDifferentRepository(repositoryName, settings, repositoriesService)) { + throw new RepositoryException( + repositoryName, + "setting " + + SHALLOW_SNAPSHOT_V2.getKey() + + " cannot be enabled if there are existing snapshots created with shallow V2 " + + "setting using different repository." + ); + } + } catch (IOException e) { + throw new RepositoryException(repositoryName, "Exception while fetching pinned timestamp details"); + } + } // Validation to not allow users to create system repository via put repository call. if (isSystemRepositorySettingPresent(repositoryMetadataSettings)) { throw new RepositoryException( @@ -710,6 +763,28 @@ public static void validateRepositoryMetadataSettings( } } + private static boolean repositoryWithShallowV2Exists(Map repositories) { + return repositories.values().stream().anyMatch(repo -> SHALLOW_SNAPSHOT_V2.get(repo.getMetadata().settings())); + } + + private static boolean pinnedTimestampExistsWithDifferentRepository( + String newRepoName, + Settings settings, + RepositoriesService repositoriesService + ) throws IOException { + Map> pinningEntityTimestampMap = RemoteStorePinnedTimestampService.fetchPinnedTimestamps( + settings, + repositoriesService + ); + for (String pinningEntity : pinningEntityTimestampMap.keySet()) { + String repoNameWithPinnedTimestamps = pinningEntity.split(SnapshotsService.SNAPSHOT_PINNED_TIMESTAMP_DELIMITER)[0]; + if (repoNameWithPinnedTimestamps.equals(newRepoName) == false) { + return true; + } + } + return false; + } + private static void ensureRepositoryNotInUse(ClusterState clusterState, String repository) { if (isRepositoryInUse(clusterState, repository)) { throw new IllegalStateException("trying to modify or unregister repository that is currently used"); diff --git a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRemoteIndexTests.java b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRemoteIndexTests.java index 9cca495cced72..e280ab8c7a73c 100644 --- a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRemoteIndexTests.java +++ b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRemoteIndexTests.java @@ -33,6 +33,8 @@ package org.opensearch.repositories.blobstore; import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse; +import org.opensearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse; +import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.client.Client; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.common.settings.Settings; @@ -41,13 +43,16 @@ import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.index.IndexSettings; import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot; +import org.opensearch.indices.RemoteStoreSettings; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.RepositoryData; +import org.opensearch.repositories.RepositoryException; import org.opensearch.repositories.fs.FsRepository; import org.opensearch.snapshots.SnapshotId; import org.opensearch.snapshots.SnapshotInfo; +import org.opensearch.snapshots.SnapshotsService; import org.opensearch.test.OpenSearchIntegTestCase; import java.io.IOException; @@ -64,6 +69,9 @@ import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.opensearch.repositories.blobstore.BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY; +import static org.opensearch.repositories.blobstore.BlobStoreRepository.SHALLOW_SNAPSHOT_V2; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; /** @@ -81,6 +89,7 @@ protected Settings nodeSettings() { .put(Environment.PATH_HOME_SETTING.getKey(), tempDir) .put(Environment.PATH_REPO_SETTING.getKey(), tempDir.resolve("repo")) .put(Environment.PATH_SHARED_DATA_SETTING.getKey(), tempDir.getParent()) + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true) .build(); } @@ -373,4 +382,119 @@ public void testRetrieveShallowCopySnapshotCase2() throws IOException { assertThat(snapshotIds, equalTo(originalSnapshots)); } + public void testRepositoryCreationShallowV2() throws Exception { + Client client = client(); + + Settings snapshotRepoSettings1 = Settings.builder() + .put(node().settings()) + .put("location", OpenSearchIntegTestCase.randomRepoPath(node().settings())) + .put(REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(SHALLOW_SNAPSHOT_V2.getKey(), true) + .build(); + + String invalidRepoName = "test" + SnapshotsService.SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + "repo-1"; + try { + createRepository(client, invalidRepoName, snapshotRepoSettings1); + } catch (RepositoryException e) { + assertEquals( + "[" + + invalidRepoName + + "] setting shallow_snapshot_v2 cannot be enabled for repository with __ in the name as this delimiter is used to create pinning entity", + e.getMessage() + ); + } + + // Create repo with shallow snapshot V2 enabled + createRepository(client, "test-repo-1", snapshotRepoSettings1); + + logger.info("--> verify the repository"); + VerifyRepositoryResponse verifyRepositoryResponse = client.admin().cluster().prepareVerifyRepository("test-repo-1").get(); + assertNotNull(verifyRepositoryResponse.getNodes()); + + GetRepositoriesResponse getRepositoriesResponse = client.admin().cluster().prepareGetRepositories("test-repo-1").get(); + assertTrue(SHALLOW_SNAPSHOT_V2.get(getRepositoriesResponse.repositories().get(0).settings())); + + Settings snapshotRepoSettings2 = Settings.builder() + .put(node().settings()) + .put("location", OpenSearchIntegTestCase.randomRepoPath(node().settings())) + .put(REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(SHALLOW_SNAPSHOT_V2.getKey(), true) + .build(); + + // Create another repo with shallow snapshot V2 enabled, this should fail. + try { + createRepository(client, "test-repo-2", snapshotRepoSettings2); + } catch (RepositoryException e) { + assertEquals( + "[test-repo-2] setting shallow_snapshot_v2 cannot be enabled as this setting can be enabled only on one repository and one or more repositories in the cluster have the setting as enabled", + e.getMessage() + ); + } + + // Disable shallow snapshot V2 setting on test-repo-1 + updateRepository( + client, + "test-repo-1", + Settings.builder().put(snapshotRepoSettings1).put(SHALLOW_SNAPSHOT_V2.getKey(), false).build() + ); + getRepositoriesResponse = client.admin().cluster().prepareGetRepositories("test-repo-1").get(); + assertFalse(SHALLOW_SNAPSHOT_V2.get(getRepositoriesResponse.repositories().get(0).settings())); + + // Create test-repo-2 with shallow snapshot V2 enabled, this should pass now. + createRepository(client, "test-repo-2", snapshotRepoSettings2); + getRepositoriesResponse = client.admin().cluster().prepareGetRepositories("test-repo-2").get(); + assertTrue(SHALLOW_SNAPSHOT_V2.get(getRepositoriesResponse.repositories().get(0).settings())); + + final String indexName = "test-idx"; + createIndex(indexName); + ensureGreen(); + indexDocuments(client, indexName); + + // Create pinned timestamp snapshot in test-repo-2 + SnapshotInfo snapshotInfo = createSnapshot("test-repo-2", "test-snap-2", new ArrayList<>()); + assertNotNull(snapshotInfo.snapshotId()); + + // As snapshot is present, even after disabling shallow snapshot setting in test-repo-2, we will not be able to + // enable shallow snapshot v2 setting in test-repo-1 + updateRepository( + client, + "test-repo-2", + Settings.builder().put(snapshotRepoSettings2).put(SHALLOW_SNAPSHOT_V2.getKey(), false).build() + ); + getRepositoriesResponse = client.admin().cluster().prepareGetRepositories("test-repo-2").get(); + assertFalse(SHALLOW_SNAPSHOT_V2.get(getRepositoriesResponse.repositories().get(0).settings())); + + try { + updateRepository(client, "test-repo-1", snapshotRepoSettings1); + } catch (RepositoryException e) { + assertEquals( + "[test-repo-1] setting shallow_snapshot_v2 cannot be enabled if there are existing snapshots created with shallow V2 setting using different repository.", + e.getMessage() + ); + } + + // After deleting the snapshot, we will be able to enable shallow snapshot v2 setting in test-repo-1 + AcknowledgedResponse deleteSnapshotResponse = client().admin().cluster().prepareDeleteSnapshot("test-repo-2", "test-snap-2").get(); + + assertAcked(deleteSnapshotResponse); + + updateRepository(client, "test-repo-1", snapshotRepoSettings1); + getRepositoriesResponse = client.admin().cluster().prepareGetRepositories("test-repo-1").get(); + assertTrue(SHALLOW_SNAPSHOT_V2.get(getRepositoriesResponse.repositories().get(0).settings())); + + // Having a snapshot in the same repo should allow disabling and re-enabling shallow snapshot v2 setting + snapshotInfo = createSnapshot("test-repo-1", "test-snap-1", new ArrayList<>()); + assertNotNull(snapshotInfo.snapshotId()); + updateRepository( + client, + "test-repo-1", + Settings.builder().put(snapshotRepoSettings1).put(SHALLOW_SNAPSHOT_V2.getKey(), false).build() + ); + getRepositoriesResponse = client.admin().cluster().prepareGetRepositories("test-repo-1").get(); + assertFalse(SHALLOW_SNAPSHOT_V2.get(getRepositoriesResponse.repositories().get(0).settings())); + + updateRepository(client, "test-repo-1", snapshotRepoSettings1); + getRepositoriesResponse = client.admin().cluster().prepareGetRepositories("test-repo-1").get(); + assertTrue(SHALLOW_SNAPSHOT_V2.get(getRepositoriesResponse.repositories().get(0).settings())); + } } From 4223fab40bedd0571324f76f8838ccea4a8b1b8a Mon Sep 17 00:00:00 2001 From: Sagar <99425694+sgup432@users.noreply.github.com> Date: Thu, 12 Sep 2024 15:31:46 -0700 Subject: [PATCH 02/14] Removing task Id from map using remove() (#15918) Signed-off-by: Sagar Upadhyaya --- .../org/opensearch/tasks/TaskCancellationMonitoringService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/tasks/TaskCancellationMonitoringService.java b/server/src/main/java/org/opensearch/tasks/TaskCancellationMonitoringService.java index 343d4571593a7..2040703d88c38 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskCancellationMonitoringService.java +++ b/server/src/main/java/org/opensearch/tasks/TaskCancellationMonitoringService.java @@ -125,7 +125,7 @@ public void onTaskCompleted(Task task) { if (!TASKS_TO_TRACK.contains(task.getClass())) { return; } - this.cancelledTaskTracker.entrySet().removeIf(entry -> entry.getKey() == task.getId()); + this.cancelledTaskTracker.remove(task.getId()); } /** From 07029b2513f0cffbd788d95836dbc6cff0b4dbf4 Mon Sep 17 00:00:00 2001 From: Navneet Verma Date: Fri, 13 Sep 2024 05:53:57 -0700 Subject: [PATCH 03/14] Add the capability to override indices.breaker.total.use_real_memory setting for test clusters (#15906) Signed-off-by: Navneet Verma --- .../org/opensearch/gradle/testclusters/OpenSearchNode.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchNode.java b/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchNode.java index 268de50340cbf..cd22560af9a96 100644 --- a/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchNode.java +++ b/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchNode.java @@ -116,7 +116,12 @@ public class OpenSearchNode implements TestClusterConfiguration { private static final TimeUnit NODE_UP_TIMEOUT_UNIT = TimeUnit.MINUTES; private static final int ADDITIONAL_CONFIG_TIMEOUT = 15; private static final TimeUnit ADDITIONAL_CONFIG_TIMEOUT_UNIT = TimeUnit.SECONDS; - private static final List OVERRIDABLE_SETTINGS = Arrays.asList("path.repo", "discovery.seed_providers", "discovery.seed_hosts"); + private static final List OVERRIDABLE_SETTINGS = Arrays.asList( + "path.repo", + "discovery.seed_providers", + "discovery.seed_hosts", + "indices.breaker.total.use_real_memory" + ); private static final int TAIL_LOG_MESSAGES_COUNT = 40; private static final List MESSAGES_WE_DONT_CARE_ABOUT = Arrays.asList( From 260edc5f13debc04858da4e73dfc396eeb459ef9 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Fri, 13 Sep 2024 10:44:16 -0400 Subject: [PATCH 04/14] Fix flaky org.opensearch.rest.ReactorNetty4StreamingStressIT.testCloseClientStreamingRequest test case (#15859) Signed-off-by: Andriy Redko --- .../rest/ReactorNetty4StreamingStressIT.java | 56 +++++++------------ 1 file changed, 21 insertions(+), 35 deletions(-) diff --git a/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingStressIT.java b/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingStressIT.java index a978af1b11db4..a853ed56fad32 100644 --- a/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingStressIT.java +++ b/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingStressIT.java @@ -8,7 +8,6 @@ package org.opensearch.rest; -import org.apache.hc.core5.http.ConnectionClosedException; import org.opensearch.client.Request; import org.opensearch.client.Response; import org.opensearch.client.StreamingRequest; @@ -16,24 +15,20 @@ import org.opensearch.test.rest.OpenSearchRestTestCase; import org.junit.After; +import java.io.IOException; import java.io.InterruptedIOException; +import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; import reactor.core.publisher.Flux; -import reactor.test.subscriber.TestSubscriber; +import reactor.test.StepVerifier; +import reactor.test.scheduler.VirtualTimeScheduler; -import static org.hamcrest.CoreMatchers.anyOf; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.not; -import static org.hamcrest.collection.IsEmptyCollection.empty; public class ReactorNetty4StreamingStressIT extends OpenSearchRestTestCase { @After @@ -49,6 +44,8 @@ public void tearDown() throws Exception { } public void testCloseClientStreamingRequest() throws Exception { + final VirtualTimeScheduler scheduler = VirtualTimeScheduler.create(true); + final AtomicInteger id = new AtomicInteger(0); final Stream stream = Stream.generate( () -> "{ \"index\": { \"_index\": \"test-stress-streaming\", \"_id\": \"" @@ -57,39 +54,28 @@ public void testCloseClientStreamingRequest() throws Exception { + "{ \"name\": \"josh\" }\n" ); + final Duration delay = Duration.ofMillis(1); final StreamingRequest streamingRequest = new StreamingRequest<>( "POST", "/_bulk/stream", - Flux.fromStream(stream).delayElements(Duration.ofMillis(500)).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8))) + Flux.fromStream(stream).delayElements(delay, scheduler).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8))) ); streamingRequest.addParameter("refresh", "true"); final StreamingResponse streamingResponse = client().streamRequest(streamingRequest); - TestSubscriber subscriber = TestSubscriber.create(); - streamingResponse.getBody().subscribe(subscriber); - - final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - try { - // Await for subscriber to receive at least one chunk - assertBusy(() -> assertThat(subscriber.getReceivedOnNext(), not(empty()))); - - // Close client forceably - executor.schedule(() -> { - client().close(); - return null; - }, 2, TimeUnit.SECONDS); + scheduler.advanceTimeBy(delay); /* emit first element */ - // Await for subscriber to terminate - subscriber.block(Duration.ofSeconds(10)); - assertThat( - subscriber.expectTerminalError(), - anyOf(instanceOf(InterruptedIOException.class), instanceOf(ConnectionClosedException.class)) - ); - } finally { - executor.shutdown(); - if (executor.awaitTermination(1, TimeUnit.SECONDS) == false) { - executor.shutdownNow(); - } - } + StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8))) + .expectNextMatches(s -> s.contains("\"result\":\"created\"") && s.contains("\"_id\":\"1\"")) + .then(() -> { + try { + client().close(); + } catch (final IOException ex) { + throw new UncheckedIOException(ex); + } + }) + .then(() -> scheduler.advanceTimeBy(delay)) + .expectErrorMatches(t -> t instanceof InterruptedIOException) + .verify(); } } From 36c89bf4f975bd2750423d1308d2cd3408d8553f Mon Sep 17 00:00:00 2001 From: Finn Date: Fri, 13 Sep 2024 08:00:37 -0700 Subject: [PATCH 05/14] Update TESTING.md 'Building with extra plugins' documentation (#15893) * Update TESTING.md 'Building with extra plugins' documentation Signed-off-by: Finn Carroll * Typo Signed-off-by: Finn Carroll * Fix TOC Signed-off-by: Finn Carroll --------- Signed-off-by: Finn Carroll --- TESTING.md | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/TESTING.md b/TESTING.md index de7ab3eefe2f8..9127f005ced75 100644 --- a/TESTING.md +++ b/TESTING.md @@ -39,7 +39,7 @@ OpenSearch uses [jUnit](https://junit.org/junit5/) for testing, it also uses ran - [Expect a specific segment topology](#expect-a-specific-segment-topology) - [Leave environment in an unstable state after test](#leave-environment-in-an-unstable-state-after-test) - [Test coverage analysis](#test-coverage-analysis) -- [Building with extra plugins](#building-with-extra-plugins) +- [Testing with plugins](#testing-with-plugins) - [Environment misc](#environment-misc) # Requirements @@ -552,11 +552,17 @@ Apart from using Gradle, it is also possible to gain insight in code coverage us Please read your IDE documentation for how to attach a debugger to a JVM process. -# Building with extra plugins +# Testing with plugins -Additional plugins may be built alongside OpenSearch, where their dependency on OpenSearch will be substituted with the local OpenSearch build. To add your plugin, create a directory called `opensearch-extra` as a sibling of OpenSearch. Checkout your plugin underneath `opensearch-extra` and the build will automatically pick it up. You can verify the plugin is included as part of the build by checking the projects of the build. +To test a plugin with a custom build of OpenSearch, build OpenSearch and use the `customDistributionUrl` setting supported by each plugin to override the OpenSearch distribution. - ./gradlew projects +For example, in your OpenSearch repository assemble a custom distribution. + + ./gradlew :distribution:archives:linux-tar:assemble + +Then in your plugin repository, substitute in your OpenSearch build + + ./gradlew run -PcustomDistributionUrl="/distribution/archives/linux-tar/build/distributions/opensearch-min-3.0.0-SNAPSHOT-linux-x64.tar.gz" # Environment misc From aaa92aeeb75a745ee7b058cc4b30b734e861cfc4 Mon Sep 17 00:00:00 2001 From: "Samuel.G" <1148690954@qq.com> Date: Sat, 14 Sep 2024 04:54:02 +0800 Subject: [PATCH 06/14] Fix case insensitive query on wildcard field (#15882) * fix case insensitive query on wildcard field Signed-off-by: gesong.samuel * fix YAML test Signed-off-by: gesong.samuel * add change log Signed-off-by: gesong.samuel --------- Signed-off-by: gesong.samuel Co-authored-by: gesong.samuel --- CHANGELOG.md | 2 +- .../search/270_wildcard_fieldtype_queries.yml | 41 +++++++++++++++++-- .../index/mapper/WildcardFieldMapper.java | 15 +++++-- 3 files changed, 49 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3268852cc99f9..ec4ac7504b9c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,7 +25,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Fixed - Fix wildcard query containing escaped character ([#15737](https://github.com/opensearch-project/OpenSearch/pull/15737)) - +- Fix case-insensitive query on wildcard field ([#15882](https://github.com/opensearch-project/OpenSearch/pull/15882)) ### Security [Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.17...2.x diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search/270_wildcard_fieldtype_queries.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search/270_wildcard_fieldtype_queries.yml index c449a06cf633d..d92538824232d 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search/270_wildcard_fieldtype_queries.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search/270_wildcard_fieldtype_queries.yml @@ -56,6 +56,12 @@ setup: id: 6 body: other_field: "test" + - do: + index: + index: test + id: 7 + body: + my_field: "ABCD" - do: indices.refresh: {} @@ -90,8 +96,9 @@ setup: query: term: my_field.lower: "abcd" - - match: { hits.total.value: 1 } + - match: { hits.total.value: 2 } - match: { hits.hits.0._id: "5" } + - match: { hits.hits.1._id: "7" } - do: search: @@ -100,8 +107,9 @@ setup: query: term: my_field.lower: "ABCD" - - match: { hits.total.value: 1 } + - match: { hits.total.value: 2 } - match: { hits.hits.0._id: "5" } + - match: { hits.hits.1._id: "7" } - do: search: @@ -215,7 +223,7 @@ setup: wildcard: my_field: value: "*" - - match: { hits.total.value: 5 } + - match: { hits.total.value: 6 } --- "regexp match-all works": - do: @@ -226,7 +234,7 @@ setup: regexp: my_field: value: ".*" - - match: { hits.total.value: 5 } + - match: { hits.total.value: 6 } --- "terms query on wildcard field matches": - do: @@ -237,3 +245,28 @@ setup: terms: { my_field: ["AbCd"] } - match: { hits.total.value: 1 } - match: { hits.hits.0._id: "5" } +--- +"case insensitive query on wildcard field": + - do: + search: + index: test + body: + query: + wildcard: + my_field: + value: "AbCd" + - match: { hits.total.value: 1 } + - match: { hits.hits.0._id: "5" } + + - do: + search: + index: test + body: + query: + wildcard: + my_field: + value: "AbCd" + case_insensitive: true + - match: { hits.total.value: 2 } + - match: { hits.hits.0._id: "5" } + - match: { hits.hits.1._id: "7" } diff --git a/server/src/main/java/org/opensearch/index/mapper/WildcardFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/WildcardFieldMapper.java index 0cb416a9b8370..e43e3bda692e7 100644 --- a/server/src/main/java/org/opensearch/index/mapper/WildcardFieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/WildcardFieldMapper.java @@ -40,6 +40,7 @@ import org.apache.lucene.util.automaton.RegExp; import org.opensearch.common.lucene.BytesRefs; import org.opensearch.common.lucene.Lucene; +import org.opensearch.common.lucene.search.AutomatonQueries; import org.opensearch.common.unit.Fuzziness; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.index.analysis.IndexAnalyzers; @@ -464,7 +465,7 @@ public Query wildcardQuery(String value, MultiTermQuery.RewriteMethod method, bo return existsQuery(context); } } else { - approximation = matchAllTermsQuery(name(), requiredNGrams); + approximation = matchAllTermsQuery(name(), requiredNGrams, caseInsensitive); } return new WildcardMatchingQuery(name(), approximation, matchPredicate, value, context, this); } @@ -678,7 +679,7 @@ public Query termsQuery(List values, QueryShardContext context) { StringBuilder pattern = new StringBuilder(); for (Object value : values) { String stringVal = BytesRefs.toString(value); - builder.add(matchAllTermsQuery(name(), getRequiredNGrams(stringVal)), BooleanClause.Occur.SHOULD); + builder.add(matchAllTermsQuery(name(), getRequiredNGrams(stringVal), false), BooleanClause.Occur.SHOULD); expectedValues.add(stringVal); if (pattern.length() > 0) { pattern.append('|'); @@ -688,10 +689,16 @@ public Query termsQuery(List values, QueryShardContext context) { return new WildcardMatchingQuery(name(), builder.build(), expectedValues::contains, pattern.toString(), context, this); } - private static BooleanQuery matchAllTermsQuery(String fieldName, Set terms) { + private static BooleanQuery matchAllTermsQuery(String fieldName, Set terms, boolean caseInsensitive) { BooleanQuery.Builder matchAllTermsBuilder = new BooleanQuery.Builder(); + Query query; for (String term : terms) { - matchAllTermsBuilder.add(new TermQuery(new Term(fieldName, term)), BooleanClause.Occur.FILTER); + if (caseInsensitive) { + query = AutomatonQueries.caseInsensitiveTermQuery(new Term(fieldName, term)); + } else { + query = new TermQuery(new Term(fieldName, term)); + } + matchAllTermsBuilder.add(query, BooleanClause.Occur.FILTER); } return matchAllTermsBuilder.build(); } From bd26056669f41826a500f944869b198149c476a2 Mon Sep 17 00:00:00 2001 From: Lakshya Taragi <157457166+ltaragi@users.noreply.github.com> Date: Mon, 16 Sep 2024 14:58:07 +0530 Subject: [PATCH 07/14] Remove TooManyShardsInSnapshotsStatusException (#15877) Signed-off-by: Lakshya Taragi --- .../opensearch/OpenSearchServerException.java | 8 --- ...oManyShardsInSnapshotsStatusException.java | 69 ------------------- .../ExceptionSerializationTests.java | 2 - 3 files changed, 79 deletions(-) delete mode 100644 server/src/main/java/org/opensearch/snapshots/TooManyShardsInSnapshotsStatusException.java diff --git a/server/src/main/java/org/opensearch/OpenSearchServerException.java b/server/src/main/java/org/opensearch/OpenSearchServerException.java index a1fc61834f69b..b0ab62259d5c0 100644 --- a/server/src/main/java/org/opensearch/OpenSearchServerException.java +++ b/server/src/main/java/org/opensearch/OpenSearchServerException.java @@ -1210,14 +1210,6 @@ public static void registerExceptions() { V_2_17_0 ) ); - registerExceptionHandle( - new OpenSearchExceptionHandle( - org.opensearch.snapshots.TooManyShardsInSnapshotsStatusException.class, - org.opensearch.snapshots.TooManyShardsInSnapshotsStatusException::new, - 175, - V_2_17_0 - ) - ); registerExceptionHandle( new OpenSearchExceptionHandle( org.opensearch.cluster.block.IndexCreateBlockException.class, diff --git a/server/src/main/java/org/opensearch/snapshots/TooManyShardsInSnapshotsStatusException.java b/server/src/main/java/org/opensearch/snapshots/TooManyShardsInSnapshotsStatusException.java deleted file mode 100644 index 1689b3e4941ec..0000000000000 --- a/server/src/main/java/org/opensearch/snapshots/TooManyShardsInSnapshotsStatusException.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.snapshots; - -import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.core.rest.RestStatus; - -import java.io.IOException; - -/** - * Thrown if the number of shards across the requested resources (snapshot(s) or the index/indices of a particular snapshot) - * breaches the limit of snapshot.max_shards_allowed_in_status_api cluster setting - * - * @opensearch.internal - */ -public class TooManyShardsInSnapshotsStatusException extends SnapshotException { - - public TooManyShardsInSnapshotsStatusException( - final String repositoryName, - final SnapshotId snapshotId, - final String message, - final Throwable cause - ) { - super(repositoryName, snapshotId, message, cause); - } - - public TooManyShardsInSnapshotsStatusException(final String repositoryName, final String message, String... snapshotName) { - super(repositoryName, String.join(", ", snapshotName), message); - } - - public TooManyShardsInSnapshotsStatusException(StreamInput in) throws IOException { - super(in); - } - - @Override - public RestStatus status() { - return RestStatus.REQUEST_ENTITY_TOO_LARGE; - } -} diff --git a/server/src/test/java/org/opensearch/ExceptionSerializationTests.java b/server/src/test/java/org/opensearch/ExceptionSerializationTests.java index eff312a36dbc6..2e4a2d7bdd59c 100644 --- a/server/src/test/java/org/opensearch/ExceptionSerializationTests.java +++ b/server/src/test/java/org/opensearch/ExceptionSerializationTests.java @@ -119,7 +119,6 @@ import org.opensearch.snapshots.SnapshotId; import org.opensearch.snapshots.SnapshotInProgressException; import org.opensearch.snapshots.SnapshotInUseDeletionException; -import org.opensearch.snapshots.TooManyShardsInSnapshotsStatusException; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.VersionUtils; import org.opensearch.transport.ActionNotFoundTransportException; @@ -899,7 +898,6 @@ public void testIds() { ids.put(172, ViewNotFoundException.class); ids.put(173, ViewAlreadyExistsException.class); ids.put(174, InvalidIndexContextException.class); - ids.put(175, TooManyShardsInSnapshotsStatusException.class); ids.put(10001, IndexCreateBlockException.class); Map, Integer> reverse = new HashMap<>(); From c709400543a16d7f7d6d644f26bb2f7a10b90caf Mon Sep 17 00:00:00 2001 From: Lakshya Taragi <157457166+ltaragi@users.noreply.github.com> Date: Mon, 16 Sep 2024 15:28:29 +0530 Subject: [PATCH 08/14] Revert "Mute failing snapshot status tests (#15652)" (#15876) This reverts commit 48cf5f061094d89aaa564b97667f4965556a8e49. Signed-off-by: Lakshya Taragi --- .../test/snapshot.status/10_basic.yml | 70 +++++++++---------- 1 file changed, 34 insertions(+), 36 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/snapshot.status/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/snapshot.status/10_basic.yml index 819f04407b219..c35f2419bdc91 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/snapshot.status/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/snapshot.status/10_basic.yml @@ -25,40 +25,38 @@ setup: snapshot: test_snapshot wait_for_completion: true -# TODO: fix and unmute tests + - do: + snapshot.status: + repository: test_repo_status_1 + snapshot: test_snapshot + + - is_true: snapshots + - match: { snapshots.0.snapshot: test_snapshot } + - match: { snapshots.0.state: SUCCESS } + - gt: { snapshots.0.stats.incremental.file_count: 0 } + - gt: { snapshots.0.stats.incremental.size_in_bytes: 0 } + - gt: { snapshots.0.stats.total.file_count: 0 } + - gt: { snapshots.0.stats.total.size_in_bytes: 0 } + - is_true: snapshots.0.stats.start_time_in_millis +## fast in memory snapshots can take less than one millisecond to complete. + - gte: { snapshots.0.stats.time_in_millis: 0 } + +--- +"Get missing snapshot status throws an exception": + + - do: + catch: /snapshot_missing_exception.+ is missing/ + snapshot.status: + repository: test_repo_status_1 + snapshot: test_nonexistent_snapshot + +--- +"Get missing snapshot status succeeds when ignoreUnavailable is true": + + - do: + snapshot.status: + repository: test_repo_status_1 + snapshot: test_nonexistent_snapshot + ignore_unavailable: true -# - do: -# snapshot.status: -# repository: test_repo_status_1 -# snapshot: test_snapshot -# -# - is_true: snapshots -# - match: { snapshots.0.snapshot: test_snapshot } -# - match: { snapshots.0.state: SUCCESS } -# - gt: { snapshots.0.stats.incremental.file_count: 0 } -# - gt: { snapshots.0.stats.incremental.size_in_bytes: 0 } -# - gt: { snapshots.0.stats.total.file_count: 0 } -# - gt: { snapshots.0.stats.total.size_in_bytes: 0 } -# - is_true: snapshots.0.stats.start_time_in_millis -### fast in memory snapshots can take less than one millisecond to complete. -# - gte: { snapshots.0.stats.time_in_millis: 0 } -# -#--- -#"Get missing snapshot status throws an exception": -# -# - do: -# catch: /snapshot_missing_exception.+ is missing/ -# snapshot.status: -# repository: test_repo_status_1 -# snapshot: test_nonexistent_snapshot -# -#--- -#"Get missing snapshot status succeeds when ignoreUnavailable is true": -# -# - do: -# snapshot.status: -# repository: test_repo_status_1 -# snapshot: test_nonexistent_snapshot -# ignore_unavailable: true -# -# - is_true: snapshots + - is_true: snapshots From 45a8ed41f0c67f8853e36d95c1cc05d921b92739 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 16 Sep 2024 11:09:17 -0400 Subject: [PATCH 09/14] Bump com.gradle.develocity from 3.18 to 3.18.1 (#15947) Bumps com.gradle.develocity from 3.18 to 3.18.1. --- updated-dependencies: - dependency-name: com.gradle.develocity dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- settings.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/settings.gradle b/settings.gradle index b79c2aee135fc..8412d198a2a29 100644 --- a/settings.gradle +++ b/settings.gradle @@ -10,7 +10,7 @@ */ plugins { - id "com.gradle.develocity" version "3.18" + id "com.gradle.develocity" version "3.18.1" } ext.disableBuildCache = hasProperty('DISABLE_BUILD_CACHE') || System.getenv().containsKey('DISABLE_BUILD_CACHE') From e3bbc74868dcba47b4231c8d6e095c2721d019e3 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 16 Sep 2024 11:40:39 -0400 Subject: [PATCH 10/14] Bump ch.qos.logback:logback-core from 1.5.6 to 1.5.8 in /test/fixtures/hdfs-fixture (#15946) * Bump ch.qos.logback:logback-core in /test/fixtures/hdfs-fixture Bumps [ch.qos.logback:logback-core](https://github.com/qos-ch/logback) from 1.5.6 to 1.5.8. - [Commits](https://github.com/qos-ch/logback/compare/v_1.5.6...v_1.5.8) --- updated-dependencies: - dependency-name: ch.qos.logback:logback-core dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] * Update changelog Signed-off-by: dependabot[bot] --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: dependabot[bot] --- CHANGELOG.md | 1 + test/fixtures/hdfs-fixture/build.gradle | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ec4ac7504b9c2..cc313fdcdb849 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `org.apache.logging.log4j:log4j-core` from 2.23.1 to 2.24.0 ([#15858](https://github.com/opensearch-project/OpenSearch/pull/15858)) - Bump `peter-evans/create-pull-request` from 6 to 7 ([#15863](https://github.com/opensearch-project/OpenSearch/pull/15863)) - Bump `com.nimbusds:oauth2-oidc-sdk` from 11.9.1 to 11.19.1 ([#15862](https://github.com/opensearch-project/OpenSearch/pull/15862)) +- Bump `ch.qos.logback:logback-core` from 1.5.6 to 1.5.8 ([#15946](https://github.com/opensearch-project/OpenSearch/pull/15946)) ### Changed diff --git a/test/fixtures/hdfs-fixture/build.gradle b/test/fixtures/hdfs-fixture/build.gradle index b5cd12ef0c11f..20fd1058a181d 100644 --- a/test/fixtures/hdfs-fixture/build.gradle +++ b/test/fixtures/hdfs-fixture/build.gradle @@ -74,7 +74,7 @@ dependencies { api 'org.apache.zookeeper:zookeeper:3.9.2' api "org.apache.commons:commons-text:1.12.0" api "commons-net:commons-net:3.11.1" - api "ch.qos.logback:logback-core:1.5.6" + api "ch.qos.logback:logback-core:1.5.8" api "ch.qos.logback:logback-classic:1.2.13" api "org.jboss.xnio:xnio-nio:3.8.16.Final" api 'org.jline:jline:3.26.3' From 81288b1044348d99cce6d23770755a464121f7cc Mon Sep 17 00:00:00 2001 From: Himshikha Gupta Date: Wed, 18 Sep 2024 00:28:59 +0530 Subject: [PATCH 11/14] remote publication checksum stats (#15957) * Remote publication checksum stats Signed-off-by: Himshikha Gupta Co-authored-by: Himshikha Gupta --- .../remote/RemoteStatePublicationIT.java | 19 ++++++++++ .../remote/RemoteClusterStateService.java | 6 ++++ .../gateway/remote/RemoteDownloadStats.java | 36 +++++++++++++++++++ .../remote/RemotePersistenceStats.java | 24 ++++++++++--- .../RemoteClusterStateServiceTests.java | 10 ++++++ 5 files changed, 91 insertions(+), 4 deletions(-) create mode 100644 server/src/main/java/org/opensearch/gateway/remote/RemoteDownloadStats.java diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java index faab3645ae894..ffb9352e8ba47 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java @@ -56,6 +56,7 @@ import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING; import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; +import static org.opensearch.gateway.remote.RemoteDownloadStats.CHECKSUM_VALIDATION_FAILED_COUNT; import static org.opensearch.gateway.remote.model.RemoteClusterBlocks.CLUSTER_BLOCKS; import static org.opensearch.gateway.remote.model.RemoteCoordinationMetadata.COORDINATION_METADATA; import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_METADATA; @@ -405,10 +406,28 @@ private void assertDataNodeDownloadStats(NodesStatsResponse nodesStatsResponse) assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getSuccessCount() > 0); assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getFailedCount()); assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getTotalTimeInMillis() > 0); + assertEquals( + 0, + dataNodeDiscoveryStats.getClusterStateStats() + .getPersistenceStats() + .get(0) + .getExtendedFields() + .get(CHECKSUM_VALIDATION_FAILED_COUNT) + .get() + ); assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getSuccessCount() > 0); assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getFailedCount()); assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getTotalTimeInMillis() > 0); + assertEquals( + 0, + dataNodeDiscoveryStats.getClusterStateStats() + .getPersistenceStats() + .get(1) + .getExtendedFields() + .get(CHECKSUM_VALIDATION_FAILED_COUNT) + .get() + ); } private Map getMetadataFiles(BlobStoreRepository repository, String subDirectory) throws IOException { diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 12d10fd908b44..ece29180f9cf5 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -1644,6 +1644,12 @@ void validateClusterStateFromChecksum( failedValidation ) ); + if (isFullStateDownload) { + remoteStateStats.stateFullDownloadValidationFailed(); + } else { + remoteStateStats.stateDiffDownloadValidationFailed(); + } + if (isFullStateDownload && remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.FAILURE)) { throw new IllegalStateException( "Cluster state checksums do not match during full state read. Validation failed for " + failedValidation diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteDownloadStats.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteDownloadStats.java new file mode 100644 index 0000000000000..a8f4b33a19c37 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteDownloadStats.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.cluster.coordination.PersistedStateStats; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Download stats for remote state + * + * @opensearch.internal + */ +public class RemoteDownloadStats extends PersistedStateStats { + static final String CHECKSUM_VALIDATION_FAILED_COUNT = "checksum_validation_failed_count"; + private AtomicLong checksumValidationFailedCount = new AtomicLong(0); + + public RemoteDownloadStats(String statsName) { + super(statsName); + addToExtendedFields(CHECKSUM_VALIDATION_FAILED_COUNT, checksumValidationFailedCount); + } + + public void checksumValidationFailedCount() { + checksumValidationFailedCount.incrementAndGet(); + } + + public long getChecksumValidationFailedCount() { + return checksumValidationFailedCount.get(); + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java b/server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java index 417ebdafd3ba7..11f26ac8b3ed9 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java @@ -18,16 +18,16 @@ public class RemotePersistenceStats { RemoteUploadStats remoteUploadStats; - PersistedStateStats remoteDiffDownloadStats; - PersistedStateStats remoteFullDownloadStats; + RemoteDownloadStats remoteDiffDownloadStats; + RemoteDownloadStats remoteFullDownloadStats; final String FULL_DOWNLOAD_STATS = "remote_full_download"; final String DIFF_DOWNLOAD_STATS = "remote_diff_download"; public RemotePersistenceStats() { remoteUploadStats = new RemoteUploadStats(); - remoteDiffDownloadStats = new PersistedStateStats(DIFF_DOWNLOAD_STATS); - remoteFullDownloadStats = new PersistedStateStats(FULL_DOWNLOAD_STATS); + remoteDiffDownloadStats = new RemoteDownloadStats(DIFF_DOWNLOAD_STATS); + remoteFullDownloadStats = new RemoteDownloadStats(FULL_DOWNLOAD_STATS); } public void cleanUpAttemptFailed() { @@ -90,6 +90,22 @@ public void stateDiffDownloadFailed() { remoteDiffDownloadStats.stateFailed(); } + public void stateDiffDownloadValidationFailed() { + remoteDiffDownloadStats.checksumValidationFailedCount(); + } + + public void stateFullDownloadValidationFailed() { + remoteFullDownloadStats.checksumValidationFailedCount(); + } + + public long getStateDiffDownloadValidationFailed() { + return remoteDiffDownloadStats.getChecksumValidationFailedCount(); + } + + public long getStateFullDownloadValidationFailed() { + return remoteFullDownloadStats.getChecksumValidationFailedCount(); + } + public PersistedStateStats getUploadStats() { return remoteUploadStats; } diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index b11d5e48bec06..56857285fa8d3 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -3342,6 +3342,7 @@ public void testGetClusterStateForManifestWithChecksumValidationEnabledWithNullC anyString(), anyBoolean() ); + assertEquals(0, remoteClusterStateService.getRemoteStateStats().getStateFullDownloadValidationFailed()); } public void testGetClusterStateForManifestWithChecksumValidationEnabled() throws IOException { @@ -3374,6 +3375,7 @@ public void testGetClusterStateForManifestWithChecksumValidationEnabled() throws ); mockService.getClusterStateForManifest(ClusterName.DEFAULT.value(), manifest, NODE_ID, true); verify(mockService, times(1)).validateClusterStateFromChecksum(manifest, clusterState, ClusterName.DEFAULT.value(), NODE_ID, true); + assertEquals(0, remoteClusterStateService.getRemoteStateStats().getStateFullDownloadValidationFailed()); } public void testGetClusterStateForManifestWithChecksumValidationModeNone() throws IOException { @@ -3406,6 +3408,7 @@ public void testGetClusterStateForManifestWithChecksumValidationModeNone() throw ); mockService.getClusterStateForManifest(ClusterName.DEFAULT.value(), manifest, NODE_ID, true); verify(mockService, times(0)).validateClusterStateFromChecksum(any(), any(), anyString(), anyString(), anyBoolean()); + assertEquals(0, remoteClusterStateService.getRemoteStateStats().getStateFullDownloadValidationFailed()); } public void testGetClusterStateForManifestWithChecksumValidationEnabledWithMismatch() throws IOException { @@ -3448,6 +3451,7 @@ public void testGetClusterStateForManifestWithChecksumValidationEnabledWithMisma NODE_ID, true ); + assertEquals(1, remoteClusterStateService.getRemoteStateStats().getStateFullDownloadValidationFailed()); } public void testGetClusterStateForManifestWithChecksumValidationDebugWithMismatch() throws IOException { @@ -3494,6 +3498,7 @@ public void testGetClusterStateForManifestWithChecksumValidationDebugWithMismatc NODE_ID, true ); + assertEquals(1, remoteClusterStateService.getRemoteStateStats().getStateFullDownloadValidationFailed()); } public void testGetClusterStateUsingDiffWithChecksum() throws IOException { @@ -3535,6 +3540,7 @@ public void testGetClusterStateUsingDiffWithChecksum() throws IOException { eq(NODE_ID), eq(false) ); + assertEquals(0, remoteClusterStateService.getRemoteStateStats().getStateDiffDownloadValidationFailed()); } public void testGetClusterStateUsingDiffWithChecksumModeNone() throws IOException { @@ -3576,6 +3582,7 @@ public void testGetClusterStateUsingDiffWithChecksumModeNone() throws IOExceptio eq(NODE_ID), eq(false) ); + assertEquals(0, remoteClusterStateService.getRemoteStateStats().getStateDiffDownloadValidationFailed()); } public void testGetClusterStateUsingDiffWithChecksumModeDebugMismatch() throws IOException { @@ -3616,6 +3623,7 @@ public void testGetClusterStateUsingDiffWithChecksumModeDebugMismatch() throws I eq(NODE_ID), eq(false) ); + assertEquals(1, remoteClusterStateService.getRemoteStateStats().getStateDiffDownloadValidationFailed()); } public void testGetClusterStateUsingDiffWithChecksumModeTraceMismatch() throws IOException { @@ -3677,6 +3685,7 @@ public void testGetClusterStateUsingDiffWithChecksumModeTraceMismatch() throws I eq(NODE_ID), eq(false) ); + assertEquals(1, remoteClusterStateService.getRemoteStateStats().getStateDiffDownloadValidationFailed()); } public void testGetClusterStateUsingDiffWithChecksumMismatch() throws IOException { @@ -3738,6 +3747,7 @@ public void testGetClusterStateUsingDiffWithChecksumMismatch() throws IOExceptio eq(NODE_ID), eq(false) ); + assertEquals(1, remoteClusterStateService.getRemoteStateStats().getStateDiffDownloadValidationFailed()); } private void mockObjectsForGettingPreviousClusterUUID(Map clusterUUIDsPointers) throws IOException { From 8347d0ec22aa865dea3bc4b2027fb7bdf486fab2 Mon Sep 17 00:00:00 2001 From: gaobinlong Date: Wed, 18 Sep 2024 03:12:05 +0800 Subject: [PATCH 12/14] Add validation for the search backpressure cancellation settings (#15501) * Fix updating search backpressure settings crashing the cluster Signed-off-by: Gao Binlong * Modify change log Signed-off-by: Gao Binlong * Fix version check Signed-off-by: Gao Binlong * Increase test coverage Signed-off-by: Gao Binlong * Format the code Signed-off-by: Gao Binlong * Optimize some code Signed-off-by: Gao Binlong * Fix typo Signed-off-by: Gao Binlong --------- Signed-off-by: Gao Binlong --- CHANGELOG.md | 2 + .../test/cluster.put_settings/10_basic.yml | 113 ++++++++++++++++++ .../opensearch/common/settings/Setting.java | 13 ++ .../SearchBackpressureService.java | 6 +- .../backpressure/SearchBackpressureState.java | 5 +- .../settings/SearchBackpressureSettings.java | 16 ++- .../settings/SearchShardTaskSettings.java | 16 ++- .../settings/SearchTaskSettings.java | 16 ++- .../common/settings/SettingTests.java | 28 +++++ .../SearchBackpressureSettingsTests.java | 28 +++++ 10 files changed, 231 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cc313fdcdb849..db41e2eee16f5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Fixed - Fix wildcard query containing escaped character ([#15737](https://github.com/opensearch-project/OpenSearch/pull/15737)) - Fix case-insensitive query on wildcard field ([#15882](https://github.com/opensearch-project/OpenSearch/pull/15882)) +- Add validation for the search backpressure cancellation settings ([#15501](https://github.com/opensearch-project/OpenSearch/pull/15501)) + ### Security [Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.17...2.x diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/cluster.put_settings/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/cluster.put_settings/10_basic.yml index 107d298b597d3..0f4a45e4591a3 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/cluster.put_settings/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/cluster.put_settings/10_basic.yml @@ -98,3 +98,116 @@ - match: {error.root_cause.0.type: "illegal_argument_exception"} - match: { error.root_cause.0.reason: "Invalid SearchBackpressureMode: monitor-only" } - match: { status: 400 } + + +--- +"Test setting search backpressure cancellation settings": + - skip: + version: "- 2.99.99" + reason: "Fixed in 3.0.0" + + - do: + cluster.put_settings: + body: + transient: + search_backpressure.search_task.cancellation_burst: 11 + - is_true: acknowledged + + - do: + cluster.get_settings: + flat_settings: false + - match: {transient.search_backpressure.search_task.cancellation_burst: "11"} + + - do: + cluster.put_settings: + body: + transient: + search_backpressure.search_task.cancellation_rate: 0.1 + - is_true: acknowledged + + - do: + cluster.get_settings: + flat_settings: false + - match: {transient.search_backpressure.search_task.cancellation_rate: "0.1"} + + - do: + cluster.put_settings: + body: + transient: + search_backpressure.search_task.cancellation_ratio: 0.2 + - is_true: acknowledged + + - do: + cluster.get_settings: + flat_settings: false + - match: {transient.search_backpressure.search_task.cancellation_ratio: "0.2"} + + - do: + cluster.put_settings: + body: + transient: + search_backpressure.search_shard_task.cancellation_burst: 12 + - is_true: acknowledged + + - do: + cluster.get_settings: + flat_settings: false + - match: {transient.search_backpressure.search_shard_task.cancellation_burst: "12"} + + - do: + cluster.put_settings: + body: + transient: + search_backpressure.search_shard_task.cancellation_rate: 0.3 + - is_true: acknowledged + + - do: + cluster.get_settings: + flat_settings: false + - match: {transient.search_backpressure.search_shard_task.cancellation_rate: "0.3"} + + - do: + cluster.put_settings: + body: + transient: + search_backpressure.search_shard_task.cancellation_ratio: 0.4 + - is_true: acknowledged + + - do: + cluster.get_settings: + flat_settings: false + - match: {transient.search_backpressure.search_shard_task.cancellation_ratio: "0.4"} + +--- +"Test setting invalid search backpressure cancellation_rate and cancellation_ratio": + - skip: + version: "- 2.99.99" + reason: "Fixed in 3.0.0" + + - do: + catch: /search_backpressure.search_task.cancellation_rate must be > 0/ + cluster.put_settings: + body: + transient: + search_backpressure.search_task.cancellation_rate: 0.0 + + - do: + catch: /search_backpressure.search_task.cancellation_ratio must be > 0/ + cluster.put_settings: + body: + transient: + search_backpressure.search_task.cancellation_ratio: 0.0 + + - do: + catch: /search_backpressure.search_shard_task.cancellation_rate must be > 0/ + cluster.put_settings: + body: + transient: + search_backpressure.search_shard_task.cancellation_rate: 0.0 + + - do: + catch: /search_backpressure.search_shard_task.cancellation_ratio must be > 0/ + cluster.put_settings: + body: + transient: + search_backpressure.search_shard_task.cancellation_ratio: 0.0 diff --git a/server/src/main/java/org/opensearch/common/settings/Setting.java b/server/src/main/java/org/opensearch/common/settings/Setting.java index fea4c165809ba..081029c1c106c 100644 --- a/server/src/main/java/org/opensearch/common/settings/Setting.java +++ b/server/src/main/java/org/opensearch/common/settings/Setting.java @@ -1855,6 +1855,10 @@ public static Setting doubleSetting( ); } + public static Setting doubleSetting(String key, double defaultValue, Validator validator, Property... properties) { + return new Setting<>(key, Double.toString(defaultValue), Double::parseDouble, validator, properties); + } + /** * A writeable parser for double * @@ -1961,6 +1965,15 @@ public static Setting doubleSetting( ); } + public static Setting doubleSetting( + String key, + Setting fallbackSetting, + Validator validator, + Property... properties + ) { + return new Setting<>(new SimpleKey(key), fallbackSetting, fallbackSetting::getRaw, Double::parseDouble, validator, properties); + } + /// simpleString public static Setting simpleString(String key, Property... properties) { diff --git a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java index 43b9f8ae87529..e98046ba1dede 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java +++ b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java @@ -158,14 +158,16 @@ public SearchBackpressureService( timeNanosSupplier, getSettings().getSearchTaskSettings().getCancellationRateNanos(), getSettings().getSearchTaskSettings().getCancellationBurst(), - getSettings().getSearchTaskSettings().getCancellationRatio() + getSettings().getSearchTaskSettings().getCancellationRatio(), + getSettings().getSearchTaskSettings().getCancellationRate() ), SearchShardTask.class, new SearchBackpressureState( timeNanosSupplier, getSettings().getSearchShardTaskSettings().getCancellationRateNanos(), getSettings().getSearchShardTaskSettings().getCancellationBurst(), - getSettings().getSearchShardTaskSettings().getCancellationRatio() + getSettings().getSearchShardTaskSettings().getCancellationRatio(), + getSettings().getSearchShardTaskSettings().getCancellationRate() ) ); this.settings.getSearchTaskSettings().addListener(searchBackpressureStates.get(SearchTask.class)); diff --git a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureState.java b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureState.java index 5f086bd498036..36f5b25e002c3 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureState.java +++ b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureState.java @@ -43,12 +43,15 @@ public class SearchBackpressureState implements CancellationSettingsListener { LongSupplier timeNanosSupplier, double cancellationRateNanos, double cancellationBurst, - double cancellationRatio + double cancellationRatio, + double cancellationRate ) { rateLimiter = new AtomicReference<>(new TokenBucket(timeNanosSupplier, cancellationRateNanos, cancellationBurst)); ratioLimiter = new AtomicReference<>(new TokenBucket(this::getCompletionCount, cancellationRatio, cancellationBurst)); this.timeNanosSupplier = timeNanosSupplier; this.cancellationBurst = cancellationBurst; + this.cancellationRatio = cancellationRatio; + this.cancellationRate = cancellationRate; } public long getCompletionCount() { diff --git a/server/src/main/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettings.java b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettings.java index 79494eb0d3c24..55a031382f282 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettings.java +++ b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettings.java @@ -61,8 +61,14 @@ private static class Defaults { public static final Setting SETTING_CANCELLATION_RATIO = Setting.doubleSetting( "search_backpressure.cancellation_ratio", Defaults.CANCELLATION_RATIO, - 0.0, - 1.0, + value -> { + if (value <= 0.0) { + throw new IllegalArgumentException("search_backpressure.cancellation_ratio must be > 0"); + } + if (value > 1.0) { + throw new IllegalArgumentException("search_backpressure.cancellation_ratio must be <= 1.0"); + } + }, Setting.Property.Deprecated, Setting.Property.Dynamic, Setting.Property.NodeScope @@ -78,7 +84,11 @@ private static class Defaults { public static final Setting SETTING_CANCELLATION_RATE = Setting.doubleSetting( "search_backpressure.cancellation_rate", Defaults.CANCELLATION_RATE, - 0.0, + value -> { + if (value <= 0.0) { + throw new IllegalArgumentException("search_backpressure.cancellation_rate must be > 0"); + } + }, Setting.Property.Deprecated, Setting.Property.Dynamic, Setting.Property.NodeScope diff --git a/server/src/main/java/org/opensearch/search/backpressure/settings/SearchShardTaskSettings.java b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchShardTaskSettings.java index 6d016c7466362..38213506c55b7 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/settings/SearchShardTaskSettings.java +++ b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchShardTaskSettings.java @@ -44,8 +44,14 @@ private static class Defaults { public static final Setting SETTING_CANCELLATION_RATIO = Setting.doubleSetting( "search_backpressure.search_shard_task.cancellation_ratio", SearchBackpressureSettings.SETTING_CANCELLATION_RATIO, - 0.0, - 1.0, + value -> { + if (value <= 0.0) { + throw new IllegalArgumentException("search_backpressure.search_shard_task.cancellation_ratio must be > 0"); + } + if (value > 1.0) { + throw new IllegalArgumentException("search_backpressure.search_shard_task.cancellation_ratio must be <= 1.0"); + } + }, Setting.Property.Dynamic, Setting.Property.NodeScope ); @@ -58,7 +64,11 @@ private static class Defaults { public static final Setting SETTING_CANCELLATION_RATE = Setting.doubleSetting( "search_backpressure.search_shard_task.cancellation_rate", SearchBackpressureSettings.SETTING_CANCELLATION_RATE, - 0.0, + value -> { + if (value <= 0.0) { + throw new IllegalArgumentException("search_backpressure.search_shard_task.cancellation_rate must be > 0"); + } + }, Setting.Property.Dynamic, Setting.Property.NodeScope ); diff --git a/server/src/main/java/org/opensearch/search/backpressure/settings/SearchTaskSettings.java b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchTaskSettings.java index 4b34323b1ddc6..f9af7f9b59fdb 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/settings/SearchTaskSettings.java +++ b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchTaskSettings.java @@ -48,8 +48,14 @@ private static class Defaults { public static final Setting SETTING_CANCELLATION_RATIO = Setting.doubleSetting( "search_backpressure.search_task.cancellation_ratio", Defaults.CANCELLATION_RATIO, - 0.0, - 1.0, + value -> { + if (value <= 0.0) { + throw new IllegalArgumentException("search_backpressure.search_task.cancellation_ratio must be > 0"); + } + if (value > 1.0) { + throw new IllegalArgumentException("search_backpressure.search_task.cancellation_ratio must be <= 1.0"); + } + }, Setting.Property.Dynamic, Setting.Property.NodeScope ); @@ -62,7 +68,11 @@ private static class Defaults { public static final Setting SETTING_CANCELLATION_RATE = Setting.doubleSetting( "search_backpressure.search_task.cancellation_rate", Defaults.CANCELLATION_RATE, - 0.0, + value -> { + if (value <= 0.0) { + throw new IllegalArgumentException("search_backpressure.search_task.cancellation_rate must be > 0"); + } + }, Setting.Property.Dynamic, Setting.Property.NodeScope ); diff --git a/server/src/test/java/org/opensearch/common/settings/SettingTests.java b/server/src/test/java/org/opensearch/common/settings/SettingTests.java index 7ebee680e8e52..c3c399a9d88b2 100644 --- a/server/src/test/java/org/opensearch/common/settings/SettingTests.java +++ b/server/src/test/java/org/opensearch/common/settings/SettingTests.java @@ -1274,6 +1274,20 @@ public void testFloatParser() throws Exception { public void testDoubleWithDefaultValue() { Setting doubleSetting = Setting.doubleSetting("foo.bar", 42.1); assertEquals(doubleSetting.get(Settings.EMPTY), Double.valueOf(42.1)); + + Setting doubleSettingWithValidator = Setting.doubleSetting("foo.bar", 42.1, value -> { + if (value <= 0.0) { + throw new IllegalArgumentException("The setting foo.bar must be >0"); + } + }); + try { + assertThrows( + IllegalArgumentException.class, + () -> doubleSettingWithValidator.get(Settings.builder().put("foo.bar", randomFrom(-1, 0)).build()) + ); + } catch (IllegalArgumentException ex) { + assertEquals("The setting foo.bar must be >0", ex.getMessage()); + } } public void testDoubleWithFallbackValue() { @@ -1282,6 +1296,20 @@ public void testDoubleWithFallbackValue() { assertEquals(doubleSetting.get(Settings.EMPTY), Double.valueOf(2.1)); assertEquals(doubleSetting.get(Settings.builder().put("foo.bar", 3.2).build()), Double.valueOf(3.2)); assertEquals(doubleSetting.get(Settings.builder().put("foo.baz", 3.2).build()), Double.valueOf(3.2)); + + Setting doubleSettingWithValidator = Setting.doubleSetting("foo.bar", fallbackSetting, value -> { + if (value <= 0.0) { + throw new IllegalArgumentException("The setting foo.bar must be >0"); + } + }); + try { + assertThrows( + IllegalArgumentException.class, + () -> doubleSettingWithValidator.get(Settings.builder().put("foo.bar", randomFrom(-1, 0)).build()) + ); + } catch (IllegalArgumentException ex) { + assertEquals("The setting foo.bar must be >0", ex.getMessage()); + } } public void testDoubleWithMinMax() throws Exception { diff --git a/server/src/test/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettingsTests.java b/server/src/test/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettingsTests.java index a02ca3cf877ad..683ada76c7683 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettingsTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettingsTests.java @@ -37,4 +37,32 @@ public void testSearchBackpressureSettingValidateInvalidMode() { () -> new SearchBackpressureSettings(settings, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) ); } + + public void testInvalidCancellationRate() { + Settings settings1 = Settings.builder().put("search_backpressure.search_task.cancellation_rate", randomFrom(-1, 0)).build(); + assertThrows( + IllegalArgumentException.class, + () -> new SearchBackpressureSettings(settings1, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) + ); + + Settings settings2 = Settings.builder().put("search_backpressure.search_shard_task.cancellation_rate", randomFrom(-1, 0)).build(); + assertThrows( + IllegalArgumentException.class, + () -> new SearchBackpressureSettings(settings2, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) + ); + } + + public void testInvalidCancellationRatio() { + Settings settings1 = Settings.builder().put("search_backpressure.search_task.cancellation_ratio", randomFrom(-1, 0)).build(); + assertThrows( + IllegalArgumentException.class, + () -> new SearchBackpressureSettings(settings1, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) + ); + + Settings settings2 = Settings.builder().put("search_backpressure.search_shard_task.cancellation_ratio", randomFrom(-1, 0)).build(); + assertThrows( + IllegalArgumentException.class, + () -> new SearchBackpressureSettings(settings2, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) + ); + } } From eb5b70370e548cd0f3b51721397df818b3050daa Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Tue, 17 Sep 2024 15:44:32 -0400 Subject: [PATCH 13/14] Fix flaky terminaton conditions for org.opensearch.rest.ReactorNetty4StreamingStressIT.testCloseClientStreamingRequest test case (#15959) Signed-off-by: Andriy Redko --- .../org/opensearch/rest/ReactorNetty4StreamingStressIT.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingStressIT.java b/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingStressIT.java index a853ed56fad32..9da456f618ffc 100644 --- a/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingStressIT.java +++ b/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingStressIT.java @@ -8,6 +8,7 @@ package org.opensearch.rest; +import org.apache.hc.core5.http.ConnectionClosedException; import org.opensearch.client.Request; import org.opensearch.client.Response; import org.opensearch.client.StreamingRequest; @@ -75,7 +76,7 @@ public void testCloseClientStreamingRequest() throws Exception { } }) .then(() -> scheduler.advanceTimeBy(delay)) - .expectErrorMatches(t -> t instanceof InterruptedIOException) + .expectErrorMatches(t -> t instanceof InterruptedIOException || t instanceof ConnectionClosedException) .verify(); } } From 7c427d9fd588597343654d2f534952f7116d1df4 Mon Sep 17 00:00:00 2001 From: David Zane <38449481+dzane17@users.noreply.github.com> Date: Tue, 17 Sep 2024 15:29:12 -0700 Subject: [PATCH 14/14] Implement WithFieldName interface in ValuesSourceAggregationBuilder & FieldSortBuilder (#15916) Signed-off-by: David Zane --- CHANGELOG.md | 1 + .../support/ValuesSourceAggregationBuilder.java | 10 +++++++++- .../org/opensearch/search/sort/FieldSortBuilder.java | 8 +++++++- .../bucket/range/RangeAggregationBuilderTests.java | 1 + .../opensearch/search/sort/FieldSortBuilderTests.java | 1 + 5 files changed, 19 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index db41e2eee16f5..309d9cb06cc94 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - MultiTermQueries in keyword fields now default to `indexed` approach and gated behind cluster setting ([#15637](https://github.com/opensearch-project/OpenSearch/pull/15637)) - [Workload Management] QueryGroup resource cancellation framework changes ([#15651](https://github.com/opensearch-project/OpenSearch/pull/15651)) - Fallback to Remote cluster-state on Term-Version check mismatch - ([#15424](https://github.com/opensearch-project/OpenSearch/pull/15424)) +- Implement WithFieldName interface in ValuesSourceAggregationBuilder & FieldSortBuilder ([#15916](https://github.com/opensearch-project/OpenSearch/pull/15916)) ### Dependencies - Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578)) diff --git a/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSourceAggregationBuilder.java b/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSourceAggregationBuilder.java index 7a73fafb4a809..1ccceb1d77dcb 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSourceAggregationBuilder.java +++ b/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSourceAggregationBuilder.java @@ -40,6 +40,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.index.query.QueryShardContext; +import org.opensearch.index.query.WithFieldName; import org.opensearch.script.Script; import org.opensearch.search.aggregations.AbstractAggregationBuilder; import org.opensearch.search.aggregations.AggregationInitializationException; @@ -57,7 +58,9 @@ * * @opensearch.internal */ -public abstract class ValuesSourceAggregationBuilder> extends AbstractAggregationBuilder { +public abstract class ValuesSourceAggregationBuilder> extends AbstractAggregationBuilder + implements + WithFieldName { public static void declareFields( AbstractObjectParser, T> objectParser, @@ -292,6 +295,11 @@ public String field() { return field; } + @Override + public String fieldName() { + return field(); + } + /** * Sets the script to use for this aggregation. */ diff --git a/server/src/main/java/org/opensearch/search/sort/FieldSortBuilder.java b/server/src/main/java/org/opensearch/search/sort/FieldSortBuilder.java index 5cecda1346b90..9825b2cbbe08e 100644 --- a/server/src/main/java/org/opensearch/search/sort/FieldSortBuilder.java +++ b/server/src/main/java/org/opensearch/search/sort/FieldSortBuilder.java @@ -65,6 +65,7 @@ import org.opensearch.index.query.QueryRewriteContext; import org.opensearch.index.query.QueryShardContext; import org.opensearch.index.query.QueryShardException; +import org.opensearch.index.query.WithFieldName; import org.opensearch.search.DocValueFormat; import org.opensearch.search.MultiValueMode; import org.opensearch.search.SearchSortValuesAndFormats; @@ -86,7 +87,7 @@ * * @opensearch.internal */ -public class FieldSortBuilder extends SortBuilder { +public class FieldSortBuilder extends SortBuilder implements WithFieldName { private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(FieldSortBuilder.class); public static final String NAME = "field_sort"; @@ -184,6 +185,11 @@ public String getFieldName() { return this.fieldName; } + @Override + public String fieldName() { + return getFieldName(); + } + /** * Sets the value when a field is missing in a doc. Can also be set to {@code _last} or * {@code _first} to sort missing last or first respectively. diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/range/RangeAggregationBuilderTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/range/RangeAggregationBuilderTests.java index 4362ce48003cc..14532e30f8984 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/range/RangeAggregationBuilderTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/range/RangeAggregationBuilderTests.java @@ -128,6 +128,7 @@ public void testNumericKeys() throws IOException { ); assertThat(builder.getName(), equalTo("test")); assertThat(builder.field(), equalTo("f")); + assertThat(builder.fieldName(), equalTo("f")); assertThat(builder.ranges, equalTo(List.of(new RangeAggregator.Range("1", null, 0d)))); } } diff --git a/server/src/test/java/org/opensearch/search/sort/FieldSortBuilderTests.java b/server/src/test/java/org/opensearch/search/sort/FieldSortBuilderTests.java index 9b8cd1b5f1ce0..ced952db555aa 100644 --- a/server/src/test/java/org/opensearch/search/sort/FieldSortBuilderTests.java +++ b/server/src/test/java/org/opensearch/search/sort/FieldSortBuilderTests.java @@ -196,6 +196,7 @@ protected void sortFieldAssertions(FieldSortBuilder builder, SortField sortField assertEquals(builder.order() == SortOrder.ASC ? false : true, sortField.getReverse()); if (expectedType == SortField.Type.CUSTOM) { assertEquals(builder.getFieldName(), sortField.getField()); + assertEquals(builder.fieldName(), sortField.getField()); } assertEquals(DocValueFormat.RAW, format); }