Skip to content

Commit

Permalink
Moving RecoveryState.Index to a top-level class and renaming (#3075)
Browse files Browse the repository at this point in the history
* Moving RecoveryState.Index to a top-level class and renaming

This class is a building block of replication and will be re-used between peer recovery and segment replication. Thus, the inner class has been extracted to a top-level class and moved to the replication.common package. It has been renamed to ReplicationLuceneIndex to better reflect what it represents. It has two dependent inner classes from RecoveryState that have also been moved along with it - these remain inner classes since they are not currently used anywhere else. The RecoveryFilesDetails class has been renamed to FilesDetails and the FileDetail class has been renamed to FileMetadata.

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Incorporate PR comments

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Revert Project_Default.xml

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Update REST Action test to no longer mock a final class

Instead, the test now populates dummy data.

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Removing mocking of ReplicationLuceneIndex in RecoverySourceHandlerTests

The class has been marked final, so it can no longer be mocked. Instead, the test class sets up the lucene index class by adding the smae file metadata that is set up for the store.

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Configure @opensearch.internal as custom Javadoc tag

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Revert "Configure @opensearch.internal as custom Javadoc tag"

This reverts commit 2077d76.

Signed-off-by: Kartik Ganesh <gkart@amazon.com>
  • Loading branch information
kartg committed May 5, 2022
1 parent 6003921 commit 44573ba
Show file tree
Hide file tree
Showing 14 changed files with 634 additions and 559 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.opensearch.index.shard.ShardPath;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
Expand Down Expand Up @@ -547,7 +548,7 @@ public void testReuseInFileBasedPeerRecovery() throws Exception {
final Set<String> files = new HashSet<>();
for (final RecoveryState recoveryState : initialRecoveryReponse.shardRecoveryStates().get("test")) {
if (recoveryState.getTargetNode().getName().equals(replicaNode)) {
for (final RecoveryState.FileDetail file : recoveryState.getIndex().fileDetails()) {
for (final ReplicationLuceneIndex.FileMetadata file : recoveryState.getIndex().fileDetails()) {
files.add(file.name());
}
break;
Expand Down Expand Up @@ -607,7 +608,7 @@ public Settings onNodeStopped(String nodeName) throws Exception {
long reused = 0;
int filesRecovered = 0;
int filesReused = 0;
for (final RecoveryState.FileDetail file : recoveryState.getIndex().fileDetails()) {
for (final ReplicationLuceneIndex.FileMetadata file : recoveryState.getIndex().fileDetails()) {
if (files.contains(file.name()) == false) {
recovered += file.length();
filesRecovered++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
import org.opensearch.indices.NodeIndicesStats;
import org.opensearch.indices.analysis.AnalysisModule;
import org.opensearch.indices.recovery.RecoveryState.Stage;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.node.NodeClosedException;
import org.opensearch.node.RecoverySettingsChunkSizePlugin;
import org.opensearch.plugins.AnalysisPlugin;
Expand Down Expand Up @@ -836,7 +837,7 @@ private IndicesStatsResponse createAndPopulateIndex(String name, int nodeCount,
return client().admin().indices().prepareStats(name).execute().actionGet();
}

private void validateIndexRecoveryState(RecoveryState.Index indexState) {
private void validateIndexRecoveryState(ReplicationLuceneIndex indexState) {
assertThat(indexState.time(), greaterThanOrEqualTo(0L));
assertThat(indexState.recoveredFilesPercent(), greaterThanOrEqualTo(0.0f));
assertThat(indexState.recoveredFilesPercent(), lessThanOrEqualTo(100.0f));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.Repository;

Expand Down Expand Up @@ -176,7 +177,7 @@ void recoverFromLocalShards(
}

void addIndices(
final RecoveryState.Index indexRecoveryStats,
final ReplicationLuceneIndex indexRecoveryStats,
final Directory target,
final Sort indexSort,
final Directory[] sources,
Expand Down Expand Up @@ -231,9 +232,9 @@ void addIndices(
* Directory wrapper that records copy process for recovery statistics
*/
static final class StatsDirectoryWrapper extends FilterDirectory {
private final RecoveryState.Index index;
private final ReplicationLuceneIndex index;

StatsDirectoryWrapper(Directory in, RecoveryState.Index indexRecoveryStats) {
StatsDirectoryWrapper(Directory in, ReplicationLuceneIndex indexRecoveryStats) {
super(in);
this.index = indexRecoveryStats;
}
Expand Down Expand Up @@ -354,7 +355,7 @@ private ActionListener<Boolean> recoveryListener(IndexShard indexShard, ActionLi
+ "]";

if (logger.isTraceEnabled()) {
RecoveryState.Index index = recoveryState.getIndex();
ReplicationLuceneIndex index = recoveryState.getIndex();
StringBuilder sb = new StringBuilder();
sb.append(" index : files [")
.append(index.totalFileCount())
Expand Down Expand Up @@ -471,7 +472,7 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
writeEmptyRetentionLeasesFile(indexShard);
}
// since we recover from local, just fill the files and size
final RecoveryState.Index index = recoveryState.getIndex();
final ReplicationLuceneIndex index = recoveryState.getIndex();
try {
if (si != null) {
addRecoveredFileDetails(si, store, index);
Expand Down Expand Up @@ -509,7 +510,7 @@ private static void writeEmptyRetentionLeasesFile(IndexShard indexShard) throws
assert indexShard.loadRetentionLeases().leases().isEmpty();
}

private void addRecoveredFileDetails(SegmentInfos si, Store store, RecoveryState.Index index) throws IOException {
private void addRecoveredFileDetails(SegmentInfos si, Store store, ReplicationLuceneIndex index) throws IOException {
final Directory directory = store.directory();
for (String name : Lucene.files(si)) {
long length = directory.fileLength(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.transport.Transports;

import java.io.IOException;
Expand All @@ -58,7 +59,7 @@

public class MultiFileWriter extends AbstractRefCounted implements Releasable {

public MultiFileWriter(Store store, RecoveryState.Index indexState, String tempFilePrefix, Logger logger, Runnable ensureOpen) {
public MultiFileWriter(Store store, ReplicationLuceneIndex indexState, String tempFilePrefix, Logger logger, Runnable ensureOpen) {
super("multi_file_writer");
this.store = store;
this.indexState = indexState;
Expand All @@ -71,7 +72,7 @@ public MultiFileWriter(Store store, RecoveryState.Index indexState, String tempF
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Logger logger;
private final Store store;
private final RecoveryState.Index indexState;
private final ReplicationLuceneIndex indexState;
private final String tempFilePrefix;

private final ConcurrentMap<String, IndexOutput> openIndexOutputs = ConcurrentCollections.newConcurrentMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogCorruptedException;
import org.opensearch.indices.recovery.RecoveriesCollection.RecoveryRef;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.indices.replication.common.ReplicationTimer;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -521,8 +522,8 @@ public void messageReceived(final RecoveryFileChunkRequest request, TransportCha
return;
}

final RecoveryState.Index indexState = recoveryTarget.state().getIndex();
if (request.sourceThrottleTimeInNanos() != RecoveryState.Index.UNKNOWN) {
final ReplicationLuceneIndex indexState = recoveryTarget.state().getIndex();
if (request.sourceThrottleTimeInNanos() != ReplicationLuceneIndex.UNKNOWN) {
indexState.addSourceThrottling(request.sourceThrottleTimeInNanos());
}

Expand Down
Loading

0 comments on commit 44573ba

Please sign in to comment.