diff --git a/CHANGELOG.md b/CHANGELOG.md index df3ac6ae0fa1b..bfd313e9a8f37 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -78,6 +78,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Disallow multiple data paths for search nodes ([#6427](https://github.com/opensearch-project/OpenSearch/pull/6427)) - [Segment Replication] Allocation and rebalancing based on average primary shard count per index ([#6422](https://github.com/opensearch-project/OpenSearch/pull/6422)) - Add 'base_path' setting to File System Repository ([#6558](https://github.com/opensearch-project/OpenSearch/pull/6558)) +- Add initial search pipelines ([#dummy](https://github.com/opensearch-project/OpenSearch/pull/dummy)) ### Dependencies - Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490)) diff --git a/client/rest-high-level/src/main/java/org/opensearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/opensearch/client/RestHighLevelClient.java index e97a42d971a85..d0d823575b563 100644 --- a/client/rest-high-level/src/main/java/org/opensearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/opensearch/client/RestHighLevelClient.java @@ -268,6 +268,7 @@ public class RestHighLevelClient implements Closeable { private final IngestClient ingestClient = new IngestClient(this); private final SnapshotClient snapshotClient = new SnapshotClient(this); private final TasksClient tasksClient = new TasksClient(this); + private final SearchPipelinesClient searchPipelinesClient = new SearchPipelinesClient(this); /** * Creates a {@link RestHighLevelClient} given the low level {@link RestClientBuilder} that allows to build the @@ -354,6 +355,10 @@ public final TasksClient tasks() { return tasksClient; } + public final SearchPipelinesClient searchPipelines() { + return searchPipelinesClient; + } + /** * Executes a bulk request using the Bulk API. * @param bulkRequest the request diff --git a/client/rest-high-level/src/main/java/org/opensearch/client/SearchPipelinesClient.java b/client/rest-high-level/src/main/java/org/opensearch/client/SearchPipelinesClient.java new file mode 100644 index 0000000000000..36a390a805f4b --- /dev/null +++ b/client/rest-high-level/src/main/java/org/opensearch/client/SearchPipelinesClient.java @@ -0,0 +1,153 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.search.DeleteSearchPipelineRequest; +import org.opensearch.action.search.GetSearchPipelineRequest; +import org.opensearch.action.search.GetSearchPipelineResponse; +import org.opensearch.action.search.PutSearchPipelineRequest; +import org.opensearch.action.support.master.AcknowledgedResponse; + +import java.io.IOException; +import java.util.Collections; + +import static java.util.Collections.emptySet; + +public final class SearchPipelinesClient { + private final RestHighLevelClient restHighLevelClient; + + SearchPipelinesClient(RestHighLevelClient restHighLevelClient) { + this.restHighLevelClient = restHighLevelClient; + } + + /** + * Add a pipeline or update an existing pipeline. + * + * @param request the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return the response + * @throws IOException in case there is a problem sending the request or parsing back the response + */ + public AcknowledgedResponse putPipeline(PutSearchPipelineRequest request, RequestOptions options) throws IOException { + return restHighLevelClient.performRequestAndParseEntity( + request, + SearchPipelinesRequestConverters::putPipeline, + options, + AcknowledgedResponse::fromXContent, + emptySet() + ); + } + + /** + * Asynchronously add a pipeline or update an existing pipeline. + * + * @param request the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @param listener the listener to be notified upon request completion + * @return cancellable that may be used to cancel the request + */ + public Cancellable putPipelineAsync( + PutSearchPipelineRequest request, + RequestOptions options, + ActionListener listener + ) { + return restHighLevelClient.performRequestAsyncAndParseEntity( + request, + SearchPipelinesRequestConverters::putPipeline, + options, + AcknowledgedResponse::fromXContent, + listener, + emptySet() + ); + } + + /** + * Get an existing pipeline. + * + * @param request the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return the response + * @throws IOException in case there is a problem sending the request or parsing back the response + */ + public GetSearchPipelineResponse getPipeline(GetSearchPipelineRequest request, RequestOptions options) throws IOException { + return restHighLevelClient.performRequestAndParseEntity( + request, + SearchPipelinesRequestConverters::getPipeline, + options, + GetSearchPipelineResponse::fromXContent, + Collections.singleton(404) + ); + } + + /** + * Asynchronously get an existing pipeline. + * + * @param request the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @param listener the listener to be notified upon request completion + * @return cancellable that may be used to cancel the request + */ + public Cancellable getPipelineAsync( + GetSearchPipelineRequest request, + RequestOptions options, + ActionListener listener + ) { + return restHighLevelClient.performRequestAsyncAndParseEntity( + request, + SearchPipelinesRequestConverters::getPipeline, + options, + GetSearchPipelineResponse::fromXContent, + listener, + Collections.singleton(404) + ); + } + + /** + * Delete an existing pipeline. + * + * @param request the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return the response + * @throws IOException in case there is a problem sending the request or parsing back the response + */ + public AcknowledgedResponse deletePipeline(DeleteSearchPipelineRequest request, RequestOptions options) throws IOException { + return restHighLevelClient.performRequestAndParseEntity( + request, + SearchPipelinesRequestConverters::deletePipeline, + options, + AcknowledgedResponse::fromXContent, + emptySet() + ); + } + + /** + * Asynchronously delete an existing pipeline. + * + * @param request the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @param listener the listener to be notified upon request completion + * @return cancellable that may be used to cancel the request + */ + public Cancellable deletePipelineAsync( + DeleteSearchPipelineRequest request, + RequestOptions options, + ActionListener listener + ) { + return restHighLevelClient.performRequestAsyncAndParseEntity( + request, + SearchPipelinesRequestConverters::deletePipeline, + options, + AcknowledgedResponse::fromXContent, + listener, + emptySet() + ); + } + +} diff --git a/client/rest-high-level/src/main/java/org/opensearch/client/SearchPipelinesRequestConverters.java b/client/rest-high-level/src/main/java/org/opensearch/client/SearchPipelinesRequestConverters.java new file mode 100644 index 0000000000000..53df19d2115d5 --- /dev/null +++ b/client/rest-high-level/src/main/java/org/opensearch/client/SearchPipelinesRequestConverters.java @@ -0,0 +1,61 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client; + +import org.apache.hc.client5.http.classic.methods.HttpDelete; +import org.apache.hc.client5.http.classic.methods.HttpGet; +import org.apache.hc.client5.http.classic.methods.HttpPut; +import org.opensearch.action.search.DeleteSearchPipelineRequest; +import org.opensearch.action.search.GetSearchPipelineRequest; +import org.opensearch.action.search.PutSearchPipelineRequest; + +import java.io.IOException; + +final class SearchPipelinesRequestConverters { + private SearchPipelinesRequestConverters() {} + + static Request putPipeline(PutSearchPipelineRequest putPipelineRequest) throws IOException { + String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_search/pipeline") + .addPathPart(putPipelineRequest.getId()) + .build(); + Request request = new Request(HttpPut.METHOD_NAME, endpoint); + + RequestConverters.Params params = new RequestConverters.Params(); + params.withTimeout(putPipelineRequest.timeout()); + params.withClusterManagerTimeout(putPipelineRequest.clusterManagerNodeTimeout()); + request.addParameters(params.asMap()); + request.setEntity(RequestConverters.createEntity(putPipelineRequest, RequestConverters.REQUEST_BODY_CONTENT_TYPE)); + return request; + } + + static Request deletePipeline(DeleteSearchPipelineRequest deletePipelineRequest) { + String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_search/pipeline") + .addPathPart(deletePipelineRequest.getId()) + .build(); + Request request = new Request(HttpDelete.METHOD_NAME, endpoint); + + RequestConverters.Params parameters = new RequestConverters.Params(); + parameters.withTimeout(deletePipelineRequest.timeout()); + parameters.withClusterManagerTimeout(deletePipelineRequest.clusterManagerNodeTimeout()); + request.addParameters(parameters.asMap()); + return request; + } + + static Request getPipeline(GetSearchPipelineRequest getPipelineRequest) { + String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_search/pipeline") + .addCommaSeparatedPathParts(getPipelineRequest.getIds()) + .build(); + Request request = new Request(HttpGet.METHOD_NAME, endpoint); + + RequestConverters.Params parameters = new RequestConverters.Params(); + parameters.withClusterManagerTimeout(getPipelineRequest.clusterManagerNodeTimeout()); + request.addParameters(parameters.asMap()); + return request; + } +} diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/SearchPipelinesClientIT.java b/client/rest-high-level/src/test/java/org/opensearch/client/SearchPipelinesClientIT.java new file mode 100644 index 0000000000000..6f01cb5144eba --- /dev/null +++ b/client/rest-high-level/src/test/java/org/opensearch/client/SearchPipelinesClientIT.java @@ -0,0 +1,116 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client; + +import org.opensearch.action.search.DeleteSearchPipelineRequest; +import org.opensearch.action.search.GetSearchPipelineRequest; +import org.opensearch.action.search.GetSearchPipelineResponse; +import org.opensearch.action.search.PutSearchPipelineRequest; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.search.pipeline.Pipeline; + +import java.io.IOException; + +public class SearchPipelinesClientIT extends OpenSearchRestHighLevelClientTestCase { + + public void testPutPipeline() throws IOException { + String id = "some_pipeline_id"; + XContentBuilder pipelineBuilder = buildSearchPipeline(); + PutSearchPipelineRequest request = new PutSearchPipelineRequest( + id, + BytesReference.bytes(pipelineBuilder), + pipelineBuilder.contentType() + ); + createPipeline(request); + } + + private static void createPipeline(PutSearchPipelineRequest request) throws IOException { + AcknowledgedResponse response = execute( + request, + highLevelClient().searchPipelines()::putPipeline, + highLevelClient().searchPipelines()::putPipelineAsync + ); + assertTrue(response.isAcknowledged()); + } + + public void testGetPipeline() throws IOException { + String id = "some_pipeline_id"; + XContentBuilder pipelineBuilder = buildSearchPipeline(); + PutSearchPipelineRequest request = new PutSearchPipelineRequest( + id, + BytesReference.bytes(pipelineBuilder), + pipelineBuilder.contentType() + ); + createPipeline(request); + + GetSearchPipelineRequest getRequest = new GetSearchPipelineRequest(id); + GetSearchPipelineResponse response = execute( + getRequest, + highLevelClient().searchPipelines()::getPipeline, + highLevelClient().searchPipelines()::getPipelineAsync + ); + assertTrue(response.isFound()); + assertEquals(1, response.pipelines().size()); + assertEquals(id, response.pipelines().get(0).getId()); + } + + public void testDeletePipeline() throws IOException { + String id = "some_pipeline_id"; + XContentBuilder pipelineBuilder = buildSearchPipeline(); + PutSearchPipelineRequest request = new PutSearchPipelineRequest( + id, + BytesReference.bytes(pipelineBuilder), + pipelineBuilder.contentType() + ); + createPipeline(request); + + DeleteSearchPipelineRequest deleteRequest = new DeleteSearchPipelineRequest(id); + AcknowledgedResponse response = execute( + deleteRequest, + highLevelClient().searchPipelines()::deletePipeline, + highLevelClient().searchPipelines()::deletePipelineAsync + ); + assertTrue(response.isAcknowledged()); + } + + private static XContentBuilder buildSearchPipeline() throws IOException { + XContentType xContentType = randomFrom(XContentType.values()); + XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent()); + return buildSearchPipeline(pipelineBuilder); + } + + private static XContentBuilder buildSearchPipeline(XContentBuilder builder) throws IOException { + builder.startObject(); + { + builder.field("description", "a pipeline description"); + builder.startArray(Pipeline.REQUEST_PROCESSORS_KEY); + { + builder.startObject().startObject("filter_query"); + { + builder.startObject("query"); + { + builder.startObject("term"); + { + builder.field("field", "value"); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject().endObject(); + } + builder.endArray(); + } + builder.endObject(); + return builder; + } +} diff --git a/modules/search-pipeline-common/build.gradle b/modules/search-pipeline-common/build.gradle new file mode 100644 index 0000000000000..dd37384923930 --- /dev/null +++ b/modules/search-pipeline-common/build.gradle @@ -0,0 +1,68 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +apply plugin: 'opensearch.yaml-rest-test' +apply plugin: 'opensearch.internal-cluster-test' + +opensearchplugin { + description 'Module for search pipeline processors that do not require additional security permissions or have large dependencies and resources' + classname 'org.opensearch.search.pipeline.common.SearchPipelineCommonModulePlugin' + extendedPlugins = ['lang-painless'] +} + +dependencies { + compileOnly project(':modules:lang-painless') + api project(':libs:opensearch-grok') + api project(':libs:opensearch-dissect') +} + +restResources { + restApi { + includeCore '_common', 'search_pipelines', 'cluster', 'indices', 'index', 'nodes', 'get', 'update', 'cat', 'mget' + } +} + +testClusters.all { + // Needed in order to test ingest pipeline templating: + // (this is because the integTest node is not using default distribution, but only the minimal number of required modules) + module ':modules:lang-mustache' +} + +thirdPartyAudit.ignoreMissingClasses( + // from log4j + 'org.osgi.framework.AdaptPermission', + 'org.osgi.framework.AdminPermission', + 'org.osgi.framework.Bundle', + 'org.osgi.framework.BundleActivator', + 'org.osgi.framework.BundleContext', + 'org.osgi.framework.BundleEvent', + 'org.osgi.framework.SynchronousBundleListener', + 'org.osgi.framework.wiring.BundleWire', + 'org.osgi.framework.wiring.BundleWiring' +) diff --git a/modules/search-pipeline-common/src/internalClusterTest/java/org/opensearch/search/pipeline/common/SearchPipelineCommonIT.java b/modules/search-pipeline-common/src/internalClusterTest/java/org/opensearch/search/pipeline/common/SearchPipelineCommonIT.java new file mode 100644 index 0000000000000..e855d0c2ae880 --- /dev/null +++ b/modules/search-pipeline-common/src/internalClusterTest/java/org/opensearch/search/pipeline/common/SearchPipelineCommonIT.java @@ -0,0 +1,13 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline.common; + +import org.opensearch.test.OpenSearchIntegTestCase; + +public class SearchPipelineCommonIT extends OpenSearchIntegTestCase {} diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/AbstractProcessor.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/AbstractProcessor.java new file mode 100644 index 0000000000000..e62497cb54db5 --- /dev/null +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/AbstractProcessor.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline.common; + +import org.opensearch.search.pipeline.Processor; + +/** + * Base class for common processor behavior. + */ +abstract class AbstractProcessor implements Processor { + private final String tag; + private final String description; + + protected AbstractProcessor(String tag, String description) { + this.tag = tag; + this.description = description; + } + + @Override + public String getTag() { + return tag; + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java new file mode 100644 index 0000000000000..81c00012daec6 --- /dev/null +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java @@ -0,0 +1,117 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline.common; + +import org.opensearch.action.search.SearchRequest; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.ParseField; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.search.pipeline.Processor; +import org.opensearch.search.pipeline.SearchRequestProcessor; + +import java.io.InputStream; +import java.util.Map; + +import static org.opensearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder; + +/** + * This is a {@link SearchRequestProcessor} that replaces the incoming query with a BooleanQuery + * that MUST match the incoming query with a FILTER clause based on the configured query. + */ +public class FilterQueryRequestProcessor extends AbstractProcessor implements SearchRequestProcessor { + /** + * Key to reference this processor type from a search pipeline. + */ + public static final String TYPE = "filter_query"; + + final QueryBuilder filterQuery; + + @Override + public String getType() { + return TYPE; + } + + /** + * Constructor that takes a filter query. + * + * @param tag processor tag + * @param description processor description + * @param filterQuery the query that will be added as a filter to incoming queries + */ + public FilterQueryRequestProcessor(String tag, String description, QueryBuilder filterQuery) { + super(tag, description); + this.filterQuery = filterQuery; + } + + @Override + public SearchRequest processRequest(SearchRequest request) throws Exception { + QueryBuilder originalQuery = null; + if (request.source() != null) { + originalQuery = request.source().query(); + } + + BoolQueryBuilder filteredQuery = new BoolQueryBuilder().filter(filterQuery); + if (originalQuery != null) { + filteredQuery.must(originalQuery); + } + if (request.source() == null) { + request.source(new SearchSourceBuilder()); + } + request.source().query(filteredQuery); + return request; + } + + static class Factory implements Processor.Factory { + private final NamedXContentRegistry namedXContentRegistry; + public static final ParseField QUERY_FIELD = new ParseField("query"); + + Factory(NamedXContentRegistry namedXContentRegistry) { + this.namedXContentRegistry = namedXContentRegistry; + } + + @Override + public FilterQueryRequestProcessor create( + Map processorFactories, + String tag, + String description, + Map config + ) throws Exception { + try ( + XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).map(config); + InputStream stream = BytesReference.bytes(builder).streamInput(); + XContentParser parser = XContentType.JSON.xContent() + .createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, stream) + ) { + XContentParser.Token token = parser.nextToken(); + assert token == XContentParser.Token.START_OBJECT; + String currentFieldName = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.START_OBJECT) { + if (QUERY_FIELD.match(currentFieldName, parser.getDeprecationHandler())) { + return new FilterQueryRequestProcessor(tag, description, parseInnerQueryBuilder(parser)); + } + } + } + } + throw new IllegalArgumentException( + "Did not specify the " + QUERY_FIELD.getPreferredName() + " property in processor of type " + TYPE + ); + } + } +} diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java new file mode 100644 index 0000000000000..fc8f68cf3db70 --- /dev/null +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline.common; + +import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.SearchPipelinesPlugin; +import org.opensearch.search.pipeline.Processor; + +import java.util.Map; + +/** + * Plugin providing common search request/response processors for use in search pipelines. + */ +public class SearchPipelineCommonModulePlugin extends Plugin implements SearchPipelinesPlugin { + + /** + * No constructor needed, but build complains if we don't have a constructor with JavaDoc. + */ + public SearchPipelineCommonModulePlugin() {} + + @Override + public Map getProcessors(Processor.Parameters parameters) { + return Map.of(FilterQueryRequestProcessor.TYPE, new FilterQueryRequestProcessor.Factory(parameters.namedXContentRegistry)); + } +} diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/package-info.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/package-info.java new file mode 100644 index 0000000000000..9b065b4008021 --- /dev/null +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * A collection of commonly-useful request and response processors for use in search pipelines. + */ +package org.opensearch.search.pipeline.common; diff --git a/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessorTests.java b/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessorTests.java new file mode 100644 index 0000000000000..1f355ac97c801 --- /dev/null +++ b/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessorTests.java @@ -0,0 +1,51 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline.common; + +import org.opensearch.action.search.SearchRequest; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.test.AbstractBuilderTestCase; + +import java.util.Collections; +import java.util.Map; + +public class FilterQueryRequestProcessorTests extends AbstractBuilderTestCase { + + public void testFilterQuery() throws Exception { + QueryBuilder filterQuery = new TermQueryBuilder("field", "value"); + FilterQueryRequestProcessor filterQueryRequestProcessor = new FilterQueryRequestProcessor(null, null, filterQuery); + QueryBuilder incomingQuery = new TermQueryBuilder("text", "foo"); + SearchSourceBuilder source = new SearchSourceBuilder().query(incomingQuery); + SearchRequest request = new SearchRequest().source(source); + SearchRequest transformedRequest = filterQueryRequestProcessor.processRequest(request); + assertEquals(new BoolQueryBuilder().must(incomingQuery).filter(filterQuery), transformedRequest.source().query()); + + // Test missing incoming query + request = new SearchRequest(); + transformedRequest = filterQueryRequestProcessor.processRequest(request); + assertEquals(new BoolQueryBuilder().filter(filterQuery), transformedRequest.source().query()); + } + + public void testFactory() throws Exception { + FilterQueryRequestProcessor.Factory factory = new FilterQueryRequestProcessor.Factory(this.xContentRegistry()); + FilterQueryRequestProcessor processor = factory.create( + Collections.emptyMap(), + null, + null, + Map.of("query", Map.of("term", Map.of("field", "value"))) + ); + assertEquals(new TermQueryBuilder("field", "value"), processor.filterQuery); + + // Missing "query" parameter: + expectThrows(IllegalArgumentException.class, () -> factory.create(Collections.emptyMap(), null, null, Collections.emptyMap())); + } +} diff --git a/modules/search-pipeline-common/src/yamlRestTest/java/org/opensearch/search/pipeline/common/SearchPipelineCommonYamlTestSuiteIT.java b/modules/search-pipeline-common/src/yamlRestTest/java/org/opensearch/search/pipeline/common/SearchPipelineCommonYamlTestSuiteIT.java new file mode 100644 index 0000000000000..cd1cbbc995f8b --- /dev/null +++ b/modules/search-pipeline-common/src/yamlRestTest/java/org/opensearch/search/pipeline/common/SearchPipelineCommonYamlTestSuiteIT.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline.common; + +import com.carrotsearch.randomizedtesting.annotations.Name; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import org.opensearch.test.rest.yaml.ClientYamlTestCandidate; +import org.opensearch.test.rest.yaml.OpenSearchClientYamlSuiteTestCase; + +public class SearchPipelineCommonYamlTestSuiteIT extends OpenSearchClientYamlSuiteTestCase { + public SearchPipelineCommonYamlTestSuiteIT(@Name("yaml") ClientYamlTestCandidate testCandidate) { + super(testCandidate); + } + + @ParametersFactory + public static Iterable parameters() throws Exception { + return OpenSearchClientYamlSuiteTestCase.createParameters(); + } +} diff --git a/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/10_basic.yml b/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/10_basic.yml new file mode 100644 index 0000000000000..473d92aa18052 --- /dev/null +++ b/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/10_basic.yml @@ -0,0 +1,15 @@ +"Search pipeline common installed": + - skip: + reason: "contains is a newly added assertion" + features: contains + - do: + cluster.state: {} + + # Get cluster-manager node id + - set: { cluster_manager_node: cluster_manager } + + - do: + nodes.info: {} + + - contains: { nodes.$cluster_manager.modules: { name: search-pipeline-common } } + - contains: { nodes.$cluster_manager.search_pipelines.processors: { type: filter_query } } diff --git a/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/20_crud.yml b/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/20_crud.yml new file mode 100644 index 0000000000000..cffb1c326462c --- /dev/null +++ b/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/20_crud.yml @@ -0,0 +1,47 @@ +--- +teardown: + - do: + search_pipelines.delete_pipeline: + id: "my_pipeline" + ignore: 404 + +--- +"Test basic pipeline crud": + - do: + search_pipelines.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "request_processors": [ + { + "filter_query" : { + "query" : { + "term" : { + "field" : "value" + } + } + } + } + ] + } + - match: { acknowledged: true } + + - do: + search_pipelines.get_pipeline: + id: "my_pipeline" + - match: { my_pipeline.description: "_description" } + + - do: + search_pipelines.get_pipeline: {} + - match: { my_pipeline.description: "_description" } + + - do: + search_pipelines.delete_pipeline: + id: "my_pipeline" + - match: { acknowledged: true } + + - do: + catch: missing + search_pipelines.get_pipeline: + id: "my_pipeline" diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/search_pipelines.delete_pipeline.json b/rest-api-spec/src/main/resources/rest-api-spec/api/search_pipelines.delete_pipeline.json new file mode 100644 index 0000000000000..475d83807b1c0 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/search_pipelines.delete_pipeline.json @@ -0,0 +1,35 @@ +{ + "search_pipelines.delete_pipeline": { + "documentation": { + "description": "Deletes a search pipeline.", + "url": "https://opensearch.org/docs/latest/opensearch/rest-api/search_pipelines/" + }, + "stability": "stable", + "url": { + "paths": [ + { + "path": "/_search/pipeline/{id}", + "methods": [ + "DELETE" + ], + "parts": { + "id": { + "type": "string", + "description": "Pipeline ID" + } + } + } + ] + }, + "params": { + "cluster_manager_timeout":{ + "type":"time", + "description":"Explicit operation timeout for connection to cluster-manager node" + }, + "timeout":{ + "type":"time", + "description":"Explicit operation timeout" + } + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/search_pipelines.get_pipeline.json b/rest-api-spec/src/main/resources/rest-api-spec/api/search_pipelines.get_pipeline.json new file mode 100644 index 0000000000000..93f9cb76f9da1 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/search_pipelines.get_pipeline.json @@ -0,0 +1,37 @@ +{ + "search_pipelines.get_pipeline": { + "documentation": { + "description": "Returns a search pipeline", + "url": "https://opensearch.org/docs/latest/opensearch/rest-api/search_pipelines/" + }, + "stability": "stable", + "url": { + "paths": [ + { + "path": "/_search/pipeline", + "methods": [ + "GET" + ] + }, + { + "path": "/_search/pipeline/{id}", + "methods": [ + "GET" + ], + "parts": { + "id": { + "type": "string", + "description": "Comma-separated list of search pipeline ids. Wildcards supported." + } + } + } + ] + }, + "params": { + "cluster_manager_timeout":{ + "type":"time", + "description":"Explicit operation timeout for connection to cluster-manager node" + } + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/search_pipelines.put_pipeline.json b/rest-api-spec/src/main/resources/rest-api-spec/api/search_pipelines.put_pipeline.json new file mode 100644 index 0000000000000..8bddc5bbd61af --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/search_pipelines.put_pipeline.json @@ -0,0 +1,39 @@ +{ + "search_pipelines.put_pipeline": { + "documentation": { + "description": "Creates or updates a search pipeline.", + "url": "https://opensearch.org/docs/latest/opensearch/rest-api/search_pipelines/" + }, + "stability": "stable", + "url": { + "paths": [ + { + "path": "/_search/pipeline/{id}", + "methods": [ + "PUT" + ], + "parts": { + "id": { + "type": "string", + "description": "Pipeline ID" + } + } + } + ] + }, + "params": { + "cluster_manager_timeout": { + "type": "time", + "description": "Explicit operation timeout for connection to cluster-manager node" + }, + "timeout": { + "type": "time", + "description": "Explicit operation timeout" + } + }, + "body": { + "description": "The search pipeline definition", + "required": true + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search_pipelines/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search_pipelines/10_basic.yml new file mode 100644 index 0000000000000..ac5a309bd14eb --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search_pipelines/10_basic.yml @@ -0,0 +1,154 @@ +--- +"Test basic pipeline crud": + - do: + search_pipelines.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "request_processors": [ + ] + } + - match: { acknowledged: true } + + - do: + search_pipelines.get_pipeline: + id: "my_pipeline" + - match: { my_pipeline.description: "_description" } + + - do: + search_pipelines.delete_pipeline: + id: "my_pipeline" + - match: { acknowledged: true } + + - do: + catch: missing + search_pipelines.get_pipeline: + id: "my_pipeline" + +--- +"Test Put Versioned Pipeline": + - do: + search_pipelines.put_pipeline: + id: "my_pipeline" + body: > + { + "version": 10, + "request_processors": [ ] + } + - match: { acknowledged: true } + + - do: + search_pipelines.get_pipeline: + id: "my_pipeline" + - match: { my_pipeline.version: 10 } + + # Lower version + - do: + search_pipelines.put_pipeline: + id: "my_pipeline" + body: > + { + "version": 9, + "request_processors": [ ] + } + - match: { acknowledged: true } + + - do: + search_pipelines.get_pipeline: + id: "my_pipeline" + - match: { my_pipeline.version: 9 } + + # Higher version + - do: + search_pipelines.put_pipeline: + id: "my_pipeline" + body: > + { + "version": 6789, + "request_processors": [ ] + } + - match: { acknowledged: true } + + - do: + search_pipelines.get_pipeline: + id: "my_pipeline" + - match: { my_pipeline.version: 6789 } + + # No version + - do: + search_pipelines.put_pipeline: + id: "my_pipeline" + body: > + { + "request_processors": [ ] + } + - match: { acknowledged: true } + + - do: + search_pipelines.get_pipeline: + id: "my_pipeline" + - is_false: my_pipeline.version + + # Coming back with a version + - do: + search_pipelines.put_pipeline: + id: "my_pipeline" + body: > + { + "version": 5385, + "request_processors": [ ] + } + - match: { acknowledged: true } + + - do: + search_pipelines.get_pipeline: + id: "my_pipeline" + - match: { my_pipeline.version: 5385 } + + # Able to delete the versioned pipeline + - do: + search_pipelines.delete_pipeline: + id: "my_pipeline" + - match: { acknowledged: true } + + - do: + catch: missing + search_pipelines.get_pipeline: + id: "my_pipeline" +--- +"Test Get All Pipelines": + - do: + search_pipelines.put_pipeline: + id: "first_pipeline" + body: > + { + "description": "first", + "request_processors": [] + } + - do: + search_pipelines.put_pipeline: + id: "second_pipeline" + body: > + { + "description": "second", + "request_processors": [] + } + + - do: + search_pipelines.get_pipeline: {} + - match: { first_pipeline.description: "first" } + - match: { second_pipeline.description: "second" } + +--- +"Test invalid config": + - do: + catch: /parse_exception/ + search_pipelines.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "request_processors": [], + "invalid_field" : {} + } diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 2cb11a0586c98..7d9e147485058 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -252,8 +252,14 @@ import org.opensearch.action.search.ClearScrollAction; import org.opensearch.action.search.CreatePitAction; import org.opensearch.action.search.DeletePitAction; +import org.opensearch.action.search.DeleteSearchPipelineAction; +import org.opensearch.action.search.DeleteSearchPipelineTransportAction; +import org.opensearch.action.search.GetSearchPipelineAction; +import org.opensearch.action.search.GetSearchPipelineTransportAction; import org.opensearch.action.search.MultiSearchAction; import org.opensearch.action.search.GetAllPitsAction; +import org.opensearch.action.search.PutSearchPipelineAction; +import org.opensearch.action.search.PutSearchPipelineTransportAction; import org.opensearch.action.search.SearchAction; import org.opensearch.action.search.SearchScrollAction; import org.opensearch.action.search.TransportClearScrollAction; @@ -434,9 +440,12 @@ import org.opensearch.rest.action.search.RestCountAction; import org.opensearch.rest.action.search.RestCreatePitAction; import org.opensearch.rest.action.search.RestDeletePitAction; +import org.opensearch.rest.action.search.RestDeleteSearchPipelineAction; import org.opensearch.rest.action.search.RestExplainAction; import org.opensearch.rest.action.search.RestGetAllPitsAction; +import org.opensearch.rest.action.search.RestGetSearchPipelineAction; import org.opensearch.rest.action.search.RestMultiSearchAction; +import org.opensearch.rest.action.search.RestPutSearchPipelineAction; import org.opensearch.rest.action.search.RestSearchAction; import org.opensearch.rest.action.search.RestSearchScrollAction; import org.opensearch.tasks.Task; @@ -719,6 +728,11 @@ public void reg actions.register(GetDecommissionStateAction.INSTANCE, TransportGetDecommissionStateAction.class); actions.register(DeleteDecommissionStateAction.INSTANCE, TransportDeleteDecommissionStateAction.class); + // Search Pipelines + actions.register(PutSearchPipelineAction.INSTANCE, PutSearchPipelineTransportAction.class); + actions.register(GetSearchPipelineAction.INSTANCE, GetSearchPipelineTransportAction.class); + actions.register(DeleteSearchPipelineAction.INSTANCE, DeleteSearchPipelineTransportAction.class); + return unmodifiableMap(actions.getRegistry()); } @@ -903,6 +917,11 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestPitSegmentsAction(nodesInCluster)); registerHandler.accept(new RestDeleteDecommissionStateAction()); + // Search pipelines API + registerHandler.accept(new RestPutSearchPipelineAction()); + registerHandler.accept(new RestGetSearchPipelineAction()); + registerHandler.accept(new RestDeleteSearchPipelineAction()); + for (ActionPlugin plugin : actionPlugins) { for (RestHandler handler : plugin.getRestHandlers( settings, diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/info/NodeInfo.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/info/NodeInfo.java index fc54ecf795a1d..c939d80e9b1d8 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/info/NodeInfo.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/info/NodeInfo.java @@ -48,6 +48,7 @@ import org.opensearch.monitor.process.ProcessInfo; import org.opensearch.node.ReportingService; import org.opensearch.search.aggregations.support.AggregationInfo; +import org.opensearch.search.pipeline.SearchPipelinesInfo; import org.opensearch.threadpool.ThreadPoolInfo; import org.opensearch.transport.TransportInfo; @@ -99,6 +100,7 @@ public NodeInfo(StreamInput in) throws IOException { addInfoIfNonNull(PluginsAndModules.class, in.readOptionalWriteable(PluginsAndModules::new)); addInfoIfNonNull(IngestInfo.class, in.readOptionalWriteable(IngestInfo::new)); addInfoIfNonNull(AggregationInfo.class, in.readOptionalWriteable(AggregationInfo::new)); + addInfoIfNonNull(SearchPipelinesInfo.class, in.readOptionalWriteable(SearchPipelinesInfo::new)); } public NodeInfo( @@ -115,7 +117,8 @@ public NodeInfo( @Nullable PluginsAndModules plugins, @Nullable IngestInfo ingest, @Nullable AggregationInfo aggsInfo, - @Nullable ByteSizeValue totalIndexingBuffer + @Nullable ByteSizeValue totalIndexingBuffer, + @Nullable SearchPipelinesInfo searchPipelinesInfo ) { super(node); this.version = version; @@ -130,6 +133,7 @@ public NodeInfo( addInfoIfNonNull(PluginsAndModules.class, plugins); addInfoIfNonNull(IngestInfo.class, ingest); addInfoIfNonNull(AggregationInfo.class, aggsInfo); + addInfoIfNonNull(SearchPipelinesInfo.class, searchPipelinesInfo); this.totalIndexingBuffer = totalIndexingBuffer; } @@ -218,5 +222,6 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(getInfo(PluginsAndModules.class)); out.writeOptionalWriteable(getInfo(IngestInfo.class)); out.writeOptionalWriteable(getInfo(AggregationInfo.class)); + out.writeOptionalWriteable(getInfo(SearchPipelinesInfo.class)); } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/info/NodesInfoRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/info/NodesInfoRequest.java index 77ffd98513698..a078199adb64a 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/info/NodesInfoRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/info/NodesInfoRequest.java @@ -168,7 +168,8 @@ public enum Metric { PLUGINS("plugins"), INGEST("ingest"), AGGREGATIONS("aggregations"), - INDICES("indices"); + INDICES("indices"), + SEARCH_PIPELINES("search_pipelines"); private String metricName; diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/info/NodesInfoResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/info/NodesInfoResponse.java index 475602e6b328f..93ecbac1bdcd9 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/info/NodesInfoResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/info/NodesInfoResponse.java @@ -49,6 +49,7 @@ import org.opensearch.monitor.os.OsInfo; import org.opensearch.monitor.process.ProcessInfo; import org.opensearch.search.aggregations.support.AggregationInfo; +import org.opensearch.search.pipeline.SearchPipelinesInfo; import org.opensearch.threadpool.ThreadPoolInfo; import org.opensearch.transport.TransportInfo; @@ -147,6 +148,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (nodeInfo.getInfo(AggregationInfo.class) != null) { nodeInfo.getInfo(AggregationInfo.class).toXContent(builder, params); } + if (nodeInfo.getInfo(SearchPipelinesInfo.class) != null) { + nodeInfo.getInfo(SearchPipelinesInfo.class).toXContent(builder, params); + } builder.endObject(); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/info/TransportNodesInfoAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/info/TransportNodesInfoAction.java index ee7b287b878e7..0bfe24f5b47df 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/info/TransportNodesInfoAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/info/TransportNodesInfoAction.java @@ -117,7 +117,8 @@ protected NodeInfo nodeOperation(NodeInfoRequest nodeRequest) { metrics.contains(NodesInfoRequest.Metric.PLUGINS.metricName()), metrics.contains(NodesInfoRequest.Metric.INGEST.metricName()), metrics.contains(NodesInfoRequest.Metric.AGGREGATIONS.metricName()), - metrics.contains(NodesInfoRequest.Metric.INDICES.metricName()) + metrics.contains(NodesInfoRequest.Metric.INDICES.metricName()), + metrics.contains(NodesInfoRequest.Metric.SEARCH_PIPELINES.metricName()) ); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java index 2fdaa46de01bc..26332f762bdf2 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -145,7 +145,7 @@ protected ClusterStatsNodeResponse newNodeResponse(StreamInput in) throws IOExce @Override protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeRequest) { - NodeInfo nodeInfo = nodeService.info(true, true, false, true, false, true, false, true, false, false, false); + NodeInfo nodeInfo = nodeService.info(true, true, false, true, false, true, false, true, false, false, false, false); NodeStats nodeStats = nodeService.stats( CommonStatsFlags.NONE, true, diff --git a/server/src/main/java/org/opensearch/action/ingest/GetPipelineResponse.java b/server/src/main/java/org/opensearch/action/ingest/GetPipelineResponse.java index b958c3ee4f0fb..6890252cec014 100644 --- a/server/src/main/java/org/opensearch/action/ingest/GetPipelineResponse.java +++ b/server/src/main/java/org/opensearch/action/ingest/GetPipelineResponse.java @@ -146,7 +146,12 @@ public boolean equals(Object other) { GetPipelineResponse otherResponse = (GetPipelineResponse) other; if (pipelines == null) { return otherResponse.pipelines == null; + } else if (otherResponse.pipelines == null) { + return false; } else { + if (otherResponse.pipelines.size() != pipelines.size()) { + return false; + } // We need a map here because order does not matter for equality Map otherPipelineMap = new HashMap<>(); for (PipelineConfiguration pipeline : otherResponse.pipelines) { diff --git a/server/src/main/java/org/opensearch/action/search/DeleteSearchPipelineAction.java b/server/src/main/java/org/opensearch/action/search/DeleteSearchPipelineAction.java new file mode 100644 index 0000000000000..65f8cf3de9506 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/DeleteSearchPipelineAction.java @@ -0,0 +1,26 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.ActionType; +import org.opensearch.action.support.master.AcknowledgedResponse; + +/** + * Action type to delete a search pipeline + * + * @opensearch.internal + */ +public class DeleteSearchPipelineAction extends ActionType { + public static final DeleteSearchPipelineAction INSTANCE = new DeleteSearchPipelineAction(); + public static final String NAME = "cluster:admin/search/pipeline/delete"; + + public DeleteSearchPipelineAction() { + super(NAME, AcknowledgedResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/DeleteSearchPipelineRequest.java b/server/src/main/java/org/opensearch/action/search/DeleteSearchPipelineRequest.java new file mode 100644 index 0000000000000..b6ba0bee87932 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/DeleteSearchPipelineRequest.java @@ -0,0 +1,56 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.support.master.AcknowledgedRequest; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Objects; + +/** + * Request to delete a search pipeline + * + * @opensearch.internal + */ +public class DeleteSearchPipelineRequest extends AcknowledgedRequest { + private String id; + + public DeleteSearchPipelineRequest(String id) { + this.id = Objects.requireNonNull(id); + } + + public DeleteSearchPipelineRequest() {} + + public DeleteSearchPipelineRequest(StreamInput in) throws IOException { + super(in); + id = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(id); + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } +} diff --git a/server/src/main/java/org/opensearch/action/search/DeleteSearchPipelineTransportAction.java b/server/src/main/java/org/opensearch/action/search/DeleteSearchPipelineTransportAction.java new file mode 100644 index 0000000000000..7af687663833f --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/DeleteSearchPipelineTransportAction.java @@ -0,0 +1,80 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.search.pipeline.SearchPipelineService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; + +/** + * Perform the action of deleting a search pipeline + * + * @opensearch.internal + */ +public class DeleteSearchPipelineTransportAction extends TransportClusterManagerNodeAction< + DeleteSearchPipelineRequest, + AcknowledgedResponse> { + private final SearchPipelineService searchPipelineService; + + @Inject + public DeleteSearchPipelineTransportAction( + ThreadPool threadPool, + SearchPipelineService searchPipelineService, + TransportService transportService, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver + ) { + super( + DeleteSearchPipelineAction.NAME, + transportService, + searchPipelineService.getClusterService(), + threadPool, + actionFilters, + DeleteSearchPipelineRequest::new, + indexNameExpressionResolver + ); + this.searchPipelineService = searchPipelineService; + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected AcknowledgedResponse read(StreamInput in) throws IOException { + return new AcknowledgedResponse(in); + } + + @Override + protected void clusterManagerOperation( + DeleteSearchPipelineRequest request, + ClusterState state, + ActionListener listener + ) throws Exception { + searchPipelineService.deletePipeline(request, listener); + } + + @Override + protected ClusterBlockException checkBlock(DeleteSearchPipelineRequest request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/GetSearchPipelineAction.java b/server/src/main/java/org/opensearch/action/search/GetSearchPipelineAction.java new file mode 100644 index 0000000000000..b80ffa781b0ec --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/GetSearchPipelineAction.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.ActionType; + +/** + * Action type to get search pipelines + * + * @opensearch.internal + */ +public class GetSearchPipelineAction extends ActionType { + public static final GetSearchPipelineAction INSTANCE = new GetSearchPipelineAction(); + public static final String NAME = "cluster:admin/search/pipeline/get"; + + public GetSearchPipelineAction() { + super(NAME, GetSearchPipelineResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/GetSearchPipelineRequest.java b/server/src/main/java/org/opensearch/action/search/GetSearchPipelineRequest.java new file mode 100644 index 0000000000000..4a1d3eaba87a0 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/GetSearchPipelineRequest.java @@ -0,0 +1,55 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.support.clustermanager.ClusterManagerNodeReadRequest; +import org.opensearch.common.Strings; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Objects; + +/** + * Request to get search pipelines + * + * @opensearch.internal + */ +public class GetSearchPipelineRequest extends ClusterManagerNodeReadRequest { + private final String[] ids; + + public GetSearchPipelineRequest(String... ids) { + this.ids = Objects.requireNonNull(ids); + } + + public GetSearchPipelineRequest() { + ids = Strings.EMPTY_ARRAY; + } + + public GetSearchPipelineRequest(StreamInput in) throws IOException { + super(in); + ids = in.readStringArray(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(ids); + } + + public String[] getIds() { + return ids; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } +} diff --git a/server/src/main/java/org/opensearch/action/search/GetSearchPipelineResponse.java b/server/src/main/java/org/opensearch/action/search/GetSearchPipelineResponse.java new file mode 100644 index 0000000000000..4fd86febcb0ae --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/GetSearchPipelineResponse.java @@ -0,0 +1,144 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.ActionResponse; +import org.opensearch.common.Strings; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.StatusToXContentObject; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.rest.RestStatus; +import org.opensearch.search.pipeline.PipelineConfiguration; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; + +/** + * transport response for getting a search pipeline + * + * @opensearch.internal + */ +public class GetSearchPipelineResponse extends ActionResponse implements StatusToXContentObject { + + private final List pipelines; + + public GetSearchPipelineResponse(StreamInput in) throws IOException { + super(in); + int size = in.readVInt(); + pipelines = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + pipelines.add(PipelineConfiguration.readFrom(in)); + } + + } + + public GetSearchPipelineResponse(List pipelines) { + this.pipelines = pipelines; + } + + public List pipelines() { + return Collections.unmodifiableList(pipelines); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + for (PipelineConfiguration pipeline : pipelines) { + builder.field(pipeline.getId(), pipeline.getConfigAsMap()); + } + builder.endObject(); + return builder; + } + + /** + * + * @param parser the parser for the XContent that contains the serialized GetPipelineResponse. + * @return an instance of GetPipelineResponse read from the parser + * @throws IOException If the parsing fails + */ + public static GetSearchPipelineResponse fromXContent(XContentParser parser) throws IOException { + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + List pipelines = new ArrayList<>(); + while (parser.nextToken().equals(XContentParser.Token.FIELD_NAME)) { + String pipelineId = parser.currentName(); + parser.nextToken(); + try (XContentBuilder contentBuilder = XContentBuilder.builder(parser.contentType().xContent())) { + contentBuilder.generator().copyCurrentStructure(parser); + PipelineConfiguration pipeline = new PipelineConfiguration( + pipelineId, + BytesReference.bytes(contentBuilder), + contentBuilder.contentType() + ); + pipelines.add(pipeline); + } + } + ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.currentToken(), parser); + return new GetSearchPipelineResponse(pipelines); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(pipelines.size()); + for (PipelineConfiguration pipeline : pipelines) { + pipeline.writeTo(out); + } + } + + public boolean isFound() { + return !pipelines.isEmpty(); + } + + @Override + public RestStatus status() { + return isFound() ? RestStatus.OK : RestStatus.NOT_FOUND; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + GetSearchPipelineResponse otherResponse = (GetSearchPipelineResponse) o; + if (pipelines == null) { + return otherResponse.pipelines == null; + } else if (otherResponse.pipelines == null) { + return false; + } + // Convert to a map to ignore order; + return toMap(pipelines).equals(toMap(otherResponse.pipelines)); + } + + private static Map toMap(List pipelines) { + return pipelines.stream().collect(Collectors.toMap(PipelineConfiguration::getId, p -> p)); + } + + @Override + public String toString() { + return Strings.toString(XContentType.JSON, this); + } + + @Override + public int hashCode() { + int result = 1; + for (PipelineConfiguration pipeline : pipelines) { + // We only take the sum here to ensure that the order does not matter. + result += (pipeline == null ? 0 : pipeline.hashCode()); + } + return result; + } +} diff --git a/server/src/main/java/org/opensearch/action/search/GetSearchPipelineTransportAction.java b/server/src/main/java/org/opensearch/action/search/GetSearchPipelineTransportAction.java new file mode 100644 index 0000000000000..690990a0c4151 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/GetSearchPipelineTransportAction.java @@ -0,0 +1,78 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeReadAction; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.search.pipeline.SearchPipelineService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; + +/** + * Perform the action of getting a search pipeline + * + * @opensearch.internal + */ +public class GetSearchPipelineTransportAction extends TransportClusterManagerNodeReadAction< + GetSearchPipelineRequest, + GetSearchPipelineResponse> { + + @Inject + public GetSearchPipelineTransportAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver + ) { + super( + GetSearchPipelineAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + GetSearchPipelineRequest::new, + indexNameExpressionResolver + ); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected GetSearchPipelineResponse read(StreamInput in) throws IOException { + return new GetSearchPipelineResponse(in); + } + + @Override + protected void clusterManagerOperation( + GetSearchPipelineRequest request, + ClusterState state, + ActionListener listener + ) throws Exception { + listener.onResponse(new GetSearchPipelineResponse(SearchPipelineService.getPipelines(state, request.getIds()))); + } + + @Override + protected ClusterBlockException checkBlock(GetSearchPipelineRequest request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/PutSearchPipelineAction.java b/server/src/main/java/org/opensearch/action/search/PutSearchPipelineAction.java new file mode 100644 index 0000000000000..798c8211ee505 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/PutSearchPipelineAction.java @@ -0,0 +1,26 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.ActionType; +import org.opensearch.action.support.master.AcknowledgedResponse; + +/** + * Action type to put a new search pipeline + * + * @opensearch.internal + */ +public class PutSearchPipelineAction extends ActionType { + public static final PutSearchPipelineAction INSTANCE = new PutSearchPipelineAction(); + public static final String NAME = "cluster:admin/search/pipeline/put"; + + public PutSearchPipelineAction() { + super(NAME, AcknowledgedResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/PutSearchPipelineRequest.java b/server/src/main/java/org/opensearch/action/search/PutSearchPipelineRequest.java new file mode 100644 index 0000000000000..c8365efbbd665 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/PutSearchPipelineRequest.java @@ -0,0 +1,87 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.support.master.AcknowledgedRequest; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.xcontent.MediaType; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +/** + * Request to put a search pipeline + * + * @opensearch.internal + */ +public class PutSearchPipelineRequest extends AcknowledgedRequest implements ToXContentObject { + private String id; + private BytesReference source; + private XContentType xContentType; + + public PutSearchPipelineRequest(String id, BytesReference source, MediaType mediaType) { + this.id = Objects.requireNonNull(id); + this.source = Objects.requireNonNull(source); + if (mediaType instanceof XContentType == false) { + throw new IllegalArgumentException( + PutSearchPipelineRequest.class.getSimpleName() + " found unsupported media type [" + mediaType.getClass().getName() + "]" + ); + } + this.xContentType = XContentType.fromMediaType(Objects.requireNonNull(xContentType)); + } + + public PutSearchPipelineRequest(StreamInput in) throws IOException { + super(in); + id = in.readString(); + source = in.readBytesReference(); + xContentType = in.readEnum(XContentType.class); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public String getId() { + return id; + } + + public BytesReference getSource() { + return source; + } + + public XContentType getXContentType() { + return xContentType; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(id); + out.writeBytesReference(source); + out.writeEnum(xContentType); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + if (source != null) { + builder.rawValue(source.streamInput(), xContentType); + } else { + builder.startObject().endObject(); + } + return builder; + } + +} diff --git a/server/src/main/java/org/opensearch/action/search/PutSearchPipelineTransportAction.java b/server/src/main/java/org/opensearch/action/search/PutSearchPipelineTransportAction.java new file mode 100644 index 0000000000000..342a1f4cb26c6 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/PutSearchPipelineTransportAction.java @@ -0,0 +1,99 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.cluster.node.info.NodeInfo; +import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.client.OriginSettingClient; +import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.search.pipeline.SearchPipelineService; +import org.opensearch.search.pipeline.SearchPipelinesInfo; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.opensearch.search.pipeline.SearchPipelineService.SEARCH_PIPELINE_ORIGIN; + +/** + * Perform the action of putting a search pipeline + * + * @opensearch.internal + */ +public class PutSearchPipelineTransportAction extends TransportClusterManagerNodeAction { + + private final SearchPipelineService searchPipelineService; + private final OriginSettingClient client; + + @Inject + public PutSearchPipelineTransportAction( + ThreadPool threadPool, + TransportService transportService, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + SearchPipelineService searchPipelineService, + NodeClient client + ) { + super( + PutSearchPipelineAction.NAME, + transportService, + searchPipelineService.getClusterService(), + threadPool, + actionFilters, + PutSearchPipelineRequest::new, + indexNameExpressionResolver + ); + this.client = new OriginSettingClient(client, SEARCH_PIPELINE_ORIGIN); + this.searchPipelineService = searchPipelineService; + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected AcknowledgedResponse read(StreamInput in) throws IOException { + return new AcknowledgedResponse(in); + } + + @Override + protected void clusterManagerOperation( + PutSearchPipelineRequest request, + ClusterState state, + ActionListener listener + ) throws Exception { + NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); + client.admin().cluster().nodesInfo(nodesInfoRequest, ActionListener.wrap(nodeInfos -> { + Map searchPipelinesInfos = new HashMap<>(); + for (NodeInfo nodeInfo : nodeInfos.getNodes()) { + searchPipelinesInfos.put(nodeInfo.getNode(), nodeInfo.getInfo(SearchPipelinesInfo.class)); + } + searchPipelineService.putPipeline(searchPipelinesInfos, request, listener); + }, listener::onFailure)); + } + + @Override + protected ClusterBlockException checkBlock(PutSearchPipelineRequest request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequest.java b/server/src/main/java/org/opensearch/action/search/SearchRequest.java index 21448bf9d7e94..40a59d81d6d69 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequest.java @@ -115,6 +115,8 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla private TimeValue cancelAfterTimeInterval; + private String pipeline; + public SearchRequest() { this.localClusterAlias = null; this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS; @@ -248,6 +250,7 @@ public SearchRequest(StreamInput in) throws IOException { } ccsMinimizeRoundtrips = in.readBoolean(); cancelAfterTimeInterval = in.readOptionalTimeValue(); + pipeline = in.readOptionalString(); } @Override @@ -276,6 +279,7 @@ public void writeTo(StreamOutput out) throws IOException { } out.writeBoolean(ccsMinimizeRoundtrips); out.writeOptionalTimeValue(cancelAfterTimeInterval); + out.writeOptionalString(pipeline); } @Override @@ -654,6 +658,15 @@ public TimeValue getCancelAfterTimeInterval() { return cancelAfterTimeInterval; } + public SearchRequest pipeline(String pipeline) { + this.pipeline = pipeline; + return this; + } + + public String pipeline() { + return pipeline; + } + @Override public SearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { return new SearchTask(id, type, action, this::buildDescription, parentTaskId, headers, cancelAfterTimeInterval); @@ -700,7 +713,8 @@ public boolean equals(Object o) { && Objects.equals(localClusterAlias, that.localClusterAlias) && absoluteStartMillis == that.absoluteStartMillis && ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips - && Objects.equals(cancelAfterTimeInterval, that.cancelAfterTimeInterval); + && Objects.equals(cancelAfterTimeInterval, that.cancelAfterTimeInterval) + && Objects.equals(pipeline, that.pipeline); } @Override @@ -762,6 +776,8 @@ public String toString() { + source + ", cancelAfterTimeInterval=" + cancelAfterTimeInterval + + ", pipeline=" + + pipeline + "}"; } } diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 1ca477942cdf6..221022fcfea80 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -80,6 +80,7 @@ import org.opensearch.search.internal.AliasFilter; import org.opensearch.search.internal.InternalSearchResponse; import org.opensearch.search.internal.SearchContext; +import org.opensearch.search.pipeline.SearchPipelineService; import org.opensearch.search.profile.ProfileShardResult; import org.opensearch.search.profile.SearchProfileShardResults; import org.opensearch.tasks.CancellableTask; @@ -153,6 +154,7 @@ public class TransportSearchAction extends HandledTransportAction) SearchRequest::new); this.client = client; @@ -180,6 +183,7 @@ public TransportSearchAction( this.searchService = searchService; this.indexNameExpressionResolver = indexNameExpressionResolver; this.namedWriteableRegistry = namedWriteableRegistry; + this.searchPipelineService = searchPipelineService; } private Map buildPerIndexAliasFilter( @@ -375,16 +379,29 @@ boolean buildPointInTimeFromSearchResults() { private void executeRequest( Task task, - SearchRequest searchRequest, + SearchRequest originalSearchRequest, SearchAsyncActionProvider searchAsyncActionProvider, - ActionListener listener + ActionListener originalListener ) { final long relativeStartNanos = System.nanoTime(); final SearchTimeProvider timeProvider = new SearchTimeProvider( - searchRequest.getOrCreateAbsoluteStartMillis(), + originalSearchRequest.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime ); + SearchRequest searchRequest; + try { + searchRequest = searchPipelineService.transformRequest(originalSearchRequest); + } catch (Exception e) { + originalListener.onFailure(e); + throw new RuntimeException(e); + } + ActionListener listener = ActionListener.wrap( + // TODO: Should we transform responses with the original request or the transformed request? Or both? + r -> originalListener.onResponse(searchPipelineService.transformResponse(originalSearchRequest, r)), + originalListener::onFailure + ); + ActionListener rewriteListener = ActionListener.wrap(source -> { if (source != searchRequest.source()) { // only set it if it changed - we don't allow null values to be set but it might be already null. this way we catch diff --git a/server/src/main/java/org/opensearch/client/ClusterAdminClient.java b/server/src/main/java/org/opensearch/client/ClusterAdminClient.java index 4ab438ec064f1..39aa1ed80d0d7 100644 --- a/server/src/main/java/org/opensearch/client/ClusterAdminClient.java +++ b/server/src/main/java/org/opensearch/client/ClusterAdminClient.java @@ -150,6 +150,10 @@ import org.opensearch.action.ingest.SimulatePipelineRequest; import org.opensearch.action.ingest.SimulatePipelineRequestBuilder; import org.opensearch.action.ingest.SimulatePipelineResponse; +import org.opensearch.action.search.DeleteSearchPipelineRequest; +import org.opensearch.action.search.GetSearchPipelineRequest; +import org.opensearch.action.search.GetSearchPipelineResponse; +import org.opensearch.action.search.PutSearchPipelineRequest; import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.xcontent.XContentType; @@ -899,4 +903,34 @@ public interface ClusterAdminClient extends OpenSearchClient { * Deletes the decommission metadata. */ DeleteDecommissionStateRequestBuilder prepareDeleteDecommissionRequest(); + + /** + * Stores a search pipeline + */ + void putSearchPipeline(PutSearchPipelineRequest request, ActionListener listener); + + /** + * Stores a search pipeline + */ + ActionFuture putSearchPipeline(PutSearchPipelineRequest request); + + /** + * Returns a stored search pipeline + */ + void getSearchPipeline(GetSearchPipelineRequest request, ActionListener listener); + + /** + * Returns a stored search pipeline + */ + ActionFuture getSearchPipeline(GetSearchPipelineRequest request); + + /** + * Deletes a stored search pipeline + */ + void deleteSearchPipeline(DeleteSearchPipelineRequest request, ActionListener listener); + + /** + * Deletes a stored search pipeline + */ + ActionFuture deleteSearchPipeline(DeleteSearchPipelineRequest request); } diff --git a/server/src/main/java/org/opensearch/client/support/AbstractClient.java b/server/src/main/java/org/opensearch/client/support/AbstractClient.java index 261709394e531..6a38254a03e71 100644 --- a/server/src/main/java/org/opensearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/opensearch/client/support/AbstractClient.java @@ -363,13 +363,20 @@ import org.opensearch.action.search.DeletePitAction; import org.opensearch.action.search.DeletePitRequest; import org.opensearch.action.search.DeletePitResponse; +import org.opensearch.action.search.DeleteSearchPipelineAction; +import org.opensearch.action.search.DeleteSearchPipelineRequest; import org.opensearch.action.search.GetAllPitNodesRequest; import org.opensearch.action.search.GetAllPitNodesResponse; +import org.opensearch.action.search.GetSearchPipelineAction; +import org.opensearch.action.search.GetSearchPipelineRequest; +import org.opensearch.action.search.GetSearchPipelineResponse; import org.opensearch.action.search.MultiSearchAction; import org.opensearch.action.search.MultiSearchRequest; import org.opensearch.action.search.MultiSearchRequestBuilder; import org.opensearch.action.search.MultiSearchResponse; import org.opensearch.action.search.GetAllPitsAction; +import org.opensearch.action.search.PutSearchPipelineAction; +import org.opensearch.action.search.PutSearchPipelineRequest; import org.opensearch.action.search.SearchAction; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchRequestBuilder; @@ -1452,6 +1459,36 @@ public void deleteDecommissionState( public DeleteDecommissionStateRequestBuilder prepareDeleteDecommissionRequest() { return new DeleteDecommissionStateRequestBuilder(this, DeleteDecommissionStateAction.INSTANCE); } + + @Override + public void putSearchPipeline(PutSearchPipelineRequest request, ActionListener listener) { + execute(PutSearchPipelineAction.INSTANCE, request, listener); + } + + @Override + public ActionFuture putSearchPipeline(PutSearchPipelineRequest request) { + return execute(PutSearchPipelineAction.INSTANCE, request); + } + + @Override + public void getSearchPipeline(GetSearchPipelineRequest request, ActionListener listener) { + execute(GetSearchPipelineAction.INSTANCE, request, listener); + } + + @Override + public ActionFuture getSearchPipeline(GetSearchPipelineRequest request) { + return execute(GetSearchPipelineAction.INSTANCE, request); + } + + @Override + public void deleteSearchPipeline(DeleteSearchPipelineRequest request, ActionListener listener) { + execute(DeleteSearchPipelineAction.INSTANCE, request, listener); + } + + @Override + public ActionFuture deleteSearchPipeline(DeleteSearchPipelineRequest request) { + return execute(DeleteSearchPipelineAction.INSTANCE, request); + } } static class IndicesAdmin implements IndicesAdminClient { diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index a33bd7e8057ed..28961d24c111a 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -96,6 +96,7 @@ import org.opensearch.persistent.PersistentTasksNodeService; import org.opensearch.plugins.ClusterPlugin; import org.opensearch.script.ScriptMetadata; +import org.opensearch.search.pipeline.SearchPipelineMetadata; import org.opensearch.snapshots.SnapshotsInfoService; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskResultsService; @@ -173,6 +174,7 @@ public static List getNamedWriteables() { // Metadata registerMetadataCustom(entries, RepositoriesMetadata.TYPE, RepositoriesMetadata::new, RepositoriesMetadata::readDiffFrom); registerMetadataCustom(entries, IngestMetadata.TYPE, IngestMetadata::new, IngestMetadata::readDiffFrom); + registerMetadataCustom(entries, SearchPipelineMetadata.TYPE, SearchPipelineMetadata::new, SearchPipelineMetadata::readDiffFrom); registerMetadataCustom(entries, ScriptMetadata.TYPE, ScriptMetadata::new, ScriptMetadata::readDiffFrom); registerMetadataCustom(entries, IndexGraveyard.TYPE, IndexGraveyard::new, IndexGraveyard::readDiffFrom); registerMetadataCustom( @@ -250,6 +252,13 @@ public static List getNamedXWriteables() { entries.add( new NamedXContentRegistry.Entry(Metadata.Custom.class, new ParseField(IngestMetadata.TYPE), IngestMetadata::fromXContent) ); + entries.add( + new NamedXContentRegistry.Entry( + Metadata.Custom.class, + new ParseField(SearchPipelineMetadata.TYPE), + SearchPipelineMetadata::fromXContent + ) + ); entries.add( new NamedXContentRegistry.Entry(Metadata.Custom.class, new ParseField(ScriptMetadata.TYPE), ScriptMetadata::fromXContent) ); diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskKeys.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskKeys.java index 0743997c23c9a..c88bea56cb9bd 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskKeys.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskKeys.java @@ -32,6 +32,9 @@ public final class ClusterManagerTaskKeys { public static final String REMOVE_INDEX_TEMPLATE_V2_KEY = "remove-index-template-v2"; public static final String PUT_PIPELINE_KEY = "put-pipeline"; public static final String DELETE_PIPELINE_KEY = "delete-pipeline"; + + public static final String PUT_SEARCH_PIPELINE_KEY = "put-search-pipeline"; + public static final String DELETE_SEARCH_PIPELINE_KEY = "delete-search-pipeline"; public static final String CREATE_PERSISTENT_TASK_KEY = "create-persistent-task"; public static final String FINISH_PERSISTENT_TASK_KEY = "finish-persistent-task"; public static final String REMOVE_PERSISTENT_TASK_KEY = "remove-persistent-task"; diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 6b7d810e9f0d9..9e2df65d984c5 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -46,8 +46,10 @@ import org.opensearch.indices.replication.SegmentReplicationSourceService; import org.opensearch.extensions.ExtensionsManager; import org.opensearch.extensions.NoopExtensionsManager; +import org.opensearch.plugins.SearchPipelinesPlugin; import org.opensearch.search.backpressure.SearchBackpressureService; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; +import org.opensearch.search.pipeline.SearchPipelineService; import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.threadpool.RunnableTaskExecutionListener; import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; @@ -541,6 +543,7 @@ protected Node( pluginsService.filterPlugins(IngestPlugin.class), client ); + final SetOnce repositoriesServiceReference = new SetOnce<>(); final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client); final UsageService usageService = new UsageService(); @@ -950,6 +953,16 @@ protected Node( rerouteService, fsHealthService ); + final SearchPipelineService searchPipelineService = new SearchPipelineService( + clusterService, + threadPool, + this.environment, + scriptService, + analysisModule.getAnalysisRegistry(), + xContentRegistry, + pluginsService.filterPlugins(SearchPipelinesPlugin.class), + client + ); this.nodeService = new NodeService( settings, threadPool, @@ -969,7 +982,8 @@ protected Node( indexingPressureService, searchModule.getValuesSourceRegistry().getUsageService(), searchBackpressureService, - nodeEnvironment + nodeEnvironment, + searchPipelineService ); final SearchService searchService = newSearchService( @@ -1027,6 +1041,7 @@ protected Node( b.bind(ScriptService.class).toInstance(scriptService); b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry()); b.bind(IngestService.class).toInstance(ingestService); + b.bind(SearchPipelineService.class).toInstance(searchPipelineService); b.bind(IndexingPressureService.class).toInstance(indexingPressureService); b.bind(TaskResourceTrackingService.class).toInstance(taskResourceTrackingService); b.bind(SearchBackpressureService.class).toInstance(searchBackpressureService); diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index b4446085243df..767e257523819 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -56,6 +56,7 @@ import org.opensearch.script.ScriptService; import org.opensearch.search.aggregations.support.AggregationUsageService; import org.opensearch.search.backpressure.SearchBackpressureService; +import org.opensearch.search.pipeline.SearchPipelineService; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -85,6 +86,7 @@ public class NodeService implements Closeable { private final IndexingPressureService indexingPressureService; private final AggregationUsageService aggregationUsageService; private final SearchBackpressureService searchBackpressureService; + private final SearchPipelineService searchPipelineService; private final ClusterService clusterService; private final Discovery discovery; private final NodeEnvironment nodeEnvironment; @@ -108,7 +110,8 @@ public class NodeService implements Closeable { IndexingPressureService indexingPressureService, AggregationUsageService aggregationUsageService, SearchBackpressureService searchBackpressureService, - NodeEnvironment nodeEnvironment + NodeEnvironment nodeEnvironment, + SearchPipelineService searchPipelineService ) { this.settings = settings; this.threadPool = threadPool; @@ -127,9 +130,11 @@ public class NodeService implements Closeable { this.indexingPressureService = indexingPressureService; this.aggregationUsageService = aggregationUsageService; this.searchBackpressureService = searchBackpressureService; + this.searchPipelineService = searchPipelineService; this.clusterService = clusterService; this.nodeEnvironment = nodeEnvironment; clusterService.addStateApplier(ingestService); + clusterService.addStateApplier(searchPipelineService); } public NodeInfo info( @@ -143,7 +148,8 @@ public NodeInfo info( boolean plugin, boolean ingest, boolean aggs, - boolean indices + boolean indices, + boolean searchPipelines ) { return new NodeInfo( Version.CURRENT, @@ -159,7 +165,8 @@ public NodeInfo info( plugin ? (pluginService == null ? null : pluginService.info()) : null, ingest ? (ingestService == null ? null : ingestService.info()) : null, aggs ? (aggregationUsageService == null ? null : aggregationUsageService.info()) : null, - indices ? indicesService.getTotalIndexingBufferBytes() : null + indices ? indicesService.getTotalIndexingBufferBytes() : null, + searchPipelines ? (searchPipelineService == null ? null : searchPipelineService.info()) : null ); } diff --git a/server/src/main/java/org/opensearch/plugins/SearchPipelinesPlugin.java b/server/src/main/java/org/opensearch/plugins/SearchPipelinesPlugin.java new file mode 100644 index 0000000000000..1a43fd57cd72f --- /dev/null +++ b/server/src/main/java/org/opensearch/plugins/SearchPipelinesPlugin.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugins; + +import org.opensearch.search.pipeline.Processor; + +import java.util.Collections; +import java.util.Map; + +/** + * An extension point for {@link Plugin} implementation to add custom search pipeline processors. + * + * @opensearch.api + */ +public interface SearchPipelinesPlugin { + /** + * Returns additional search pipeline processor types added by this plugin. + * + * The key of the returned {@link Map} is the unique name for the processor which is specified + * in pipeline configurations, and the value is a {@link org.opensearch.search.pipeline.Processor.Factory} + * to create the processor from a given pipeline configuration. + */ + default Map getProcessors(Processor.Parameters parameters) { + return Collections.emptyMap(); + } +} diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestDeleteSearchPipelineAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestDeleteSearchPipelineAction.java new file mode 100644 index 0000000000000..cb89a5276fae1 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/search/RestDeleteSearchPipelineAction.java @@ -0,0 +1,45 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.search; + +import org.opensearch.action.search.DeleteSearchPipelineRequest; +import org.opensearch.client.node.NodeClient; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.List; + +import static org.opensearch.rest.RestRequest.Method.DELETE; + +/** + * REST action to delete a search pipeline + * + * @opensearch.internal + */ +public class RestDeleteSearchPipelineAction extends BaseRestHandler { + @Override + public String getName() { + return "search_delete_pipeline_action"; + } + + @Override + public List routes() { + return List.of(new Route(DELETE, "/_search/pipeline/{id}")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + DeleteSearchPipelineRequest request = new DeleteSearchPipelineRequest(restRequest.param("id")); + request.clusterManagerNodeTimeout(restRequest.paramAsTime("cluster_manager_timeout", request.clusterManagerNodeTimeout())); + request.timeout(restRequest.paramAsTime("timeout", request.timeout())); + return channel -> client.admin().cluster().deleteSearchPipeline(request, new RestToXContentListener<>(channel)); + } +} diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestGetSearchPipelineAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestGetSearchPipelineAction.java new file mode 100644 index 0000000000000..d5003de00ec35 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/search/RestGetSearchPipelineAction.java @@ -0,0 +1,45 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.search; + +import org.opensearch.action.search.GetSearchPipelineRequest; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.Strings; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestStatusToXContentListener; + +import java.io.IOException; +import java.util.List; + +import static org.opensearch.rest.RestRequest.Method.GET; + +/** + * REST action to retrieve search pipelines + * + * @opensearch.internal + */ +public class RestGetSearchPipelineAction extends BaseRestHandler { + @Override + public String getName() { + return "search_get_pipeline_action"; + } + + @Override + public List routes() { + return List.of(new Route(GET, "/_search/pipeline"), new Route(GET, "/_search/pipeline/{id}")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + GetSearchPipelineRequest request = new GetSearchPipelineRequest(Strings.splitStringByCommaToArray(restRequest.param("id"))); + request.clusterManagerNodeTimeout(restRequest.paramAsTime("cluster_manager_timeout", request.clusterManagerNodeTimeout())); + return channel -> client.admin().cluster().getSearchPipeline(request, new RestStatusToXContentListener<>(channel)); + } +} diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestPutSearchPipelineAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestPutSearchPipelineAction.java new file mode 100644 index 0000000000000..a2bb061e52c32 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/search/RestPutSearchPipelineAction.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.search; + +import org.opensearch.action.search.PutSearchPipelineRequest; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.List; + +import static org.opensearch.rest.RestRequest.Method.PUT; + +/** + * REST action to put a search pipeline + * + * @opensearch.internal + */ +public class RestPutSearchPipelineAction extends BaseRestHandler { + @Override + public String getName() { + return "search_put_pipeline_action"; + } + + @Override + public List routes() { + return List.of(new Route(PUT, "/_search/pipeline/{id}")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + Tuple sourceTuple = restRequest.contentOrSourceParam(); + PutSearchPipelineRequest request = new PutSearchPipelineRequest(restRequest.param("id"), sourceTuple.v2(), sourceTuple.v1()); + request.clusterManagerNodeTimeout(restRequest.paramAsTime("cluster_manager_timeout", request.clusterManagerNodeTimeout())); + request.timeout(restRequest.paramAsTime("timeout", request.timeout())); + return channel -> client.admin().cluster().putSearchPipeline(request, new RestToXContentListener<>(channel)); + } +} diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java index 1c6526701d91d..d624e1bf826b2 100644 --- a/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java +++ b/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java @@ -200,6 +200,7 @@ public static void parseSearchRequest( searchRequest.routing(request.param("routing")); searchRequest.preference(request.param("preference")); searchRequest.indicesOptions(IndicesOptions.fromRequest(request, searchRequest.indicesOptions())); + searchRequest.pipeline(request.param("search_pipeline")); checkRestTotalHits(request, searchRequest); diff --git a/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java new file mode 100644 index 0000000000000..1b052e4a7a673 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java @@ -0,0 +1,158 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline; + +import org.opensearch.OpenSearchParseException; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.common.Nullable; +import org.opensearch.ingest.ConfigurationUtils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static org.opensearch.ingest.ConfigurationUtils.TAG_KEY; +import static org.opensearch.ingest.Pipeline.DESCRIPTION_KEY; +import static org.opensearch.ingest.Pipeline.VERSION_KEY; + +/** + * Concrete representation of a search pipeline, holding multiple processors. + */ +public class Pipeline { + + public static final String REQUEST_PROCESSORS_KEY = "request_processors"; + public static final String RESPONSE_PROCESSORS_KEY = "response_processors"; + private final String id; + private final String description; + private final Integer version; + + // TODO: Refactor org.opensearch.ingest.CompoundProcessor to implement our generic Processor interface + // Then these can be CompoundProcessors instead of lists. + private final List searchRequestProcessors; + private final List searchResponseProcessors; + + Pipeline( + String id, + @Nullable String description, + @Nullable Integer version, + List requestProcessors, + List responseProcessors + ) { + this.id = id; + this.description = description; + this.version = version; + this.searchRequestProcessors = requestProcessors; + this.searchResponseProcessors = responseProcessors; + } + + public static Pipeline create(String id, Map config, Map processorFactories) + throws Exception { + String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY); + Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null); + List> requestProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, REQUEST_PROCESSORS_KEY); + List requestProcessors = readProcessors( + SearchRequestProcessor.class, + processorFactories, + requestProcessorConfigs + ); + List> responseProcessorConfigs = ConfigurationUtils.readOptionalList( + null, + null, + config, + RESPONSE_PROCESSORS_KEY + ); + List responseProcessors = readProcessors( + SearchResponseProcessor.class, + processorFactories, + responseProcessorConfigs + ); + if (config.isEmpty() == false) { + throw new OpenSearchParseException( + "pipeline [" + + id + + "] doesn't support one or more provided configuration parameters " + + Arrays.toString(config.keySet().toArray()) + ); + } + return new Pipeline(id, description, version, requestProcessors, responseProcessors); + } + + @SuppressWarnings("unchecked") // Cast is checked using isInstance + private static List readProcessors( + Class processorType, + Map processorFactories, + List> requestProcessorConfigs + ) throws Exception { + List processors = new ArrayList<>(); + if (requestProcessorConfigs == null) { + return processors; + } + for (Map processorConfigWithKey : requestProcessorConfigs) { + for (Map.Entry entry : processorConfigWithKey.entrySet()) { + String type = entry.getKey(); + if (!processorFactories.containsKey(type)) { + throw new IllegalArgumentException("Invalid processor type " + type); + } + Map config = (Map) entry.getValue(); + String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY); + String description = ConfigurationUtils.readOptionalStringProperty(null, tag, config, DESCRIPTION_KEY); + Processor processor = processorFactories.get(type).create(processorFactories, tag, description, config); + if (processorType.isInstance(processor)) { + processors.add((T) processor); + } else { + throw new IllegalArgumentException("Processor type " + type + " is not a " + processorType.getSimpleName()); + } + } + } + return processors; + } + + List flattenAllProcessors() { + List allProcessors = new ArrayList<>(searchRequestProcessors.size() + searchResponseProcessors.size()); + allProcessors.addAll(searchRequestProcessors); + allProcessors.addAll(searchResponseProcessors); + return allProcessors; + } + + String getId() { + return id; + } + + String getDescription() { + return description; + } + + Integer getVersion() { + return version; + } + + List getSearchRequestProcessors() { + return searchRequestProcessors; + } + + List getSearchResponseProcessors() { + return searchResponseProcessors; + } + + SearchRequest transformRequest(SearchRequest request) throws Exception { + for (SearchRequestProcessor searchRequestProcessor : searchRequestProcessors) { + request = searchRequestProcessor.processRequest(request); + } + return request; + } + + SearchResponse transformResponse(SearchRequest request, SearchResponse response) throws Exception { + for (SearchResponseProcessor responseProcessor : searchResponseProcessors) { + response = responseProcessor.processResponse(request, response); + } + return response; + } +} diff --git a/server/src/main/java/org/opensearch/search/pipeline/PipelineConfiguration.java b/server/src/main/java/org/opensearch/search/pipeline/PipelineConfiguration.java new file mode 100644 index 0000000000000..e2599fd78908c --- /dev/null +++ b/server/src/main/java/org/opensearch/search/pipeline/PipelineConfiguration.java @@ -0,0 +1,160 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline; + +import org.opensearch.cluster.AbstractDiffable; +import org.opensearch.cluster.Diff; +import org.opensearch.common.Strings; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.XContentHelper; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.ParseField; +import org.opensearch.core.xcontent.ContextParser; +import org.opensearch.core.xcontent.MediaType; +import org.opensearch.core.xcontent.ObjectParser; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +/** + * TODO: Copied verbatim from {@link org.opensearch.ingest.PipelineConfiguration}. + * + * See if we can refactor into a common class. I suspect not, just because this one will hold + */ +public class PipelineConfiguration extends AbstractDiffable implements ToXContentObject { + private static final ObjectParser PARSER = new ObjectParser<>( + "pipeline_config", + true, + PipelineConfiguration.Builder::new + ); + static { + PARSER.declareString(PipelineConfiguration.Builder::setId, new ParseField("id")); + PARSER.declareField((parser, builder, aVoid) -> { + XContentBuilder contentBuilder = XContentBuilder.builder(parser.contentType().xContent()); + contentBuilder.generator().copyCurrentStructure(parser); + builder.setConfig(BytesReference.bytes(contentBuilder), contentBuilder.contentType()); + }, new ParseField("config"), ObjectParser.ValueType.OBJECT); + + } + + public static ContextParser getParser() { + return (parser, context) -> PARSER.apply(parser, null).build(); + } + + private static class Builder { + + private String id; + private BytesReference config; + private XContentType xContentType; + + void setId(String id) { + this.id = id; + } + + void setConfig(BytesReference config, MediaType mediaType) { + if (mediaType instanceof XContentType == false) { + throw new IllegalArgumentException("PipelineConfiguration does not support media type [" + mediaType.getClass() + "]"); + } + this.config = config; + this.xContentType = XContentType.fromMediaType(mediaType); + } + + PipelineConfiguration build() { + return new PipelineConfiguration(id, config, xContentType); + } + } + + private final String id; + // Store config as bytes reference, because the config is only used when the pipeline store reads the cluster state + // and the way the map of maps config is read requires a deep copy (it removes instead of gets entries to check for unused options) + // also the get pipeline api just directly returns this to the caller + private final BytesReference config; + private final XContentType xContentType; + + public PipelineConfiguration(String id, BytesReference config, XContentType xContentType) { + this.id = Objects.requireNonNull(id); + this.config = Objects.requireNonNull(config); + this.xContentType = Objects.requireNonNull(xContentType); + } + + public PipelineConfiguration(String id, BytesReference config, MediaType mediaType) { + this(id, config, XContentType.fromMediaType(mediaType)); + } + + public String getId() { + return id; + } + + public Map getConfigAsMap() { + return XContentHelper.convertToMap(config, true, xContentType).v2(); + } + + // pkg-private for tests + XContentType getXContentType() { + return xContentType; + } + + // pkg-private for tests + BytesReference getConfig() { + return config; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("id", id); + builder.field("config", getConfigAsMap()); + builder.endObject(); + return builder; + } + + public static PipelineConfiguration readFrom(StreamInput in) throws IOException { + return new PipelineConfiguration(in.readString(), in.readBytesReference(), in.readEnum(XContentType.class)); + } + + public static Diff readDiffFrom(StreamInput in) throws IOException { + return readDiffFrom(PipelineConfiguration::readFrom, in); + } + + @Override + public String toString() { + return Strings.toString(XContentType.JSON, this); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(id); + out.writeBytesReference(config); + out.writeEnum(xContentType); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + PipelineConfiguration that = (PipelineConfiguration) o; + + if (!id.equals(that.id)) return false; + return getConfigAsMap().equals(that.getConfigAsMap()); + + } + + @Override + public int hashCode() { + int result = id.hashCode(); + result = 31 * result + getConfigAsMap().hashCode(); + return result; + } +} diff --git a/server/src/main/java/org/opensearch/search/pipeline/Processor.java b/server/src/main/java/org/opensearch/search/pipeline/Processor.java new file mode 100644 index 0000000000000..245db15a771b9 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/pipeline/Processor.java @@ -0,0 +1,166 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.search.pipeline; + +import org.opensearch.client.Client; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.index.analysis.AnalysisRegistry; +import org.opensearch.script.ScriptService; +import org.opensearch.threadpool.Scheduler; + +import java.util.Map; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.LongSupplier; + +/** + * A processor implementation may modify the request or response from a search call. + * Whether changes are made and what exactly is modified is up to the implementation. + *

+ * Processors may get called concurrently and thus need to be thread-safe. + * + * + * TODO: Refactor {@link org.opensearch.ingest.Processor} to extend this interface, and specialize to IngestProcessor. + * + * @opensearch.internal + */ +public interface Processor { + + /** + * Gets the type of processor + */ + String getType(); + + /** + * Gets the tag of a processor. + */ + String getTag(); + + /** + * Gets the description of a processor. + */ + String getDescription(); + + /** + * A factory that knows how to construct a processor based on a map of maps. + */ + interface Factory { + + /** + * Creates a processor based on the specified map of maps config. + * + * @param processorFactories Other processors which may be created inside this processor + * @param tag The tag for the processor + * @param description A short description of what this processor does + * @param config The configuration for the processor + * + * Note: Implementations are responsible for removing the used configuration + * keys, so that after creation the config map should be empty. + */ + Processor create(Map processorFactories, String tag, String description, Map config) + throws Exception; + } + + /** + * Infrastructure class that holds services that can be used by processor factories to create processor instances + * and that gets passed around to all {@link org.opensearch.plugins.SearchPipelinesPlugin}s. + */ + class Parameters { + + /** + * Useful to provide access to the node's environment like config directory to processor factories. + */ + public final Environment env; + + /** + * Provides processors script support. + */ + public final ScriptService scriptService; + + /** + * Provide analyzer support + */ + public final AnalysisRegistry analysisRegistry; + + /** + * Allows processors to read headers set by {@link org.opensearch.action.support.ActionFilter} + * instances that have run while handling the current search. + */ + public final ThreadContext threadContext; + + public final LongSupplier relativeTimeSupplier; + + public final SearchPipelineService searchPipelineService; + + public final Consumer genericExecutor; + + public final NamedXContentRegistry namedXContentRegistry; + + /** + * Provides scheduler support + */ + public final BiFunction scheduler; + + /** + * Provides access to the node's cluster client + */ + public final Client client; + + public Parameters( + Environment env, + ScriptService scriptService, + AnalysisRegistry analysisRegistry, + ThreadContext threadContext, + LongSupplier relativeTimeSupplier, + BiFunction scheduler, + SearchPipelineService searchPipelineService, + Client client, + Consumer genericExecutor, + NamedXContentRegistry namedXContentRegistry + ) { + this.env = env; + this.scriptService = scriptService; + this.threadContext = threadContext; + this.analysisRegistry = analysisRegistry; + this.relativeTimeSupplier = relativeTimeSupplier; + this.scheduler = scheduler; + this.searchPipelineService = searchPipelineService; + this.client = client; + this.genericExecutor = genericExecutor; + this.namedXContentRegistry = namedXContentRegistry; + } + + } +} diff --git a/server/src/main/java/org/opensearch/search/pipeline/ProcessorInfo.java b/server/src/main/java/org/opensearch/search/pipeline/ProcessorInfo.java new file mode 100644 index 0000000000000..0864fecb6c7f1 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/pipeline/ProcessorInfo.java @@ -0,0 +1,82 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; + +/** + * Information about a search pipeline processor + * + * TODO: This is copy/pasted from the ingest ProcessorInfo. + * Can/should we share implementation or is this just boilerplate? + * + * @opensearch.internal + */ +public class ProcessorInfo implements Writeable, ToXContentObject, Comparable { + + private final String type; + + public ProcessorInfo(String type) { + this.type = type; + } + + /** + * Read from a stream. + */ + public ProcessorInfo(StreamInput input) throws IOException { + type = input.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(this.type); + } + + /** + * @return The unique processor type + */ + public String getType() { + return type; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("type", type); + builder.endObject(); + return null; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ProcessorInfo that = (ProcessorInfo) o; + + return type.equals(that.type); + + } + + @Override + public int hashCode() { + return type.hashCode(); + } + + @Override + public int compareTo(ProcessorInfo o) { + return type.compareTo(o.type); + } +} diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineMetadata.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineMetadata.java new file mode 100644 index 0000000000000..bfbf5cd24bf92 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineMetadata.java @@ -0,0 +1,163 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline; + +import org.opensearch.Version; +import org.opensearch.cluster.Diff; +import org.opensearch.cluster.DiffableUtils; +import org.opensearch.cluster.NamedDiff; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.core.ParseField; +import org.opensearch.core.xcontent.ObjectParser; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Represents the search pipelines that are available in the cluster + * + * @opensearch.internal + */ +public class SearchPipelineMetadata implements Metadata.Custom { + public static final String TYPE = "search_pipeline"; + + private static final ParseField PIPELINES_FIELD = new ParseField("pipeline"); + private static final ObjectParser, Void> SEARCH_PIPELINE_METADATA_PARSER = new ObjectParser<>( + "search_pipeline_metadata", + ArrayList::new + ); + static { + SEARCH_PIPELINE_METADATA_PARSER.declareObjectArray(List::addAll, PipelineConfiguration.getParser(), PIPELINES_FIELD); + } + // Mapping from pipeline ID to each pipeline's configuration. + private final Map pipelines; + + public SearchPipelineMetadata(Map pipelines) { + this.pipelines = Collections.unmodifiableMap(pipelines); + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT.minimumCompatibilityVersion(); + } + + public Map getPipelines() { + return pipelines; + } + + public SearchPipelineMetadata(StreamInput in) throws IOException { + int size = in.readVInt(); + Map pipelines = new HashMap<>(size); + for (int i = 0; i < size; i++) { + PipelineConfiguration pipeline = PipelineConfiguration.readFrom(in); + pipelines.put(pipeline.getId(), pipeline); + } + this.pipelines = Collections.unmodifiableMap(pipelines); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(pipelines.size()); + for (PipelineConfiguration pipeline : pipelines.values()) { + pipeline.writeTo(out); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startArray(PIPELINES_FIELD.getPreferredName()); + for (PipelineConfiguration pipeline : pipelines.values()) { + pipeline.toXContent(builder, params); + } + builder.endArray(); + return builder; + } + + public static SearchPipelineMetadata fromXContent(XContentParser parser) throws IOException { + Map pipelines = new HashMap<>(); + List configs = SEARCH_PIPELINE_METADATA_PARSER.parse(parser, null); + for (PipelineConfiguration pipeline : configs) { + pipelines.put(pipeline.getId(), pipeline); + } + return new SearchPipelineMetadata(pipelines); + } + + @Override + public EnumSet context() { + return Metadata.ALL_CONTEXTS; + } + + @Override + public Diff diff(Metadata.Custom previousState) { + return new SearchPipelineMetadataDiff((SearchPipelineMetadata) previousState, this); + } + + public static NamedDiff readDiffFrom(StreamInput in) throws IOException { + return new SearchPipelineMetadataDiff(in); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SearchPipelineMetadata that = (SearchPipelineMetadata) o; + return pipelines.equals(that.pipelines); + } + + @Override + public int hashCode() { + return pipelines.hashCode(); + } + + static class SearchPipelineMetadataDiff implements NamedDiff { + final Diff> pipelines; + + public SearchPipelineMetadataDiff(SearchPipelineMetadata before, SearchPipelineMetadata after) { + this.pipelines = DiffableUtils.diff(before.pipelines, after.pipelines, DiffableUtils.getStringKeySerializer()); + } + + public SearchPipelineMetadataDiff(StreamInput in) throws IOException { + this.pipelines = DiffableUtils.readJdkMapDiff( + in, + DiffableUtils.getStringKeySerializer(), + PipelineConfiguration::readFrom, + PipelineConfiguration::readDiffFrom + ); + } + + @Override + public Metadata.Custom apply(Metadata.Custom part) { + return new SearchPipelineMetadata(pipelines.apply(((SearchPipelineMetadata) part).pipelines)); + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + pipelines.writeTo(out); + } + } +} diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java new file mode 100644 index 0000000000000..74d901085b7aa --- /dev/null +++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java @@ -0,0 +1,410 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.ExceptionsHelper; +import org.opensearch.OpenSearchParseException; +import org.opensearch.ResourceNotFoundException; +import org.opensearch.action.ActionListener; +import org.opensearch.action.search.DeleteSearchPipelineRequest; +import org.opensearch.action.search.PutSearchPipelineRequest; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.client.Client; +import org.opensearch.cluster.AckedClusterStateUpdateTask; +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.ClusterStateApplier; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterManagerTaskKeys; +import org.opensearch.cluster.service.ClusterManagerTaskThrottler; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.regex.Regex; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.XContentHelper; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.gateway.GatewayService; +import org.opensearch.index.analysis.AnalysisRegistry; +import org.opensearch.ingest.ConfigurationUtils; +import org.opensearch.node.ReportingService; +import org.opensearch.plugins.SearchPipelinesPlugin; +import org.opensearch.script.ScriptService; +import org.opensearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Consumer; + +/** + * The main entry point for search pipelines. Handles CRUD operations and exposes the API to execute search pipelines + * against requests and responses. + */ +public class SearchPipelineService implements ClusterStateApplier, ReportingService { + public static final String SEARCH_PIPELINE_ORIGIN = "search_pipeline"; + + private static final Logger logger = LogManager.getLogger(SearchPipelineService.class); + private final ClusterService clusterService; + private final ScriptService scriptService; + private final Map processorFactories; + private volatile Map pipelines = Collections.emptyMap(); + private final ThreadPool threadPool; + private final List> searchPipelineClusterStateListeners = new CopyOnWriteArrayList<>(); + private final ClusterManagerTaskThrottler.ThrottlingKey putPipelineTaskKey; + private final ClusterManagerTaskThrottler.ThrottlingKey deletePipelineTaskKey; + private volatile ClusterState state; + + public SearchPipelineService( + ClusterService clusterService, + ThreadPool threadPool, + Environment env, + ScriptService scriptService, + AnalysisRegistry analysisRegistry, + NamedXContentRegistry namedXContentRegistry, + List searchPipelinesPlugins, + Client client + ) { + this.clusterService = clusterService; + this.scriptService = scriptService; + this.threadPool = threadPool; + this.processorFactories = processorFactories( + searchPipelinesPlugins, + new Processor.Parameters( + env, + scriptService, + analysisRegistry, + threadPool.getThreadContext(), + threadPool::relativeTimeInMillis, + (delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC), + this, + client, + threadPool.generic()::execute, + namedXContentRegistry + ) + ); + putPipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.PUT_SEARCH_PIPELINE_KEY, true); + deletePipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_SEARCH_PIPELINE_KEY, true); + } + + private static Map processorFactories( + List searchPipelinesPlugins, + Processor.Parameters parameters + ) { + Map processorFactories = new HashMap<>(); + for (SearchPipelinesPlugin searchPipelinesPlugin : searchPipelinesPlugins) { + Map newProcessors = searchPipelinesPlugin.getProcessors(parameters); + for (Map.Entry entry : newProcessors.entrySet()) { + if (processorFactories.put(entry.getKey(), entry.getValue()) != null) { + throw new IllegalArgumentException("Ingest processor [" + entry.getKey() + "] is already registered"); + } + } + } + return Collections.unmodifiableMap(processorFactories); + } + + @Override + public void applyClusterState(ClusterChangedEvent event) { + state = event.state(); + + if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { + return; + } + searchPipelineClusterStateListeners.forEach(consumer -> consumer.accept(state)); + + SearchPipelineMetadata newSearchPipelineMetadata = state.getMetadata().custom(SearchPipelineMetadata.TYPE); + if (newSearchPipelineMetadata == null) { + return; + } + + try { + innerUpdatePipelines(newSearchPipelineMetadata); + } catch (OpenSearchParseException e) { + logger.warn("failed to update search pipelines", e); + } + } + + void innerUpdatePipelines(SearchPipelineMetadata newSearchPipelineMetadata) { + Map existingPipelines = this.pipelines; + + // Lazily initialize these variables in order to favour the most likely scenario that there are no pipeline changes: + Map newPipelines = null; + List exceptions = null; + // Iterate over pipeline configurations in metadata and constructs a new pipeline if there is no pipeline + // or the pipeline configuration has been modified + + for (PipelineConfiguration newConfiguration : newSearchPipelineMetadata.getPipelines().values()) { + PipelineHolder previous = existingPipelines.get(newConfiguration.getId()); + if (previous != null && previous.configuration.equals(newConfiguration)) { + continue; + } + if (newPipelines == null) { + newPipelines = new HashMap<>(existingPipelines); + } + try { + Pipeline newPipeline = Pipeline.create(newConfiguration.getId(), newConfiguration.getConfigAsMap(), processorFactories); + newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, newPipeline)); + + if (previous == null) { + continue; + } + // TODO -- once we add in pipeline metrics (like in ingest pipelines), we will need to deep-copy + // the old pipeline's metrics into the new pipeline. + } catch (Exception e) { + OpenSearchParseException parseException = new OpenSearchParseException( + "Error updating pipeline with id [" + newConfiguration.getId() + "]", + e + ); + // TODO -- replace pipeline with one that throws an exception when we try to use it + if (exceptions == null) { + exceptions = new ArrayList<>(); + } + exceptions.add(parseException); + } + } + // Iterate over the current active pipelines and check whether they are missing in the pipeline configuration and + // if so delete the pipeline from new Pipelines map: + for (Map.Entry entry : existingPipelines.entrySet()) { + if (newSearchPipelineMetadata.getPipelines().get(entry.getKey()) == null) { + if (newPipelines == null) { + newPipelines = new HashMap<>(existingPipelines); + } + newPipelines.remove(entry.getKey()); + } + } + + if (newPipelines != null) { + this.pipelines = Collections.unmodifiableMap(newPipelines); + if (exceptions != null) { + ExceptionsHelper.rethrowAndSuppress(exceptions); + } + } + } + + public void putPipeline( + Map searchPipelinesInfos, + PutSearchPipelineRequest request, + ActionListener listener + ) throws Exception { + + validatePipeline(searchPipelinesInfos, request); + clusterService.submitStateUpdateTask( + "put-search-pipeline-" + request.getId(), + new AckedClusterStateUpdateTask<>(request, listener) { + @Override + public ClusterState execute(ClusterState currentState) { + return innerPut(request, currentState); + } + + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return putPipelineTaskKey; + } + + @Override + protected AcknowledgedResponse newResponse(boolean acknowledged) { + return new AcknowledgedResponse(acknowledged); + } + } + ); + } + + static ClusterState innerPut(PutSearchPipelineRequest request, ClusterState currentState) { + SearchPipelineMetadata currentSearchPipelineMetadata = currentState.metadata().custom(SearchPipelineMetadata.TYPE); + Map pipelines; + if (currentSearchPipelineMetadata != null) { + pipelines = new HashMap<>(currentSearchPipelineMetadata.getPipelines()); + } else { + pipelines = new HashMap<>(); + } + pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), request.getSource(), request.getXContentType())); + ClusterState.Builder newState = ClusterState.builder(currentState); + newState.metadata( + Metadata.builder(currentState.getMetadata()) + .putCustom(SearchPipelineMetadata.TYPE, new SearchPipelineMetadata(pipelines)) + .build() + ); + return newState.build(); + } + + void validatePipeline(Map searchPipelinesInfos, PutSearchPipelineRequest request) throws Exception { + if (searchPipelinesInfos.isEmpty()) { + throw new IllegalStateException("Search pipeline info is empty"); + } + Map pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); + Pipeline pipeline = Pipeline.create(request.getId(), pipelineConfig, processorFactories); + List exceptions = new ArrayList<>(); + for (Processor processor : pipeline.flattenAllProcessors()) { + for (Map.Entry entry : searchPipelinesInfos.entrySet()) { + String type = processor.getType(); + if (entry.getValue().containsProcessor(type) == false) { + String message = "Processor type [" + processor.getType() + "] is not installed on node [" + entry.getKey() + "]"; + exceptions.add(ConfigurationUtils.newConfigurationException(processor.getType(), processor.getTag(), null, message)); + } + } + } + ExceptionsHelper.rethrowAndSuppress(exceptions); + } + + public void deletePipeline(DeleteSearchPipelineRequest request, ActionListener listener) throws Exception { + clusterService.submitStateUpdateTask( + "delete-search-pipeline-" + request.getId(), + new AckedClusterStateUpdateTask<>(request, listener) { + @Override + public ClusterState execute(ClusterState currentState) { + return innerDelete(request, currentState); + } + + @Override + public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { + return deletePipelineTaskKey; + } + + @Override + protected AcknowledgedResponse newResponse(boolean acknowledged) { + return new AcknowledgedResponse(acknowledged); + } + } + ); + + } + + static ClusterState innerDelete(DeleteSearchPipelineRequest request, ClusterState currentState) { + SearchPipelineMetadata currentMetadata = currentState.metadata().custom(SearchPipelineMetadata.TYPE); + if (currentMetadata == null) { + return currentState; + } + Map pipelines = currentMetadata.getPipelines(); + Set toRemove = new HashSet<>(); + for (String pipelineKey : pipelines.keySet()) { + if (Regex.simpleMatch(request.getId(), pipelineKey)) { + toRemove.add(pipelineKey); + } + } + if (toRemove.isEmpty()) { + if (Regex.isMatchAllPattern(request.getId())) { + // Deleting all the empty state is a no-op. + return currentState; + } + throw new ResourceNotFoundException("pipeline [{}] is missing", request.getId()); + } + final Map newPipelines = new HashMap<>(pipelines); + for (String key : toRemove) { + newPipelines.remove(key); + } + ClusterState.Builder newState = ClusterState.builder(currentState); + newState.metadata( + Metadata.builder(currentState.getMetadata()).putCustom(SearchPipelineMetadata.TYPE, new SearchPipelineMetadata(newPipelines)) + ); + return newState.build(); + } + + public SearchRequest transformRequest(SearchRequest searchRequest) throws Exception { + String pipelineId = searchRequest.pipeline(); + if (pipelineId != null) { + PipelineHolder pipeline = pipelines.get(pipelineId); + if (pipeline == null) { + throw new IllegalArgumentException("Pipeline " + pipelineId + " is not defined"); + } + // Save the original request by deep cloning the existing request. + BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(); + searchRequest.writeTo(bytesStreamOutput); + SearchRequest clonedRequest = new SearchRequest(bytesStreamOutput.bytes().streamInput()); + return pipeline.pipeline.transformRequest(clonedRequest); + } + return searchRequest; + } + + public SearchResponse transformResponse(SearchRequest request, SearchResponse searchResponse) throws Exception { + String pipelineId = request.pipeline(); + if (pipelineId != null) { + PipelineHolder pipeline = pipelines.get(pipelineId); + if (pipeline == null) { + throw new IllegalArgumentException("Pipeline " + pipelineId + " is not defined"); + } + return pipeline.pipeline.transformResponse(request, searchResponse); + } + return searchResponse; + } + + Map getProcessorFactories() { + return processorFactories; + } + + @Override + public SearchPipelinesInfo info() { + List processorInfoList = new ArrayList<>(); + for (Map.Entry entry : processorFactories.entrySet()) { + processorInfoList.add(new ProcessorInfo(entry.getKey())); + } + return new SearchPipelinesInfo(processorInfoList); + } + + public static List getPipelines(ClusterState clusterState, String... ids) { + SearchPipelineMetadata metadata = clusterState.getMetadata().custom(SearchPipelineMetadata.TYPE); + return innerGetPipelines(metadata, ids); + } + + static List innerGetPipelines(SearchPipelineMetadata metadata, String... ids) { + if (metadata == null) { + return Collections.emptyList(); + } + + // if we didn't ask for _any_ ID, then we get them all (this is the same as if they ask for '*') + if (ids.length == 0) { + return new ArrayList<>(metadata.getPipelines().values()); + } + List result = new ArrayList<>(ids.length); + for (String id : ids) { + if (Regex.isSimpleMatchPattern(id)) { + for (Map.Entry entry : metadata.getPipelines().entrySet()) { + if (Regex.simpleMatch(id, entry.getKey())) { + result.add(entry.getValue()); + } + } + } else { + PipelineConfiguration pipeline = metadata.getPipelines().get(id); + if (pipeline != null) { + result.add(pipeline); + } + } + } + return result; + } + + public ClusterService getClusterService() { + return clusterService; + } + + Map getPipelines() { + return pipelines; + } + + static class PipelineHolder { + + final PipelineConfiguration configuration; + final Pipeline pipeline; + + PipelineHolder(PipelineConfiguration configuration, Pipeline pipeline) { + this.configuration = Objects.requireNonNull(configuration); + this.pipeline = Objects.requireNonNull(pipeline); + } + } +} diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelinesInfo.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelinesInfo.java new file mode 100644 index 0000000000000..0f4a830b6e5ef --- /dev/null +++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelinesInfo.java @@ -0,0 +1,82 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.node.ReportingService; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; + +/** + * Information about a search pipelines event + * + * @opensearch.internal + */ +public class SearchPipelinesInfo implements ReportingService.Info { + + private final Set processors; + + public SearchPipelinesInfo(List processors) { + this.processors = new TreeSet<>(processors); // we use a treeset here to have a test-able / predictable order + } + + /** + * Read from a stream. + */ + public SearchPipelinesInfo(StreamInput in) throws IOException { + processors = new TreeSet<>(); + final int size = in.readVInt(); + for (int i = 0; i < size; i++) { + processors.add(new ProcessorInfo(in)); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject("search_pipelines"); + builder.startArray("processors"); + for (ProcessorInfo info : processors) { + info.toXContent(builder, params); + } + builder.endArray(); + builder.endObject(); + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.write(processors.size()); + for (ProcessorInfo info : processors) { + info.writeTo(out); + } + } + + public boolean containsProcessor(String type) { + return processors.contains(new ProcessorInfo(type)); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SearchPipelinesInfo that = (SearchPipelinesInfo) o; + return Objects.equals(processors, that.processors); + } + + @Override + public int hashCode() { + return Objects.hash(processors); + } +} diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchRequestProcessor.java b/server/src/main/java/org/opensearch/search/pipeline/SearchRequestProcessor.java new file mode 100644 index 0000000000000..db20ddf596ffb --- /dev/null +++ b/server/src/main/java/org/opensearch/search/pipeline/SearchRequestProcessor.java @@ -0,0 +1,19 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline; + +import org.opensearch.action.search.SearchRequest; + +/** + * Interface for a search pipeline processor that modifies a search request. + */ +public interface SearchRequestProcessor extends Processor { + SearchRequest processRequest(SearchRequest request) throws Exception; + +} diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchResponseProcessor.java b/server/src/main/java/org/opensearch/search/pipeline/SearchResponseProcessor.java new file mode 100644 index 0000000000000..b3bc5e98453b9 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/pipeline/SearchResponseProcessor.java @@ -0,0 +1,20 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline; + +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; + +/** + * Interface for a search pipeline processor that modifies a search response. + */ +public interface SearchResponseProcessor extends Processor { + SearchResponse processResponse(SearchRequest request, SearchResponse response) throws Exception; + +} diff --git a/server/src/main/java/org/opensearch/search/pipeline/package-info.java b/server/src/main/java/org/opensearch/search/pipeline/package-info.java new file mode 100644 index 0000000000000..91573d06e8f76 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/pipeline/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Search pipelines base package. */ +package org.opensearch.search.pipeline; diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/info/NodeInfoTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/info/NodeInfoTests.java index 6710276dad0e9..cfd6fcec4bdc6 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/info/NodeInfoTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/info/NodeInfoTests.java @@ -71,6 +71,7 @@ public void testGetInfo() { null, null, null, + null, null ); diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java index 9c0090a15d7a5..d3a40868bc389 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java @@ -167,6 +167,7 @@ private static NodeInfo createNodeInfo(String nodeId, String transportType, Stri null, null, null, + null, null ); } diff --git a/server/src/test/java/org/opensearch/action/ingest/GetPipelineResponseTests.java b/server/src/test/java/org/opensearch/action/ingest/GetPipelineResponseTests.java index 962428a2a0930..68bb523d594c0 100644 --- a/server/src/test/java/org/opensearch/action/ingest/GetPipelineResponseTests.java +++ b/server/src/test/java/org/opensearch/action/ingest/GetPipelineResponseTests.java @@ -99,6 +99,16 @@ public void testXContentDeserialization() throws IOException { } } + public void testSubsetNotEqual() throws IOException { + PipelineConfiguration pipeline1 = createRandomPipeline("pipe1"); + PipelineConfiguration pipeline2 = createRandomPipeline("pipe2"); + + GetPipelineResponse response1 = new GetPipelineResponse(List.of(pipeline1)); + GetPipelineResponse response2 = new GetPipelineResponse(List.of(pipeline1, pipeline2)); + assertNotEquals(response1, response2); + assertNotEquals(response2, response1); + } + @Override protected GetPipelineResponse doParseInstance(XContentParser parser) throws IOException { return GetPipelineResponse.fromXContent(parser); @@ -133,4 +143,5 @@ protected GetPipelineResponse mutateInstance(GetPipelineResponse response) { throw new UncheckedIOException(e); } } + } diff --git a/server/src/test/java/org/opensearch/nodesinfo/NodeInfoStreamingTests.java b/server/src/test/java/org/opensearch/nodesinfo/NodeInfoStreamingTests.java index 6f226385896d8..341f52f28e5e0 100644 --- a/server/src/test/java/org/opensearch/nodesinfo/NodeInfoStreamingTests.java +++ b/server/src/test/java/org/opensearch/nodesinfo/NodeInfoStreamingTests.java @@ -54,6 +54,7 @@ import org.opensearch.plugins.PluginInfo; import org.opensearch.search.aggregations.support.AggregationInfo; import org.opensearch.search.aggregations.support.AggregationUsageService; +import org.opensearch.search.pipeline.SearchPipelinesInfo; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.VersionUtils; import org.opensearch.threadpool.ThreadPool; @@ -242,6 +243,17 @@ private static NodeInfo createNodeInfo() { // pick a random long that sometimes exceeds an int: indexingBuffer = new ByteSizeValue(random().nextLong() & ((1L << 40) - 1)); } + + SearchPipelinesInfo searchPipelinesInfo = null; + if (randomBoolean()) { + int numProcessors = randomIntBetween(0, 5); + List processors = new ArrayList<>(numProcessors); + for (int i = 0; i < numProcessors; i++) { + processors.add(new org.opensearch.search.pipeline.ProcessorInfo(randomAlphaOfLengthBetween(3, 10))); + } + searchPipelinesInfo = new SearchPipelinesInfo(processors); + } + return new NodeInfo( VersionUtils.randomVersion(random()), build, @@ -256,7 +268,8 @@ private static NodeInfo createNodeInfo() { pluginsAndModules, ingestInfo, aggregationInfo, - indexingBuffer + indexingBuffer, + searchPipelinesInfo ); } } diff --git a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java new file mode 100644 index 0000000000000..c1074f3f41fc1 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java @@ -0,0 +1,620 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.lucene.search.TotalHits; +import org.junit.Before; +import org.opensearch.OpenSearchParseException; +import org.opensearch.ResourceNotFoundException; +import org.opensearch.Version; +import org.opensearch.action.search.DeleteSearchPipelineRequest; +import org.opensearch.action.search.PutSearchPipelineRequest; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.search.SearchResponseSections; +import org.opensearch.client.Client; +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.plugins.SearchPipelinesPlugin; +import org.opensearch.search.SearchHit; +import org.opensearch.search.SearchHits; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.test.MockLogAppender; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; + +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SearchPipelineServiceTests extends OpenSearchTestCase { + private static final SearchPipelinesPlugin DUMMY_PLUGIN = new SearchPipelinesPlugin() { + @Override + public Map getProcessors(Processor.Parameters parameters) { + return Map.of("foo", (factories, tag, description, config) -> null); + } + }; + + private ThreadPool threadPool; + + @Before + public void setup() { + threadPool = mock(ThreadPool.class); + ExecutorService executorService = OpenSearchExecutors.newDirectExecutorService(); + when(threadPool.generic()).thenReturn(executorService); + when(threadPool.executor(anyString())).thenReturn(executorService); + } + + public void testSearchPipelinePlugin() { + Client client = mock(Client.class); + SearchPipelineService searchPipelineService = new SearchPipelineService( + mock(ClusterService.class), + threadPool, + null, + null, + null, + this.xContentRegistry(), + List.of(DUMMY_PLUGIN), + client + ); + Map factories = searchPipelineService.getProcessorFactories(); + assertEquals(1, factories.size()); + assertTrue(factories.containsKey("foo")); + } + + public void testSearchPipelinePluginDuplicate() { + Client client = mock(Client.class); + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> new SearchPipelineService( + mock(ClusterService.class), + threadPool, + null, + null, + null, + this.xContentRegistry(), + List.of(DUMMY_PLUGIN, DUMMY_PLUGIN), + client + ) + ); + assertTrue(e.getMessage(), e.getMessage().contains(" already registered")); + } + + public void testExecuteSearchPipelineDoesNotExist() { + Client client = mock(Client.class); + SearchPipelineService searchPipelineService = new SearchPipelineService( + mock(ClusterService.class), + threadPool, + null, + null, + null, + this.xContentRegistry(), + List.of(DUMMY_PLUGIN), + client + ); + final SearchRequest searchRequest = new SearchRequest("_index").pipeline("bar"); + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> searchPipelineService.transformRequest(searchRequest) + ); + assertTrue(e.getMessage(), e.getMessage().contains(" not defined")); + } + + private static abstract class FakeProcessor implements Processor { + private final String type; + private final String tag; + private final String description; + + protected FakeProcessor(String type, String tag, String description) { + this.type = type; + this.tag = tag; + this.description = description; + } + + @Override + public String getType() { + return type; + } + + @Override + public String getTag() { + return tag; + } + + @Override + public String getDescription() { + return description; + } + } + + private static class FakeRequestProcessor extends FakeProcessor implements SearchRequestProcessor { + private final Consumer executor; + + public FakeRequestProcessor(String type, String tag, String description, Consumer executor) { + super(type, tag, description); + this.executor = executor; + } + + @Override + public SearchRequest processRequest(SearchRequest request) throws Exception { + executor.accept(request); + return request; + } + } + + private static class FakeResponseProcessor extends FakeProcessor implements SearchResponseProcessor { + private final Consumer executor; + + public FakeResponseProcessor(String type, String tag, String description, Consumer executor) { + super(type, tag, description); + this.executor = executor; + } + + @Override + public SearchResponse processResponse(SearchRequest request, SearchResponse response) throws Exception { + executor.accept(response); + return response; + } + } + + private static SearchPipelineService createWithProcessors() { + Map processors = new HashMap<>(); + processors.put("scale_request_size", (processorFactories, tag, description, config) -> { + float scale = ((Number) config.remove("scale")).floatValue(); + return new FakeRequestProcessor( + "scale_request_size", + tag, + description, + req -> req.source().size((int) (req.source().size() * scale)) + ); + }); + processors.put("fixed_score", (processorFactories, tag, description, config) -> { + float score = ((Number) config.remove("score")).floatValue(); + return new FakeResponseProcessor("fixed_score", tag, description, rsp -> rsp.getHits().forEach(h -> h.score(score))); + }); + return createWithProcessors(processors); + } + + private static SearchPipelineService createWithProcessors(Map processors) { + Client client = mock(Client.class); + ThreadPool threadPool = mock(ThreadPool.class); + ExecutorService executorService = OpenSearchExecutors.newDirectExecutorService(); + when(threadPool.generic()).thenReturn(executorService); + when(threadPool.executor(anyString())).thenReturn(executorService); + return new SearchPipelineService( + mock(ClusterService.class), + threadPool, + null, + null, + null, + null, + Collections.singletonList(new SearchPipelinesPlugin() { + @Override + public Map getProcessors(Processor.Parameters parameters) { + return processors; + } + }), + client + ); + } + + public void testUpdatePipelines() { + SearchPipelineService searchPipelineService = createWithProcessors(); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState previousClusterState = clusterState; + searchPipelineService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + assertEquals(0, searchPipelineService.getPipelines().size()); + + PipelineConfiguration pipeline = new PipelineConfiguration( + "_id", + new BytesArray( + "{ " + + "\"request_processors\" : [ { \"scale_request_size\": { \"scale\" : 2 } } ], " + + "\"response_processors\" : [ { \"fixed_score\" : { \"score\" : 1.0 } } ]" + + "}" + ), + XContentType.JSON + ); + SearchPipelineMetadata pipelineMetadata = new SearchPipelineMetadata(Map.of("_id", pipeline)); + clusterState = ClusterState.builder(clusterState) + .metadata(Metadata.builder().putCustom(SearchPipelineMetadata.TYPE, pipelineMetadata).build()) + .build(); + searchPipelineService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + assertEquals(1, searchPipelineService.getPipelines().size()); + assertEquals("_id", searchPipelineService.getPipelines().get("_id").pipeline.getId()); + assertNull(searchPipelineService.getPipelines().get("_id").pipeline.getDescription()); + assertEquals(1, searchPipelineService.getPipelines().get("_id").pipeline.getSearchRequestProcessors().size()); + assertEquals( + "scale_request_size", + searchPipelineService.getPipelines().get("_id").pipeline.getSearchRequestProcessors().get(0).getType() + ); + assertEquals(1, searchPipelineService.getPipelines().get("_id").pipeline.getSearchResponseProcessors().size()); + assertEquals( + "fixed_score", + searchPipelineService.getPipelines().get("_id").pipeline.getSearchResponseProcessors().get(0).getType() + ); + } + + public void testPutPipeline() { + SearchPipelineService searchPipelineService = createWithProcessors(); + String id = "_id"; + SearchPipelineService.PipelineHolder pipeline = searchPipelineService.getPipelines().get(id); + assertNull(pipeline); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + + PutSearchPipelineRequest putRequest = new PutSearchPipelineRequest(id, new BytesArray("{}"), XContentType.JSON); + ClusterState previousClusterState = clusterState; + clusterState = SearchPipelineService.innerPut(putRequest, clusterState); + searchPipelineService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + pipeline = searchPipelineService.getPipelines().get(id); + assertNotNull(pipeline); + assertEquals(id, pipeline.pipeline.getId()); + assertNull(pipeline.pipeline.getDescription()); + assertEquals(0, pipeline.pipeline.getSearchRequestProcessors().size()); + assertEquals(0, pipeline.pipeline.getSearchResponseProcessors().size()); + + // Overwrite pipeline + putRequest = new PutSearchPipelineRequest(id, new BytesArray("{ \"description\": \"empty pipeline\"}"), XContentType.JSON); + previousClusterState = clusterState; + clusterState = SearchPipelineService.innerPut(putRequest, clusterState); + searchPipelineService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + pipeline = searchPipelineService.getPipelines().get(id); + assertNotNull(pipeline); + assertEquals(id, pipeline.pipeline.getId()); + assertEquals("empty pipeline", pipeline.pipeline.getDescription()); + assertEquals(0, pipeline.pipeline.getSearchRequestProcessors().size()); + assertEquals(0, pipeline.pipeline.getSearchResponseProcessors().size()); + } + + public void testPutInvalidPipeline() throws IllegalAccessException { + SearchPipelineService searchPipelineService = createWithProcessors(); + String id = "_id"; + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState previousState = clusterState; + + PutSearchPipelineRequest putRequest = new PutSearchPipelineRequest( + id, + new BytesArray("{\"request_processors\" : [ { \"scale_request_size\": { \"scale\" : \"foo\" } } ] }"), + XContentType.JSON + ); + clusterState = SearchPipelineService.innerPut(putRequest, clusterState); + try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(SearchPipelineService.class))) { + mockAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "test1", + SearchPipelineService.class.getCanonicalName(), + Level.WARN, + "failed to update search pipelines" + ) + ); + searchPipelineService.applyClusterState(new ClusterChangedEvent("", clusterState, previousState)); + mockAppender.assertAllExpectationsMatched(); + } + assertEquals(0, searchPipelineService.getPipelines().size()); + } + + public void testDeletePipeline() { + SearchPipelineService searchPipelineService = createWithProcessors(); + PipelineConfiguration config = new PipelineConfiguration( + "_id", + new BytesArray("{\"request_processors\" : [ { \"scale_request_size\": { \"scale\" : 2 } } ] }"), + XContentType.JSON + ); + SearchPipelineMetadata searchPipelineMetadata = new SearchPipelineMetadata(Map.of("_id", config)); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState previousState = clusterState; + clusterState = ClusterState.builder(clusterState) + .metadata(Metadata.builder().putCustom(SearchPipelineMetadata.TYPE, searchPipelineMetadata).build()) + .build(); + searchPipelineService.applyClusterState(new ClusterChangedEvent("", clusterState, previousState)); + assertEquals(1, searchPipelineService.getPipelines().size()); + + // Delete pipeline: + DeleteSearchPipelineRequest deleteRequest = new DeleteSearchPipelineRequest("_id"); + previousState = clusterState; + clusterState = SearchPipelineService.innerDelete(deleteRequest, clusterState); + searchPipelineService.applyClusterState(new ClusterChangedEvent("", clusterState, previousState)); + assertEquals(0, searchPipelineService.getPipelines().size()); + + final ClusterState finalClusterState = clusterState; + // Delete missing pipeline + ResourceNotFoundException e = expectThrows( + ResourceNotFoundException.class, + () -> SearchPipelineService.innerDelete(deleteRequest, finalClusterState) + ); + assertEquals("pipeline [_id] is missing", e.getMessage()); + } + + public void testDeletePipelinesWithWildcard() { + SearchPipelineService searchPipelineService = createWithProcessors(); + BytesArray definition = new BytesArray("{\"request_processors\" : [ { \"scale_request_size\": { \"scale\" : 2 } } ] }"); + SearchPipelineMetadata metadata = new SearchPipelineMetadata( + Map.of( + "p1", + new PipelineConfiguration("p1", definition, XContentType.JSON), + "p2", + new PipelineConfiguration("p2", definition, XContentType.JSON), + "q1", + new PipelineConfiguration("q1", definition, XContentType.JSON) + ) + ); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState previousState = clusterState; + clusterState = ClusterState.builder(clusterState) + .metadata(Metadata.builder().putCustom(SearchPipelineMetadata.TYPE, metadata)) + .build(); + searchPipelineService.applyClusterState(new ClusterChangedEvent("", clusterState, previousState)); + assertNotNull(searchPipelineService.getPipelines().get("p1")); + assertNotNull(searchPipelineService.getPipelines().get("p2")); + assertNotNull(searchPipelineService.getPipelines().get("q1")); + + // Delete all pipelines starting with "p" + DeleteSearchPipelineRequest deleteRequest = new DeleteSearchPipelineRequest("p*"); + previousState = clusterState; + clusterState = SearchPipelineService.innerDelete(deleteRequest, clusterState); + searchPipelineService.applyClusterState(new ClusterChangedEvent("", clusterState, previousState)); + assertEquals(1, searchPipelineService.getPipelines().size()); + assertNotNull(searchPipelineService.getPipelines().get("q1")); + + // Prefix wildcard fails if no matches + final ClusterState finalClusterState = clusterState; + ResourceNotFoundException e = expectThrows( + ResourceNotFoundException.class, + () -> SearchPipelineService.innerDelete(deleteRequest, finalClusterState) + ); + assertEquals("pipeline [p*] is missing", e.getMessage()); + + // Delete all removes remaining pipeline + DeleteSearchPipelineRequest deleteAllRequest = new DeleteSearchPipelineRequest("*"); + previousState = clusterState; + clusterState = SearchPipelineService.innerDelete(deleteAllRequest, clusterState); + searchPipelineService.applyClusterState(new ClusterChangedEvent("", clusterState, previousState)); + assertEquals(0, searchPipelineService.getPipelines().size()); + + // Delete all does not throw exception if no pipelines + SearchPipelineService.innerDelete(deleteAllRequest, clusterState); + } + + public void testTransformRequest() throws Exception { + SearchPipelineService searchPipelineService = createWithProcessors(); + + SearchPipelineMetadata metadata = new SearchPipelineMetadata( + Map.of( + "p1", + new PipelineConfiguration( + "p1", + new BytesArray("{\"request_processors\" : [ { \"scale_request_size\": { \"scale\" : 2 } } ] }"), + XContentType.JSON + ) + ) + ); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState previousState = clusterState; + clusterState = ClusterState.builder(clusterState) + .metadata(Metadata.builder().putCustom(SearchPipelineMetadata.TYPE, metadata)) + .build(); + searchPipelineService.applyClusterState(new ClusterChangedEvent("", clusterState, previousState)); + + int size = 10; + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().size(size); + SearchRequest request = new SearchRequest("_index").source(sourceBuilder).pipeline("p1"); + + SearchRequest transformedRequest = searchPipelineService.transformRequest(request); + + assertEquals(2 * size, transformedRequest.source().size()); + assertEquals(size, request.source().size()); + + // This request doesn't specify a pipeline, it doesn't get transformed. + request = new SearchRequest("_index").source(sourceBuilder); + SearchRequest notTransformedRequest = searchPipelineService.transformRequest(request); + assertEquals(size, notTransformedRequest.source().size()); + assertSame(request, notTransformedRequest); + } + + public void testTransformResponse() throws Exception { + SearchPipelineService searchPipelineService = createWithProcessors(); + + SearchPipelineMetadata metadata = new SearchPipelineMetadata( + Map.of( + "p1", + new PipelineConfiguration( + "p1", + new BytesArray("{\"response_processors\" : [ { \"fixed_score\": { \"score\" : 2 } } ] }"), + XContentType.JSON + ) + ) + ); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState previousState = clusterState; + clusterState = ClusterState.builder(clusterState) + .metadata(Metadata.builder().putCustom(SearchPipelineMetadata.TYPE, metadata)) + .build(); + searchPipelineService.applyClusterState(new ClusterChangedEvent("", clusterState, previousState)); + + int size = 10; + SearchHit[] hits = new SearchHit[size]; + for (int i = 0; i < size; i++) { + hits[i] = new SearchHit(i, "doc" + i, Collections.emptyMap(), Collections.emptyMap()); + hits[i].score(i); + } + SearchHits searchHits = new SearchHits(hits, new TotalHits(size * 2, TotalHits.Relation.EQUAL_TO), size); + SearchResponseSections searchResponseSections = new SearchResponseSections(searchHits, null, null, false, false, null, 0); + SearchResponse searchResponse = new SearchResponse(searchResponseSections, null, 1, 1, 0, 10, null, null); + + // First try without specifying a pipeline, which should be a no-op. + SearchRequest searchRequest = new SearchRequest(); + SearchResponse notTransformedResponse = searchPipelineService.transformResponse(searchRequest, searchResponse); + assertSame(searchResponse, notTransformedResponse); + + // Now apply a pipeline + searchRequest = new SearchRequest().pipeline("p1"); + SearchResponse transformedResponse = searchPipelineService.transformResponse(searchRequest, searchResponse); + assertEquals(size, transformedResponse.getHits().getHits().length); + for (int i = 0; i < size; i++) { + assertEquals(2.0, transformedResponse.getHits().getHits()[i].getScore(), 0.0001f); + } + + expectThrows( + IllegalArgumentException.class, + () -> searchPipelineService.transformResponse(new SearchRequest().pipeline("p2"), searchResponse) + ); + } + + public void testGetPipelines() { + // + assertEquals(0, SearchPipelineService.innerGetPipelines(null, "p1").size()); + + SearchPipelineMetadata metadata = new SearchPipelineMetadata( + Map.of( + "p1", + new PipelineConfiguration( + "p1", + new BytesArray("{\"request_processors\" : [ { \"scale_request_size\": { \"scale\" : 2 } } ] }"), + XContentType.JSON + ), + "p2", + new PipelineConfiguration( + "p2", + new BytesArray("{\"response_processors\" : [ { \"fixed_score\": { \"score\" : 2 } } ] }"), + XContentType.JSON + ) + ) + ); + + // Return all when no ids specified + List pipelines = SearchPipelineService.innerGetPipelines(metadata); + assertEquals(2, pipelines.size()); + pipelines.sort(Comparator.comparing(PipelineConfiguration::getId)); + assertEquals("p1", pipelines.get(0).getId()); + assertEquals("p2", pipelines.get(1).getId()); + + // Get specific pipeline + pipelines = SearchPipelineService.innerGetPipelines(metadata, "p1"); + assertEquals(1, pipelines.size()); + assertEquals("p1", pipelines.get(0).getId()); + + // Get both pipelines explicitly + pipelines = SearchPipelineService.innerGetPipelines(metadata, "p1", "p2"); + assertEquals(2, pipelines.size()); + pipelines.sort(Comparator.comparing(PipelineConfiguration::getId)); + assertEquals("p1", pipelines.get(0).getId()); + assertEquals("p2", pipelines.get(1).getId()); + + // Match all + pipelines = SearchPipelineService.innerGetPipelines(metadata, "*"); + assertEquals(2, pipelines.size()); + pipelines.sort(Comparator.comparing(PipelineConfiguration::getId)); + assertEquals("p1", pipelines.get(0).getId()); + assertEquals("p2", pipelines.get(1).getId()); + + // Match prefix + pipelines = SearchPipelineService.innerGetPipelines(metadata, "p*"); + assertEquals(2, pipelines.size()); + pipelines.sort(Comparator.comparing(PipelineConfiguration::getId)); + assertEquals("p1", pipelines.get(0).getId()); + assertEquals("p2", pipelines.get(1).getId()); + } + + public void testValidatePipeline() throws Exception { + SearchPipelineService searchPipelineService = createWithProcessors(); + + ProcessorInfo reqProcessor = new ProcessorInfo("scale_request_size"); + ProcessorInfo rspProcessor = new ProcessorInfo("fixed_score"); + DiscoveryNode n1 = new DiscoveryNode("n1", buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNode n2 = new DiscoveryNode("n2", buildNewFakeTransportAddress(), Version.CURRENT); + PutSearchPipelineRequest putRequest = new PutSearchPipelineRequest( + "p1", + new BytesArray( + "{" + + "\"request_processors\": [{ \"scale_request_size\": { \"scale\" : 2 } }]," + + "\"response_processors\": [{ \"fixed_score\": { \"score\" : 2 } }]" + + "}" + ), + XContentType.JSON + ); + + // One node is missing a processor + expectThrows( + OpenSearchParseException.class, + () -> searchPipelineService.validatePipeline( + Map.of( + n1, + new SearchPipelinesInfo(List.of(reqProcessor, rspProcessor)), + n2, + new SearchPipelinesInfo(List.of(reqProcessor)) + ), + putRequest + ) + ); + + // Discovery failed, no infos passed. + expectThrows(IllegalStateException.class, () -> searchPipelineService.validatePipeline(Collections.emptyMap(), putRequest)); + + // Invalid configuration in request + PutSearchPipelineRequest badPutRequest = new PutSearchPipelineRequest( + "p1", + new BytesArray( + "{" + + "\"request_processors\": [{ \"scale_request_size\": { \"scale\" : \"banana\" } }]," + + "\"response_processors\": [{ \"fixed_score\": { \"score\" : 2 } }]" + + "}" + ), + XContentType.JSON + ); + expectThrows( + ClassCastException.class, + () -> searchPipelineService.validatePipeline( + Map.of( + n1, + new SearchPipelinesInfo(List.of(reqProcessor, rspProcessor)), + n2, + new SearchPipelinesInfo(List.of(reqProcessor, rspProcessor)) + ), + badPutRequest + ) + ); + + // Success + searchPipelineService.validatePipeline( + Map.of( + n1, + new SearchPipelinesInfo(List.of(reqProcessor, rspProcessor)), + n2, + new SearchPipelinesInfo(List.of(reqProcessor, rspProcessor)) + ), + putRequest + ); + } + + public void testInfo() { + SearchPipelineService searchPipelineService = createWithProcessors(); + SearchPipelinesInfo info = searchPipelineService.info(); + assertTrue(info.containsProcessor("scale_request_size")); + assertTrue(info.containsProcessor("fixed_score")); + } +} diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 33b7e74ad7b51..cc760b38a82f7 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -204,6 +204,7 @@ import org.opensearch.search.SearchService; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.fetch.FetchPhase; +import org.opensearch.search.pipeline.SearchPipelineService; import org.opensearch.search.query.QueryPhase; import org.opensearch.snapshots.mockstore.MockEventuallyConsistentRepository; import org.opensearch.tasks.TaskResourceTrackingService; @@ -2077,7 +2078,17 @@ public void onFailure(final Exception e) { clusterService, actionFilters, indexNameExpressionResolver, - namedWriteableRegistry + namedWriteableRegistry, + new SearchPipelineService( + clusterService, + threadPool, + environment, + scriptService, + new AnalysisModule(environment, Collections.emptyList()).getAnalysisRegistry(), + namedXContentRegistry, + List.of(), + client + ) ) ); actions.put(