Skip to content

Commit

Permalink
Adding condition to check if fork of thread is necessary.
Browse files Browse the repository at this point in the history
Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
  • Loading branch information
Rishikesh1159 committed Jul 20, 2022
1 parent d9b2e71 commit e0ec134
Showing 1 changed file with 9 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,22 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe
);
return;
}
final Thread thread = Thread.currentThread();
if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) {
startReplication(receivedCheckpoint, replicaShard, new SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
// if we received a checkpoint during the copy event that is ahead of this
// try and process it.
if (latestReceivedCheckpoint.get(replicaShard.shardId()).isAheadOf(replicaShard.getLatestReplicationCheckpoint())) {
threadPool.generic()
.execute(() -> onNewCheckpoint(latestReceivedCheckpoint.get(replicaShard.shardId()), replicaShard));
Runnable runnable = () -> onNewCheckpoint(latestReceivedCheckpoint.get(replicaShard.shardId()), replicaShard);
// Checks if we are using same thread and forks if necessary.
if (thread == Thread.currentThread()){
threadPool.generic().execute(runnable);
}
else{
runnable.run();
}
}
}

Expand Down

0 comments on commit e0ec134

Please sign in to comment.