Skip to content

Commit

Permalink
Adding Latest Recevied checkpoint, replay checkpoint logic along with…
Browse files Browse the repository at this point in the history
… tests

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
  • Loading branch information
Rishikesh1159 committed Jun 23, 2022
1 parent 7005b9e commit b7c5d6d
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.opensearch.transport.TransportRequestHandler;
import org.opensearch.transport.TransportService;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand All @@ -49,6 +51,8 @@ public class SegmentReplicationTargetService implements IndexEventListener {

private final SegmentReplicationSourceFactory sourceFactory;

private static final Map<ShardId, ReplicationCheckpoint> latestReceivedCheckpoint = new HashMap<>();

/**
* The internal actions
*
Expand Down Expand Up @@ -84,13 +88,27 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh
}
}

ReplicationCheckpoint getLatestReceivedCheckpoint(ShardId shardId) {
if (latestReceivedCheckpoint.containsKey(shardId)) {
return latestReceivedCheckpoint.get(shardId);
}
return null;
}

/**
* Invoked when a new checkpoint is received from a primary shard.
* It checks if a new checkpoint should be processed or not and starts replication if needed.
* @param receivedCheckpoint received checkpoint that is checked for processing
* @param replicaShard replica shard on which checkpoint is received
*/
public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedCheckpoint, final IndexShard replicaShard) {
if (getLatestReceivedCheckpoint(replicaShard.shardId()) != null) {
if (receivedCheckpoint.isAheadOf(latestReceivedCheckpoint.get(replicaShard.shardId()))) {
latestReceivedCheckpoint.replace(replicaShard.shardId(), receivedCheckpoint);
}
} else {
latestReceivedCheckpoint.put(replicaShard.shardId(), receivedCheckpoint);
}
if (onGoingReplications.isShardReplicating(replicaShard.shardId())) {
logger.trace(
() -> new ParameterizedMessage(
Expand All @@ -103,7 +121,13 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe
if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) {
startReplication(receivedCheckpoint, replicaShard, new SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {}
public void onReplicationDone(SegmentReplicationState state) {
// if we received a checkpoint during the copy event that is ahead of this
// try and process it.
if (getLatestReceivedCheckpoint(replicaShard.shardId()).isAheadOf(replicaShard.getLatestReplicationCheckpoint())) {
onNewCheckpoint(getLatestReceivedCheckpoint(replicaShard.shardId()), replicaShard);
}
}

@Override
public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,48 @@ public void testNewCheckpoint_validationPassesAndReplicationFails() throws IOExc
closeShard(indexShard, false);
}

public void testReplicationOnDone() throws IOException {
SegmentReplicationTargetService spy = spy(sut);
IndexShard spyShard = spy(indexShard);
ReplicationCheckpoint cp = indexShard.getLatestReplicationCheckpoint();
ReplicationCheckpoint newCheckpoint = new ReplicationCheckpoint(
cp.getShardId(),
cp.getPrimaryTerm(),
cp.getSegmentsGen(),
cp.getSeqNo(),
cp.getSegmentInfosVersion() + 1
);
ReplicationCheckpoint anotherNewCheckpoint = new ReplicationCheckpoint(
cp.getShardId(),
cp.getPrimaryTerm(),
cp.getSegmentsGen(),
cp.getSeqNo(),
cp.getSegmentInfosVersion() + 2
);
ArgumentCaptor<SegmentReplicationTargetService.SegmentReplicationListener> captor = ArgumentCaptor.forClass(
SegmentReplicationTargetService.SegmentReplicationListener.class
);
doNothing().when(spy).startReplication(any(), any(), any());
spy.onNewCheckpoint(newCheckpoint, spyShard);
spy.onNewCheckpoint(anotherNewCheckpoint, spyShard);
verify(spy, times(1)).startReplication(eq(newCheckpoint), any(), captor.capture());
verify(spy, times(1)).onNewCheckpoint(eq(anotherNewCheckpoint), any());
SegmentReplicationTargetService.SegmentReplicationListener listener = captor.getValue();
listener.onDone(new SegmentReplicationState(new ReplicationLuceneIndex()));
verify(spy, times(2)).onNewCheckpoint(eq(anotherNewCheckpoint), any());
closeShard(indexShard, false);

}

public void testLatestReceivedCheckpoint() {
SegmentReplicationTargetService spy = spy(sut);
assertEquals(null, spy.getLatestReceivedCheckpoint(indexShard.shardId()));
spy.onNewCheckpoint(checkpoint, indexShard);
assertEquals(checkpoint, spy.getLatestReceivedCheckpoint(indexShard.shardId()));
spy.onNewCheckpoint(indexShard.getLatestReplicationCheckpoint(), indexShard);
assertEquals(indexShard.getLatestReplicationCheckpoint(), spy.getLatestReceivedCheckpoint(indexShard.shardId()));
}

public void testBeforeIndexShardClosed_CancelsOngoingReplications() {
final SegmentReplicationTarget target = new SegmentReplicationTarget(
checkpoint,
Expand Down

0 comments on commit b7c5d6d

Please sign in to comment.