Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Use engine codec and replica shard ReplicationCheckpoint for replication events #7732

Merged
merged 8 commits into from
May 26, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,9 @@ public ReplicationCheckpoint getCheckpoint() {
return this.checkpoint;
}

public SegmentReplicationTarget(
ReplicationCheckpoint checkpoint,
IndexShard indexShard,
SegmentReplicationSource source,
ReplicationListener listener
) {
public SegmentReplicationTarget(IndexShard indexShard, SegmentReplicationSource source, ReplicationListener listener) {
super("replication_target", indexShard, new ReplicationLuceneIndex(), listener);
this.checkpoint = checkpoint;
this.checkpoint = indexShard.getLatestReplicationCheckpoint();
this.source = source;
this.state = new SegmentReplicationState(
indexShard.routingEntry(),
Expand Down Expand Up @@ -101,7 +96,7 @@ public SegmentReplicationState state() {
}

public SegmentReplicationTarget retryCopy() {
return new SegmentReplicationTarget(checkpoint, indexShard, source, listener);
return new SegmentReplicationTarget(indexShard, source, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe
}
final Thread thread = Thread.currentThread();
if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) {
startReplication(receivedCheckpoint, replicaShard, new SegmentReplicationListener() {
startReplication(replicaShard, new SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
logger.trace(
Expand Down Expand Up @@ -301,17 +301,8 @@ protected void updateLatestReceivedCheckpoint(ReplicationCheckpoint receivedChec
}
}

public SegmentReplicationTarget startReplication(
final ReplicationCheckpoint checkpoint,
final IndexShard indexShard,
final SegmentReplicationListener listener
) {
final SegmentReplicationTarget target = new SegmentReplicationTarget(
checkpoint,
indexShard,
sourceFactory.get(indexShard),
listener
);
public SegmentReplicationTarget startReplication(final IndexShard indexShard, final SegmentReplicationListener listener) {
final SegmentReplicationTarget target = new SegmentReplicationTarget(indexShard, sourceFactory.get(indexShard), listener);
startReplication(target);
return target;
}
Expand Down Expand Up @@ -429,57 +420,49 @@ public void messageReceived(final ForceSyncRequest request, TransportChannel cha
channel.sendResponse(TransportResponse.Empty.INSTANCE);
return;
}
startReplication(
ReplicationCheckpoint.empty(request.getShardId(), indexShard.getDefaultCodecName()),
indexShard,
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication complete to {}, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
indexShard.getLatestReplicationCheckpoint(),
state.getTimingData()
)
);
try {
// Promote engine type for primary target
if (indexShard.recoveryState().getPrimary() == true) {
indexShard.resetToWriteableEngine();
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (InterruptedException | TimeoutException | IOException e) {
throw new RuntimeException(e);
startReplication(indexShard, new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication complete to {}, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
indexShard.getLatestReplicationCheckpoint(),
state.getTimingData()
)
);
try {
// Promote engine type for primary target
if (indexShard.recoveryState().getPrimary() == true) {
indexShard.resetToWriteableEngine();
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (InterruptedException | TimeoutException | IOException e) {
throw new RuntimeException(e);
}
}

@Override
public void onReplicationFailure(
SegmentReplicationState state,
ReplicationFailedException e,
boolean sendShardFailure
) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
state.getTimingData()
)
);
if (sendShardFailure == true) {
indexShard.failShard("replication failure", e);
}
try {
channel.sendResponse(e);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
@Override
public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
state.getTimingData()
)
);
if (sendShardFailure == true) {
indexShard.failShard("replication failure", e);
}
try {
channel.sendResponse(e);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
);
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.translog.SnapshotMatchers;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.replication.CheckpointInfoResponse;
Expand Down Expand Up @@ -294,7 +295,7 @@ public void testPublishCheckpointAfterRelocationHandOff() throws IOException {
public void testRejectCheckpointOnShardRoutingPrimary() throws IOException {
IndexShard primaryShard = newStartedShard(true);
SegmentReplicationTargetService sut;
sut = prepareForReplication(primaryShard, null);
sut = prepareForReplication(primaryShard, null, mock(TransportService.class), mock(IndicesService.class));
SegmentReplicationTargetService spy = spy(sut);

// Starting a new shard in PrimaryMode and shard routing primary.
Expand All @@ -314,7 +315,7 @@ public void testRejectCheckpointOnShardRoutingPrimary() throws IOException {
spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L, Codec.getDefault().getName()), spyShard);

// Verify that checkpoint is not processed as shard routing is primary.
verify(spy, times(0)).startReplication(any(), any(), any());
verify(spy, times(0)).startReplication(any(), any());
closeShards(primaryShard);
}

Expand Down Expand Up @@ -1027,7 +1028,10 @@ private void assertDocCounts(IndexShard indexShard, int expectedPersistedDocCoun

private void resolveCheckpointInfoResponseListener(ActionListener<CheckpointInfoResponse> listener, IndexShard primary) {
try {
final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primary.shardId, primary.getDefaultCodecName()), primary);
final CopyState copyState = new CopyState(
ReplicationCheckpoint.empty(primary.shardId, primary.getLatestReplicationCheckpoint().getCodec()),
primary
);
listener.onResponse(
new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes())
);
Expand All @@ -1041,7 +1045,6 @@ private void startReplicationAndAssertCancellation(IndexShard replica, SegmentRe
throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
final SegmentReplicationTarget target = targetService.startReplication(
ReplicationCheckpoint.empty(replica.shardId, replica.getDefaultCodecName()),
replica,
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
Expand Down
Loading