Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Support shard promotion. #2212

Closed
4 tasks done
mch2 opened this issue Feb 22, 2022 · 24 comments
Closed
4 tasks done

[Segment Replication] Support shard promotion. #2212

mch2 opened this issue Feb 22, 2022 · 24 comments

Comments

@mch2
Copy link
Member

mch2 commented Feb 22, 2022

Support failover scenario of a replica re-assigned as a primary.

Right now this flow fails because of how we wire up the engine as readOnly by setting IndexWriter to null.

Issues remaining (updated 7/22):

@hydrogen666
Copy link

How you guys handle duplicate file name afer replica shard promition?
For example, we have a index with 1 shard 2 replica configuration, it has two replicas A and A', primary shard goes down,master choose one replica A to be promotted to primary shard, replica A contains segment _1, _2, _3 before promotion. But another replica A' is much newer, it contains segment _1, _2, _3, _4. During A promotion progree , it need to run tlog recovery progress, it will generate segment _4' which is not same as _4 in replica A'. How you guys handle this scenario?

@mch2
Copy link
Member Author

mch2 commented Apr 8, 2022

Hi @hydrogen666 thanks for the question!

In the scenario you describe once the tlog recovery completes on A it would publish a new checkpoint notification to replicas. The published checkpoint data also includes a primaryTerm field. When this is incremented the replicas would process this checkpoint and fetch the new segments from the primary, even if the latest sequence numbers match. The copy process starts with replicas fetching a list of the latest segment metadata available the primary and then computing a diff against its own that includes different and missing files. In this case A' will recognize its version of _4.si is different and replace it with the primary's version.

With that said I haven't yet put together a failover section to be included in the proposal on what failover will look like with segrep. If you have suggestions on how you think this should work please share them, thanks!

@hydrogen666
Copy link

hydrogen666 commented Apr 9, 2022

Hi @mch2
Yes, A' can recognize its _4.si is not same as the newly generated _4.si by new primary. But how to handle if someone's IndexSearcher on A' is still holding the origin _4.si ( scroll request will hold a segment for a long time) ?

I can only think of theses ways to deal with it:

  1. Switch a new Directory and new SearcherManager after the replica found primaryTerm is bumped ?

  2. Send a _{n} (n is larger than the largest segment counter in current SegmentInfos) to master node before segment replication progress so that master can tell the newly promoted Primary Shard not to generate any segment less than _{n}. To reduce the pressure of master node, we don't need to send this information every time. For example, if the max segment is _4.si in primary's current SegmentInfos, we can send _rw(or 1004 in decimal) to master node. After segment increases to _rw.si, we send _1jo(or 2004 in decimal) to master node.

  3. Before choosing replica doing promotion progress, master node must ask all replicas whose replication state is newest.

@mikemccand
Copy link

These are awesome questions about segment replication! This is indeed a challenging situation for segrep, but is solvable with one of the three proposed options.

Lucene is fundamentally a write-once (at the OS filesystem layer) index, so we must ensure that no node (primary or replica that was just newly promoted as primary) ever attempts to overwrite an existing index file on any of the existing/prior replicas.

Is solution 3) too hard to do? I would think distributed consensus algorithms could do the right thing here -- all replicas that are currently reachable could respond with how far they had replicated and we pick the furthest one to promote to the new primary? Any detached (network partitioning) replicas that come online too late must possibly delete "future" segment files before re-joining the cluster, which should be fine since those detached replicas are not in use...

Also note that scan/scroll is a weak implementation in ES/OS today, with the hard routing back to a single replica which might drop offline -- with segrep, any replica can accurately/precisely service the scan/scroll request (since all replicas share precise point-in-time views of the shard), and hold allocated (i.e. cannot delete, keep an open IndexReader/Searcher until the lease expires) a consistent point-in-time snapshot from primary.

With segrep, every replica can search accurate point-in-time snapshots of the index, unlike ES with its inefficient document replication today where every replica is searching "slightly" different point-in-time views of the index. This can make for very accurate pagination, versus today where users paging through results might easily see duplicates on page N+1, or might also miss results, eroding trust.

Failing that (option 3 is too hard), I think option 2 is best -- bump the next segment number to a number clearly larger than any replica might have already copied. Lucene uses a long to increment these segment numbers so we have tremendous headroom to "jump ahead" with near zero risk of running out of the 64 bit space.

@nknize
Copy link
Collaborator

nknize commented Apr 12, 2022

With segrep, every replica can search accurate point-in-time snapshots of the index

Noting here there's a Point in Time API design issue and corresponding PR to incrementally improve the scroll shortcomings in OpenSearch using PIT but it will most certainly need to be refactored under segrep.

Thinking out loud (haven't mulled this one as much) I like option 3 but it seems like a lot of coordination and we'll certainly want to benchmark? Option 2 might be an "easier" near term implementation and we can migrate to the coordinated approach in a follow on major release.

@andrross
Copy link
Member

Also thinking out loud, I do kind of like option 2 as a good guarantee that we'll never reuse a segment number no matter what weird distributed system edge case is encountered. We could add the logic to choose a new primary based on which replica has made the most progress as an optimization while still keeping the segment high water mark logic. It would allow the "choose farthest ahead replica" logic to be best effort as opposed to being critical for correctness.

@mch2
Copy link
Member Author

mch2 commented Apr 12, 2022

Thanks all for your thoughts here.

I like the idea of using both 2 and 3. I don't think implementing 3 will be all that hard because each replica already updates their local checkpoint and publishes an updated global checkpoint to cluster state after a copy event completes. I wonder if we could reuse that global checkpoint mechanism in reverse to fetch the furthest ahead replica.

With segrep, every replica can search accurate point-in-time snapshots of the index, unlike ES with its inefficient document replication today where every replica is searching "slightly" different point-in-time views of the index. This can make for very accurate pagination, versus today where users paging through results might easily see duplicates on page N+1, or might also miss results, eroding trust.

Thanks for calling this out. For this guarantee I'm assuming we will need to block primaries from opening a new reader until we have confirmed all replicas have received and opened up the latest copy? Our implementation isn't doing this right now and only initiates a copy event after the primary has refreshed. I wonder if this is something worth gating with an additional setting?

@anasalkouz
Copy link
Member

Is segment number is the only factor to decide what replica to promote? Do we need to take shard allocations, node load ..etc in our consideration?

@mikemccand
Copy link

With segrep, every replica can search accurate point-in-time snapshots of the index, unlike ES with its inefficient document replication today where every replica is searching "slightly" different point-in-time views of the index. This can make for very accurate pagination, versus today where users paging through results might easily see duplicates on page N+1, or might also miss results, eroding trust.

Thanks for calling this out. For this guarantee I'm assuming we will need to block primaries from opening a new reader until we have confirmed all replicas have received and opened up the latest copy? Our implementation isn't doing this right now and only initiates a copy event after the primary has refreshed. I wonder if this is something worth gating with an additional setting?

Hmm no that should not be necessary?

You should instead use Lucene's SearcherLifetimeManager class to efficiently hold open any past point-in-time readers that are still leased by running scan/scroll requests. This way every replica is free to refresh after each checkpoint, while keeping the older readers still available. Each reader is keyed by a long version that increments on each checkpoint, and should be persisted into the scan/scroll state. Primary can refresh first, and handle a new incoming scan/scroll at the latest version. If that scan/scroll client comes back for page two, we will need some added (cluster state? not sure) logic/storage to record which of the replicas have that index version refreshed now. Those replicas would also need to know that there is a lease held on version N and keep that reader alive (e.g. using SearcherLifetimeManager).

@mikemccand
Copy link

Is segment number is the only factor to decide what replica to promote? Do we need to take shard allocations, node load ..etc in our consideration?

Do Primaries handle searching too? Or are they dedicated to indexing (which would give awesome physical isolation of JVMs that do searcher versus indexing, but would require more total cluster resources typically).

If Primaries do searching too then I agree we may want to take shard allocations / indexing rate across those shards into account. We have some freedom to promote a Replica on a relatively underutilized node, instead of a Replica on a node that is already doing too much indexing/searching? In fact, this should be a powerful freedom for balancing load across the cluster ... we may want to do it (promote a different Replica as Primary for this shard) pro-actively (not just during node failure cases) to keep the cluster load balanced?

@nknize
Copy link
Collaborator

nknize commented Apr 14, 2022

use Lucene's SearcherLifetimeManager class to efficiently hold open any past point-in-time readers that are still leased by running scan/scroll requests

This sounds like a better mechanism to use within the SearchService#createPitReaderContext logic flow? Looks like the proposed PR leverages all of the inherited keep alive logic from the existing scroll API whereas SearcherLifetimeManager can take care of this for us? /cc @bharath-techie.

I think we should look at refactoring away from the old Scroll scaffolding and move toward using this mechanism in that PR?

@nknize
Copy link
Collaborator

nknize commented Apr 14, 2022

Do Primaries handle searching too?

For now, yes. This was discussed for "priority queries" (e.g., alerting use cases) where the primary would have documents available for search before replicas.

Or are they dedicated to indexing.

This is the desired next step; to isolate the reader / writer jvms. At minimum I think we'll want to look at leveraging some of the concurrency improvements being made in lucene to help in the decoupling of indexing and search time geometries?

@nknize
Copy link
Collaborator

nknize commented Apr 14, 2022

/cc @vigyasharma to join the discussion!

@vigyasharma
Copy link
Contributor

Excellent discussion here, with some really good ideas, and hard questions about segment replication and failover scenarios. I love the overall idea, and mostly agree with the approach being discussed here. My early thoughts on this are --

++ on the idea of shard promotion, not just for failure recovery, but also for load balancing.
This would require making it a first class operation, where the leader node can, atomically in a cluster state change, demote a primary to replica, and promote another replica to primary. It would be extremely useful for balancing a cluster's load, without having to move (relocate) shards around like today. This is all the more important in segment replication, since primary and replica will now have uneven resource footprint.

For e.g. consider a 2 node cluster, with 1 pri and 1 rep in its indexes. If one node drops, all replicas on the single remaining node get promoted to primaries. When the failed node comes back (is replaced), it will get all replica copies for every shard (based on current shard recovery logic). This leaves us with a 2 node cluster, will all primaries on one node and all replicas on the other. This is okay in document replication since pri and rep have roughly the same load, but would lead to a very skewed cluster in segment replication. The only way to balance such a cluster (since there are only 2 nodes), is to demote some pri to rep on node 1 and promote corresponding rep in node 2.

__

How you guys handle duplicate file name afer replica shard promition?

Re: failover protocol, I agree with @mch2 - using checkpoints to identify the replica with most progress should be doable.

For option (2), around making the newly promoted primary jump segment numbers to avoid overlap - I wonder if it can create some confusion (either in protocol implementation or in manual debugging), on whether some segments got dropped or missed.
Can we instead leverage primary term to disambiguate the new primary's segments? Perhaps the files and metadata sent to replica in response to GET checkpoint, also contains primary term value, and replica can use the higher (current?) primary term value to resolve conflict between same name segments?

@hydrogen666
Copy link

With segrep, CPU and IO consumption on Primary is much higher than Replica. Do we need to provide a new balancing algorithm for master node to take Primary shard balancing into consideration ? For example, one data node goes down and comes back, all shard on this data node will be Replica shard.

@Bukhtawar
Copy link
Collaborator

Bukhtawar commented Apr 15, 2022

Today there is no differentiation in the shard balancer algorithm in terms of placing primary and replica copies, both have equal weights. @hydrogen666 yes you are correct we would most likely need the master to load balance primary and replica. This however can be solved in general via a placement algorithm that can factor in shard heat(resource consumption) also aiming at maximizing throughput

I really like @vigyasharma's approach of swapping primary and replica atomically, however we would need to think about a graceful mechanism to achieve it eg: in flight requests might not be able to replicate if roles are changed atomically on the fly. Existing peer recoveries might have to be gracefully handed off etc

@mikemccand
Copy link

With segrep, CPU and IO consumption on Primary is much higher than Replica. Do we need to provide a new balancing algorithm for master node to take Primary shard balancing into consideration ? For example, one data node goes down and comes back, all shard on this data node will be Replica shard.

Also note that in ES (document replication), every Replica is as hot as the Primary, whereas with segrep, only Primary is that hot -- Replicas are much cooler (just doing periodic rsync + light the new segments for searching). And so the existing shard allocation system that ES uses, will be no worse in OS/segrep, it's just that many nodes will be substantially under-loaded since they are no longer doing the wasteful indexing operations.

@Bukhtawar
Copy link
Collaborator

And so the existing shard allocation system that ES uses, will be no worse in OS/segrep, it's just that many nodes will be substantially under-loaded since they are no longer doing the wasteful indexing operations.

Agree, while this is true for CPU, I/O consumption on the primary is expected to be higher than replica due to segment transfer at every refresh/merge. Network bandwidth on the node hosting primaries will eventually be constrained once there are more replicas getting assigned on other nodes.

@vigyasharma
Copy link
Contributor

Do Primaries handle searching too?

For now, yes. This was discussed for "priority queries" (e.g., alerting use cases) where the primary would have documents available for search before replicas.

Or are they dedicated to indexing.

This is the desired next step; to isolate the reader / writer jvms.

Thinking about this more, having primaries only do indexing can really simplify the whole setup, and eliminate the need for shard promotion. When a primary goes down, the leader could just allocate a new, empty primary in any available node. Since primaries won't do searches, this new shard doesn't need to be up to date with the latest segments created.

I believe it would be a rather fast recovery model - just allocate a new empty primary and connect all replicas to it. No need to find the latest and greatest replica shard. In terms of data loss, this is no worse than the current proposed model, since all search shards (replicas) were unaffected by the outage.

One downside I see is that users now have to create an indexing shard (primary) and a search shard (replica); and costs can increase for small clusters. But given the simplicity of this recovery model, and the benefits of index/search compute separation; I think it has a lot of value for larger, more serious use cases. We could even keep both models and give users the option to enable this in index settings.. like a decoupling: enabled flag.

@vigyasharma
Copy link
Contributor

In this decoupled index/search world, cluster load balancing can also be done without shard promotions and relocations.

The leader could put the current primary on standby mode, and allocate a new primary in another node. For the old primary in standby, we wait till all segments have been flushed to search shards, and then tear it down. Meanwhile, the new primary can start receiving indexing requests. This would also eliminate the transactional handoff that demoting-promoting a rep/pri pair would require.

For above, and as a general rule, I feel that segment replication needs to have the ability to identify segments that come from different primaries. As mentioned above, I feel leveraging primary terms is a strong candidate here.

@mch2
Copy link
Member Author

mch2 commented Jun 23, 2022

Reviving this issue as the first phase of segrep is nearly merged into main. Tracked with #2355. When that issue is complete we will have a basic implementation with primary shards as the source of replication (all functionality in POC branch feature/segment-replication).

Thanks for all the input here, I'd like to convert this issue to a meta issue and split the issue of supporting failover into a few smaller issues.

Here is what I'm thinking...

  1. Primary Selection - We want to update the selection of a new primary to choose the furthest ahead replica. This is defined as a replica that has the latest available segments from an outgoing primary. To do this we will need to fetch (possibly storing in cluster state?) and consider each replica's current checkpoint. With the ReplicationCheckpoint introduced, I think we can use a combination of SegmentInfos version + primary term.
  2. Engine swap - Once a new primary is chosen, we need to swap its engine implementation from the read only NRTReplicationEngine implementation to a writeable InternalEngine. I think this is effectively a recover from store operation. As called out by @vigyasharma it will be valuable for us to have the ability to swap the other direction as well during reallocation.
  3. Replay translog ops - Once the new primary’s engine has been updated, we will need to reindex all operations currently in the local xlog to ensure any operation indexed on the former primary before it was able to push out new segments is not lost. With the recently introduced NRTReplicationEngine, the replica's xlog is purged up to a provided seqNo when a new set of segments is received. Any op remaining in the xlog during promotion would need to be replayed.
  4. Replicas should consider the primary term. happening already when determining if an incoming checkpoint should be processed.
  5. Update shard allocation to avoid network congestion on nodes with multiple primary shards.
  6. Providing option of separating Indexers & Searchers. I found related issue Separate Reader & Writer JVM  #1121. I think this is a great extension that we should consider as a next step.

@Bukhtawar
Copy link
Collaborator

Primary Selection - We want to update the selection of a new primary to choose the furthest ahead replica. This is defined as a replica that has the latest available segments from an outgoing primary. To do this we will need to fetch (possibly storing in cluster state?) and consider each replica's current checkpoint. With the ReplicationCheckpoint introduced, I think we can use a combination of SegmentInfos version + primary term.

The leader does something similar today to choose which nodes have an up to date copy to assign a primary to. We might need to modify this to factor in checkpoints.

public synchronized FetchResult<T> fetchData(DiscoveryNodes nodes, Set<String> ignoreNodes) {

@dreamer-89
Copy link
Member

dreamer-89 commented Aug 2, 2022

It appears shard promotion logic is also applied eagerly by coordinator when node is identified as faulty by FollowersChecker. Coordinator uses NodeRemovalClusterStateTaskExecutor task which fails shards using RoutingNodes.failShard, which chooses the replica with highest node version.

One option is to always return null (for active replica) from highest node version. But, RoutingNodes.failShard appears to be core routing logic piece in routing and not sure removing this would be good idea. From functionality perspective this appears fine as AllocationService.reroute (triggered on cluster state updates and uses PrimaryShardAllocator) takes care of unassigned primary shards.
Can not remove RoutingNodes#failShard method altogether as it is the first resort for cluster manager node to promote primary. Removing this logic fails follow up cluster balancing operations. More details in #4131
@Bukhtawar @vigyasharma @mch2 : WDYT ?

More details in #3988

@Jeevananthan-23
Copy link

Hello @mikemccand / @mch2, I could like to understand incontext of how shard promotion (Leader Election) works with the below proposals. May why not consider distributed consensus algorithm like RAFT.

  1. Send a _{n} (n is larger than the largest segment counter in current SegmentInfos) to master node before segment replication progress so that master can tell the newly promoted Primary Shard not to generate any segment less than _{n}. To reduce the pressure of master node, we don't need to send this information every time. For example, if the max segment is _4.si in primary's current SegmentInfos, we can send _rw(or 1004 in decimal) to master node. After segment increases to _rw.si, we send _1jo(or 2004 in decimal) to master node.

what happen when master primary shard dies at first inside the cluster and newly prometed primary shard has same segment number and how promotion happens?

  1. Before choosing a replica doing promotion progress, master node must ask all replicas whose replication state is newest.

Using distributed consensus algorithm like Raft should be the great choice because copying the merged segment and transfer to replicas and support learder election as @mikemccand mentioned in his blog Segment Replication cluster state.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: Done
Development

No branches or pull requests

10 participants