From 9aec21548f6944e2d4bc5a468cc225f5d1976f92 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Wed, 22 Jun 2022 00:18:19 +0000 Subject: [PATCH] Addressing comments on PR Signed-off-by: Rishikesh1159 --- .../org/opensearch/index/engine/Engine.java | 1 + .../opensearch/index/shard/IndexShard.java | 54 +++++++++++-------- .../org/opensearch/indices/IndicesModule.java | 28 +++++++++- .../SegmentReplicationTargetService.java | 22 ++++---- ...SegmentReplicationCheckpointPublisher.java | 3 +- .../main/java/org/opensearch/node/Node.java | 18 +++---- .../PublishCheckpointActionTests.java | 27 ++++++---- 7 files changed, 99 insertions(+), 54 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 4829148322b31..fa0c4673a437a 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -173,6 +173,7 @@ public final EngineConfig config() { * Return the latest active SegmentInfos from the engine. * @return {@link SegmentInfos} */ + @Nullable protected abstract SegmentInfos getLatestSegmentInfos(); /** diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 32d1db647732c..9441c133fba09 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1376,25 +1376,33 @@ public GatedCloseable acquireSafeIndexCommit() throws EngineExcepti } } - /** - * Returns the lastest segmentInfos - */ - public SegmentInfos getLatestSegmentInfos() { - return getEngine().getSegmentInfosSnapshot().get(); - } - /** * Returns the lastest Replication Checkpoint that shard received */ public ReplicationCheckpoint getLatestReplicationCheckpoint() { - final SegmentInfos latestSegmentInfos = getLatestSegmentInfos(); - return new ReplicationCheckpoint( - this.shardId, - getOperationPrimaryTerm(), - latestSegmentInfos.getGeneration(), - getProcessedLocalCheckpoint(), - latestSegmentInfos.getVersion() - ); + try (final GatedCloseable snapshot = getSegmentInfosSnapshot()) { + return Optional.ofNullable(snapshot.get()) + .map( + segmentInfos -> new ReplicationCheckpoint( + this.shardId, + getOperationPrimaryTerm(), + segmentInfos.getGeneration(), + getProcessedLocalCheckpoint(), + segmentInfos.getVersion() + ) + ) + .orElse( + new ReplicationCheckpoint( + shardId, + getOperationPrimaryTerm(), + SequenceNumbers.NO_OPS_PERFORMED, + getProcessedLocalCheckpoint(), + SequenceNumbers.NO_OPS_PERFORMED + ) + ); + } catch (IOException ex) { + throw new OpenSearchException("Error Closing SegmentInfos Snapshot", ex); + } } /** @@ -1403,22 +1411,26 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() { * @param requestCheckpoint received checkpoint that is checked for processing * @return true if checkpoint should be processed */ - public boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) { + public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) { if (state().equals(IndexShardState.STARTED) == false) { - logger.trace("Ignoring new replication checkpoint - shard is not started {}", state()); + logger.trace(() -> new ParameterizedMessage("Ignoring new replication checkpoint - shard is not started {}", state())); return false; } ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint(); if (localCheckpoint.isAheadOf(requestCheckpoint)) { logger.trace( - "Ignoring new replication checkpoint - Shard is already on checkpoint {} that is ahead of {}", - localCheckpoint, - requestCheckpoint + () -> new ParameterizedMessage( + "Ignoring new replication checkpoint - Shard is already on checkpoint {} that is ahead of {}", + localCheckpoint, + requestCheckpoint + ) ); return false; } if (localCheckpoint.equals(requestCheckpoint)) { - logger.trace("Ignoring new replication checkpoint - Shard is already on checkpoint {}", requestCheckpoint); + logger.trace( + () -> new ParameterizedMessage("Ignoring new replication checkpoint - Shard is already on checkpoint {}", requestCheckpoint) + ); return false; } return true; diff --git a/server/src/main/java/org/opensearch/indices/IndicesModule.java b/server/src/main/java/org/opensearch/indices/IndicesModule.java index 0cb2ff958c787..e7aa79d817ede 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesModule.java +++ b/server/src/main/java/org/opensearch/indices/IndicesModule.java @@ -39,6 +39,8 @@ import org.opensearch.action.resync.TransportResyncReplicationAction; import org.opensearch.common.ParseField; import org.opensearch.common.inject.AbstractModule; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.inject.Provider; import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.io.stream.NamedWriteableRegistry.Entry; import org.opensearch.common.util.FeatureFlags; @@ -74,6 +76,7 @@ import org.opensearch.index.shard.PrimaryReplicaSyncer; import org.opensearch.indices.cluster.IndicesClusterStateService; import org.opensearch.indices.mapper.MapperRegistry; +import org.opensearch.indices.replication.checkpoint.PublishCheckpointAction; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.store.IndicesStore; import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; @@ -281,7 +284,30 @@ protected void configure() { bind(RetentionLeaseBackgroundSyncAction.class).asEagerSingleton(); bind(RetentionLeaseSyncer.class).asEagerSingleton(); if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)) { - bind(SegmentReplicationCheckpointPublisher.class).asEagerSingleton(); + bind(SegmentReplicationCheckpointPublisher.class).toProvider(CheckpointPublisherProvider.class).asEagerSingleton(); + } else { + bind(SegmentReplicationCheckpointPublisher.class).toInstance(SegmentReplicationCheckpointPublisher.EMPTY); + } + } + + /** + * This provider is necessary while segment replication is behind a feature flag. + * We don't want to initialize a PublishCheckpointAction with the feature flag disabled. + * + * @opensearch.internal + */ + public final static class CheckpointPublisherProvider implements Provider { + + private final PublishCheckpointAction action; + + @Inject + public CheckpointPublisherProvider(PublishCheckpointAction action) { + this.action = action; + } + + @Override + public SegmentReplicationCheckpointPublisher get() { + return new SegmentReplicationCheckpointPublisher(action); } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 1eead75d0f718..c44b27911bb7a 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -14,7 +14,6 @@ import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.common.Nullable; -import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.Settings; import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; @@ -59,7 +58,6 @@ public static class Actions { public static final String FILE_CHUNK = "internal:index/shard/replication/file_chunk"; } - @Inject public SegmentReplicationTargetService( final ThreadPool threadPool, final RecoverySettings recoverySettings, @@ -89,19 +87,21 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh /** * Invoked when a new checkpoint is received from a primary shard. * It checks if a new checkpoint should be processed or not and starts replication if needed. - * @param requestCheckpoint received checkpoint that is checked for processing - * @param indexShard replica shard on which checkpoint is received + * @param receivedCheckpoint received checkpoint that is checked for processing + * @param replicaShard replica shard on which checkpoint is received */ - public synchronized void onNewCheckpoint(final ReplicationCheckpoint requestCheckpoint, final IndexShard indexShard) { - if (onGoingReplications.isShardReplicating(indexShard.shardId())) { + public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedCheckpoint, final IndexShard replicaShard) { + if (onGoingReplications.isShardReplicating(replicaShard.shardId())) { logger.trace( - "Ignoring new replication checkpoint - shard is currently replicating to checkpoint {}", - indexShard.getLatestReplicationCheckpoint() + () -> new ParameterizedMessage( + "Ignoring new replication checkpoint - shard is currently replicating to checkpoint {}", + replicaShard.getLatestReplicationCheckpoint() + ) ); return; } - if (indexShard.shouldProcessCheckpoint(requestCheckpoint)) { - startReplication(requestCheckpoint, indexShard, new SegmentReplicationListener() { + if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) { + startReplication(receivedCheckpoint, replicaShard, new SegmentReplicationListener() { @Override public void onReplicationDone(SegmentReplicationState state) {} @@ -109,7 +109,7 @@ public void onReplicationDone(SegmentReplicationState state) {} public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { if (sendShardFailure == true) { logger.error("replication failure", e); - indexShard.failShard("replication failure", e); + replicaShard.failShard("replication failure", e); } } }); diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java index 2b09901a947fe..39b7a6fef1abd 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java @@ -8,7 +8,6 @@ package org.opensearch.indices.replication.checkpoint; -import org.opensearch.common.inject.Inject; import org.opensearch.index.shard.IndexShard; import java.util.Objects; @@ -22,7 +21,7 @@ public class SegmentReplicationCheckpointPublisher { private final PublishAction publishAction; - @Inject + // This Component is behind feature flag so we are manually binding this in IndicesModule. public SegmentReplicationCheckpointPublisher(PublishCheckpointAction publishAction) { this(publishAction::publish); } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 8c894e7b57616..4b4fdc974f8cb 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -937,16 +937,16 @@ protected Node( .toInstance(new PeerRecoverySourceService(transportService, indicesService, recoverySettings)); b.bind(PeerRecoveryTargetService.class) .toInstance(new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService)); - b.bind(SegmentReplicationTargetService.class) - .toInstance( - new SegmentReplicationTargetService( - threadPool, - recoverySettings, - transportService, - new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService) - ) - ); if (FeatureFlags.isEnabled(REPLICATION_TYPE)) { + b.bind(SegmentReplicationTargetService.class) + .toInstance( + new SegmentReplicationTargetService( + threadPool, + recoverySettings, + transportService, + new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService) + ) + ); b.bind(SegmentReplicationSourceService.class) .toInstance(new SegmentReplicationSourceService(indicesService, transportService, recoverySettings)); } diff --git a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java index da6ab32cb2517..77cc1d744f0dc 100644 --- a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java @@ -20,15 +20,15 @@ import org.opensearch.index.Index; import org.opensearch.index.IndexService; import org.opensearch.index.shard.IndexShard; -import org.opensearch.index.shard.IndexShardTestCase; +import org.opensearch.index.shard.ShardId; import org.opensearch.indices.IndicesService; import org.opensearch.indices.replication.SegmentReplicationTargetService; +import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.transport.CapturingTransport; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; -import java.io.IOException; import java.util.Collections; import java.util.concurrent.atomic.AtomicBoolean; @@ -36,7 +36,7 @@ import static org.mockito.Mockito.*; import static org.opensearch.test.ClusterServiceUtils.createClusterService; -public class PublishCheckpointActionTests extends IndexShardTestCase { +public class PublishCheckpointActionTests extends OpenSearchTestCase { private ThreadPool threadPool; private CapturingTransport transport; @@ -73,14 +73,19 @@ public void tearDown() throws Exception { super.tearDown(); } - public void testPublishCheckpointActionOnPrimary() throws InterruptedException, IOException { + public void testPublishCheckpointActionOnPrimary() { final IndicesService indicesService = mock(IndicesService.class); final Index index = new Index("index", "uuid"); final IndexService indexService = mock(IndexService.class); when(indicesService.indexServiceSafe(index)).thenReturn(indexService); - final IndexShard indexShard = newShard(true); + final int id = randomIntBetween(0, 4); + final IndexShard indexShard = mock(IndexShard.class); + when(indexService.getShard(id)).thenReturn(indexShard); + + final ShardId shardId = new ShardId(index, id); + when(indexShard.shardId()).thenReturn(shardId); final SegmentReplicationTargetService mockTargetService = mock(SegmentReplicationTargetService.class); @@ -103,18 +108,21 @@ public void testPublishCheckpointActionOnPrimary() throws InterruptedException, // we should forward the request containing the current publish checkpoint to the replica assertThat(result.replicaRequest(), sameInstance(request)); })); - closeShards(indexShard); } - public void testPublishCheckpointActionOnReplica() throws IOException { + public void testPublishCheckpointActionOnReplica() { final IndicesService indicesService = mock(IndicesService.class); final Index index = new Index("index", "uuid"); final IndexService indexService = mock(IndexService.class); when(indicesService.indexServiceSafe(index)).thenReturn(indexService); + final int id = randomIntBetween(0, 4); + final IndexShard indexShard = mock(IndexShard.class); + when(indexService.getShard(id)).thenReturn(indexShard); - final IndexShard indexShard = newShard(false); + final ShardId shardId = new ShardId(index, id); + when(indexShard.shardId()).thenReturn(shardId); final SegmentReplicationTargetService mockTargetService = mock(SegmentReplicationTargetService.class); @@ -138,13 +146,12 @@ public void testPublishCheckpointActionOnReplica() throws IOException { final TransportReplicationAction.ReplicaResult result = listener.actionGet(); // onNewCheckpoint should be called on shard with checkpoint request - verify(mockTargetService).onNewCheckpoint(checkpoint, indexShard); + verify(mockTargetService, times(1)).onNewCheckpoint(checkpoint, indexShard); // the result should indicate success final AtomicBoolean success = new AtomicBoolean(); result.runPostReplicaActions(ActionListener.wrap(r -> success.set(true), e -> fail(e.toString()))); assertTrue(success.get()); - closeShards(indexShard); }