Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into seg-rep/checkpoint-replay
Browse files Browse the repository at this point in the history
  • Loading branch information
Rishikesh1159 committed Jun 29, 2022
2 parents 666ef0c + 30209d8 commit d9b2e71
Show file tree
Hide file tree
Showing 43 changed files with 792 additions and 926 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/gradle-check.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
name: Jenkins Gradle Check
on: [pull_request]
name: Gradle Check (Jenkins)
on:
pull_request_target:
types: [opened, synchronize, reopened]

jobs:
gradle-check:
Expand Down
2 changes: 2 additions & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
# GitHub history for details.
#

# Enable build caching
org.gradle.caching=true
org.gradle.warning.mode=none
org.gradle.parallel=true
org.gradle.jvmargs=-Xmx3g -XX:+HeapDumpOnOutOfMemoryError -Xss2m \
Expand Down
2 changes: 1 addition & 1 deletion plugins/repository-hdfs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ dependencies {
api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}"
api 'com.google.code.gson:gson:2.9.0'
runtimeOnly 'com.google.guava:guava:31.1-jre'
api 'com.google.protobuf:protobuf-java:3.21.1'
api 'com.google.protobuf:protobuf-java:3.21.2'
api "commons-logging:commons-logging:${versions.commonslogging}"
api 'commons-cli:commons-cli:1.5.0'
api "commons-codec:commons-codec:${versions.commonscodec}"
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
b7755d218ee7c15541afb51f2d247ca951603e0b
13 changes: 6 additions & 7 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,6 @@ public final class IndexModule {

private static final FsDirectoryFactory DEFAULT_DIRECTORY_FACTORY = new FsDirectoryFactory();

private static final RemoteDirectoryFactory REMOTE_DIRECTORY_FACTORY = new RemoteDirectoryFactory();

private static final IndexStorePlugin.RecoveryStateFactory DEFAULT_RECOVERY_STATE_FACTORY = RecoveryState::new;

public static final Setting<String> INDEX_STORE_TYPE_SETTING = new Setting<>(
Expand Down Expand Up @@ -189,9 +187,9 @@ public final class IndexModule {
* Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
* via {@link org.opensearch.plugins.PluginsService#onIndexModule(IndexModule)}.
*
* @param indexSettings the index settings
* @param analysisRegistry the analysis registry
* @param engineFactory the engine factory
* @param indexSettings the index settings
* @param analysisRegistry the analysis registry
* @param engineFactory the engine factory
* @param directoryFactories the available store types
*/
public IndexModule(
Expand Down Expand Up @@ -476,7 +474,8 @@ public IndexService newIndexService(
IndicesFieldDataCache indicesFieldDataCache,
NamedWriteableRegistry namedWriteableRegistry,
BooleanSupplier idFieldDataEnabled,
ValuesSourceRegistry valuesSourceRegistry
ValuesSourceRegistry valuesSourceRegistry,
RemoteDirectoryFactory remoteDirectoryFactory
) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
Expand Down Expand Up @@ -519,7 +518,7 @@ public IndexService newIndexService(
client,
queryCache,
directoryFactory,
REMOTE_DIRECTORY_FACTORY,
remoteDirectoryFactory,
eventListener,
readerWrapperFactory,
mapperRegistry,
Expand Down
37 changes: 17 additions & 20 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardClosedException;
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.RemoteStoreRefreshListener;
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.shard.ShardNotFoundException;
Expand All @@ -97,9 +96,6 @@
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -437,8 +433,7 @@ public synchronized IndexShard createShard(
final ShardRouting routing,
final Consumer<ShardId> globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final SegmentReplicationCheckpointPublisher checkpointPublisher,
final RepositoriesService repositoriesService
final SegmentReplicationCheckpointPublisher checkpointPublisher
) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
/*
Expand Down Expand Up @@ -511,22 +506,24 @@ public synchronized IndexShard createShard(
warmer.warm(reader, shard, IndexService.this.indexSettings);
}
};
Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
Directory remoteDirectory = null;
RemoteStoreRefreshListener remoteStoreRefreshListener = null;

Store remoteStore = null;
if (this.indexSettings.isRemoteStoreEnabled()) {
try {
Repository repository = repositoriesService.repository(clusterService.state().metadata().clusterUUID());
remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path, repository);
remoteStoreRefreshListener = new RemoteStoreRefreshListener(directory, remoteDirectory);
} catch (RepositoryMissingException e) {
throw new IllegalArgumentException(
"Repository should be created before creating index with remote_store enabled setting",
e
);
}
Directory remoteDirectory = remoteDirectoryFactory.newDirectory(
clusterService.state().metadata().clusterUUID(),
this.indexSettings,
path
);
remoteStore = new Store(
shardId,
this.indexSettings,
remoteDirectory,
lock,
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId))
);
}

Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
store = new Store(
shardId,
this.indexSettings,
Expand Down Expand Up @@ -557,7 +554,7 @@ public synchronized IndexShard createShard(
retentionLeaseSyncer,
circuitBreakerService,
this.indexSettings.isSegRepEnabled() && routing.primary() ? checkpointPublisher : null,
remoteStoreRefreshListener
remoteStore
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
99 changes: 21 additions & 78 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogStats;
import org.opensearch.index.translog.TranslogManager;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
import org.opensearch.search.suggest.completion.CompletionStats;

import java.io.Closeable;
Expand All @@ -107,7 +109,6 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Stream;

import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
Expand Down Expand Up @@ -167,6 +168,8 @@ public final EngineConfig config() {
return engineConfig;
}

public abstract TranslogManager translogManager();

protected abstract SegmentInfos getLastCommittedSegmentInfos();

/**
Expand Down Expand Up @@ -346,12 +349,6 @@ boolean throttleLockIsHeldByCurrentThread() { // to be used in assertions and te
*/
public abstract boolean isThrottled();

/**
* Trims translog for terms below <code>belowTerm</code> and seq# above <code>aboveSeqNo</code>
* @see Translog#trimOperations(long, long)
*/
public abstract void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException;

/**
* A Lock implementation that always allows the lock to be acquired
*
Expand Down Expand Up @@ -784,18 +781,6 @@ public enum SearcherScope {
INTERNAL
}

/**
* Checks if the underlying storage sync is required.
*/
public abstract boolean isTranslogSyncNeeded();

/**
* Ensures that all locations in the given stream have been written to the underlying storage.
*/
public abstract boolean ensureTranslogSynced(Stream<Translog.Location> locations) throws IOException;

public abstract void syncTranslog() throws IOException;

/**
* Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed
*/
Expand Down Expand Up @@ -831,13 +816,6 @@ public abstract Translog.Snapshot newChangesSnapshot(
*/
public abstract long getMinRetainedSeqNo();

public abstract TranslogStats getTranslogStats();

/**
* Returns the last location that the translog of this engine has written into.
*/
public abstract Translog.Location getTranslogLastWriteLocation();

protected final void ensureOpen(Exception suppressed) {
if (isClosed.get()) {
AlreadyClosedException ace = new AlreadyClosedException(shardId + " engine is closed", failedEngine.get());
Expand Down Expand Up @@ -905,6 +883,22 @@ public SegmentsStats segmentsStats(boolean includeSegmentFileSizes, boolean incl
return stats;
}

protected TranslogDeletionPolicy getTranslogDeletionPolicy(EngineConfig engineConfig) {
TranslogDeletionPolicy customTranslogDeletionPolicy = null;
if (engineConfig.getCustomTranslogDeletionPolicyFactory() != null) {
customTranslogDeletionPolicy = engineConfig.getCustomTranslogDeletionPolicyFactory()
.create(engineConfig.getIndexSettings(), engineConfig.retentionLeasesSupplier());
}
return Objects.requireNonNullElseGet(
customTranslogDeletionPolicy,
() -> new DefaultTranslogDeletionPolicy(
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(),
engineConfig.getIndexSettings().getTranslogRetentionTotalFiles()
)
);
}

protected void fillSegmentStats(SegmentReader segmentReader, boolean includeSegmentFileSizes, SegmentsStats stats) {
stats.add(1);
if (includeSegmentFileSizes) {
Expand Down Expand Up @@ -1152,25 +1146,6 @@ public final void flush() throws EngineException {
flush(false, false);
}

/**
* checks and removes translog files that no longer need to be retained. See
* {@link org.opensearch.index.translog.TranslogDeletionPolicy} for details
*/
public abstract void trimUnreferencedTranslogFiles() throws EngineException;

/**
* Tests whether or not the translog generation should be rolled to a new generation.
* This test is based on the size of the current generation compared to the configured generation threshold size.
*
* @return {@code true} if the current generation should be rolled to a new generation
*/
public abstract boolean shouldRollTranslogGeneration();

/**
* Rolls the translog generation and cleans unneeded.
*/
public abstract void rollTranslogGeneration() throws EngineException;

/**
* Triggers a forced merge on this engine
*/
Expand Down Expand Up @@ -1982,14 +1957,6 @@ public interface Warmer {
*/
public abstract void deactivateThrottling();

/**
* This method replays translog to restore the Lucene index which might be reverted previously.
* This ensures that all acknowledged writes are restored correctly when this engine is promoted.
*
* @return the number of translog operations have been recovered
*/
public abstract int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecoveryRunner) throws IOException;

/**
* Fills up the local checkpoints history with no-ops until the local checkpoint
* and the max seen sequence ID are identical.
Expand All @@ -1998,20 +1965,6 @@ public interface Warmer {
*/
public abstract int fillSeqNoGaps(long primaryTerm) throws IOException;

/**
* Performs recovery from the transaction log up to {@code recoverUpToSeqNo} (inclusive).
* This operation will close the engine if the recovery fails.
*
* @param translogRecoveryRunner the translog recovery runner
* @param recoverUpToSeqNo the upper bound, inclusive, of sequence number to be recovered
*/
public abstract Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException;

/**
* Do not replay translog operations, but make the engine be ready.
*/
public abstract void skipTranslogRecovery();

/**
* Tries to prune buffered deletes from the version map.
*/
Expand All @@ -2032,16 +1985,6 @@ public long getMaxSeenAutoIdTimestamp() {
*/
public abstract void updateMaxUnsafeAutoIdTimestamp(long newTimestamp);

/**
* The runner for translog recovery
*
* @opensearch.internal
*/
@FunctionalInterface
public interface TranslogRecoveryRunner {
int run(Engine engine, Translog.Snapshot snapshot) throws IOException;
}

/**
* Returns the maximum sequence number of either update or delete operations have been processed in this engine
* or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. An index request is considered
Expand Down
Loading

0 comments on commit d9b2e71

Please sign in to comment.