diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 9311f1fabd2b2..4829148322b31 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -173,7 +173,7 @@ public final EngineConfig config() { * Return the latest active SegmentInfos from the engine. * @return {@link SegmentInfos} */ - public abstract SegmentInfos getLatestSegmentInfos(); + protected abstract SegmentInfos getLatestSegmentInfos(); /** * In contrast to {@link #getLatestSegmentInfos()}, which returns a {@link SegmentInfos} diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index b63a39ebb1222..d2d688a90353e 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -2289,7 +2289,7 @@ protected SegmentInfos getLastCommittedSegmentInfos() { } @Override - public SegmentInfos getLatestSegmentInfos() { + protected SegmentInfos getLatestSegmentInfos() { OpenSearchDirectoryReader reader = null; try { reader = internalReaderManager.acquire(); diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index c9b5ccaca8800..e4f4bbbba8f16 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -431,7 +431,7 @@ protected SegmentInfos getLastCommittedSegmentInfos() { } @Override - public SegmentInfos getLatestSegmentInfos() { + protected SegmentInfos getLatestSegmentInfos() { return readerManager.getSegmentInfos(); } diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index bc54bb9433fb4..6262a9269c01c 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -271,7 +271,7 @@ protected SegmentInfos getLastCommittedSegmentInfos() { } @Override - public SegmentInfos getLatestSegmentInfos() { + protected SegmentInfos getLatestSegmentInfos() { return lastCommittedSegmentInfos; } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index c01260ef4909b..b6083580964a0 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1372,18 +1372,11 @@ public GatedCloseable acquireSafeIndexCommit() throws EngineExcepti } } - /** - * Returns the IndexShardSate of current shard - */ - public IndexShardState getState() { - return this.state; - } - /** * Returns the lastest segmentInfos */ public SegmentInfos getLatestSegmentInfos() { - return getEngine().getLatestSegmentInfos(); + return getEngine().getSegmentInfosSnapshot().get(); } /** diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java index 40a36e44f4f98..08dbbb194d16c 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java @@ -160,7 +160,6 @@ public class RecoverySettings { private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE; - @Inject public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { this.retryDelayStateSync = INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.get(settings); this.maxConcurrentFileChunks = INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING.get(settings); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java index 242f4c953beb6..453075bf7d24e 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java @@ -28,7 +28,6 @@ public class SegmentReplicationSourceFactory { private RecoverySettings recoverySettings; private ClusterService clusterService; - @Inject public SegmentReplicationSourceFactory( TransportService transportService, RecoverySettings recoverySettings, diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index da4145322ccd7..6e5a5e3f57e4f 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -90,6 +90,8 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh /** * Invoked when a new checkpoint is received from a primary shard. * It checks if a new checkpoint should be processed or not and starts replication if needed. + * @param requestCheckpoint received checkpoint that is checked for processing + * @param indexShard replica shard on which checkpoint is received */ public synchronized void onNewCheckpoint(final ReplicationCheckpoint requestCheckpoint, final IndexShard indexShard) { logger.trace("Checkpoint received {}", () -> requestCheckpoint); @@ -130,26 +132,25 @@ public void startReplication(final SegmentReplicationTarget target) { * Checks if checkpoint should be processed * * @param requestCheckpoint received checkpoint that is checked for processing - * @param indexshard replica shard on which checkpoint is received + * @param indexShard replica shard on which checkpoint is received * @return true if checkpoint should be processed */ - private boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint, IndexShard indexshard) { - if (indexshard.getState().equals(IndexShardState.STARTED) == false) { - logger.debug("Ignore - shard is not started {}", indexshard.getState()); + private boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint, IndexShard indexShard) { + if (indexShard.state().equals(IndexShardState.STARTED) == false) { + logger.trace("Ignoring new replication checkpoint - shard is not started {}", indexShard.state()); return false; } - ReplicationCheckpoint localCheckpoint = indexshard.getLatestReplicationCheckpoint(); - logger.debug("Local Checkpoint {}", indexshard.getLatestReplicationCheckpoint()); - if (onGoingReplications.isShardReplicating(indexshard.shardId())) { - logger.debug("Ignore - shard is currently replicating to a checkpoint"); + ReplicationCheckpoint localCheckpoint = indexShard.getLatestReplicationCheckpoint(); + if (onGoingReplications.isShardReplicating(indexShard.shardId())) { + logger.trace("Ignoring new replication checkpoint - shard is currently replicating to a checkpoint"); return false; } if (localCheckpoint.isAheadOf(requestCheckpoint)) { - logger.debug("Ignore - Shard is already on checkpoint {} that is ahead of {}", localCheckpoint, requestCheckpoint); + logger.trace("Ignoring new replication checkpoint - Shard is already on checkpoint {} that is ahead of {}", localCheckpoint, requestCheckpoint); return false; } if (localCheckpoint.equals(requestCheckpoint)) { - logger.debug("Ignore - Shard is already on checkpoint {}", requestCheckpoint); + logger.trace("Ignoring new replication checkpoint - Shard is already on checkpoint {}", requestCheckpoint); return false; } return true; diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 7e205b88e9eb1..937a93f7211de 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -37,6 +37,8 @@ import org.apache.lucene.util.Constants; import org.apache.lucene.util.SetOnce; import org.opensearch.index.IndexingPressureService; +import org.opensearch.indices.replication.SegmentReplicationSourceFactory; +import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.watcher.ResourceWatcherService; import org.opensearch.Assertions; import org.opensearch.Build; @@ -932,6 +934,8 @@ protected Node( .toInstance(new PeerRecoverySourceService(transportService, indicesService, recoverySettings)); b.bind(PeerRecoveryTargetService.class) .toInstance(new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService)); + b.bind(SegmentReplicationTargetService.class) + .toInstance(new SegmentReplicationTargetService(threadPool, recoverySettings, transportService, new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService))); } b.bind(HttpServerTransport.class).toInstance(httpServerTransport); pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p));