Skip to content

Commit

Permalink
Addressing comments on PR
Browse files Browse the repository at this point in the history
Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
  • Loading branch information
Rishikesh1159 committed Jun 22, 2022
1 parent 2a1d718 commit 9aec215
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ public final EngineConfig config() {
* Return the latest active SegmentInfos from the engine.
* @return {@link SegmentInfos}
*/
@Nullable
protected abstract SegmentInfos getLatestSegmentInfos();

/**
Expand Down
54 changes: 33 additions & 21 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1376,25 +1376,33 @@ public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineExcepti
}
}

/**
* Returns the lastest segmentInfos
*/
public SegmentInfos getLatestSegmentInfos() {
return getEngine().getSegmentInfosSnapshot().get();
}

/**
* Returns the lastest Replication Checkpoint that shard received
*/
public ReplicationCheckpoint getLatestReplicationCheckpoint() {
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
return new ReplicationCheckpoint(
this.shardId,
getOperationPrimaryTerm(),
latestSegmentInfos.getGeneration(),
getProcessedLocalCheckpoint(),
latestSegmentInfos.getVersion()
);
try (final GatedCloseable<SegmentInfos> snapshot = getSegmentInfosSnapshot()) {
return Optional.ofNullable(snapshot.get())
.map(
segmentInfos -> new ReplicationCheckpoint(
this.shardId,
getOperationPrimaryTerm(),
segmentInfos.getGeneration(),
getProcessedLocalCheckpoint(),
segmentInfos.getVersion()
)
)
.orElse(
new ReplicationCheckpoint(
shardId,
getOperationPrimaryTerm(),
SequenceNumbers.NO_OPS_PERFORMED,
getProcessedLocalCheckpoint(),
SequenceNumbers.NO_OPS_PERFORMED
)
);
} catch (IOException ex) {
throw new OpenSearchException("Error Closing SegmentInfos Snapshot", ex);
}
}

/**
Expand All @@ -1403,22 +1411,26 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() {
* @param requestCheckpoint received checkpoint that is checked for processing
* @return true if checkpoint should be processed
*/
public boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) {
public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) {
if (state().equals(IndexShardState.STARTED) == false) {
logger.trace("Ignoring new replication checkpoint - shard is not started {}", state());
logger.trace(() -> new ParameterizedMessage("Ignoring new replication checkpoint - shard is not started {}", state()));
return false;
}
ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint();
if (localCheckpoint.isAheadOf(requestCheckpoint)) {
logger.trace(
"Ignoring new replication checkpoint - Shard is already on checkpoint {} that is ahead of {}",
localCheckpoint,
requestCheckpoint
() -> new ParameterizedMessage(
"Ignoring new replication checkpoint - Shard is already on checkpoint {} that is ahead of {}",
localCheckpoint,
requestCheckpoint
)
);
return false;
}
if (localCheckpoint.equals(requestCheckpoint)) {
logger.trace("Ignoring new replication checkpoint - Shard is already on checkpoint {}", requestCheckpoint);
logger.trace(
() -> new ParameterizedMessage("Ignoring new replication checkpoint - Shard is already on checkpoint {}", requestCheckpoint)
);
return false;
}
return true;
Expand Down
28 changes: 27 additions & 1 deletion server/src/main/java/org/opensearch/indices/IndicesModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.opensearch.action.resync.TransportResyncReplicationAction;
import org.opensearch.common.ParseField;
import org.opensearch.common.inject.AbstractModule;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.inject.Provider;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.io.stream.NamedWriteableRegistry.Entry;
import org.opensearch.common.util.FeatureFlags;
Expand Down Expand Up @@ -74,6 +76,7 @@
import org.opensearch.index.shard.PrimaryReplicaSyncer;
import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.replication.checkpoint.PublishCheckpointAction;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.store.IndicesStore;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;
Expand Down Expand Up @@ -281,7 +284,30 @@ protected void configure() {
bind(RetentionLeaseBackgroundSyncAction.class).asEagerSingleton();
bind(RetentionLeaseSyncer.class).asEagerSingleton();
if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)) {
bind(SegmentReplicationCheckpointPublisher.class).asEagerSingleton();
bind(SegmentReplicationCheckpointPublisher.class).toProvider(CheckpointPublisherProvider.class).asEagerSingleton();
} else {
bind(SegmentReplicationCheckpointPublisher.class).toInstance(SegmentReplicationCheckpointPublisher.EMPTY);
}
}

/**
* This provider is necessary while segment replication is behind a feature flag.
* We don't want to initialize a PublishCheckpointAction with the feature flag disabled.
*
* @opensearch.internal
*/
public final static class CheckpointPublisherProvider implements Provider<SegmentReplicationCheckpointPublisher> {

private final PublishCheckpointAction action;

@Inject
public CheckpointPublisherProvider(PublishCheckpointAction action) {
this.action = action;
}

@Override
public SegmentReplicationCheckpointPublisher get() {
return new SegmentReplicationCheckpointPublisher(action);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.common.Nullable;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
Expand Down Expand Up @@ -59,7 +58,6 @@ public static class Actions {
public static final String FILE_CHUNK = "internal:index/shard/replication/file_chunk";
}

@Inject
public SegmentReplicationTargetService(
final ThreadPool threadPool,
final RecoverySettings recoverySettings,
Expand Down Expand Up @@ -89,27 +87,29 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh
/**
* Invoked when a new checkpoint is received from a primary shard.
* It checks if a new checkpoint should be processed or not and starts replication if needed.
* @param requestCheckpoint received checkpoint that is checked for processing
* @param indexShard replica shard on which checkpoint is received
* @param receivedCheckpoint received checkpoint that is checked for processing
* @param replicaShard replica shard on which checkpoint is received
*/
public synchronized void onNewCheckpoint(final ReplicationCheckpoint requestCheckpoint, final IndexShard indexShard) {
if (onGoingReplications.isShardReplicating(indexShard.shardId())) {
public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedCheckpoint, final IndexShard replicaShard) {
if (onGoingReplications.isShardReplicating(replicaShard.shardId())) {
logger.trace(
"Ignoring new replication checkpoint - shard is currently replicating to checkpoint {}",
indexShard.getLatestReplicationCheckpoint()
() -> new ParameterizedMessage(
"Ignoring new replication checkpoint - shard is currently replicating to checkpoint {}",
replicaShard.getLatestReplicationCheckpoint()
)
);
return;
}
if (indexShard.shouldProcessCheckpoint(requestCheckpoint)) {
startReplication(requestCheckpoint, indexShard, new SegmentReplicationListener() {
if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) {
startReplication(receivedCheckpoint, replicaShard, new SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {}

@Override
public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) {
if (sendShardFailure == true) {
logger.error("replication failure", e);
indexShard.failShard("replication failure", e);
replicaShard.failShard("replication failure", e);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.indices.replication.checkpoint;

import org.opensearch.common.inject.Inject;
import org.opensearch.index.shard.IndexShard;

import java.util.Objects;
Expand All @@ -22,7 +21,7 @@ public class SegmentReplicationCheckpointPublisher {

private final PublishAction publishAction;

@Inject
// This Component is behind feature flag so we are manually binding this in IndicesModule.
public SegmentReplicationCheckpointPublisher(PublishCheckpointAction publishAction) {
this(publishAction::publish);
}
Expand Down
18 changes: 9 additions & 9 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -937,16 +937,16 @@ protected Node(
.toInstance(new PeerRecoverySourceService(transportService, indicesService, recoverySettings));
b.bind(PeerRecoveryTargetService.class)
.toInstance(new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService));
b.bind(SegmentReplicationTargetService.class)
.toInstance(
new SegmentReplicationTargetService(
threadPool,
recoverySettings,
transportService,
new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService)
)
);
if (FeatureFlags.isEnabled(REPLICATION_TYPE)) {
b.bind(SegmentReplicationTargetService.class)
.toInstance(
new SegmentReplicationTargetService(
threadPool,
recoverySettings,
transportService,
new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService)
)
);
b.bind(SegmentReplicationSourceService.class)
.toInstance(new SegmentReplicationSourceService(indicesService, transportService, recoverySettings));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,23 @@
import org.opensearch.index.Index;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardTestCase;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.transport.CapturingTransport;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.*;
import static org.opensearch.test.ClusterServiceUtils.createClusterService;

public class PublishCheckpointActionTests extends IndexShardTestCase {
public class PublishCheckpointActionTests extends OpenSearchTestCase {

private ThreadPool threadPool;
private CapturingTransport transport;
Expand Down Expand Up @@ -73,14 +73,19 @@ public void tearDown() throws Exception {
super.tearDown();
}

public void testPublishCheckpointActionOnPrimary() throws InterruptedException, IOException {
public void testPublishCheckpointActionOnPrimary() {
final IndicesService indicesService = mock(IndicesService.class);

final Index index = new Index("index", "uuid");
final IndexService indexService = mock(IndexService.class);
when(indicesService.indexServiceSafe(index)).thenReturn(indexService);

final IndexShard indexShard = newShard(true);
final int id = randomIntBetween(0, 4);
final IndexShard indexShard = mock(IndexShard.class);
when(indexService.getShard(id)).thenReturn(indexShard);

final ShardId shardId = new ShardId(index, id);
when(indexShard.shardId()).thenReturn(shardId);

final SegmentReplicationTargetService mockTargetService = mock(SegmentReplicationTargetService.class);

Expand All @@ -103,18 +108,21 @@ public void testPublishCheckpointActionOnPrimary() throws InterruptedException,
// we should forward the request containing the current publish checkpoint to the replica
assertThat(result.replicaRequest(), sameInstance(request));
}));
closeShards(indexShard);

}

public void testPublishCheckpointActionOnReplica() throws IOException {
public void testPublishCheckpointActionOnReplica() {
final IndicesService indicesService = mock(IndicesService.class);

final Index index = new Index("index", "uuid");
final IndexService indexService = mock(IndexService.class);
when(indicesService.indexServiceSafe(index)).thenReturn(indexService);
final int id = randomIntBetween(0, 4);
final IndexShard indexShard = mock(IndexShard.class);
when(indexService.getShard(id)).thenReturn(indexShard);

final IndexShard indexShard = newShard(false);
final ShardId shardId = new ShardId(index, id);
when(indexShard.shardId()).thenReturn(shardId);

final SegmentReplicationTargetService mockTargetService = mock(SegmentReplicationTargetService.class);

Expand All @@ -138,13 +146,12 @@ public void testPublishCheckpointActionOnReplica() throws IOException {
final TransportReplicationAction.ReplicaResult result = listener.actionGet();

// onNewCheckpoint should be called on shard with checkpoint request
verify(mockTargetService).onNewCheckpoint(checkpoint, indexShard);
verify(mockTargetService, times(1)).onNewCheckpoint(checkpoint, indexShard);

// the result should indicate success
final AtomicBoolean success = new AtomicBoolean();
result.runPostReplicaActions(ActionListener.wrap(r -> success.set(true), e -> fail(e.toString())));
assertTrue(success.get());
closeShards(indexShard);

}

Expand Down

0 comments on commit 9aec215

Please sign in to comment.