Skip to content

Commit

Permalink
Making getLatestReceivedCheckpoint() as private and adding java docs
Browse files Browse the repository at this point in the history
Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
  • Loading branch information
Rishikesh1159 committed Jun 23, 2022
1 parent b7c5d6d commit fcdfbd1
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class SegmentReplicationTargetService implements IndexEventListener {

private final SegmentReplicationSourceFactory sourceFactory;

private static final Map<ShardId, ReplicationCheckpoint> latestReceivedCheckpoint = new HashMap<>();
private final Map<ShardId, ReplicationCheckpoint> latestReceivedCheckpoint = new HashMap<>();

/**
* The internal actions
Expand Down Expand Up @@ -88,7 +88,12 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh
}
}

ReplicationCheckpoint getLatestReceivedCheckpoint(ShardId shardId) {
/**
* Returns the Latest checkpoint received by replica shard based on shard ID.
* Returns null if replica shard has not received a checkpoint before
* @param shardId Shard Id of replica shard
*/
private ReplicationCheckpoint getLatestReceivedCheckpoint(ShardId shardId) {
if (latestReceivedCheckpoint.containsKey(shardId)) {
return latestReceivedCheckpoint.get(shardId);
}
Expand All @@ -102,6 +107,8 @@ ReplicationCheckpoint getLatestReceivedCheckpoint(ShardId shardId) {
* @param replicaShard replica shard on which checkpoint is received
*/
public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedCheckpoint, final IndexShard replicaShard) {

// Checks if received checkpoint is already present and ahead then it replaces old received checkpoint
if (getLatestReceivedCheckpoint(replicaShard.shardId()) != null) {
if (receivedCheckpoint.isAheadOf(latestReceivedCheckpoint.get(replicaShard.shardId()))) {
latestReceivedCheckpoint.replace(replicaShard.shardId(), receivedCheckpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,6 @@ public void testReplicationOnDone() throws IOException {

}

public void testLatestReceivedCheckpoint() {
SegmentReplicationTargetService spy = spy(sut);
assertEquals(null, spy.getLatestReceivedCheckpoint(indexShard.shardId()));
spy.onNewCheckpoint(checkpoint, indexShard);
assertEquals(checkpoint, spy.getLatestReceivedCheckpoint(indexShard.shardId()));
spy.onNewCheckpoint(indexShard.getLatestReplicationCheckpoint(), indexShard);
assertEquals(indexShard.getLatestReplicationCheckpoint(), spy.getLatestReceivedCheckpoint(indexShard.shardId()));
}

public void testBeforeIndexShardClosed_CancelsOngoingReplications() {
final SegmentReplicationTarget target = new SegmentReplicationTarget(
checkpoint,
Expand Down

0 comments on commit fcdfbd1

Please sign in to comment.