Skip to content

Commit

Permalink
[Segment Replication] Added source-side classes for orchestrating rep…
Browse files Browse the repository at this point in the history
…lication events (#3470)

This change expands on the existing SegmentReplicationSource interface and its corresponding Factory class by introducing an implementation where the replication source is a primary shard (PrimaryShardReplicationSource). These code paths execute on the target. The primary shard implementation creates the requests to be send to the source/primary shard.

Correspondingly, this change also defines two request classes for the GET_CHECKPOINT_INFO and GET_SEGMENT_FILES requests as well as an abstract superclass.

A CopyState class has been introduced that captures point-in-time, file-level details from an IndexShard. This implementation mirrors Lucene's NRT CopyState implementation.

Finally, a service class has been introduce for segment replication that runs on the source side (SegmentReplicationSourceService) which handles these two types of incoming requests. This includes private handler classes that house the logic to respond to these requests, with some functionality stubbed for now. The service class also uses a simple map to cache CopyState objects that would be needed by replication targets.

Unit tests have been added/updated for all new functionality.

Signed-off-by: Kartik Ganesh <gkart@amazon.com>
(cherry picked from commit b902add)
  • Loading branch information
kartg authored and github-actions[bot] committed Aug 2, 2022
1 parent 343045f commit 7e7cb66
Show file tree
Hide file tree
Showing 22 changed files with 1,204 additions and 51 deletions.
21 changes: 21 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,21 @@ protected SegmentInfos getLatestSegmentInfos() {
return null;
};

/**
* In contrast to {@link #getLatestSegmentInfos()}, which returns a {@link SegmentInfos}
* object directly, this method returns a {@link GatedCloseable} reference to the same object.
* This allows the engine to include a clean-up {@link org.opensearch.common.CheckedRunnable}
* which is run when the reference is closed. The default implementation of the clean-up
* procedure is a no-op.
*
* @return {@link GatedCloseable} - A wrapper around a {@link SegmentInfos} instance that
* must be closed for segment files to be deleted.
*/
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
// default implementation
return new GatedCloseable<>(getLatestSegmentInfos(), () -> {});
}

public MergeStats getMergeStats() {
return new MergeStats();
}
Expand Down Expand Up @@ -865,6 +880,12 @@ public final CommitStats commitStats() {
*/
public abstract long getPersistedLocalCheckpoint();

/**
* @return the latest checkpoint that has been processed but not necessarily persisted.
* Also see {@link #getPersistedLocalCheckpoint()}
*/
public abstract long getProcessedLocalCheckpoint();

/**
* @return a {@link SeqNoStats} object, using local state and the supplied global checkpoint
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2333,6 +2333,22 @@ public SegmentInfos getLatestSegmentInfos() {
}
}

/**
* Fetch the latest {@link SegmentInfos} object via {@link #getLatestSegmentInfos()}
* but also increment the ref-count to ensure that these segment files are retained
* until the reference is closed. On close, the ref-count is decremented.
*/
@Override
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
final SegmentInfos segmentInfos = getLatestSegmentInfos();
try {
indexWriter.incRefDeleter(segmentInfos);
} catch (IOException e) {
throw new EngineException(shardId, e.getMessage(), e);
}
return new GatedCloseable<>(segmentInfos, () -> indexWriter.decRefDeleter(segmentInfos));
}

@Override
protected final void writerSegmentStats(SegmentsStats stats) {
stats.addVersionMapMemoryInBytes(versionMap.ramBytesUsed());
Expand Down Expand Up @@ -2742,6 +2758,7 @@ public long getLastSyncedGlobalCheckpoint() {
return translogManager.getTranslog().getLastSyncedGlobalCheckpoint();
}

@Override
public long getProcessedLocalCheckpoint() {
return localCheckpointTracker.getProcessedCheckpoint();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ public long getPersistedLocalCheckpoint() {
return localCheckpointTracker.getPersistedCheckpoint();
}

@Override
public long getProcessedLocalCheckpoint() {
return localCheckpointTracker.getProcessedCheckpoint();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,13 @@ public long getProcessedLocalCheckpoint() {
return getPersistedLocalCheckpoint();
}

@Override
public long getProcessedLocalCheckpoint() {
// the read-only engine does not process checkpoints, so its
// processed checkpoint is identical to its persisted one.
return getPersistedLocalCheckpoint();
}

@Override
public SeqNoStats getSeqNoStats(long globalCheckpoint) {
return new SeqNoStats(seqNoStats.getMaxSeqNo(), seqNoStats.getLocalCheckpoint(), globalCheckpoint);
Expand Down
18 changes: 18 additions & 0 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2650,6 +2650,14 @@ public long getLocalCheckpoint() {
return getEngine().getPersistedLocalCheckpoint();
}

/**
* Fetch the latest checkpoint that has been processed but not necessarily persisted.
* Also see {@link #getLocalCheckpoint()}.
*/
public long getProcessedLocalCheckpoint() {
return getEngine().getProcessedLocalCheckpoint();
}

/**
* Returns the global checkpoint for the shard.
*
Expand Down Expand Up @@ -4017,4 +4025,14 @@ public void verifyShardBeforeIndexClosing() throws IllegalStateException {
RetentionLeaseSyncer getRetentionLeaseSyncer() {
return retentionLeaseSyncer;
}

/**
* Fetch the latest SegmentInfos held by the shard's underlying Engine, wrapped
* by a a {@link GatedCloseable} to ensure files are not deleted/merged away.
*
* @throws EngineException - When segment infos cannot be safely retrieved
*/
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
return getEngine().getSegmentInfosSnapshot();
}
}
97 changes: 62 additions & 35 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,13 @@ public MetadataSnapshot getMetadata(IndexCommit commit) throws IOException {
return getMetadata(commit, false);
}

/**
* Convenience wrapper around the {@link #getMetadata(IndexCommit)} method for null input.
*/
public MetadataSnapshot getMetadata() throws IOException {
return getMetadata(null, false);
}

/**
* Returns a new MetadataSnapshot for the given commit. If the given commit is <code>null</code>
* the latest commit point is used.
Expand Down Expand Up @@ -315,6 +322,16 @@ public MetadataSnapshot getMetadata(IndexCommit commit, boolean lockDirectory) t
}
}

/**
* Returns a new {@link MetadataSnapshot} for the given {@link SegmentInfos} object.
* In contrast to {@link #getMetadata(IndexCommit)}, this method is useful for scenarios
* where we need to construct a MetadataSnapshot from an in-memory SegmentInfos object that
* may not have a IndexCommit associated with it, such as with segment replication.
*/
public MetadataSnapshot getMetadata(SegmentInfos segmentInfos) throws IOException {
return new MetadataSnapshot(segmentInfos, directory, logger);
}

/**
* Renames all the given files from the key of the map to the
* value of the map. All successfully renamed files are removed from the map in-place.
Expand Down Expand Up @@ -477,7 +494,7 @@ public static MetadataSnapshot readMetadataSnapshot(
Directory dir = new NIOFSDirectory(indexLocation)
) {
failIfCorrupted(dir);
return new MetadataSnapshot(null, dir, logger);
return new MetadataSnapshot((IndexCommit) null, dir, logger);
} catch (IndexNotFoundException ex) {
// that's fine - happens all the time no need to log
} catch (FileNotFoundException | NoSuchFileException ex) {
Expand Down Expand Up @@ -682,7 +699,7 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr
}
}
directory.syncMetaData();
final Store.MetadataSnapshot metadataOrEmpty = getMetadata(null);
final Store.MetadataSnapshot metadataOrEmpty = getMetadata();
verifyAfterCleanup(sourceMetadata, metadataOrEmpty);
} finally {
metadataLock.writeLock().unlock();
Expand Down Expand Up @@ -822,7 +839,14 @@ public MetadataSnapshot(Map<String, StoreFileMetadata> metadata, Map<String, Str
}

MetadataSnapshot(IndexCommit commit, Directory directory, Logger logger) throws IOException {
LoadedMetadata loadedMetadata = loadMetadata(commit, directory, logger);
this(loadMetadata(commit, directory, logger));
}

MetadataSnapshot(SegmentInfos segmentInfos, Directory directory, Logger logger) throws IOException {
this(loadMetadata(segmentInfos, directory, logger));
}

private MetadataSnapshot(LoadedMetadata loadedMetadata) {
metadata = loadedMetadata.fileMetadata;
commitUserData = loadedMetadata.userData;
numDocs = loadedMetadata.numDocs;
Expand Down Expand Up @@ -890,40 +914,9 @@ static class LoadedMetadata {
}

static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logger logger) throws IOException {
long numDocs;
Map<String, StoreFileMetadata> builder = new HashMap<>();
Map<String, String> commitUserDataBuilder = new HashMap<>();
try {
final SegmentInfos segmentCommitInfos = Store.readSegmentsInfo(commit, directory);
numDocs = Lucene.getNumDocs(segmentCommitInfos);
commitUserDataBuilder.putAll(segmentCommitInfos.getUserData());
// we don't know which version was used to write so we take the max version.
Version maxVersion = segmentCommitInfos.getMinSegmentLuceneVersion();
for (SegmentCommitInfo info : segmentCommitInfos) {
final Version version = info.info.getVersion();
if (version == null) {
// version is written since 3.1+: we should have already hit IndexFormatTooOld.
throw new IllegalArgumentException("expected valid version value: " + info.info.toString());
}
if (version.onOrAfter(maxVersion)) {
maxVersion = version;
}
for (String file : info.files()) {
checksumFromLuceneFile(
directory,
file,
builder,
logger,
version,
SEGMENT_INFO_EXTENSION.equals(IndexFileNames.getExtension(file))
);
}
}
if (maxVersion == null) {
maxVersion = org.opensearch.Version.CURRENT.minimumIndexCompatibilityVersion().luceneVersion;
}
final String segmentsFile = segmentCommitInfos.getSegmentsFileName();
checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true);
return loadMetadata(segmentCommitInfos, directory, logger);
} catch (CorruptIndexException | IndexNotFoundException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
// we either know the index is corrupted or it's just not there
throw ex;
Expand All @@ -949,6 +942,40 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg
}
throw ex;
}
}

static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory directory, Logger logger) throws IOException {
long numDocs = Lucene.getNumDocs(segmentInfos);
Map<String, String> commitUserDataBuilder = new HashMap<>();
commitUserDataBuilder.putAll(segmentInfos.getUserData());
Map<String, StoreFileMetadata> builder = new HashMap<>();
// we don't know which version was used to write so we take the max version.
Version maxVersion = segmentInfos.getMinSegmentLuceneVersion();
for (SegmentCommitInfo info : segmentInfos) {
final Version version = info.info.getVersion();
if (version == null) {
// version is written since 3.1+: we should have already hit IndexFormatTooOld.
throw new IllegalArgumentException("expected valid version value: " + info.info.toString());
}
if (version.onOrAfter(maxVersion)) {
maxVersion = version;
}
for (String file : info.files()) {
checksumFromLuceneFile(
directory,
file,
builder,
logger,
version,
SEGMENT_INFO_EXTENSION.equals(IndexFileNames.getExtension(file))
);
}
}
if (maxVersion == null) {
maxVersion = org.opensearch.Version.CURRENT.minimumIndexCompatibilityVersion().luceneVersion;
}
final String segmentsFile = segmentInfos.getSegmentsFileName();
checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true);
return new LoadedMetadata(unmodifiableMap(builder), unmodifiableMap(commitUserDataBuilder), numDocs);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.SegmentReplicationTransportRequest;

import java.io.IOException;

/**
* Request object for fetching segment metadata for a {@link ReplicationCheckpoint} from
* a {@link SegmentReplicationSource}. This object is created by the target node and sent
* to the source node.
*
* @opensearch.internal
*/
public class CheckpointInfoRequest extends SegmentReplicationTransportRequest {

private final ReplicationCheckpoint checkpoint;

public CheckpointInfoRequest(StreamInput in) throws IOException {
super(in);
checkpoint = new ReplicationCheckpoint(in);
}

public CheckpointInfoRequest(
long replicationId,
String targetAllocationId,
DiscoveryNode targetNode,
ReplicationCheckpoint checkpoint
) {
super(replicationId, targetAllocationId, targetNode);
this.checkpoint = checkpoint;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
checkpoint.writeTo(out);
}

public ReplicationCheckpoint getCheckpoint() {
return checkpoint;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.SegmentReplicationTransportRequest;

import java.io.IOException;
import java.util.List;

/**
* Request object for fetching a list of segment files metadata from a {@link SegmentReplicationSource}.
* This object is created by the target node and sent to the source node.
*
* @opensearch.internal
*/
public class GetSegmentFilesRequest extends SegmentReplicationTransportRequest {

private final List<StoreFileMetadata> filesToFetch;
private final ReplicationCheckpoint checkpoint;

public GetSegmentFilesRequest(StreamInput in) throws IOException {
super(in);
this.filesToFetch = in.readList(StoreFileMetadata::new);
this.checkpoint = new ReplicationCheckpoint(in);
}

public GetSegmentFilesRequest(
long replicationId,
String targetAllocationId,
DiscoveryNode targetNode,
List<StoreFileMetadata> filesToFetch,
ReplicationCheckpoint checkpoint
) {
super(replicationId, targetAllocationId, targetNode);
this.filesToFetch = filesToFetch;
this.checkpoint = checkpoint;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(filesToFetch);
checkpoint.writeTo(out);
}

public ReplicationCheckpoint getCheckpoint() {
return checkpoint;
}
}
Loading

0 comments on commit 7e7cb66

Please sign in to comment.