Skip to content

Commit

Permalink
[Segment Replication] Use engine codec and replica shard ReplicationC…
Browse files Browse the repository at this point in the history
…heckpoint for replication events (opensearch-project#7732)

* [Segment Replication] Use engine codec and replica shard ReplicationCheckpoint for replication events

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Spotless fix

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Fix failing unit test

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Address review comments

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Address review comments rename

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Address review comments

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Add unit test for ForceSegmentSync transport handler

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Add unit test for ForceSegmentSync transport failure

Signed-off-by: Suraj Singh <surajrider@gmail.com>

---------

Signed-off-by: Suraj Singh <surajrider@gmail.com>
  • Loading branch information
dreamer-89 authored and austintlee committed Jun 2, 2023
1 parent 4470aa5 commit 73c0705
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,9 @@ public ReplicationCheckpoint getCheckpoint() {
return this.checkpoint;
}

public SegmentReplicationTarget(
ReplicationCheckpoint checkpoint,
IndexShard indexShard,
SegmentReplicationSource source,
ReplicationListener listener
) {
public SegmentReplicationTarget(IndexShard indexShard, SegmentReplicationSource source, ReplicationListener listener) {
super("replication_target", indexShard, new ReplicationLuceneIndex(), listener);
this.checkpoint = checkpoint;
this.checkpoint = indexShard.getLatestReplicationCheckpoint();
this.source = source;
this.state = new SegmentReplicationState(
indexShard.routingEntry(),
Expand Down Expand Up @@ -101,7 +96,7 @@ public SegmentReplicationState state() {
}

public SegmentReplicationTarget retryCopy() {
return new SegmentReplicationTarget(checkpoint, indexShard, source, listener);
return new SegmentReplicationTarget(indexShard, source, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe
}
final Thread thread = Thread.currentThread();
if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) {
startReplication(receivedCheckpoint, replicaShard, new SegmentReplicationListener() {
startReplication(replicaShard, new SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
logger.trace(
Expand Down Expand Up @@ -301,17 +301,8 @@ protected void updateLatestReceivedCheckpoint(ReplicationCheckpoint receivedChec
}
}

public SegmentReplicationTarget startReplication(
final ReplicationCheckpoint checkpoint,
final IndexShard indexShard,
final SegmentReplicationListener listener
) {
final SegmentReplicationTarget target = new SegmentReplicationTarget(
checkpoint,
indexShard,
sourceFactory.get(indexShard),
listener
);
public SegmentReplicationTarget startReplication(final IndexShard indexShard, final SegmentReplicationListener listener) {
final SegmentReplicationTarget target = new SegmentReplicationTarget(indexShard, sourceFactory.get(indexShard), listener);
startReplication(target);
return target;
}
Expand Down Expand Up @@ -429,57 +420,49 @@ public void messageReceived(final ForceSyncRequest request, TransportChannel cha
channel.sendResponse(TransportResponse.Empty.INSTANCE);
return;
}
startReplication(
ReplicationCheckpoint.empty(request.getShardId(), indexShard.getDefaultCodecName()),
indexShard,
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication complete to {}, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
indexShard.getLatestReplicationCheckpoint(),
state.getTimingData()
)
);
try {
// Promote engine type for primary target
if (indexShard.recoveryState().getPrimary() == true) {
indexShard.resetToWriteableEngine();
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (InterruptedException | TimeoutException | IOException e) {
throw new RuntimeException(e);
startReplication(indexShard, new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication complete to {}, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
indexShard.getLatestReplicationCheckpoint(),
state.getTimingData()
)
);
try {
// Promote engine type for primary target
if (indexShard.recoveryState().getPrimary() == true) {
indexShard.resetToWriteableEngine();
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (InterruptedException | TimeoutException | IOException e) {
throw new RuntimeException(e);
}
}

@Override
public void onReplicationFailure(
SegmentReplicationState state,
ReplicationFailedException e,
boolean sendShardFailure
) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
state.getTimingData()
)
);
if (sendShardFailure == true) {
indexShard.failShard("replication failure", e);
}
try {
channel.sendResponse(e);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
@Override
public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
state.getTimingData()
)
);
if (sendShardFailure == true) {
indexShard.failShard("replication failure", e);
}
try {
channel.sendResponse(e);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
);
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.translog.SnapshotMatchers;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.replication.CheckpointInfoResponse;
Expand Down Expand Up @@ -294,7 +295,7 @@ public void testPublishCheckpointAfterRelocationHandOff() throws IOException {
public void testRejectCheckpointOnShardRoutingPrimary() throws IOException {
IndexShard primaryShard = newStartedShard(true);
SegmentReplicationTargetService sut;
sut = prepareForReplication(primaryShard, null);
sut = prepareForReplication(primaryShard, null, mock(TransportService.class), mock(IndicesService.class));
SegmentReplicationTargetService spy = spy(sut);

// Starting a new shard in PrimaryMode and shard routing primary.
Expand All @@ -314,7 +315,7 @@ public void testRejectCheckpointOnShardRoutingPrimary() throws IOException {
spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L, Codec.getDefault().getName()), spyShard);

// Verify that checkpoint is not processed as shard routing is primary.
verify(spy, times(0)).startReplication(any(), any(), any());
verify(spy, times(0)).startReplication(any(), any());
closeShards(primaryShard);
}

Expand Down Expand Up @@ -1027,7 +1028,10 @@ private void assertDocCounts(IndexShard indexShard, int expectedPersistedDocCoun

private void resolveCheckpointInfoResponseListener(ActionListener<CheckpointInfoResponse> listener, IndexShard primary) {
try {
final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primary.shardId, primary.getDefaultCodecName()), primary);
final CopyState copyState = new CopyState(
ReplicationCheckpoint.empty(primary.shardId, primary.getLatestReplicationCheckpoint().getCodec()),
primary
);
listener.onResponse(
new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes())
);
Expand All @@ -1041,7 +1045,6 @@ private void startReplicationAndAssertCancellation(IndexShard replica, SegmentRe
throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
final SegmentReplicationTarget target = targetService.startReplication(
ReplicationCheckpoint.empty(replica.shardId, replica.getDefaultCodecName()),
replica,
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
Expand Down
Loading

0 comments on commit 73c0705

Please sign in to comment.