Skip to content

Commit

Permalink
Changing binding/inject logic and addressing comments from PR
Browse files Browse the repository at this point in the history
Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
  • Loading branch information
Rishikesh1159 committed Jun 9, 2022
1 parent 8b288d8 commit 02c1be3
Show file tree
Hide file tree
Showing 9 changed files with 20 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public final EngineConfig config() {
* Return the latest active SegmentInfos from the engine.
* @return {@link SegmentInfos}
*/
public abstract SegmentInfos getLatestSegmentInfos();
protected abstract SegmentInfos getLatestSegmentInfos();

/**
* In contrast to {@link #getLatestSegmentInfos()}, which returns a {@link SegmentInfos}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2289,7 +2289,7 @@ protected SegmentInfos getLastCommittedSegmentInfos() {
}

@Override
public SegmentInfos getLatestSegmentInfos() {
protected SegmentInfos getLatestSegmentInfos() {
OpenSearchDirectoryReader reader = null;
try {
reader = internalReaderManager.acquire();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ protected SegmentInfos getLastCommittedSegmentInfos() {
}

@Override
public SegmentInfos getLatestSegmentInfos() {
protected SegmentInfos getLatestSegmentInfos() {
return readerManager.getSegmentInfos();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ protected SegmentInfos getLastCommittedSegmentInfos() {
}

@Override
public SegmentInfos getLatestSegmentInfos() {
protected SegmentInfos getLatestSegmentInfos() {
return lastCommittedSegmentInfos;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1372,18 +1372,11 @@ public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineExcepti
}
}

/**
* Returns the IndexShardSate of current shard
*/
public IndexShardState getState() {
return this.state;
}

/**
* Returns the lastest segmentInfos
*/
public SegmentInfos getLatestSegmentInfos() {
return getEngine().getLatestSegmentInfos();
return getEngine().getSegmentInfosSnapshot().get();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ public class RecoverySettings {

private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE;

@Inject
public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
this.retryDelayStateSync = INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.get(settings);
this.maxConcurrentFileChunks = INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING.get(settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public class SegmentReplicationSourceFactory {
private RecoverySettings recoverySettings;
private ClusterService clusterService;

@Inject
public SegmentReplicationSourceFactory(
TransportService transportService,
RecoverySettings recoverySettings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh
/**
* Invoked when a new checkpoint is received from a primary shard.
* It checks if a new checkpoint should be processed or not and starts replication if needed.
* @param requestCheckpoint received checkpoint that is checked for processing
* @param indexShard replica shard on which checkpoint is received
*/
public synchronized void onNewCheckpoint(final ReplicationCheckpoint requestCheckpoint, final IndexShard indexShard) {
logger.trace("Checkpoint received {}", () -> requestCheckpoint);
Expand Down Expand Up @@ -130,26 +132,25 @@ public void startReplication(final SegmentReplicationTarget target) {
* Checks if checkpoint should be processed
*
* @param requestCheckpoint received checkpoint that is checked for processing
* @param indexshard replica shard on which checkpoint is received
* @param indexShard replica shard on which checkpoint is received
* @return true if checkpoint should be processed
*/
private boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint, IndexShard indexshard) {
if (indexshard.getState().equals(IndexShardState.STARTED) == false) {
logger.debug("Ignore - shard is not started {}", indexshard.getState());
private boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint, IndexShard indexShard) {
if (indexShard.state().equals(IndexShardState.STARTED) == false) {
logger.trace("Ignoring new replication checkpoint - shard is not started {}", indexShard.state());
return false;
}
ReplicationCheckpoint localCheckpoint = indexshard.getLatestReplicationCheckpoint();
logger.debug("Local Checkpoint {}", indexshard.getLatestReplicationCheckpoint());
if (onGoingReplications.isShardReplicating(indexshard.shardId())) {
logger.debug("Ignore - shard is currently replicating to a checkpoint");
ReplicationCheckpoint localCheckpoint = indexShard.getLatestReplicationCheckpoint();
if (onGoingReplications.isShardReplicating(indexShard.shardId())) {
logger.trace("Ignoring new replication checkpoint - shard is currently replicating to a checkpoint");
return false;
}
if (localCheckpoint.isAheadOf(requestCheckpoint)) {
logger.debug("Ignore - Shard is already on checkpoint {} that is ahead of {}", localCheckpoint, requestCheckpoint);
logger.trace("Ignoring new replication checkpoint - Shard is already on checkpoint {} that is ahead of {}", localCheckpoint, requestCheckpoint);
return false;
}
if (localCheckpoint.equals(requestCheckpoint)) {
logger.debug("Ignore - Shard is already on checkpoint {}", requestCheckpoint);
logger.trace("Ignoring new replication checkpoint - Shard is already on checkpoint {}", requestCheckpoint);
return false;
}
return true;
Expand Down
4 changes: 4 additions & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.SetOnce;
import org.opensearch.index.IndexingPressureService;
import org.opensearch.indices.replication.SegmentReplicationSourceFactory;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.watcher.ResourceWatcherService;
import org.opensearch.Assertions;
import org.opensearch.Build;
Expand Down Expand Up @@ -932,6 +934,8 @@ protected Node(
.toInstance(new PeerRecoverySourceService(transportService, indicesService, recoverySettings));
b.bind(PeerRecoveryTargetService.class)
.toInstance(new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService));
b.bind(SegmentReplicationTargetService.class)
.toInstance(new SegmentReplicationTargetService(threadPool, recoverySettings, transportService, new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService)));
}
b.bind(HttpServerTransport.class).toInstance(httpServerTransport);
pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p));
Expand Down

0 comments on commit 02c1be3

Please sign in to comment.