diff --git a/clientv3/concurrency/election.go b/clientv3/concurrency/election.go index 257cc78afca..1d75dde3d82 100644 --- a/clientv3/concurrency/election.go +++ b/clientv3/concurrency/election.go @@ -165,15 +165,14 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) { client := e.session.Client() defer close(ch) - lastRev := int64(0) for { - opts := append(v3.WithFirstCreate(), v3.WithRev(lastRev)) - resp, err := client.Get(ctx, e.keyPrefix, opts...) + resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...) if err != nil { return } var kv *mvccpb.KeyValue + var hdr *pb.ResponseHeader if len(resp.Kvs) == 0 { cctx, cancel := context.WithCancel(ctx) @@ -189,18 +188,27 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) { // only accept PUTs; a DELETE will make observe() spin for _, ev := range wr.Events { if ev.Type == mvccpb.PUT { - kv = ev.Kv + hdr, kv = &wr.Header, ev.Kv + // may have multiple revs; hdr.rev = the last rev + // set to kv's rev in case batch has multiple PUTs + hdr.Revision = kv.ModRevision break } } } cancel() } else { - kv = resp.Kvs[0] + hdr, kv = resp.Header, resp.Kvs[0] + } + + select { + case ch <- v3.GetResponse{Header: hdr, Kvs: []*mvccpb.KeyValue{kv}}: + case <-ctx.Done(): + return } cctx, cancel := context.WithCancel(ctx) - wch := client.Watch(cctx, string(kv.Key), v3.WithRev(kv.ModRevision)) + wch := client.Watch(cctx, string(kv.Key), v3.WithRev(hdr.Revision+1)) keyDeleted := false for !keyDeleted { wr, ok := <-wch @@ -209,7 +217,6 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) { } for _, ev := range wr.Events { if ev.Type == mvccpb.DELETE { - lastRev = ev.Kv.ModRevision keyDeleted = true break } diff --git a/integration/v3_election_test.go b/integration/v3_election_test.go index d74e2966c80..95f5b4949b9 100644 --- a/integration/v3_election_test.go +++ b/integration/v3_election_test.go @@ -272,3 +272,39 @@ func TestElectionOnSessionRestart(t *testing.T) { t.Errorf("expected value=%q, got response %v", "def", resp) } } + +// TestElectionObserveCompacted checks that observe can tolerate +// a leader key with a modrev less than the compaction revision. +func TestElectionObserveCompacted(t *testing.T) { + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + cli := clus.Client(0) + + session, err := concurrency.NewSession(cli) + if err != nil { + t.Fatal(err) + } + defer session.Orphan() + + e := concurrency.NewElection(session, "test-elect") + if cerr := e.Campaign(context.TODO(), "abc"); cerr != nil { + t.Fatal(cerr) + } + + presp, perr := cli.Put(context.TODO(), "foo", "bar") + if perr != nil { + t.Fatal(perr) + } + if _, cerr := cli.Compact(context.TODO(), presp.Header.Revision); cerr != nil { + t.Fatal(cerr) + } + + v, ok := <-e.Observe(context.TODO()) + if !ok { + t.Fatal("failed to observe on compacted revision") + } + if string(v.Kvs[0].Value) != "abc" { + t.Fatalf(`expected leader value "abc", got %q`, string(v.Kvs[0].Value)) + } +}