Skip to content

Commit

Permalink
Initial search pipelines implementation
Browse files Browse the repository at this point in the history
This commit includes the basic features of search pipelines
(see opensearch-project/search-processor#80).

Search pipelines are modeled after ingest pipelines and provide a
simple, clean API for components to modify search requests and
responses.

With this commit we can:

1. Can create, retrieve, update, and delete search pipelines.
2. Transform search requests and responses by explicitly referencing a
   pipeline.

Later work will include:

1. Adding an index setting to specify a default search pipeline.
2. Allowing search pipelines to be defined within a search request (for
   development/testing purposes, akin to simulating an ingest
   pipeline).
3. Adding a collection of search pipeline processors to support common
   useful transformations. (Suggestions welcome!)

Signed-off-by: Michael Froh <froh@amazon.com>
  • Loading branch information
msfroh committed Mar 9, 2023
1 parent f79b4dc commit 652ea1b
Show file tree
Hide file tree
Showing 65 changed files with 3,977 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Disallow multiple data paths for search nodes ([#6427](https://github.com/opensearch-project/OpenSearch/pull/6427))
- [Segment Replication] Allocation and rebalancing based on average primary shard count per index ([#6422](https://github.com/opensearch-project/OpenSearch/pull/6422))
- Add 'base_path' setting to File System Repository ([#6558](https://github.com/opensearch-project/OpenSearch/pull/6558))
- Add initial search pipelines ([#dummy](https://github.com/opensearch-project/OpenSearch/pull/dummy))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ public class RestHighLevelClient implements Closeable {
private final IngestClient ingestClient = new IngestClient(this);
private final SnapshotClient snapshotClient = new SnapshotClient(this);
private final TasksClient tasksClient = new TasksClient(this);
private final SearchPipelinesClient searchPipelinesClient = new SearchPipelinesClient(this);

/**
* Creates a {@link RestHighLevelClient} given the low level {@link RestClientBuilder} that allows to build the
Expand Down Expand Up @@ -354,6 +355,10 @@ public final TasksClient tasks() {
return tasksClient;
}

public final SearchPipelinesClient searchPipelines() {
return searchPipelinesClient;
}

/**
* Executes a bulk request using the Bulk API.
* @param bulkRequest the request
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client;

import org.opensearch.action.ActionListener;
import org.opensearch.action.search.DeleteSearchPipelineRequest;
import org.opensearch.action.search.GetSearchPipelineRequest;
import org.opensearch.action.search.GetSearchPipelineResponse;
import org.opensearch.action.search.PutSearchPipelineRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;

import java.io.IOException;
import java.util.Collections;

import static java.util.Collections.emptySet;

public final class SearchPipelinesClient {
private final RestHighLevelClient restHighLevelClient;

SearchPipelinesClient(RestHighLevelClient restHighLevelClient) {
this.restHighLevelClient = restHighLevelClient;
}

/**
* Add a pipeline or update an existing pipeline.
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public AcknowledgedResponse putPipeline(PutSearchPipelineRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(
request,
SearchPipelinesRequestConverters::putPipeline,
options,
AcknowledgedResponse::fromXContent,
emptySet()
);
}

/**
* Asynchronously add a pipeline or update an existing pipeline.
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return cancellable that may be used to cancel the request
*/
public Cancellable putPipelineAsync(
PutSearchPipelineRequest request,
RequestOptions options,
ActionListener<AcknowledgedResponse> listener
) {
return restHighLevelClient.performRequestAsyncAndParseEntity(
request,
SearchPipelinesRequestConverters::putPipeline,
options,
AcknowledgedResponse::fromXContent,
listener,
emptySet()
);
}

/**
* Get an existing pipeline.
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public GetSearchPipelineResponse getPipeline(GetSearchPipelineRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(
request,
SearchPipelinesRequestConverters::getPipeline,
options,
GetSearchPipelineResponse::fromXContent,
Collections.singleton(404)
);
}

/**
* Asynchronously get an existing pipeline.
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return cancellable that may be used to cancel the request
*/
public Cancellable getPipelineAsync(
GetSearchPipelineRequest request,
RequestOptions options,
ActionListener<GetSearchPipelineResponse> listener
) {
return restHighLevelClient.performRequestAsyncAndParseEntity(
request,
SearchPipelinesRequestConverters::getPipeline,
options,
GetSearchPipelineResponse::fromXContent,
listener,
Collections.singleton(404)
);
}

/**
* Delete an existing pipeline.
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public AcknowledgedResponse deletePipeline(DeleteSearchPipelineRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(
request,
SearchPipelinesRequestConverters::deletePipeline,
options,
AcknowledgedResponse::fromXContent,
emptySet()
);
}

/**
* Asynchronously delete an existing pipeline.
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return cancellable that may be used to cancel the request
*/
public Cancellable deletePipelineAsync(
DeleteSearchPipelineRequest request,
RequestOptions options,
ActionListener<AcknowledgedResponse> listener
) {
return restHighLevelClient.performRequestAsyncAndParseEntity(
request,
SearchPipelinesRequestConverters::deletePipeline,
options,
AcknowledgedResponse::fromXContent,
listener,
emptySet()
);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client;

import org.apache.hc.client5.http.classic.methods.HttpDelete;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.classic.methods.HttpPut;
import org.opensearch.action.search.DeleteSearchPipelineRequest;
import org.opensearch.action.search.GetSearchPipelineRequest;
import org.opensearch.action.search.PutSearchPipelineRequest;

import java.io.IOException;

final class SearchPipelinesRequestConverters {
private SearchPipelinesRequestConverters() {}

static Request putPipeline(PutSearchPipelineRequest putPipelineRequest) throws IOException {
String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_search/pipeline")
.addPathPart(putPipelineRequest.getId())
.build();
Request request = new Request(HttpPut.METHOD_NAME, endpoint);

RequestConverters.Params params = new RequestConverters.Params();
params.withTimeout(putPipelineRequest.timeout());
params.withClusterManagerTimeout(putPipelineRequest.clusterManagerNodeTimeout());
request.addParameters(params.asMap());
request.setEntity(RequestConverters.createEntity(putPipelineRequest, RequestConverters.REQUEST_BODY_CONTENT_TYPE));
return request;
}

static Request deletePipeline(DeleteSearchPipelineRequest deletePipelineRequest) {
String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_search/pipeline")
.addPathPart(deletePipelineRequest.getId())
.build();
Request request = new Request(HttpDelete.METHOD_NAME, endpoint);

RequestConverters.Params parameters = new RequestConverters.Params();
parameters.withTimeout(deletePipelineRequest.timeout());
parameters.withClusterManagerTimeout(deletePipelineRequest.clusterManagerNodeTimeout());
request.addParameters(parameters.asMap());
return request;
}

static Request getPipeline(GetSearchPipelineRequest getPipelineRequest) {
String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_search/pipeline")
.addCommaSeparatedPathParts(getPipelineRequest.getIds())
.build();
Request request = new Request(HttpGet.METHOD_NAME, endpoint);

RequestConverters.Params parameters = new RequestConverters.Params();
parameters.withClusterManagerTimeout(getPipelineRequest.clusterManagerNodeTimeout());
request.addParameters(parameters.asMap());
return request;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client;

import org.opensearch.action.search.DeleteSearchPipelineRequest;
import org.opensearch.action.search.GetSearchPipelineRequest;
import org.opensearch.action.search.GetSearchPipelineResponse;
import org.opensearch.action.search.PutSearchPipelineRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.search.pipeline.Pipeline;

import java.io.IOException;

public class SearchPipelinesClientIT extends OpenSearchRestHighLevelClientTestCase {

public void testPutPipeline() throws IOException {
String id = "some_pipeline_id";
XContentBuilder pipelineBuilder = buildSearchPipeline();
PutSearchPipelineRequest request = new PutSearchPipelineRequest(
id,
BytesReference.bytes(pipelineBuilder),
pipelineBuilder.contentType()
);
createPipeline(request);
}

private static void createPipeline(PutSearchPipelineRequest request) throws IOException {
AcknowledgedResponse response = execute(
request,
highLevelClient().searchPipelines()::putPipeline,
highLevelClient().searchPipelines()::putPipelineAsync
);
assertTrue(response.isAcknowledged());
}

public void testGetPipeline() throws IOException {
String id = "some_pipeline_id";
XContentBuilder pipelineBuilder = buildSearchPipeline();
PutSearchPipelineRequest request = new PutSearchPipelineRequest(
id,
BytesReference.bytes(pipelineBuilder),
pipelineBuilder.contentType()
);
createPipeline(request);

GetSearchPipelineRequest getRequest = new GetSearchPipelineRequest(id);
GetSearchPipelineResponse response = execute(
getRequest,
highLevelClient().searchPipelines()::getPipeline,
highLevelClient().searchPipelines()::getPipelineAsync
);
assertTrue(response.isFound());
assertEquals(1, response.pipelines().size());
assertEquals(id, response.pipelines().get(0).getId());
}

public void testDeletePipeline() throws IOException {
String id = "some_pipeline_id";
XContentBuilder pipelineBuilder = buildSearchPipeline();
PutSearchPipelineRequest request = new PutSearchPipelineRequest(
id,
BytesReference.bytes(pipelineBuilder),
pipelineBuilder.contentType()
);
createPipeline(request);

DeleteSearchPipelineRequest deleteRequest = new DeleteSearchPipelineRequest(id);
AcknowledgedResponse response = execute(
deleteRequest,
highLevelClient().searchPipelines()::deletePipeline,
highLevelClient().searchPipelines()::deletePipelineAsync
);
assertTrue(response.isAcknowledged());
}

private static XContentBuilder buildSearchPipeline() throws IOException {
XContentType xContentType = randomFrom(XContentType.values());
XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent());
return buildSearchPipeline(pipelineBuilder);
}

private static XContentBuilder buildSearchPipeline(XContentBuilder builder) throws IOException {
builder.startObject();
{
builder.field("description", "a pipeline description");
builder.startArray(Pipeline.REQUEST_PROCESSORS_KEY);
{
builder.startObject().startObject("filter_query");
{
builder.startObject("query");
{
builder.startObject("term");
{
builder.field("field", "value");
}
builder.endObject();
}
builder.endObject();
}
builder.endObject().endObject();
}
builder.endArray();
}
builder.endObject();
return builder;
}
}
Loading

0 comments on commit 652ea1b

Please sign in to comment.