diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index b417133e4a89d..44add3fac4271 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -512,6 +512,10 @@ public String getDefaultCodecName() { return codecService.codec(CodecService.DEFAULT_CODEC).getName(); } + public String getEngineCodec() { + return getEngine().config().getCodec().getName(); + } + /** * USE THIS METHOD WITH CARE! * Returns the primary term the index shard is supposed to be on. In case of primary promotion or when a replica learns about diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 6c1547fbee82b..f1b49753cc039 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -228,7 +228,7 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe } final Thread thread = Thread.currentThread(); if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) { - startReplication(receivedCheckpoint, replicaShard, new SegmentReplicationListener() { + startReplication(replicaShard.getLatestReplicationCheckpoint(), replicaShard, new SegmentReplicationListener() { @Override public void onReplicationDone(SegmentReplicationState state) { logger.trace( @@ -430,7 +430,7 @@ public void messageReceived(final ForceSyncRequest request, TransportChannel cha return; } startReplication( - ReplicationCheckpoint.empty(request.getShardId(), indexShard.getDefaultCodecName()), + ReplicationCheckpoint.empty(request.getShardId(), indexShard.getEngineCodec()), indexShard, new SegmentReplicationTargetService.SegmentReplicationListener() { @Override diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 0d95f40652523..591c3589aeb22 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -1027,7 +1027,7 @@ private void assertDocCounts(IndexShard indexShard, int expectedPersistedDocCoun private void resolveCheckpointInfoResponseListener(ActionListener listener, IndexShard primary) { try { - final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primary.shardId, primary.getDefaultCodecName()), primary); + final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primary.shardId, primary.getEngineCodec()), primary); listener.onResponse( new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) ); @@ -1041,7 +1041,7 @@ private void startReplicationAndAssertCancellation(IndexShard replica, SegmentRe throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); final SegmentReplicationTarget target = targetService.startReplication( - ReplicationCheckpoint.empty(replica.shardId, replica.getDefaultCodecName()), + ReplicationCheckpoint.empty(replica.shardId, replica.getEngineCodec()), replica, new SegmentReplicationTargetService.SegmentReplicationListener() { @Override diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index a3c016d5ba0df..83bd4111e0911 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -69,12 +69,11 @@ public void setUp() throws Exception { .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()) .build(); - CodecService codecService = new CodecService(null, null); - String defaultCodecName = codecService.codec(CodecService.DEFAULT_CODEC).getName(); primaryShard = newStartedShard(true, settings); + String primaryCodec = primaryShard.getEngineCodec(); replicaShard = newShard(false, settings, new NRTReplicationEngineFactory()); recoverReplica(replicaShard, primaryShard, true, getReplicationFunc(replicaShard)); - checkpoint = new ReplicationCheckpoint(replicaShard.shardId(), 0L, 0L, 0L, defaultCodecName); + checkpoint = new ReplicationCheckpoint(replicaShard.shardId(), 0L, 0L, 0L, replicaShard.getEngineCodec()); SegmentReplicationSourceFactory replicationSourceFactory = mock(SegmentReplicationSourceFactory.class); replicationSource = mock(SegmentReplicationSource.class); when(replicationSourceFactory.get(replicaShard)).thenReturn(replicationSource); @@ -86,14 +85,14 @@ public void setUp() throws Exception { initialCheckpoint.getPrimaryTerm(), initialCheckpoint.getSegmentsGen(), initialCheckpoint.getSegmentInfosVersion() + 1, - defaultCodecName + primaryCodec ); newPrimaryCheckpoint = new ReplicationCheckpoint( initialCheckpoint.getShardId(), initialCheckpoint.getPrimaryTerm() + 1, initialCheckpoint.getSegmentsGen(), initialCheckpoint.getSegmentInfosVersion() + 1, - defaultCodecName + primaryCodec ); } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index a029d87f4a575..3c481952a143d 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -108,7 +108,7 @@ public void setUp() throws Exception { spyIndexShard.getPendingPrimaryTerm(), testSegmentInfos.getGeneration(), testSegmentInfos.version, - Codec.getDefault().getName() + indexShard.getEngineCodec() ); } diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index b785574ca52b2..8134698a64285 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -1317,7 +1317,7 @@ public void getCheckpointMetadata( ) { try { final CopyState copyState = new CopyState( - ReplicationCheckpoint.empty(primaryShard.shardId, primaryShard.getDefaultCodecName()), + ReplicationCheckpoint.empty(primaryShard.shardId, primaryShard.getEngineCodec()), primaryShard ); listener.onResponse( @@ -1373,7 +1373,7 @@ public final List replicateSegments(IndexShard primary for (IndexShard replica : replicaShards) { final SegmentReplicationTargetService targetService = prepareForReplication(primaryShard, replica); final SegmentReplicationTarget target = targetService.startReplication( - ReplicationCheckpoint.empty(replica.shardId, replica.getDefaultCodecName()), + ReplicationCheckpoint.empty(replica.shardId, replica.getEngineCodec()), replica, new SegmentReplicationTargetService.SegmentReplicationListener() { @Override