diff --git a/CHANGELOG.md b/CHANGELOG.md index aee27a1c4dd67..2b7d3852f7be1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Dependency updates (httpcore, mockito, slf4j, httpasyncclient, commons-codec) ([#4308](https://github.com/opensearch-project/OpenSearch/pull/4308)) - Update to Netty 4.1.80.Final ([#4359](https://github.com/opensearch-project/OpenSearch/pull/4359)) - Use RemoteSegmentStoreDirectory instead of RemoteDirectory ([#4240](https://github.com/opensearch-project/OpenSearch/pull/4240)) +- Add index specific setting for remote repository ([#4253](https://github.com/opensearch-project/OpenSearch/pull/4253)) ### Deprecated @@ -48,6 +49,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - PR reference to checkout code for changelog verifier ([#4296](https://github.com/opensearch-project/OpenSearch/pull/4296)) - Restore using the class ClusterInfoRequest and ClusterInfoRequestBuilder from package 'org.opensearch.action.support.master.info' for subclasses ([#4324](https://github.com/opensearch-project/OpenSearch/pull/4324)) - Fixed cancellation of segment replication events ([#4225](https://github.com/opensearch-project/OpenSearch/pull/4225)) +- [Segment Replication] Add check to cancel ongoing replication with old primary on onNewCheckpoint on replica ([#4363](https://github.com/opensearch-project/OpenSearch/pull/4363)) - [Segment Replication] Bump segment infos counter before commit during replica promotion ([#4365](https://github.com/opensearch-project/OpenSearch/pull/4365)) ### Security diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index f3b01333f363a..72697c44295cf 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -284,6 +284,9 @@ public Iterator> settings() { ); public static final String SETTING_REMOTE_STORE_ENABLED = "index.remote_store.enabled"; + + public static final String SETTING_REMOTE_STORE_REPOSITORY = "index.remote_store.repository"; + /** * Used to specify if the index data should be persisted in the remote store. */ @@ -320,6 +323,50 @@ public Iterator> settings() { Property.Final ); + /** + * Used to specify remote store repository to use for this index. + */ + public static final Setting INDEX_REMOTE_STORE_REPOSITORY_SETTING = Setting.simpleString( + SETTING_REMOTE_STORE_REPOSITORY, + new Setting.Validator<>() { + + @Override + public void validate(final String value) {} + + @Override + public void validate(final String value, final Map, Object> settings) { + if (value == null || value.isEmpty()) { + throw new IllegalArgumentException( + "Setting " + INDEX_REMOTE_STORE_REPOSITORY_SETTING.getKey() + " should be provided with non-empty repository ID" + ); + } else { + validateRemoteStoreSettingEnabled(settings, INDEX_REMOTE_STORE_REPOSITORY_SETTING); + } + } + + @Override + public Iterator> settings() { + final List> settings = Collections.singletonList(INDEX_REMOTE_STORE_ENABLED_SETTING); + return settings.iterator(); + } + }, + Property.IndexScope, + Property.Final + ); + + private static void validateRemoteStoreSettingEnabled(final Map, Object> settings, Setting setting) { + final Boolean isRemoteSegmentStoreEnabled = (Boolean) settings.get(INDEX_REMOTE_STORE_ENABLED_SETTING); + if (isRemoteSegmentStoreEnabled == false) { + throw new IllegalArgumentException( + "Settings " + + setting.getKey() + + " can ont be set/enabled when " + + INDEX_REMOTE_STORE_ENABLED_SETTING.getKey() + + " is set to true" + ); + } + } + public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas"; public static final Setting INDEX_AUTO_EXPAND_REPLICAS_SETTING = AutoExpandReplicas.SETTING; diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index b0efaab733231..dc28fcaf2596f 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -61,6 +61,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Predicate; @@ -218,11 +219,11 @@ public final class IndexScopedSettings extends AbstractScopedSettings { * is ready for production release, the feature flag can be removed, and the * setting should be moved to {@link #BUILT_IN_INDEX_SETTINGS}. */ - public static final Map FEATURE_FLAGGED_INDEX_SETTINGS = Map.of( + public static final Map> FEATURE_FLAGGED_INDEX_SETTINGS = Map.of( FeatureFlags.REPLICATION_TYPE, - IndexMetadata.INDEX_REPLICATION_TYPE_SETTING, + Collections.singletonList(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING), FeatureFlags.REMOTE_STORE, - IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING + Arrays.asList(IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING, IndexMetadata.INDEX_REMOTE_STORE_REPOSITORY_SETTING) ); public static final IndexScopedSettings DEFAULT_SCOPED_SETTINGS = new IndexScopedSettings(Settings.EMPTY, BUILT_IN_INDEX_SETTINGS); diff --git a/server/src/main/java/org/opensearch/common/settings/SettingsModule.java b/server/src/main/java/org/opensearch/common/settings/SettingsModule.java index 16b39bb2e33f9..7b4dfb7d64bb6 100644 --- a/server/src/main/java/org/opensearch/common/settings/SettingsModule.java +++ b/server/src/main/java/org/opensearch/common/settings/SettingsModule.java @@ -88,9 +88,9 @@ public SettingsModule( registerSetting(setting); } - for (Map.Entry featureFlaggedSetting : IndexScopedSettings.FEATURE_FLAGGED_INDEX_SETTINGS.entrySet()) { + for (Map.Entry> featureFlaggedSetting : IndexScopedSettings.FEATURE_FLAGGED_INDEX_SETTINGS.entrySet()) { if (FeatureFlags.isEnabled(featureFlaggedSetting.getKey())) { - registerSetting(featureFlaggedSetting.getValue()); + featureFlaggedSetting.getValue().forEach(feature -> registerSetting(feature)); } } diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 7c3675fab423c..475129b62177b 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -510,7 +510,7 @@ public synchronized IndexShard createShard( Store remoteStore = null; if (this.indexSettings.isRemoteStoreEnabled()) { Directory remoteDirectory = remoteDirectoryFactory.newDirectory( - clusterService.state().metadata().clusterUUID(), + this.indexSettings.getRemoteStoreRepository(), this.indexSettings, path ); diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index d1363540709c8..74b104392133f 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -559,6 +559,7 @@ public final class IndexSettings { private final int numberOfShards; private final ReplicationType replicationType; private final boolean isRemoteStoreEnabled; + private final String remoteStoreRepository; // volatile fields are updated via #updateIndexMetadata(IndexMetadata) under lock private volatile Settings settings; private volatile IndexMetadata indexMetadata; @@ -719,7 +720,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti numberOfShards = settings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_SHARDS, null); replicationType = ReplicationType.parseString(settings.get(IndexMetadata.SETTING_REPLICATION_TYPE)); isRemoteStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false); - + remoteStoreRepository = settings.get(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY); this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings); this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings); this.queryStringAnalyzeWildcard = QUERY_STRING_ANALYZE_WILDCARD.get(nodeSettings); @@ -971,6 +972,13 @@ public boolean isRemoteStoreEnabled() { return isRemoteStoreEnabled; } + /** + * Returns remote store repository configured for this index. + */ + public String getRemoteStoreRepository() { + return remoteStoreRepository; + } + /** * Returns the node settings. The settings returned from {@link #getSettings()} are a merged version of the * index settings and the node settings where node settings are overwritten by index settings. diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 0d32e8d56e4d2..a8ca9891d9743 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -59,6 +59,13 @@ public RemoteStoreRefreshListener(IndexShard indexShard) { .getDelegate()).getDelegate(); this.primaryTerm = indexShard.getOperationPrimaryTerm(); localSegmentChecksumMap = new HashMap<>(); + if (indexShard.shardRouting.primary()) { + try { + this.remoteDirectory.init(); + } catch (IOException e) { + logger.error("Exception while initialising RemoteSegmentStoreDirectory", e); + } + } } @Override diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index d1d6104a416ca..7c28406036ddd 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -56,6 +56,10 @@ public class SegmentReplicationTarget extends ReplicationTarget { private final SegmentReplicationState state; protected final MultiFileWriter multiFileWriter; + public ReplicationCheckpoint getCheckpoint() { + return this.checkpoint; + } + public SegmentReplicationTarget( ReplicationCheckpoint checkpoint, IndexShard indexShard, diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 9e6b66dc4d7d6..8fc53ccd3bc08 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -18,6 +18,7 @@ import org.opensearch.common.Nullable; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.CancellableThreads; +import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; @@ -34,7 +35,6 @@ import org.opensearch.transport.TransportRequestHandler; import org.opensearch.transport.TransportService; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; @@ -54,7 +54,7 @@ public class SegmentReplicationTargetService implements IndexEventListener { private final SegmentReplicationSourceFactory sourceFactory; - private final Map latestReceivedCheckpoint = new HashMap<>(); + private final Map latestReceivedCheckpoint = ConcurrentCollections.newConcurrentMap(); // Empty Implementation, only required while Segment Replication is under feature flag. public static final SegmentReplicationTargetService NO_OP = new SegmentReplicationTargetService() { @@ -151,14 +151,23 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe } else { latestReceivedCheckpoint.put(replicaShard.shardId(), receivedCheckpoint); } - if (onGoingReplications.isShardReplicating(replicaShard.shardId())) { - logger.trace( - () -> new ParameterizedMessage( - "Ignoring new replication checkpoint - shard is currently replicating to checkpoint {}", - replicaShard.getLatestReplicationCheckpoint() - ) - ); - return; + SegmentReplicationTarget ongoingReplicationTarget = onGoingReplications.getOngoingReplicationTarget(replicaShard.shardId()); + if (ongoingReplicationTarget != null) { + if (ongoingReplicationTarget.getCheckpoint().getPrimaryTerm() < receivedCheckpoint.getPrimaryTerm()) { + logger.trace( + "Cancelling ongoing replication from old primary with primary term {}", + ongoingReplicationTarget.getCheckpoint().getPrimaryTerm() + ); + onGoingReplications.cancel(ongoingReplicationTarget.getId(), "Cancelling stuck target after new primary"); + } else { + logger.trace( + () -> new ParameterizedMessage( + "Ignoring new replication checkpoint - shard is currently replicating to checkpoint {}", + replicaShard.getLatestReplicationCheckpoint() + ) + ); + return; + } } final Thread thread = Thread.currentThread(); if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) { diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java index d648ca6041ff8..20600856c9444 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java @@ -49,6 +49,7 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; /** * This class holds a collection of all on going replication events on the current node (i.e., the node is the target node @@ -236,13 +237,18 @@ public boolean cancelForShard(ShardId shardId, String reason) { } /** - * check if a shard is currently replicating + * Get target for shard * - * @param shardId shardId for which to check if replicating - * @return true if shard is currently replicating + * @param shardId shardId + * @return ReplicationTarget for input shardId */ - public boolean isShardReplicating(ShardId shardId) { - return onGoingTargetEvents.values().stream().anyMatch(t -> t.indexShard.shardId().equals(shardId)); + public T getOngoingReplicationTarget(ShardId shardId) { + final List replicationTargetList = onGoingTargetEvents.values() + .stream() + .filter(t -> t.indexShard.shardId().equals(shardId)) + .collect(Collectors.toList()); + assert replicationTargetList.size() <= 1 : "More than one on-going replication targets"; + return replicationTargetList.size() > 0 ? replicationTargetList.get(0) : null; } /** diff --git a/server/src/test/java/org/opensearch/index/IndexSettingsTests.java b/server/src/test/java/org/opensearch/index/IndexSettingsTests.java index 63e8b3aa423f2..2d232cccef8b2 100644 --- a/server/src/test/java/org/opensearch/index/IndexSettingsTests.java +++ b/server/src/test/java/org/opensearch/index/IndexSettingsTests.java @@ -42,7 +42,6 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; -import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.translog.Translog; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.OpenSearchTestCase; @@ -59,7 +58,6 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.core.StringContains.containsString; import static org.hamcrest.object.HasToString.hasToString; -import static org.opensearch.common.settings.IndexScopedSettings.FEATURE_FLAGGED_INDEX_SETTINGS; public class IndexSettingsTests extends OpenSearchTestCase { @@ -784,7 +782,7 @@ public void testRemoteStoreExplicitSetting() { public void testUpdateRemoteStoreFails() { Set> remoteStoreSettingSet = new HashSet<>(); - remoteStoreSettingSet.add(FEATURE_FLAGGED_INDEX_SETTINGS.get(FeatureFlags.REMOTE_STORE)); + remoteStoreSettingSet.add(IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING); IndexScopedSettings settings = new IndexScopedSettings(Settings.EMPTY, remoteStoreSettingSet); IllegalArgumentException error = expectThrows( IllegalArgumentException.class, @@ -818,4 +816,71 @@ public void testEnablingRemoteStoreFailsWhenReplicationTypeIsDefault() { ); assertEquals("To enable index.remote_store.enabled, index.replication.type should be set to SEGMENT", iae.getMessage()); } + + public void testRemoteRepositoryDefaultSetting() { + IndexMetadata metadata = newIndexMeta( + "index", + Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT).build() + ); + IndexSettings settings = new IndexSettings(metadata, Settings.EMPTY); + assertNull(settings.getRemoteStoreRepository()); + } + + public void testRemoteRepositoryExplicitSetting() { + IndexMetadata metadata = newIndexMeta( + "index", + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, "repo1") + .build() + ); + IndexSettings settings = new IndexSettings(metadata, Settings.EMPTY); + assertEquals("repo1", settings.getRemoteStoreRepository()); + } + + public void testUpdateRemoteRepositoryFails() { + Set> remoteStoreSettingSet = new HashSet<>(); + remoteStoreSettingSet.add(IndexMetadata.INDEX_REMOTE_STORE_REPOSITORY_SETTING); + IndexScopedSettings settings = new IndexScopedSettings(Settings.EMPTY, remoteStoreSettingSet); + IllegalArgumentException error = expectThrows( + IllegalArgumentException.class, + () -> settings.updateSettings( + Settings.builder().put("index.remote_store.repository", randomUnicodeOfLength(10)).build(), + Settings.builder(), + Settings.builder(), + "index" + ) + ); + assertEquals(error.getMessage(), "final index setting [index.remote_store.repository], not updateable"); + } + + public void testSetRemoteRepositoryFailsWhenRemoteStoreIsNotEnabled() { + Settings indexSettings = Settings.builder() + .put("index.replication.type", ReplicationType.SEGMENT) + .put("index.remote_store.enabled", false) + .put("index.remote_store.repository", "repo1") + .build(); + IllegalArgumentException iae = expectThrows( + IllegalArgumentException.class, + () -> IndexMetadata.INDEX_REMOTE_STORE_REPOSITORY_SETTING.get(indexSettings) + ); + assertEquals( + "Settings index.remote_store.repository can ont be set/enabled when index.remote_store.enabled is set to true", + iae.getMessage() + ); + } + + public void testSetRemoteRepositoryFailsWhenEmptyString() { + Settings indexSettings = Settings.builder() + .put("index.replication.type", ReplicationType.SEGMENT) + .put("index.remote_store.enabled", false) + .put("index.remote_store.repository", "") + .build(); + IllegalArgumentException iae = expectThrows( + IllegalArgumentException.class, + () -> IndexMetadata.INDEX_REMOTE_STORE_REPOSITORY_SETTING.get(indexSettings) + ); + assertEquals("Setting index.remote_store.repository should be provided with non-empty repository ID", iae.getMessage()); + } } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 7d9b0f09f21cd..1d253b0a9a300 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -49,6 +49,8 @@ public class SegmentReplicationTargetServiceTests extends IndexShardTestCase { private ReplicationCheckpoint initialCheckpoint; private ReplicationCheckpoint aheadCheckpoint; + private ReplicationCheckpoint newPrimaryCheckpoint; + @Override public void setUp() throws Exception { super.setUp(); @@ -74,6 +76,13 @@ public void setUp() throws Exception { initialCheckpoint.getSeqNo(), initialCheckpoint.getSegmentInfosVersion() + 1 ); + newPrimaryCheckpoint = new ReplicationCheckpoint( + initialCheckpoint.getShardId(), + initialCheckpoint.getPrimaryTerm() + 1, + initialCheckpoint.getSegmentsGen(), + initialCheckpoint.getSeqNo(), + initialCheckpoint.getSegmentInfosVersion() + 1 + ); } @Override @@ -160,7 +169,7 @@ public void testShardAlreadyReplicating() throws InterruptedException { // Create a spy of Target Service so that we can verify invocation of startReplication call with specific checkpoint on it. SegmentReplicationTargetService serviceSpy = spy(sut); final SegmentReplicationTarget target = new SegmentReplicationTarget( - checkpoint, + initialCheckpoint, replicaShard, replicationSource, mock(SegmentReplicationTargetService.SegmentReplicationListener.class) @@ -185,9 +194,47 @@ public void testShardAlreadyReplicating() throws InterruptedException { // wait for the new checkpoint to arrive, before the listener completes. latch.await(30, TimeUnit.SECONDS); + verify(targetSpy, times(0)).cancel(any()); verify(serviceSpy, times(0)).startReplication(eq(aheadCheckpoint), eq(replicaShard), any()); } + public void testOnNewCheckpointFromNewPrimaryCancelOngoingReplication() throws IOException, InterruptedException { + // Create a spy of Target Service so that we can verify invocation of startReplication call with specific checkpoint on it. + SegmentReplicationTargetService serviceSpy = spy(sut); + // Create a Mockito spy of target to stub response of few method calls. + final SegmentReplicationTarget targetSpy = spy( + new SegmentReplicationTarget( + initialCheckpoint, + replicaShard, + replicationSource, + mock(SegmentReplicationTargetService.SegmentReplicationListener.class) + ) + ); + + CountDownLatch latch = new CountDownLatch(1); + // Mocking response when startReplication is called on targetSpy we send a new checkpoint to serviceSpy and later reduce countdown + // of latch. + doAnswer(invocation -> { + final ActionListener listener = invocation.getArgument(0); + // a new checkpoint arrives before we've completed. + serviceSpy.onNewCheckpoint(newPrimaryCheckpoint, replicaShard); + listener.onResponse(null); + latch.countDown(); + return null; + }).when(targetSpy).startReplication(any()); + doNothing().when(targetSpy).onDone(); + + // start replication. This adds the target to on-ongoing replication collection + serviceSpy.startReplication(targetSpy); + + // wait for the new checkpoint to arrive, before the listener completes. + latch.await(5, TimeUnit.SECONDS); + doNothing().when(targetSpy).startReplication(any()); + verify(targetSpy, times(1)).cancel("Cancelling stuck target after new primary"); + verify(serviceSpy, times(1)).startReplication(eq(newPrimaryCheckpoint), eq(replicaShard), any()); + closeShards(replicaShard); + } + public void testNewCheckpointBehindCurrentCheckpoint() { SegmentReplicationTargetService spy = spy(sut); spy.onNewCheckpoint(checkpoint, replicaShard); diff --git a/server/src/test/java/org/opensearch/recovery/ReplicationCollectionTests.java b/server/src/test/java/org/opensearch/recovery/ReplicationCollectionTests.java index 7587f48503625..1789dd3b2a288 100644 --- a/server/src/test/java/org/opensearch/recovery/ReplicationCollectionTests.java +++ b/server/src/test/java/org/opensearch/recovery/ReplicationCollectionTests.java @@ -105,7 +105,25 @@ public void onFailure(ReplicationState state, OpenSearchException e, boolean sen collection.cancel(recoveryId, "meh"); } } + } + public void testMultiReplicationsForSingleShard() throws Exception { + try (ReplicationGroup shards = createGroup(0)) { + final ReplicationCollection collection = new ReplicationCollection<>(logger, threadPool); + final IndexShard shard1 = shards.addReplica(); + final IndexShard shard2 = shards.addReplica(); + final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shard1); + final long recoveryId2 = startRecovery(collection, shards.getPrimaryNode(), shard2); + try { + collection.getOngoingReplicationTarget(shard1.shardId()); + } catch (AssertionError e) { + assertEquals(e.getMessage(), "More than one on-going replication targets"); + } finally { + collection.cancel(recoveryId, "meh"); + collection.cancel(recoveryId2, "meh"); + } + closeShards(shard1, shard2); + } } public void testRecoveryCancellation() throws Exception {