Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Parallel+Batch Ingestion for Neural Search #598

Closed
chishui opened this issue Feb 8, 2024 · 2 comments
Closed

[RFC] Parallel+Batch Ingestion for Neural Search #598

chishui opened this issue Feb 8, 2024 · 2 comments
Assignees
Labels
Features Introduces a new unit of functionality that satisfies a requirement

Comments

@chishui
Copy link
Contributor

chishui commented Feb 8, 2024

Problem Statements

When users utilize bulk API to ingest multiple documents in a single request, the OpenSearch ingest pipeline only handles one at a time in a sequential order (ref). The ingest pipeline is constituted by a collection of processors and processor is the computing unit of a pipeline. Most of the processors are pretty light weighted such as append, uppercase, lowercase, and to process multiple documents one after another or to process them in parallel would make no observable difference. But for time-consuming processors such as neural search processors, which by their nature require more time to compute, being able to run them in parallel could save user some valuable time. Apart from ingestion time, processors like neural search, can benefit from processing of batch documents together as it can reduce the requests to remote ML services via batch APIs to maximally avoid hitting rate limit restriction. (Feature request: opensearch-project/ml-commons#1840, rate limit example from OpenAI: https://platform.openai.com/docs/guides/rate-limits)

Due to the lack of parallel ingesting and batch ingestion capabilities in data ingestion flow, in this doc we propose two solutions to address them.

Solutions

Option 1: A Batchable Ingest Pipeline (Preferred)

Use Case Walk Through

User creates a batch pipeline with request example shown below:

PUT _ingest/pipeline/my-pipeline
{
  "description": "This pipeline processes student data",
  "settings": {
      "batch_enable": true, # enable batch processing in this pipeline
      "maximum_batch_size": 3 # maximum size of documents in a batch
  },
  "processors": [
    {
      "set": {
        "description": "Sets the graduation year to 2023",
        "field": "grad_year",
        "value": 2023
      }
    },
    {
      "text_embedding": {
        "model_id": "bQ1J8ooBpBj3wT4HVUsb",
        "field_map": {
          "passage_text": "passage_embedding"
        },
        "document_batching": {
          "enable": true, # false will use default iterative batch behavior
          "maximum_batch_size": 2 # processor defines it's own maximum batch size which can't be large than one defined in pipeline, if not defined, will use pipeline's value
        }
      }
    }
  ]
}

Later, user uses bulk API to ingest multiple documents through the batch pipeline. Since text_embedding processor has enabled batch capability, it can send documents from bulk API to ML node in one request. set processor doesn't have customized batch operation, documents are processed one by one in the processor as default behavior.

Proposal

An ingest pipeline is constructed by a list of processors and a single document could flow through each processor one by one before it can be stored into index. Currently, both pipeline and processor can only handle one document each time and even if with bulk API, documents are iterated and handled in sequential order. As shown in figure 1, to ingest doc1, it would firstly flow through ingest pipeline 1, then through pipeline 2. Then, the next document would go through both pipeline.
Figure 1

To support batch processing of documents, we'll create a batchable pipeline in which, instead of single document, multiple documents can flow through the a pipeline and its processors. The batch pipeline option should be configurable by user and they need to explicitly enable it. We will provide a default implementation in Processor interface to iteratively process document so that most of the processors don't need to make change and only if there's necessity for them to process multiple documents in parallel (e.g. text embedding processor), they can have their own implementation, otherwise, even receiving documents altogether, they default to process them one by one.

Figure 2

To batch process documents, user need to use bulk API to pass multiple documents. And if the document count exceeds the maximum_batch_size value, documents would be split to subsets so that each subset will satisfy the maximum batch size requirement. If the document count is less than the maximum_batch_size , they will be batched and processed.

Work need to be done:

Support new setting in pipeline API

Add a new pipeline field settings.batch_enabled and settings.maximum_batch_size in pipeline API. Support batch settings in neural search processors.

New batch function in both Pipeline and Processor

In Pipeline class, add batchExecute to consume a collection of IngestDocument. In Processor class, add batchExecute to consume a collection of IngestDocument, its default behavior is to iterate the documents and execute them one by one. For neural search processors, they need to implement batchExecute and combine text from multiple documents and send to ml-commons for inferencing.

Ingest flow logic change

Current logic of the ingestion flow of documents can be shown from the pseudo code below:

for (document in documents) {
    for (pipeline in pipelines) {
        for (processor in pipeline.processors) {
            document = processor.execute(document)
        }
    }
}

We'll change the flow to logic shown below if the pipeline has enable the batch option.

for (pipeline in pipelines) {
    for (processor in pipeline.processors) {
        documents = processor.batchExecute(documents)
    }
}

Pros and Cons

Pros

  1. Batch ingestion capability is enabled to all processors. Processors can determine whether they can benefit from parallelization and have their own implementation. Otherwise, they can just use the default behavior.
  2. Better performance. Processors can process documents in parallel to minimize the latency without other latency overhead comparing to the other option.
  3. User has full control. User can simply enable or disable the batch option on a pipeline. They can control what documents and how many of them to be batched through bulk API.
  4. Risk is under control as user can fallback to original non-batch option. We can carefully use the batch flow only when pipeline has batch option enabled. And if it's disabled, we can fallback to previous logic.

Cons

  1. Requires larger effort.
  2. It's still risky as it might impact all pipelines and processors. The proposal requires changes in processor interface and the ingest flow which could impact all processors and all ingestion logic. So we need to be very careful to make sure the original flow is untouched and user can fallback to it.
  3. Can only work with bulk API

Option 2: Caching Documents in Neural Search Processors

Use Case Walk Through

User creates a pipeline with text_embedding processor enabling document caching with parameter shown below:

PUT _ingest/pipeline/my-pipeline
{
  "description": "This pipeline processes student data",
  "processors": [
    {
      "set": {
        "description": "Sets the graduation year to 2023",
        "field": "grad_year",
        "value": 2023
      }
    },
    {
      "text_embedding": {
        "model_id": "bQ1J8ooBpBj3wT4HVUsb",
        "field_map": {
          "passage_text": "passage_embedding"
        },
        "document_batching": {
          "enable": true,
          "batch_size": 3,
          "timeout": 500
        }
      }
    }
  ]
}

After pipeline is created, user can start to ingest documents using single ingest API or bulk API. Let's say user ingests three documents in separate requests and all these requests are handled by the same OpenSearch node, same pipeline, and same model, if they are sent within 500ms, after the third request is sent, user can receive responses for all three.

Proposal

To limit the impact of the change and to only focus on inference processors which can benefit the most out of the batch solution, this proposal tries to solve the problem from within neural search processors.

Processor can process one document each time. To batch documents without changing interfaces, we need to cache them. We'll create a BatchManager which caches the documents in a cache queue and until these conditions are met:

  1. documents in queue reaches "batch_size" limit.
  2. timeout windows has been passed, then queue is flushed and documents are batched and sent to ML node for inference.

A cache queue will be created for a unique “pipeline_id + model_id” and documents are batched in neural search processors per model and per pipeline. Note that, although rare, but in case user configures multiple neural search processors in the same pipeline and point to the same model, if they set different batch sizes in them, they might see a mix of batch size in request to ML nodes.

Cache in Neural Search vs in ml-commons

In Neural Search

In this option, document caching logic is maintained in neural search package and triggered by neural search ingest processors.

Pros:
  1. It only impacts ingestion flow.
  2. Fine-grained control on batch size.
Cons:
  1. Can’t batch requests from other flows.
  2. If other flows need to adopt the same caching solution, additional work is needed.
In ml-commons

As ml-commons can be invoked not only by ingestion flow, but also by OpenSearch core for other flows, caching data here is not purely a batch ingestion solution but more of a ML client side caching solution. It can ensure data are always batched before sending to ML node to inference.

Pros:
  1. Can maximally batch data.
  2. Future ML logic can benefit from this batch logic for free.
Cons:
  1. The cache solution might not be ideal for some flows like search which is usually latency sensitive, the additional latency overhead can be unbearable.
  2. It could impact all ML logic.

Caching Solution

Local Cache

The local cache solution, each OpenSearch node will have the cache the data in memory and only the documents reaching to the same host can be cached.

Distributed Cache

In this solution, we need dedicated cache platform hosted separately in a cache fleet. All OpenSearch nodes will reach out to this cache fleet to store and fetch data. This solution can ensure that even documents are routed to different OpenSearch nodes to process, they can be stored in the same cache entry and batched globally. It can also decouple the OpenSearch nodes with ML nodes. The two most popular cache solutions are: Memcached and Redis. And Redis is more favourable as it features advanced data structures and provides durability through its cluster.

However, distribute cache also brings significant complexity to our logic. Firstly, nodes would now compete to process cached documents and we might need to add distributed lock to ensure documents are only processed once (similar to SQS). Secondly, the results of the inference should also be stored to cache and nodes need to keep polling for result so that they can continue the ingest process.

Which Caching Solution to Use

Due to the complexity of distributed cache and the additional infrastructure set up it requires, we can start from local cache solution and reconsider distributed cache option later.

Work need to be done:

Support new settings in neural search processors

Users can set batch parameters for inference processors when they create pipeline:

{
    "document_batching": {
        "enable": true,
        "batch_size": 3,
        "timeout": 500
    }
}

The new settings need to be passed to processors to consume.

Create BatchManager

Create a new BatchManager class which can:

  1. Manage data queue
  2. Enqueue data and call data consumer when batch count condition is met
  3. Timing data in queue and call data consumer when timeout value is met

Processors update

Instead of calling ml-commons to inference, call BatchManager to cache document. Provide batch document inference logic and dispatch logic.

Pros and Cons

Pros

  1. It requires less efforts and less code touched
  2. If done properly, BatchManager could be used as a general batch solution for all processors
  3. Less risky as it only impacts neural search processors
  4. Can work for both single ingest API and bulk API. However, if user has many OpenSearch nodes, bulk API is recommended to enable maximum throughput.

Cons

  1. Higher latency and lower throughput. User will see higher latency for ingestion requests as some of them are not processed immediately but cached to be processed later. However, this disadvantage can be mitigated if user uses bulk request and the ingest documents in the bulk requests always match batch_size setting.
  2. In memory data out of request lifecycle. We’d need to store data out of the request lifecycle and we need to be careful to clean data on time and not causing memory leak.
@model-collapse
Copy link
Collaborator

We would like to hear more voice from the neural search audience since some options may trigger the contribution to OpenSearch core. If the first option is chosen, we will move the RPC link to opensearch repo and continue the discussion there.

@vamshin vamshin added Features Introduces a new unit of functionality that satisfies a requirement and removed untriaged labels Mar 21, 2024
@chishui
Copy link
Contributor Author

chishui commented May 28, 2024

Batch ingestion feature has completed. Parallel ingestion is out of the scope for now.

@chishui chishui closed this as completed May 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Features Introduces a new unit of functionality that satisfies a requirement
Projects
None yet
Development

No branches or pull requests

3 participants