Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into tracer_injection
Browse files Browse the repository at this point in the history
  • Loading branch information
suranjay committed Jul 19, 2023
2 parents 908c3bf + 57d5e90 commit 5b355ed
Show file tree
Hide file tree
Showing 21 changed files with 105 additions and 147 deletions.
2 changes: 0 additions & 2 deletions client/rest-high-level/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,6 @@ testClusters.all {
extraConfigFile nodeTrustStore.name, nodeTrustStore
extraConfigFile pkiTrustCert.name, pkiTrustCert

// Enable APIs behind feature flags
setting 'opensearch.experimental.feature.search_pipeline.enabled', 'true'
}

thirdPartyAudit.ignoreMissingClasses(
Expand Down
6 changes: 0 additions & 6 deletions distribution/src/config/opensearch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,6 @@ ${path.logs}
#opensearch.experimental.feature.extensions.enabled: false
#
#
# Gates the search pipeline feature. This feature enables configurable processors
# for search requests and search responses, similar to ingest pipelines.
#
#opensearch.experimental.feature.search_pipeline.enabled: false
#
#
# Gates the concurrent segment search feature. This feature enables concurrent segment search in a separate
# index searcher threadpool.
#
Expand Down
3 changes: 0 additions & 3 deletions modules/search-pipeline-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ restResources {
}
}

testClusters.all {
setting 'opensearch.experimental.feature.search_pipeline.enabled', 'true'
}

thirdPartyAudit.ignoreMissingClasses(
// from log4j
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.plugins.Plugin;
Expand All @@ -35,11 +33,6 @@
@OpenSearchIntegTestCase.SuiteScopeTestCase
public class SearchPipelineCommonIT extends OpenSearchIntegTestCase {

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(FeatureFlags.SEARCH_PIPELINE, "true").build();
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(SearchPipelineCommonModulePlugin.class);
Expand Down
1 change: 0 additions & 1 deletion qa/mixed-cluster/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) {
numberOfNodes = 4

setting 'path.repo', "${buildDir}/cluster/shared/repo/${baseName}"
setting 'opensearch.experimental.feature.search_pipeline.enabled', 'true'
}
}

Expand Down
1 change: 0 additions & 1 deletion qa/smoke-test-multinode/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ File repo = file("$buildDir/testclusters/repo")
testClusters.integTest {
numberOfNodes = 2
setting 'path.repo', repo.absolutePath
setting 'opensearch.experimental.feature.search_pipeline.enabled', 'true'
}

integTest {
Expand Down
1 change: 0 additions & 1 deletion rest-api-spec/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ artifacts {

testClusters.all {
module ':modules:mapper-extras'
setting 'opensearch.experimental.feature.search_pipeline.enabled', 'true'
}

test.enabled = false
Expand Down
8 changes: 3 additions & 5 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -946,11 +946,9 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestDeleteDecommissionStateAction());

// Search pipelines API
if (FeatureFlags.isEnabled(FeatureFlags.SEARCH_PIPELINE)) {
registerHandler.accept(new RestPutSearchPipelineAction());
registerHandler.accept(new RestGetSearchPipelineAction());
registerHandler.accept(new RestDeleteSearchPipelineAction());
}
registerHandler.accept(new RestPutSearchPipelineAction());
registerHandler.accept(new RestGetSearchPipelineAction());
registerHandler.accept(new RestDeleteSearchPipelineAction());

// Extensions API
if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ protected FeatureFlagSettings(
FeatureFlags.REMOTE_STORE_SETTING,
FeatureFlags.EXTENSIONS_SETTING,
FeatureFlags.IDENTITY_SETTING,
FeatureFlags.SEARCH_PIPELINE_SETTING,
FeatureFlags.CONCURRENT_SEGMENT_SEARCH_SETTING,
FeatureFlags.TELEMETRY_SETTING
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,6 @@ public class FeatureFlags {
*/
public static final String EXTENSIONS = "opensearch.experimental.feature.extensions.enabled";

/**
* Gates the search pipeline features during initial development.
* Once the feature is complete and ready for release, this feature flag can be removed.
*/
public static final String SEARCH_PIPELINE = "opensearch.experimental.feature.search_pipeline.enabled";

/**
* Gates the functionality of identity.
*/
Expand Down Expand Up @@ -106,8 +100,6 @@ public static boolean isEnabled(String featureFlagName) {

public static final Setting<Boolean> EXTENSIONS_SETTING = Setting.boolSetting(EXTENSIONS, false, Property.NodeScope);

public static final Setting<Boolean> SEARCH_PIPELINE_SETTING = Setting.boolSetting(SEARCH_PIPELINE, true, Property.NodeScope);

public static final Setting<Boolean> IDENTITY_SETTING = Setting.boolSetting(IDENTITY, false, Property.NodeScope);

public static final Setting<Boolean> TELEMETRY_SETTING = Setting.boolSetting(TELEMETRY, false, Property.NodeScope);
Expand Down
14 changes: 1 addition & 13 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsException;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
Expand All @@ -64,7 +63,6 @@
import java.util.function.UnaryOperator;

import static org.opensearch.common.util.FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY;
import static org.opensearch.common.util.FeatureFlags.SEARCH_PIPELINE;
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_DEPTH_LIMIT_SETTING;
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING;
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_NESTED_DOCS_LIMIT_SETTING;
Expand Down Expand Up @@ -1625,16 +1623,6 @@ public String getDefaultSearchPipeline() {
}

public void setDefaultSearchPipeline(String defaultSearchPipeline) {
if (FeatureFlags.isEnabled(SEARCH_PIPELINE)) {
this.defaultSearchPipeline = defaultSearchPipeline;
} else {
throw new SettingsException(
"Unable to update setting: "
+ DEFAULT_SEARCH_PIPELINE.getKey()
+ ". This is an experimental feature that is currently disabled, please enable the "
+ SEARCH_PIPELINE
+ " feature flag first."
);
}
this.defaultSearchPipeline = defaultSearchPipeline;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class NRTReplicationEngine extends Engine {
private final CompletionStatsCache completionStatsCache;
private final LocalCheckpointTracker localCheckpointTracker;
private final WriteOnlyTranslogManager translogManager;
private final boolean shouldCommit;

private volatile long lastReceivedGen = SequenceNumbers.NO_OPS_PERFORMED;

Expand Down Expand Up @@ -113,6 +114,7 @@ public void onAfterTranslogSync() {
engineConfig.getPrimaryModeSupplier()
);
this.translogManager = translogManagerRef;
this.shouldCommit = engineConfig.getIndexSettings().isRemoteStoreEnabled() == false;
} catch (IOException e) {
IOUtils.closeWhileHandlingException(store::decRef, readerManager, translogManagerRef);
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
Expand Down Expand Up @@ -163,7 +165,9 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep
* @throws IOException - When there is an IO error committing the SegmentInfos.
*/
private void commitSegmentInfos(SegmentInfos infos) throws IOException {
store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint());
if (shouldCommit) {
store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint());
}
this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
translogManager.syncTranslog();
}
Expand Down Expand Up @@ -383,15 +387,21 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread()
: "Either the write lock must be held or the engine must be currently be failing itself";
try {
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
/*
This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied
from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is
used to generate new segment file names. The ideal solution is to identify the counter from previous primary.
*/
latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT;
latestSegmentInfos.changed();
commitSegmentInfos(latestSegmentInfos);
// if remote store is enabled, all segments durably persisted
if (shouldCommit) {
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
/*
This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied
from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is
used to generate new segment file names. The ideal solution is to identify the counter from previous primary.
*/
latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT;
latestSegmentInfos.changed();
commitSegmentInfos(latestSegmentInfos);
} else {
store.directory().sync(List.of(store.directory().listAll()));
store.directory().syncMetaData();
}
IOUtils.close(readerManager, translogManager, store::decRef);
} catch (Exception e) {
logger.warn("failed to close engine", e);
Expand Down
47 changes: 25 additions & 22 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -4661,31 +4661,34 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
indexInput,
remoteSegmentMetadata.getGeneration()
);
// Replicas never need a local commit
if (shouldCommit) {
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
// Following code block makes sure to use SegmentInfosSnapshot in the remote store if generation differs
// with local filesystem. If local filesystem already has segments_N+2 and infosSnapshot has generation N,
// after commit, there would be 2 files that would be created segments_N+1 and segments_N+2. With the
// policy of preserving only the latest commit, we will delete segments_N+1 which in fact is the part of the latest
// commit.
Optional<String> localMaxSegmentInfos = localSegmentFiles.stream()
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
.max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName));
if (localMaxSegmentInfos.isPresent()
&& infosSnapshot.getGeneration() < SegmentInfos.generationFromSegmentsFileName(localMaxSegmentInfos.get())
- 1) {
// If remote translog is not enabled, local translog will be created with different UUID.
// This fails in Store.trimUnsafeCommits() as translog UUID of checkpoint and SegmentInfos needs
// to be same. Following code block make sure to have the same UUID.
if (indexSettings.isRemoteTranslogStoreEnabled() == false) {
SegmentInfos localSegmentInfos = store.readLastCommittedSegmentsInfo();
Map<String, String> userData = new HashMap<>(infosSnapshot.getUserData());
userData.put(TRANSLOG_UUID_KEY, localSegmentInfos.userData.get(TRANSLOG_UUID_KEY));
infosSnapshot.setUserData(userData, false);
if (this.shardRouting.primary()) {
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
// Following code block makes sure to use SegmentInfosSnapshot in the remote store if generation differs
// with local filesystem. If local filesystem already has segments_N+2 and infosSnapshot has generation N,
// after commit, there would be 2 files that would be created segments_N+1 and segments_N+2. With the
// policy of preserving only the latest commit, we will delete segments_N+1 which in fact is the part of the
// latest commit.
Optional<String> localMaxSegmentInfos = localSegmentFiles.stream()
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
.max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName));
if (localMaxSegmentInfos.isPresent()
&& infosSnapshot.getGeneration() < SegmentInfos.generationFromSegmentsFileName(localMaxSegmentInfos.get())
- 1) {
// If remote translog is not enabled, local translog will be created with different UUID.
// This fails in Store.trimUnsafeCommits() as translog UUID of checkpoint and SegmentInfos needs
// to be same. Following code block make sure to have the same UUID.
if (indexSettings.isRemoteTranslogStoreEnabled() == false) {
SegmentInfos localSegmentInfos = store.readLastCommittedSegmentsInfo();
Map<String, String> userData = new HashMap<>(infosSnapshot.getUserData());
userData.put(TRANSLOG_UUID_KEY, localSegmentInfos.userData.get(TRANSLOG_UUID_KEY));
infosSnapshot.setUserData(userData, false);
}
storeDirectory.deleteFile(localMaxSegmentInfos.get());
}
storeDirectory.deleteFile(localMaxSegmentInfos.get());
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
}
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
} else {
finalizeReplication(infosSnapshot);
}
Expand Down
4 changes: 1 addition & 3 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@
import java.util.stream.Stream;

import static java.util.stream.Collectors.toList;
import static org.opensearch.common.util.FeatureFlags.SEARCH_PIPELINE;
import static org.opensearch.common.util.FeatureFlags.TELEMETRY;
import static org.opensearch.env.NodeEnvironment.collectFileCacheDataPath;
import static org.opensearch.index.ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY;
Expand Down Expand Up @@ -981,8 +980,7 @@ protected Node(
xContentRegistry,
namedWriteableRegistry,
pluginsService.filterPlugins(SearchPipelinePlugin.class),
client,
FeatureFlags.isEnabled(SEARCH_PIPELINE)
client
);
final TaskCancellationMonitoringSettings taskCancellationMonitoringSettings = new TaskCancellationMonitoringSettings(
settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.opensearch.Version;
import org.opensearch.common.Booleans;
import org.opensearch.common.Nullable;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.common.ParsingException;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -1250,16 +1249,15 @@ public void parseXContent(XContentParser parser, boolean checkTrailingTokens) th
collapse = CollapseBuilder.fromXContent(parser);
} else if (POINT_IN_TIME.match(currentFieldName, parser.getDeprecationHandler())) {
pointInTimeBuilder = PointInTimeBuilder.fromXContent(parser);
} else if (FeatureFlags.isEnabled(FeatureFlags.SEARCH_PIPELINE)
&& SEARCH_PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
searchPipelineSource = parser.mapOrdered();
} else {
throw new ParsingException(
parser.getTokenLocation(),
"Unknown key for a " + token + " in [" + currentFieldName + "].",
parser.getTokenLocation()
);
}
} else if (SEARCH_PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
searchPipelineSource = parser.mapOrdered();
} else {
throw new ParsingException(
parser.getTokenLocation(),
"Unknown key for a " + token + " in [" + currentFieldName + "].",
parser.getTokenLocation()
);
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (STORED_FIELDS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
storedFieldsContext = StoredFieldsContext.fromXContent(STORED_FIELDS_FIELD.getPreferredName(), parser);
Expand Down
Loading

0 comments on commit 5b355ed

Please sign in to comment.