Skip to content

Commit

Permalink
[Segment Replication] Use engine codec and replica shard ReplicationC…
Browse files Browse the repository at this point in the history
…heckpoint for replication events

Signed-off-by: Suraj Singh <surajrider@gmail.com>
  • Loading branch information
dreamer-89 committed May 24, 2023
1 parent 49fafd1 commit a577c91
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,10 @@ public String getDefaultCodecName() {
return codecService.codec(CodecService.DEFAULT_CODEC).getName();
}

public String getEngineCodec() {
return getEngine().config().getCodec().getName();
}

/**
* USE THIS METHOD WITH CARE!
* Returns the primary term the index shard is supposed to be on. In case of primary promotion or when a replica learns about
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe
}
final Thread thread = Thread.currentThread();
if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) {
startReplication(receivedCheckpoint, replicaShard, new SegmentReplicationListener() {
startReplication(replicaShard.getLatestReplicationCheckpoint(), replicaShard, new SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
logger.trace(
Expand Down Expand Up @@ -430,7 +430,7 @@ public void messageReceived(final ForceSyncRequest request, TransportChannel cha
return;
}
startReplication(
ReplicationCheckpoint.empty(request.getShardId(), indexShard.getDefaultCodecName()),
ReplicationCheckpoint.empty(request.getShardId(), indexShard.getEngineCodec()),
indexShard,
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1027,7 +1027,7 @@ private void assertDocCounts(IndexShard indexShard, int expectedPersistedDocCoun

private void resolveCheckpointInfoResponseListener(ActionListener<CheckpointInfoResponse> listener, IndexShard primary) {
try {
final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primary.shardId, primary.getDefaultCodecName()), primary);
final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primary.shardId, primary.getEngineCodec()), primary);
listener.onResponse(
new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes())
);
Expand All @@ -1041,7 +1041,7 @@ private void startReplicationAndAssertCancellation(IndexShard replica, SegmentRe
throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
final SegmentReplicationTarget target = targetService.startReplication(
ReplicationCheckpoint.empty(replica.shardId, replica.getDefaultCodecName()),
ReplicationCheckpoint.empty(replica.shardId, replica.getEngineCodec()),
replica,
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,11 @@ public void setUp() throws Exception {
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName())
.build();
CodecService codecService = new CodecService(null, null);
String defaultCodecName = codecService.codec(CodecService.DEFAULT_CODEC).getName();
primaryShard = newStartedShard(true, settings);
String primaryCodec = primaryShard.getEngineCodec();
replicaShard = newShard(false, settings, new NRTReplicationEngineFactory());
recoverReplica(replicaShard, primaryShard, true, getReplicationFunc(replicaShard));
checkpoint = new ReplicationCheckpoint(replicaShard.shardId(), 0L, 0L, 0L, defaultCodecName);
checkpoint = new ReplicationCheckpoint(replicaShard.shardId(), 0L, 0L, 0L, replicaShard.getEngineCodec());
SegmentReplicationSourceFactory replicationSourceFactory = mock(SegmentReplicationSourceFactory.class);
replicationSource = mock(SegmentReplicationSource.class);
when(replicationSourceFactory.get(replicaShard)).thenReturn(replicationSource);
Expand All @@ -86,14 +85,14 @@ public void setUp() throws Exception {
initialCheckpoint.getPrimaryTerm(),
initialCheckpoint.getSegmentsGen(),
initialCheckpoint.getSegmentInfosVersion() + 1,
defaultCodecName
primaryCodec
);
newPrimaryCheckpoint = new ReplicationCheckpoint(
initialCheckpoint.getShardId(),
initialCheckpoint.getPrimaryTerm() + 1,
initialCheckpoint.getSegmentsGen(),
initialCheckpoint.getSegmentInfosVersion() + 1,
defaultCodecName
primaryCodec
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void setUp() throws Exception {
spyIndexShard.getPendingPrimaryTerm(),
testSegmentInfos.getGeneration(),
testSegmentInfos.version,
Codec.getDefault().getName()
indexShard.getEngineCodec()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1317,7 +1317,7 @@ public void getCheckpointMetadata(
) {
try {
final CopyState copyState = new CopyState(
ReplicationCheckpoint.empty(primaryShard.shardId, primaryShard.getDefaultCodecName()),
ReplicationCheckpoint.empty(primaryShard.shardId, primaryShard.getEngineCodec()),
primaryShard
);
listener.onResponse(
Expand Down Expand Up @@ -1373,7 +1373,7 @@ public final List<SegmentReplicationTarget> replicateSegments(IndexShard primary
for (IndexShard replica : replicaShards) {
final SegmentReplicationTargetService targetService = prepareForReplication(primaryShard, replica);
final SegmentReplicationTarget target = targetService.startReplication(
ReplicationCheckpoint.empty(replica.shardId, replica.getDefaultCodecName()),
ReplicationCheckpoint.empty(replica.shardId, replica.getEngineCodec()),
replica,
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
Expand Down

0 comments on commit a577c91

Please sign in to comment.