Skip to content

Commit

Permalink
Initial search pipelines implementation
Browse files Browse the repository at this point in the history
This commit includes the basic features of search pipelines
(see opensearch-project/search-processor#80).

Search pipelines are modeled after ingest pipelines and provide a
simple, clean API for components to modify search requests and
responses.

With this commit we can:

1. Can create, retrieve, update, and delete search pipelines.
2. Transform search requests and responses by explicitly referencing a
   pipeline.

Later work will include:

1. Adding an index setting to specify a default search pipeline.
2. Allowing search pipelines to be defined within a search request (for
   development/testing purposes, akin to simulating an ingest
   pipeline).
3. Adding a collection of search pipeline processors to support common
   useful transformations. (Suggestions welcome!)

Signed-off-by: Michael Froh <froh@amazon.com>
  • Loading branch information
msfroh committed Mar 9, 2023
1 parent f79b4dc commit b97d3de
Show file tree
Hide file tree
Showing 61 changed files with 3,636 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Disallow multiple data paths for search nodes ([#6427](https://github.com/opensearch-project/OpenSearch/pull/6427))
- [Segment Replication] Allocation and rebalancing based on average primary shard count per index ([#6422](https://github.com/opensearch-project/OpenSearch/pull/6422))
- Add 'base_path' setting to File System Repository ([#6558](https://github.com/opensearch-project/OpenSearch/pull/6558))
- Add initial search pipelines ([#dummy](https://github.com/opensearch-project/OpenSearch/pull/dummy))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490))
Expand Down
68 changes: 68 additions & 0 deletions modules/search-pipeline-common/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
apply plugin: 'opensearch.yaml-rest-test'
apply plugin: 'opensearch.internal-cluster-test'

opensearchplugin {
description 'Module for search pipeline processors that do not require additional security permissions or have large dependencies and resources'
classname 'org.opensearch.search.pipeline.common.SearchPipelineCommonModulePlugin'
extendedPlugins = ['lang-painless']
}

dependencies {
compileOnly project(':modules:lang-painless')
api project(':libs:opensearch-grok')
api project(':libs:opensearch-dissect')
}

restResources {
restApi {
includeCore '_common', 'search', 'cluster', 'indices', 'index', 'bulk', 'nodes', 'get', 'update', 'cat', 'mget'
}
}

testClusters.all {
// Needed in order to test ingest pipeline templating:
// (this is because the integTest node is not using default distribution, but only the minimal number of required modules)
module ':modules:lang-mustache'
}

thirdPartyAudit.ignoreMissingClasses(
// from log4j
'org.osgi.framework.AdaptPermission',
'org.osgi.framework.AdminPermission',
'org.osgi.framework.Bundle',
'org.osgi.framework.BundleActivator',
'org.osgi.framework.BundleContext',
'org.osgi.framework.BundleEvent',
'org.osgi.framework.SynchronousBundleListener',
'org.osgi.framework.wiring.BundleWire',
'org.osgi.framework.wiring.BundleWiring'
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.pipeline.common;

import org.opensearch.test.OpenSearchIntegTestCase;

public class SearchPipelineCommonIT extends OpenSearchIntegTestCase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.pipeline.common;

import org.opensearch.search.pipeline.Processor;

/**
* Base class for common processor behavior.
*/
abstract class AbstractProcessor implements Processor {
private final String tag;
private final String description;

protected AbstractProcessor(String tag, String description) {
this.tag = tag;
this.description = description;
}

@Override
public String getTag() {
return tag;
}

@Override
public String getDescription() {
return description;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.pipeline.common;

import org.opensearch.action.search.SearchRequest;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.core.ParseField;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchRequestProcessor;

import java.io.InputStream;
import java.util.Map;

import static org.opensearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder;

/**
* This is a {@link SearchRequestProcessor} that replaces the incoming query with a BooleanQuery
* that MUST match the incoming query with a FILTER clause based on the configured query.
*/
public class FilterQueryRequestProcessor extends AbstractProcessor implements SearchRequestProcessor {
/**
* Key to reference this processor type from a search pipeline.
*/
public static final String TYPE = "filter_query";

final QueryBuilder filterQuery;

@Override
public String getType() {
return TYPE;
}

/**
* Constructor that takes a filter query.
*
* @param tag processor tag
* @param description processor description
* @param filterQuery the query that will be added as a filter to incoming queries
*/
public FilterQueryRequestProcessor(String tag, String description, QueryBuilder filterQuery) {
super(tag, description);
this.filterQuery = filterQuery;
}

@Override
public SearchRequest processRequest(SearchRequest request) throws Exception {
QueryBuilder originalQuery = null;
if (request.source() != null) {
originalQuery = request.source().query();
}

BoolQueryBuilder filteredQuery = new BoolQueryBuilder().filter(filterQuery);
if (originalQuery != null) {
filteredQuery.must(originalQuery);
}
if (request.source() == null) {
request.source(new SearchSourceBuilder());
}
request.source().query(filteredQuery);
return request;
}

static class Factory implements Processor.Factory {
private final NamedXContentRegistry namedXContentRegistry;
public static final ParseField QUERY_FIELD = new ParseField("query");

Factory(NamedXContentRegistry namedXContentRegistry) {
this.namedXContentRegistry = namedXContentRegistry;
}

@Override
public FilterQueryRequestProcessor create(
Map<String, Processor.Factory> processorFactories,
String tag,
String description,
Map<String, Object> config
) throws Exception {
try (
XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).map(config);
InputStream stream = BytesReference.bytes(builder).streamInput();
XContentParser parser = XContentType.JSON.xContent()
.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)
) {
XContentParser.Token token = parser.nextToken();
assert token == XContentParser.Token.START_OBJECT;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
if (QUERY_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
return new FilterQueryRequestProcessor(tag, description, parseInnerQueryBuilder(parser));
}
}
}
}
throw new IllegalArgumentException(
"Did not specify the " + QUERY_FIELD.getPreferredName() + " property in processor of type " + TYPE
);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.pipeline.common;

import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SearchPipelinesPlugin;
import org.opensearch.search.pipeline.Processor;

import java.util.Map;

/**
* Plugin providing common search request/response processors for use in search pipelines.
*/
public class SearchPipelineCommonModulePlugin extends Plugin implements SearchPipelinesPlugin {

/**
* No constructor needed, but build complains if we don't have a constructor with JavaDoc.
*/
public SearchPipelineCommonModulePlugin() {}

@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
return Map.of(FilterQueryRequestProcessor.TYPE, new FilterQueryRequestProcessor.Factory(parameters.namedXContentRegistry));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* A collection of commonly-useful request and response processors for use in search pipelines.
*/
package org.opensearch.search.pipeline.common;
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.pipeline.common;

import org.opensearch.action.search.SearchRequest;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.test.AbstractBuilderTestCase;

import java.util.Collections;
import java.util.Map;

public class FilterQueryRequestProcessorTests extends AbstractBuilderTestCase {

public void testFilterQuery() throws Exception {
QueryBuilder filterQuery = new TermQueryBuilder("field", "value");
FilterQueryRequestProcessor filterQueryRequestProcessor = new FilterQueryRequestProcessor(null, null, filterQuery);
QueryBuilder incomingQuery = new TermQueryBuilder("text", "foo");
SearchSourceBuilder source = new SearchSourceBuilder().query(incomingQuery);
SearchRequest request = new SearchRequest().source(source);
SearchRequest transformedRequest = filterQueryRequestProcessor.processRequest(request);
assertEquals(new BoolQueryBuilder().must(incomingQuery).filter(filterQuery), transformedRequest.source().query());

// Test missing incoming query
request = new SearchRequest();
transformedRequest = filterQueryRequestProcessor.processRequest(request);
assertEquals(new BoolQueryBuilder().filter(filterQuery), transformedRequest.source().query());
}

public void testFactory() throws Exception {
FilterQueryRequestProcessor.Factory factory = new FilterQueryRequestProcessor.Factory(this.xContentRegistry());
FilterQueryRequestProcessor processor = factory.create(
Collections.emptyMap(),
null,
null,
Map.of("query", Map.of("term", Map.of("field", "value")))
);
assertEquals(new TermQueryBuilder("field", "value"), processor.filterQuery);

// Missing "query" parameter:
expectThrows(IllegalArgumentException.class, () -> factory.create(Collections.emptyMap(), null, null, Collections.emptyMap()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.pipeline.common;

import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.opensearch.test.rest.yaml.ClientYamlTestCandidate;
import org.opensearch.test.rest.yaml.OpenSearchClientYamlSuiteTestCase;

public class SearchPipelineCommonYamlTestSuiteIT extends OpenSearchClientYamlSuiteTestCase {
public SearchPipelineCommonYamlTestSuiteIT(@Name("yaml") ClientYamlTestCandidate testCandidate) {
super(testCandidate);
}

@ParametersFactory
public static Iterable<Object[]> parameters() throws Exception {
return OpenSearchClientYamlSuiteTestCase.createParameters();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"Search pipeline common installed":
- skip:
reason: "contains is a newly added assertion"
features: contains
- do:
cluster.state: {}

# Get cluster-manager node id
- set: { cluster_manager_node: cluster_manager }

- do:
nodes.info: {}

- contains: { nodes.$cluster_manager.modules: { name: search-pipeline-common } }
- contains: { nodes.$cluster_manager.search_pipelines.processors: { type: filter_query } }
Loading

0 comments on commit b97d3de

Please sign in to comment.