From 6486be673bef1bf2cce4fe5a9c5a97d32fd59c12 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 25 Apr 2017 18:11:32 -0700 Subject: [PATCH 1/2] integration: test Observe can read leaders set prior to compaction --- integration/v3_election_test.go | 36 +++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/integration/v3_election_test.go b/integration/v3_election_test.go index d74e2966c80..95f5b4949b9 100644 --- a/integration/v3_election_test.go +++ b/integration/v3_election_test.go @@ -272,3 +272,39 @@ func TestElectionOnSessionRestart(t *testing.T) { t.Errorf("expected value=%q, got response %v", "def", resp) } } + +// TestElectionObserveCompacted checks that observe can tolerate +// a leader key with a modrev less than the compaction revision. +func TestElectionObserveCompacted(t *testing.T) { + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + cli := clus.Client(0) + + session, err := concurrency.NewSession(cli) + if err != nil { + t.Fatal(err) + } + defer session.Orphan() + + e := concurrency.NewElection(session, "test-elect") + if cerr := e.Campaign(context.TODO(), "abc"); cerr != nil { + t.Fatal(cerr) + } + + presp, perr := cli.Put(context.TODO(), "foo", "bar") + if perr != nil { + t.Fatal(perr) + } + if _, cerr := cli.Compact(context.TODO(), presp.Header.Revision); cerr != nil { + t.Fatal(cerr) + } + + v, ok := <-e.Observe(context.TODO()) + if !ok { + t.Fatal("failed to observe on compacted revision") + } + if string(v.Kvs[0].Value) != "abc" { + t.Fatalf(`expected leader value "abc", got %q`, string(v.Kvs[0].Value)) + } +} From 50f29bd6618e600e00f36a98ac8969fa6dfed14b Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 25 Apr 2017 18:13:02 -0700 Subject: [PATCH 2/2] concurrency: use current revisions for election Watching from the leader's ModRevision could cause live-locking on observe retry loops when the ModRevision is less than the compacted revision. Instead, start watching the leader from at least the store revision of the linearized read used to detect the current leader. Fixes #7815 --- clientv3/concurrency/election.go | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/clientv3/concurrency/election.go b/clientv3/concurrency/election.go index 257cc78afca..1d75dde3d82 100644 --- a/clientv3/concurrency/election.go +++ b/clientv3/concurrency/election.go @@ -165,15 +165,14 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) { client := e.session.Client() defer close(ch) - lastRev := int64(0) for { - opts := append(v3.WithFirstCreate(), v3.WithRev(lastRev)) - resp, err := client.Get(ctx, e.keyPrefix, opts...) + resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...) if err != nil { return } var kv *mvccpb.KeyValue + var hdr *pb.ResponseHeader if len(resp.Kvs) == 0 { cctx, cancel := context.WithCancel(ctx) @@ -189,18 +188,27 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) { // only accept PUTs; a DELETE will make observe() spin for _, ev := range wr.Events { if ev.Type == mvccpb.PUT { - kv = ev.Kv + hdr, kv = &wr.Header, ev.Kv + // may have multiple revs; hdr.rev = the last rev + // set to kv's rev in case batch has multiple PUTs + hdr.Revision = kv.ModRevision break } } } cancel() } else { - kv = resp.Kvs[0] + hdr, kv = resp.Header, resp.Kvs[0] + } + + select { + case ch <- v3.GetResponse{Header: hdr, Kvs: []*mvccpb.KeyValue{kv}}: + case <-ctx.Done(): + return } cctx, cancel := context.WithCancel(ctx) - wch := client.Watch(cctx, string(kv.Key), v3.WithRev(kv.ModRevision)) + wch := client.Watch(cctx, string(kv.Key), v3.WithRev(hdr.Revision+1)) keyDeleted := false for !keyDeleted { wr, ok := <-wch @@ -209,7 +217,6 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) { } for _, ev := range wr.Events { if ev.Type == mvccpb.DELETE { - lastRev = ev.Kv.ModRevision keyDeleted = true break }