Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Pull-based indexing #10610

Closed
msfroh opened this issue Oct 13, 2023 · 4 comments
Closed

[RFC] Pull-based indexing #10610

msfroh opened this issue Oct 13, 2023 · 4 comments
Labels
enhancement Enhancement or improvement to existing feature or request Indexing Indexing, Bulk Indexing and anything related to indexing untriaged

Comments

@msfroh
Copy link
Collaborator

msfroh commented Oct 13, 2023

This is a very rough initial-draft RFC based on my talk (rant?) at the Unconference at OpenSearchCon 2023. Big thanks to @huyaboo and @mansu for their Unconference talks and subsequent discussion.

Problem

OpenSearch currently has a push-based indexing model. Clients push updates to OpenSearch and they are immediately routed to the appropriate primary shards.

The current push-based indexing model has several problems:

  1. Coordinators must be able to push updates to specific primary shards and receive acknowledgement. If a primary shard is temporarily unavailable (node failure, stop-the-world GC, network partition, primary under duress, etc.) the update fails, leading to a drop in availability for indexing APIs.
  2. Coordinators must be able to locate every possible primary shard to route documents appropriately, which is shared state that must be agreed upon by all possible coordinators, which relies on propagation from cluster managers.
  3. It's essentially impossible to reshard without downtime (which is why the split operation requires a read-only index). Do you ever feel frustrated when you try to log into your online bank on a Sunday morning and they're "down for maintenance"? This isn't the 1990s. We have known how to scale without downtime for a couple of decades. I was on a team of 6 people who built a Lucene-based distributed search service that could do live resharding in 2012 (as in we started and finished building it in 2012).

Proposal

In my dreams, I'd like to just build a modern version of the system that I helped build in 2012, but that's very different from where OpenSearch is now. We need a step-by-step migration path, and I believe pull-based indexing would be a first step.

This proposal assumes the existence of an external event stream (not to be confused with the still push-based Streaming Indexing API). Possible event stream implementations could include Apache Kafka, Amazon Kinesis, or others. (Back in early 2012, we built an event stream on top of S3, because Kinesis and Kafka didn't exist yet.) A callout is that the event stream is not a message queue (where receivers "dequeue" messages, removing them from the queue such that they can't be processed by others) -- it's more of an indexable buffer where multiple clients should be able to read past events (probably with some limits as to how far back they could go). Ideally, the event stream should support sharding (and live resharding), since we would like to be able to scale up the event queue too, but that's not a hard requirement for smaller installations.

Puller API

On the short term, I would like to add a new puller service running on every indexing-capable node that pulls updates for any primary shards that they may be hosting. Since OpenSearch should operate independent of any particular event stream implementation, I propose that we define a generic API like:

class ShardRange {
  final int shardStart;
  final int shardEnd;
}

class Updates {
  final List<IndexRequest> indexUpdates;
  final byte[] newCheckpoint;
}

Updates pullUpdates(ShardRange shardRange, byte[] checkpoint, int maxUpdates);

The above is rough, but probably a decent starting point.

The key ideas are:

  1. The shard (or the in-process puller service acting on behalf of the shard) should know which range of documents it should accept. This is based on the current sharding strategy. (If we split/merge shards, these ranges will be different on the new shards. The section below on live resharding goes into detail.) If we want to support pulling from event streams that don't support sharding, the shard should assume this is merely a suggestion and should still check the IndexRequests to see if they fit the current shard. (Currently, that means computing the doc ID hash or looking at an explicit _routing parameter.) Anything outside the shard's range can be discarded -- it's excess I/O, but we don't do the heavy indexing work. Alternatively, the event stream API implementation (discussed in the next section) could discard them (still probably involves pulling updates onto the node just to discard them, but you may be able to skip some serialization steps).
  2. There is a generic "checkpoint" that is essentially a pointer into the event stream. The shard asks the event stream for updates starting from one pointer, and the event stream returns a pointer that will get the next set of updates. The meaning of the checkpoint is opaque to OpenSearch. (I went with byte[] to represent this, but it's not a hard requirement.)
  3. The shard should be able to set a limit on the number of updates that it can accept in each iteration. This means that we don't need to worry about overwhelming the shard with updates, since it will only pull at a rate that it can handle.

Essentially, the puller service would fetch updates and invoke the existing in-process indexing APIs to apply the updates. Document and segment replication would do their thing to propagate the updates out to replicas.

How to implement the event stream API

My initial thought was that we could do something plugin-based, kind of like repositories. Then I realized this could be a shining beacon of a use-case for extensions. Essentially, users could configure an extension for whatever event stream they want to pull from and the extension could take care of fetching (either synchronously or concurrently buffering) updates from the event stream. In practice, we specifically want the event stream API to be independent of the OpenSearch version, since this model gives us a cleaner path toward zero-downtime upgrades (see below). So, extensions feel like a great fit. (Plugins wouldn't be terrible, but would still have the messy "recompile with every release" nonsense.)

Snapshots

A key piece of this is that snapshots should record the latest checkpoint associated with the indexed data for a shard. This allows us to recover from a total outage without loss of data (and if the event stream supports sufficient buffering, without an availability drop on the clients pushing to the event stream). A shard can recover from snapshot and then start processing updates from the checkpoint encoded in the snapshot to get caught up.

This would be amazing for cross-cluster replication. You could enable replication for an index on another cluster, that new cluster could restore the latest snapshot from the original cluster, then process updates from the event stream to get caught up and stay up-to-date. Essentially, cross-cluster replication would put zero load on the original cluster.

You could also scale your cluster under that model. Bring up a new, larger cluster, have all the shards restore from snapshots written by the underscaled cluster, get caught up, then you flip search traffic over to the new cluster.

You could do the same thing to upgrade your cluster (or update your opensearch.yml) with zero downtime and no shard-copying load. Bring up a new cluster, restore from snapshot, get caught up, flip traffic, shut down the old cluster.

tl;dr: host-to-host copying of shards is sooooo early 2000s. The cool kids restore from snapshot and get caught up.

Live resharding

I promised live resharding up top, but I see that as a longer-term effort. Essentially, we need a way to bring up an additional set of primary shards for an existing index, such that the shard ranges for the new shards are different from the original shards, but cover the full range. The new shards will restore from snapshot from any overlapping original shards, delete any documents outside the range (using a Lucene deleteByQuery), and use addIndexes to merge the shards.

This may be easiest to explain with some examples.

Suppose you have 1 shard and want to split it into 2. We'll bring up 2 new shards. Each will restore from snapshot from the original shard. The first shard will delete the second half of the total shard range (i.e. [0, Integer.MAX_VALUE]), while the second shard will delete the first half (i.e. [Integer.MIN_VALUE, -1]). Now each shard has half of the documents from the original shard (assuming a statistically even distribution over the range). Both shards have the original shard's snapshot checkpoint, and they resuming indexing from that point. Once they're caught up, we can flip search traffic over to the new shards and shut down the old shard.

The trickier scenario is when you have overlapping shards. Say you want to go from 2 shards to 3 shards. The first and third shard each restore from a single snapshot from the previous shards (since 0 < 1/3 < 1/2, and 1/2 < 2/3 < 1), but the middle shard needs to restore from both. This can be a slow operation, where it restores the first snapshot (which is 1/2 of the index), deletes the first 1/3 of the index (leaving 1/6 of the index), then it restores the second snapshot (which is also 1/2 of the index, for a total of 2/3 of the index), deletes the last 1/3 of the index, and then merges the two 1/6 of the index into the middle 1/3. Phew... Only now the hard part comes: we have two checkpoints possibly representing different points in the event stream. We should probably start from the earlier checkpoint (and need the event stream API to tell us which is the earliest), but we may go back and reprocess updates that are already in the index. In theory, they should always be the latest updates, so once we're caught up we don't go back in time, but we may introduce unnecessary updates (i.e. adds+deletes) that make log-based optimizations (i.e. if there are no deletes you can take some shortcuts) disappear.

The solution I helped build back in made this 2012 easy -- we had mandatory external versioning. So, if we reprocessed old updates for existing documents, we would simply discard them. OpenSearch currently processes updates without external versioning in a "last one to get in wins, things are always sequential, what are distributed systems?" model.

All may not be lost, though. We can a) allow resharding for arbitrary indices as long as the new shard count is a multiple of the old shard count, since that's analogous to the 1 -> 2 scenario described above. b) if the user can tell us that the index is inherently append-only (like logs), we don't care about versions. If we already have a matching doc, we skip the current one. (Of course, this assumes that we can compute unique doc IDs based on an IndexRequest.)

@msfroh msfroh added enhancement Enhancement or improvement to existing feature or request untriaged labels Oct 13, 2023
@noCharger noCharger added the Indexing Indexing, Bulk Indexing and anything related to indexing label Oct 13, 2023
@dblock
Copy link
Member

dblock commented Oct 13, 2023

This sounds "easy" :) Replace a push-based model where central control needs to have all the info by a pull-based one where pullers update central state as they pull work to do.

@reta
Copy link
Collaborator

reta commented Oct 13, 2023

This is interesting perspective, it looks a lot like connectors model (at least to me) where we just configure the source and connector starts polling / pulling the data in. There are three main questions that bother me:

  • the fact that the pulling has to be collocated with the primary shard it pulls data for, it has to be the part of the OpenSearch core (and follow all relocation / recovery / etc), correct?
  • you mentioned that extenstions could implement all this different pulling sources (essentially, being connector) but those are inherently out of the process, how does it play with shard placement awareness?
  • why you think Streaming Indexing won't be an acceptable alternative here since we would have a natural back-pressure and also could open direct connections (ideally, streaming ones with gRPC) to the nodes that have relevant primary shards?

@msfroh
Copy link
Collaborator Author

msfroh commented Oct 13, 2023

Thanks @reta!

the fact that the pulling has to be collocated with the primary shard it pulls data for, it has to be the part of the OpenSearch core (and follow all relocation / recovery / etc), correct?

The sneaky thing in the proposal (and what makes it probably much more complicated than it initially sounds) is that my real goal is to kill off the whole concept of primary shards and especially host-to-host shard relocation. Instead, you just have "writer" shards that pull updates from the event stream and write them to a Lucene index (where among other things, you should be able to have more than one writer for the same shard at the same time). (But that's a bigger topic, IMO.)

This hypothetical UpdatePullerService or whatever we want to call it would be part of OpenSearch Core. The UpdatePullerService would check to see what primary shards are on the current node and pull updates for those shards (by calling out to the registered connector). If a primary shard is added to a node, the UpdatePullerService starts pulling updates for that shard. If a primary shard is removed from a node, the UpdatePullerService stops pulling updates for that shard.

you mentioned that extenstions could implement all this different pulling sources (essentially, being connector) but those are inherently out of the process, how does it play with shard placement awareness?

Related to the above answer, the in-process UpdatePullerService knows what shards are on the current node and calls into the extension to retrieve updates from the event stream. The extension just fetches the updates it has been asked to fetch and returns them to the UpdatePullerService.

why you think Streaming Indexing won't be an acceptable alternative here since we would have a natural back-pressure and also could open direct connections (ideally, streaming ones with gRPC) to the nodes that have relevant primary shards?

That's still a push model, where the client needs to route the documents to one specific primary node. Yes -- we get backpressure. We still can't bring up a copy of the primary shard on a different box, have that get caught up, and flip over with zero downtime, though.

I might try drawing a picture of the architecture, since it is very different from what OpenSearch does now.

@nknize
Copy link
Collaborator

nknize commented Oct 18, 2023

My $0.02.

This proposal assumes the existence of an external event stream (not to be confused with the still push-based Streaming Indexing API)

The in-flight Streaming Index API implementation is just the first step to the original RFC. The goal of the Streaming Index API RFC is to do away with the _bulk push model altogether and move to a stream model (hence the name "Streaming Index API") such that the index/search service "picks" the documents off the stream. This also removes the need for a translog which is a massive achilles heel to Elasticsearch / OpenSearch.

Then I realized this could be a shining beacon of a use-case for extensions.

I think this is more a shining beacon for #5910 and the need to move OpenSearch from a monolithic server implementation into a distributed lucene API such that your proposal could be easily implemented using the OpenSearch core artifacts as a dependency and not depend on the superfluous CORBA-like marshall/unmarshall logic of protobuf.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Enhancement or improvement to existing feature or request Indexing Indexing, Bulk Indexing and anything related to indexing untriaged
Projects
None yet
Development

No branches or pull requests

5 participants