Skip to content

Commit

Permalink
[Backport 2.x] [Segment Replication] Added source-side classes for or…
Browse files Browse the repository at this point in the history
…chestrating replication events. (#4128)

* [Segment Replication] Added source-side classes for orchestrating replication events (#3470)

This change expands on the existing SegmentReplicationSource interface and its corresponding Factory class by introducing an implementation where the replication source is a primary shard (PrimaryShardReplicationSource). These code paths execute on the target. The primary shard implementation creates the requests to be send to the source/primary shard.

Correspondingly, this change also defines two request classes for the GET_CHECKPOINT_INFO and GET_SEGMENT_FILES requests as well as an abstract superclass.

A CopyState class has been introduced that captures point-in-time, file-level details from an IndexShard. This implementation mirrors Lucene's NRT CopyState implementation.

Finally, a service class has been introduce for segment replication that runs on the source side (SegmentReplicationSourceService) which handles these two types of incoming requests. This includes private handler classes that house the logic to respond to these requests, with some functionality stubbed for now. The service class also uses a simple map to cache CopyState objects that would be needed by replication targets.

Unit tests have been added/updated for all new functionality.

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Remove abstract getProcessedLocalCheckpoint() from Engine.java as it is a breaking change and fix other uses of getProcessedLocalCheckpoint()

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Apply spotlessCheck

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Fixing error in ReadOnlyEngineTests by casting.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Addressing comments on PR.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

Co-authored-by: Kartik Ganesh <gkart@amazon.com>
  • Loading branch information
Rishikesh1159 and kartg committed Aug 9, 2022
1 parent b202245 commit 313b919
Show file tree
Hide file tree
Showing 20 changed files with 1,206 additions and 52 deletions.
15 changes: 15 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,21 @@ protected SegmentInfos getLatestSegmentInfos() {
return null;
};

/**
* In contrast to {@link #getLatestSegmentInfos()}, which returns a {@link SegmentInfos}
* object directly, this method returns a {@link GatedCloseable} reference to the same object.
* This allows the engine to include a clean-up {@link org.opensearch.common.CheckedRunnable}
* which is run when the reference is closed. The default implementation of the clean-up
* procedure is a no-op.
*
* @return {@link GatedCloseable} - A wrapper around a {@link SegmentInfos} instance that
* must be closed for segment files to be deleted.
*/
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
// default implementation
return new GatedCloseable<>(getLatestSegmentInfos(), () -> {});
}

public MergeStats getMergeStats() {
return new MergeStats();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2301,6 +2301,22 @@ public SegmentInfos getLatestSegmentInfos() {
}
}

/**
* Fetch the latest {@link SegmentInfos} object via {@link #getLatestSegmentInfos()}
* but also increment the ref-count to ensure that these segment files are retained
* until the reference is closed. On close, the ref-count is decremented.
*/
@Override
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
final SegmentInfos segmentInfos = getLatestSegmentInfos();
try {
indexWriter.incRefDeleter(segmentInfos);
} catch (IOException e) {
throw new EngineException(shardId, e.getMessage(), e);
}
return new GatedCloseable<>(segmentInfos, () -> indexWriter.decRefDeleter(segmentInfos));
}

@Override
protected final void writerSegmentStats(SegmentsStats stats) {
stats.addVersionMapMemoryInBytes(versionMap.ramBytesUsed());
Expand Down
34 changes: 34 additions & 0 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@
import org.opensearch.index.engine.EngineConfigFactory;
import org.opensearch.index.engine.EngineException;
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.index.engine.NRTReplicationEngine;
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.engine.ReadOnlyEngine;
import org.opensearch.index.engine.RefreshFailedEngineException;
import org.opensearch.index.engine.SafeCommitInfo;
Expand Down Expand Up @@ -2650,6 +2652,28 @@ public long getLocalCheckpoint() {
return getEngine().getPersistedLocalCheckpoint();
}

/**
* Fetch the latest checkpoint that has been processed but not necessarily persisted. This should be used only when Segment Replication is enabled.
* Also see {@link #getLocalCheckpoint()}.
*/
public long getProcessedLocalCheckpoint() {
assert indexSettings.isSegRepEnabled();
// Returns checkpoint only if the current engine is an instance of NRTReplicationEngine or InternalEngine
return getReplicationEngine().map(NRTReplicationEngine::getProcessedLocalCheckpoint).orElseGet(() -> {
final Engine engine = getEngine();
assert engine instanceof InternalEngine;
return ((InternalEngine) engine).getProcessedLocalCheckpoint();
});
}

private Optional<NRTReplicationEngine> getReplicationEngine() {
if (getEngine() instanceof NRTReplicationEngine) {
return Optional.of((NRTReplicationEngine) getEngine());
} else {
return Optional.empty();
}
}

/**
* Returns the global checkpoint for the shard.
*
Expand Down Expand Up @@ -4017,4 +4041,14 @@ public void verifyShardBeforeIndexClosing() throws IllegalStateException {
RetentionLeaseSyncer getRetentionLeaseSyncer() {
return retentionLeaseSyncer;
}

/**
* Fetch the latest SegmentInfos held by the shard's underlying Engine, wrapped
* by a a {@link GatedCloseable} to ensure files are not deleted/merged away.
*
* @throws EngineException - When segment infos cannot be safely retrieved
*/
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
return getEngine().getSegmentInfosSnapshot();
}
}
97 changes: 62 additions & 35 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,13 @@ public MetadataSnapshot getMetadata(IndexCommit commit) throws IOException {
return getMetadata(commit, false);
}

/**
* Convenience wrapper around the {@link #getMetadata(IndexCommit)} method for null input.
*/
public MetadataSnapshot getMetadata() throws IOException {
return getMetadata(null, false);
}

/**
* Returns a new MetadataSnapshot for the given commit. If the given commit is <code>null</code>
* the latest commit point is used.
Expand Down Expand Up @@ -315,6 +322,16 @@ public MetadataSnapshot getMetadata(IndexCommit commit, boolean lockDirectory) t
}
}

/**
* Returns a new {@link MetadataSnapshot} for the given {@link SegmentInfos} object.
* In contrast to {@link #getMetadata(IndexCommit)}, this method is useful for scenarios
* where we need to construct a MetadataSnapshot from an in-memory SegmentInfos object that
* may not have a IndexCommit associated with it, such as with segment replication.
*/
public MetadataSnapshot getMetadata(SegmentInfos segmentInfos) throws IOException {
return new MetadataSnapshot(segmentInfos, directory, logger);
}

/**
* Renames all the given files from the key of the map to the
* value of the map. All successfully renamed files are removed from the map in-place.
Expand Down Expand Up @@ -477,7 +494,7 @@ public static MetadataSnapshot readMetadataSnapshot(
Directory dir = new NIOFSDirectory(indexLocation)
) {
failIfCorrupted(dir);
return new MetadataSnapshot(null, dir, logger);
return new MetadataSnapshot((IndexCommit) null, dir, logger);
} catch (IndexNotFoundException ex) {
// that's fine - happens all the time no need to log
} catch (FileNotFoundException | NoSuchFileException ex) {
Expand Down Expand Up @@ -682,7 +699,7 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr
}
}
directory.syncMetaData();
final Store.MetadataSnapshot metadataOrEmpty = getMetadata(null);
final Store.MetadataSnapshot metadataOrEmpty = getMetadata();
verifyAfterCleanup(sourceMetadata, metadataOrEmpty);
} finally {
metadataLock.writeLock().unlock();
Expand Down Expand Up @@ -822,7 +839,14 @@ public MetadataSnapshot(Map<String, StoreFileMetadata> metadata, Map<String, Str
}

MetadataSnapshot(IndexCommit commit, Directory directory, Logger logger) throws IOException {
LoadedMetadata loadedMetadata = loadMetadata(commit, directory, logger);
this(loadMetadata(commit, directory, logger));
}

MetadataSnapshot(SegmentInfos segmentInfos, Directory directory, Logger logger) throws IOException {
this(loadMetadata(segmentInfos, directory, logger));
}

private MetadataSnapshot(LoadedMetadata loadedMetadata) {
metadata = loadedMetadata.fileMetadata;
commitUserData = loadedMetadata.userData;
numDocs = loadedMetadata.numDocs;
Expand Down Expand Up @@ -890,40 +914,9 @@ static class LoadedMetadata {
}

static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logger logger) throws IOException {
long numDocs;
Map<String, StoreFileMetadata> builder = new HashMap<>();
Map<String, String> commitUserDataBuilder = new HashMap<>();
try {
final SegmentInfos segmentCommitInfos = Store.readSegmentsInfo(commit, directory);
numDocs = Lucene.getNumDocs(segmentCommitInfos);
commitUserDataBuilder.putAll(segmentCommitInfos.getUserData());
// we don't know which version was used to write so we take the max version.
Version maxVersion = segmentCommitInfos.getMinSegmentLuceneVersion();
for (SegmentCommitInfo info : segmentCommitInfos) {
final Version version = info.info.getVersion();
if (version == null) {
// version is written since 3.1+: we should have already hit IndexFormatTooOld.
throw new IllegalArgumentException("expected valid version value: " + info.info.toString());
}
if (version.onOrAfter(maxVersion)) {
maxVersion = version;
}
for (String file : info.files()) {
checksumFromLuceneFile(
directory,
file,
builder,
logger,
version,
SEGMENT_INFO_EXTENSION.equals(IndexFileNames.getExtension(file))
);
}
}
if (maxVersion == null) {
maxVersion = org.opensearch.Version.CURRENT.minimumIndexCompatibilityVersion().luceneVersion;
}
final String segmentsFile = segmentCommitInfos.getSegmentsFileName();
checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true);
return loadMetadata(segmentCommitInfos, directory, logger);
} catch (CorruptIndexException | IndexNotFoundException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
// we either know the index is corrupted or it's just not there
throw ex;
Expand All @@ -949,6 +942,40 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg
}
throw ex;
}
}

static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory directory, Logger logger) throws IOException {
long numDocs = Lucene.getNumDocs(segmentInfos);
Map<String, String> commitUserDataBuilder = new HashMap<>();
commitUserDataBuilder.putAll(segmentInfos.getUserData());
Map<String, StoreFileMetadata> builder = new HashMap<>();
// we don't know which version was used to write so we take the max version.
Version maxVersion = segmentInfos.getMinSegmentLuceneVersion();
for (SegmentCommitInfo info : segmentInfos) {
final Version version = info.info.getVersion();
if (version == null) {
// version is written since 3.1+: we should have already hit IndexFormatTooOld.
throw new IllegalArgumentException("expected valid version value: " + info.info.toString());
}
if (version.onOrAfter(maxVersion)) {
maxVersion = version;
}
for (String file : info.files()) {
checksumFromLuceneFile(
directory,
file,
builder,
logger,
version,
SEGMENT_INFO_EXTENSION.equals(IndexFileNames.getExtension(file))
);
}
}
if (maxVersion == null) {
maxVersion = org.opensearch.Version.CURRENT.minimumIndexCompatibilityVersion().luceneVersion;
}
final String segmentsFile = segmentInfos.getSegmentsFileName();
checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true);
return new LoadedMetadata(unmodifiableMap(builder), unmodifiableMap(commitUserDataBuilder), numDocs);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.SegmentReplicationTransportRequest;

import java.io.IOException;

/**
* Request object for fetching segment metadata for a {@link ReplicationCheckpoint} from
* a {@link SegmentReplicationSource}. This object is created by the target node and sent
* to the source node.
*
* @opensearch.internal
*/
public class CheckpointInfoRequest extends SegmentReplicationTransportRequest {

private final ReplicationCheckpoint checkpoint;

public CheckpointInfoRequest(StreamInput in) throws IOException {
super(in);
checkpoint = new ReplicationCheckpoint(in);
}

public CheckpointInfoRequest(
long replicationId,
String targetAllocationId,
DiscoveryNode targetNode,
ReplicationCheckpoint checkpoint
) {
super(replicationId, targetAllocationId, targetNode);
this.checkpoint = checkpoint;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
checkpoint.writeTo(out);
}

public ReplicationCheckpoint getCheckpoint() {
return checkpoint;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.SegmentReplicationTransportRequest;

import java.io.IOException;
import java.util.List;

/**
* Request object for fetching a list of segment files metadata from a {@link SegmentReplicationSource}.
* This object is created by the target node and sent to the source node.
*
* @opensearch.internal
*/
public class GetSegmentFilesRequest extends SegmentReplicationTransportRequest {

private final List<StoreFileMetadata> filesToFetch;
private final ReplicationCheckpoint checkpoint;

public GetSegmentFilesRequest(StreamInput in) throws IOException {
super(in);
this.filesToFetch = in.readList(StoreFileMetadata::new);
this.checkpoint = new ReplicationCheckpoint(in);
}

public GetSegmentFilesRequest(
long replicationId,
String targetAllocationId,
DiscoveryNode targetNode,
List<StoreFileMetadata> filesToFetch,
ReplicationCheckpoint checkpoint
) {
super(replicationId, targetAllocationId, targetNode);
this.filesToFetch = filesToFetch;
this.checkpoint = checkpoint;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(filesToFetch);
checkpoint.writeTo(out);
}

public ReplicationCheckpoint getCheckpoint() {
return checkpoint;
}
}
Loading

0 comments on commit 313b919

Please sign in to comment.