Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC]: Search Phase Results Processor #152

Closed
navneet1v opened this issue Apr 6, 2023 · 9 comments
Closed

[RFC]: Search Phase Results Processor #152

navneet1v opened this issue Apr 6, 2023 · 9 comments
Assignees
Labels
Enhancements Increases software capabilities beyond original client specifications Features Introduces a new unit of functionality that satisfies a requirement knn neural-search RFC vector search

Comments

@navneet1v
Copy link
Collaborator

navneet1v commented Apr 6, 2023

Introduction

This issues proposes a new Processor Interface for Search Pipeline which will run in between the Phases of Search. This will allow plugins to transform the results retrieved from one phase before it goes to next phase at the Coordinator Node Level.

Background[tl;Dr]

This RFC proposes a new set of APIs to manage Processors to transform Search Request and Responses in OpenSearch. The Search Pipeline will be used to create and define these processors. Example:

Creating Search Pipeline

// Create/update a search pipeline.
PUT /_search/pipeline/mypipeline
{
  "description": "A pipeline to apply custom synonyms, result post-filtering, an ML ranking model",
  "request_processors" : [
    {
      "external_synonyms" : {
        "service_url" : "https://my-synonym-service/"
      }
    },
    {
      "ml_ranker_bracket" : {
        "result_oversampling" : 2, // Request 2 * size results.
        "model_id" : "doc-features-20230109",
        "id" : "ml_ranker_identifier"
      }
    }
  ],
  "response_processors" : [
    {
      "result_blocker" : {
        "service_url" : "https://result-blocklist-service/"
      },
      "ml_ranker_bracket" : {
        // Placed here to indicate that it should run after result_blocker.
        // If not part of response_processors, it will run before result_blocker.
        "id" : "ml_ranker_identifier" 
      }
    }
  ]
}

// Return identifiers for all search pipelines.
GET /_search/pipeline

// Return a single search pipeline definition.
GET /_search/pipeline/mypipeline

// Delete a search pipeline.
DELETE /_search/pipeline/mypipeline

Search API Changes

// Apply a search pipeline to a search request.
POST /my-index/_search?search_pipeline=mypipeline
{
  "query" : {
    "match" : {
      "text_field" : "some search text"
    }
  }
}

// Specify an ad hoc search pipeline as part of a search request.
POST /my-index/_search

{
  "query" : {
    "match" : {
      "text_field" : "some search text"
    }
  },
  "pipeline" : {
    "request_processors" : [
      {
        "external_synonyms" : {
          "service_url" : "https://my-synonym-service/"
        }
      },
      {
        "ml_ranker_bracket" : {
          "result_oversampling" : 2, // Request 2 * size results
          "model_id" : "doc-features-20230109",
          "id" : "ml_ranker_identifier"
        }
      }
    ],
    "response_processors" : [
      {
        "result_blocker" : {
          "service_url" : "https://result-blocklist-service/"
        },
        "ml_ranker_bracket" : {
          // Placed here to indicate that it should run after result_blocker.
          // If not part of response_processors, it will run before result_blocker.
          "id" : "ml_ranker_identifier" 
        }
      }
    ]
  }
}

Index Settings

// Set default search pipeline for an existing index.
PUT /my-index/_settings
{
  "index" : {
    "default_search_pipeline" : "my_pipeline"
  }
}

// Remove default search pipeline for an index.
PUT /my-index/_settings
{
  "index" : {
    "default_search_pipeline" : "_none"
  }
}

// Create a new index with a default search pipeline.
PUT my-index
{
  "mappings" : {
    // ...index mappings...
  },
  "settings" : {
    "index" : {
      "default_search_pipeline" : "mypipeline",
      // ... other settings ...
    }
  }
}

Requirement

For Normalization and Score Combination feature, we need a way to do the Normalization of scores received for different sub-queries from different shards at Coordinator node, before we can start combing the scores. This needs to be done after the Query phase is completed and before Fetch Phase is started in a _search api call.

Solution

The proposed solution is to extend the Search Pipeline Processor Interface to create a new Processor Interface that can run between Phases. We will be onboarding Normalization use-case as the first use case for this processor which will run after Query Phase and Before Fetch Phase of Search Request.

Proposed Flow Chart

The below flow chart assumes that the processor is running after Query Phase and before Fetch phase for search_type=query_then_fetch which is a default search type. But none of the interface assumes that these are the only 2 phases in the OpenSearch.

NormalizationInNeuralSearch-Search Processor Flow (1)

Note: Above diagram can be updated via this link.

Proposed Interface (Recommended)

interface SearchPhaseResultsProcessor extends Processor {

    <Result extends SearchPhaseResult> void
            process(final SearchPhaseResults<Result> results, final SearchPhaseContext context);
  /**
    This function is called by Search Pipeline Service before invoking the processor.
  */          
   
   <Result extends SearchPhaseResult> boolean shouldRunProcessor(
        final SearchPhaseResults<Result> results, 
        final SearchPhaseContext context,
        final SearchPhaseNames beforePhase,
        final SearchPhaseNames nextPhase);
}

/**
Currently when we create phases we pass string as the phase name. This enum class
will allow us to define the phase names and use them at different places when required.
*/
// mark internal
public final enum class SearchPhaseNames {
// There are many more, I just added few here.
   QUERY_PHASE("query"), FETCH_PHASE("query"), DFS_QUERY_PAHSE("dfs_query");
   
   @Getter
   String name;
}

SearchPhaseNames enum class will provide necessary abstraction and proper naming convention for OpenSearch phase names.

Pros:

  1. The logic to run a Processor is abstracted with the processor and exposed via a well defined interface.

Cons:

  1. I don’t see specific cons of this approach. But we have to make SearchPhase class publically accessible which it is not currently. But we can get around this, if we can create SearchPhaseName enums which can be passed.

Proposed API

// Create/update a search pipeline.
PUT /_search/pipeline/my_pipeline
{
  "description": "A pipeline that adds a Normalization and Combination Transformer",
  "phase_results_processors" : [
    {
      "normalization-processor" : {
        "technique": "min-max", // there can be others techniques. I know this only for now.
      }
    }
  ]
}

// all other APIs remain same for making this pipeline as default pipeline etc.

Alternatives Interface:

The idea here is that the SearchPipeline Service before invoking the execute function of a processor will check whether the condition to run the processor is met or not. The way it checks is it calls the getBeforePhases and getAfterPhases function and checks validates phase which got recently completed is in the the BeforeList and next phase which will be run is in AfterList or not.

interface SearchPhaseResultsProcessor extends Procesor {

    <Result extends SearchPhaseResult> void
            process(final SearchPhaseResults<Result> results, final SearchPhaseContext context);
            
   /**
      Returns a list of phases, before which this processor should be run.
   */
   
   List<SearchPhaseNames> getBeforePhases();
    
   /**
      Returns a list of phases, after which this processor should be run.
   */ 
   List<SearchPhaseNames> getAfterPhases();
}

/**
Currently when we create phases we pass string as the phase name. This enum class
will allow us to define the phase names and use them at different places when required.
*/
public final enum class SearchPhaseNames {
// There are many more, I just added few here.
   QUERY_PHASE("query"), FETCH_PHASE("query"), DFS_QUERY_PAHSE("dfs_query");
   
   @Getter
   String name;
}

Alternatives To Search Pipeline:

Use Search Operation Listener to run code after Query phase is completed

The Search Operation Listeners works at shard level and not the coordinator node level. We need to do the normalization at Coordinator node. Hence rejecting this solution. Please check this, and this code reference. It comes in the code path when Query is getting executed at Shard Level.

Create a new Phase between Query and Fetch Phase

The high level idea here is to create a phase which runs in between the Query and Fetch phase which will do the Normalization.
Pros:

  1. No specific pros that I can think for this approach.

Cons:

  1. Currently there is not extension points in OpenSearch to create a phase, so we need to build everything from scratch.
  2. Problems will arrive in during implementation where code need to identify for which queries this new phase to run, and then we need implement some sophisticated logic to identify that.

Create a new Fetch Sub-phase

OpenSearch provides an extension where plugins can add Fetch subphases which will run at the end when all Core Subphases are executed. We can create a Fetch subphase that will do the normalization and score combination. But problem with this, as we have multiple sub-queries we need to change the interfaces to make sure that all the information required for Normalization needs to be passed in. This will result in duplicated information and multiple hacks to pass through the earlier fetch phases.
Pros:

  1. No new extension points needs to be created as adding new subphases in Fetch phase is already present.

Cons:

  1. Order of execution of fetch subphases is not consistent. It depends on which plugin got registered first and the fetch sub-phases of that plugin will run first. This will create inconsistency across clusters with different set of plugins. Code reference.
  2. There is a source subphase which gets the source info for all the docIds, running this before normalization will make OpenSearch get sources for document Ids which we will not be sending in response. Hence waste of computation.

References

  1. Feature Request: [FEATURE] : Search Phase Results Processor in Search Pipeline #139
  2. Search Pipeline RFC: [RFC] Search pipelines search-processor#80
  3. Search Pipeline PR: Initial search pipelines implementation OpenSearch#6587

Feedback Requested:

  1. We are naming this new Processor interface as SearchPhase Processor, because they run during the phases of Search. Please let us know your thoughts and any better name around this. Based on the comments in the PR Adding the SearchPhaseResultsProcessor interface in Search Pipeline OpenSearch#7283, we renamed the interface to SearchPhaseResultsProcessor as it operates on the SearchPhaseResults
  2. Any other comment to improve the interface or functionality.
@jmazanec15
Copy link
Member

@navneet1v This is very interesting!

A couple questions:

For normalization, can we associate a query type with the processor so that users dont have to manually specify the normalization pipeline anywhere in there query request? How would this look?

Will only one pipeline be allowed to run for each query? Like would it be possible to run one pipeline after query phase and then one pipeline after the fetch phase?

@jmazanec15
Copy link
Member

In:

// Create/update a search pipeline.
PUT /_search/pipeline/mypipeline
{
  "description": "A pipeline to apply custom synonyms, result post-filtering, an ML ranking model",
  "request_processors" : [
    {
      "external_synonyms" : {
        "service_url" : "https://my-synonym-service/"
      }
    },
    {
      "ml_ranker_bracket" : {
        "result_oversampling" : 2, // Request 2 * size results.
        "model_id" : "doc-features-20230109",
        "id" : "ml_ranker_identifier"
      }
    }
  ],

Why does ml_ranker_bracket appear in request and result processors?

@navneet1v
Copy link
Collaborator Author

navneet1v commented Apr 11, 2023

Why does ml_ranker_bracket appear in request and result processors?

@jmazanec15
This is an example I copied from the Search Pipelines proposal from here: opensearch-project/search-processor#80. I think it's more like an example and not an actual use case.

@navneet1v
Copy link
Collaborator Author

navneet1v commented Apr 12, 2023

@jmazanec15

For normalization, can we associate a query type with the processor so that users dont have to manually specify the normalization pipeline anywhere in there query request? How would this look?

Processors are part of pipeline, without processors pipeline is nothing. For normalization processor we will define the checks like query type check, phases check etc, which will make the Processor to run once those checks are passed.

Now should we attach a default pipeline(which contains a normalization processor) for a search request containing a specific query type, in my opinion ans is no. Here are some reasons behind that:

  1. Given that a normalization processor will evolve and it can have attributes in it, then for every attribute we need to come with default values. Which will result into what should be or should not be the right default value. Avoiding that is a great win for the project.
  2. While parsing the search request, we would need to update the search context and logic over there can be complex. Think of complexity in this case, if a customer has attached a pipeline with the query type then we need to parse the pipeline see if the normalization processor is present or not. If its not then update the pipeline which was created by customer. I don't think this is a right behavior. This has potential to add additional latency.

Will only one pipeline be allowed to run for each query?

Yes 1 pipeline can run on each _search request. I think your question is on the _search api call.

Like would it be possible to run one pipeline after query phase and then one pipeline after the fetch phase?
I think you want to ask can you run 1 processor after query phase and 1 after fetch phase. Please correct me that is not the case.

The ans to this question if you are asking about processor and not pipeline is Yes.

@navneet1v navneet1v added Enhancements Increases software capabilities beyond original client specifications Features Introduces a new unit of functionality that satisfies a requirement RFC and removed untriaged labels Apr 12, 2023
@jmazanec15
Copy link
Member

  1. Given that a normalization processor will evolve and it can have attributes in it, then for every attribute we need to come with default values. Which will result into what should be or should not be the right default value. Avoiding that is a great win for the project.

I think good defaults make feature easier to use - which make the project easier to use - which is a win. The fewer actions the user needs to take to meet their requirements, the better.

  1. While parsing the search request, we would need to update the search context and logic over there can be complex. Think of complexity in this case, if a customer has attached a pipeline with the query type then we need to parse the pipeline see if the normalization processor is present or not. If its not then update the pipeline which was created by customer. I don't think this is a right behavior. This has potential to add additional latency.

Yes, this might be complex. But I still think we should avoid a user having to maintain a pipeline in order for normalization to work. In other words, normalization should be an out of the box feature - a user shouldnt have to create a pipeline to use it.

Between the following 2 options, I think option 1 is a lot more intuitive user experience.

Option 1:

PUT <index-name>/_search
{
    "query": {
        "<new-compound-query-clause>": {
            "queries": [
                { /* neural query */ }, // this is added for an example
                { /* standard text search */ } // If a user want to boost some scores or update 
               // the scores he need to go ahead and do it in this query clause
            ],
            "normalization-technique" : "min-max" // min-max etc.., Optional Object
            "combination" : {
                "algorithm" : "harmonic-mean", // all the values defined in #3 above, interleave, harmonic mean etc
                "parameters" : {
                    // list of all the parameters that can be required for above algo
                    "weights" : [0.4, 0.7] // a floating pointing array which can be used in the algorithm
                }
            }
        }
    }
}

Option 2:

PUT /_search_processing/pipeline/my_pipeline
{
  "description": "A pipeline that adds a Normalization and Combination Transformer",
  "searchphase_processors" : [
    {
      "normalization-processor" : {
        "technique": "min-max",
      }
    }
  ]
}

PUT <index-name>/_search?pipeline=my_pipeline
{
    "query": {
        "<new-compound-query-clause>": {
            "queries": [
                { /* neural query */ }, // this is added for an example
                { /* standard text search */ } // If a user want to boost some scores or update 
               // the scores he need to go ahead and do it in this query clause
            ]
        }
    }
}

Im wondering if we could somehow add parameters in the query that would automatically add this pipeline to the request.

The ans to this question if you are asking about processor and not pipeline is Yes.

Got it. As a followup, if I have a pipeline where I want a couple processors to run after the query phase and a couple to run after the fetch phase, what would the required ordering be in the pipeline?

@navneet1v
Copy link
Collaborator Author

@jmazanec15

I think good defaults make feature easier to use - which make the project easier to use - which is a win. The fewer actions the user needs to take to meet their requirements, the better.

This is not always the case and would like understand this from exact requirement standpoint of customer what defaults are needed. Do we need a default pipeline to be added when the new query type is present or not? One reason to not give default pipeline or processor is, the new query clause that we are creating we don't want to make it only usable only for normalization. The output of the query clause provides a way for customers to get separate scores of the queries at coordinator node level and then they can apply any transformation on top of that.

On on the option of using 1 & 2, customers already have a way in which they can specify the pipeline in the _search request, by making a small change in Option 1.

POST /my-index/_search

{
  "query": {
        "<new-compound-query-clause>": {
            "queries": [
                { /* neural query */ }, // this is added for an example
                { /* standard text search */ } // If a user want to boost some scores or update 
               // the scores he need to go ahead and do it in this query clause
            ]
  },
  "pipeline" : {
    "searchphase_processors" : [
      {
        "normalization-processor" : {
          "technique": "min-max",
          "combination" : {
                "algorithm" : "harmonic-mean", // all the values defined in #3 above, interleave, harmonic mean etc
                "parameters" : {
                              // list of all the parameters that can be required for above algo
                              "weights" : [0.4, 0.7] // a floating pointing array which can be used in the algorithm
                }
            }
        }
      }
    ]
  }
}



@navneet1v
Copy link
Collaborator Author

Got it. As a followup, if I have a pipeline where I want a couple processors to run after the query phase and a couple to run after the fetch phase, what would the required ordering be in the pipeline?

PUT /_search_processing/pipeline/my_pipeline
{
  "description": "A pipeline that adds a Normalization and Combination Transformer",
  "searchphase_processors" : [
    {
      "processor1" : { // runs after fetch phase
      }
    },
    {
      "processor2" : { // runs after query phase and before fetch phase
      }
    },
    {
      "processor3" : { // runs after dfs phase and before query phase
      }
    }

  ]
}

Consider a pipeline like this, in this case the SearchPipeline Service after every phase will invoke the shouldRunProcessor function of each processor to see if it can run that processor. These function are invoked in the order of how processors are defined in the pipeline in array : searchphase_processors

processor1 will only run once the fetch phase is completed. This is just an example, customer might want to use search response processors instead of this. But just for example sake.
processor2: will run when the query phase is completed and next phase is fetch phase only.
processor3: will run once the dfs phase is done and query phase is the next phase.

I hope this ans the question.

@navneet1v
Copy link
Collaborator Author

Updated the description. Based on the comments on the PR, the processor interface is renamed to SearchPhaseResultsProcessor. The idea is this processor operators on SearchPhaseResults, hence SearchPhaseResultsProcessor.

@navneet1v
Copy link
Collaborator Author

Resolving this issue as the feature is released in 2.10 version of OpenSearch

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Enhancements Increases software capabilities beyond original client specifications Features Introduces a new unit of functionality that satisfies a requirement knn neural-search RFC vector search
Projects
None yet
Development

No branches or pull requests

3 participants