Skip to content

Commit

Permalink
Merge pull request #7819 from heyitsanthony/fix-elect-compact
Browse files Browse the repository at this point in the history
concurrency: use current revisions for election
  • Loading branch information
Anthony Romano committed Apr 27, 2017
2 parents 2a3229c + 50f29bd commit c309d74
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 7 deletions.
21 changes: 14 additions & 7 deletions clientv3/concurrency/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,14 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
client := e.session.Client()

defer close(ch)
lastRev := int64(0)
for {
opts := append(v3.WithFirstCreate(), v3.WithRev(lastRev))
resp, err := client.Get(ctx, e.keyPrefix, opts...)
resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
if err != nil {
return
}

var kv *mvccpb.KeyValue
var hdr *pb.ResponseHeader

if len(resp.Kvs) == 0 {
cctx, cancel := context.WithCancel(ctx)
Expand All @@ -189,18 +188,27 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
// only accept PUTs; a DELETE will make observe() spin
for _, ev := range wr.Events {
if ev.Type == mvccpb.PUT {
kv = ev.Kv
hdr, kv = &wr.Header, ev.Kv
// may have multiple revs; hdr.rev = the last rev
// set to kv's rev in case batch has multiple PUTs
hdr.Revision = kv.ModRevision
break
}
}
}
cancel()
} else {
kv = resp.Kvs[0]
hdr, kv = resp.Header, resp.Kvs[0]
}

select {
case ch <- v3.GetResponse{Header: hdr, Kvs: []*mvccpb.KeyValue{kv}}:
case <-ctx.Done():
return
}

cctx, cancel := context.WithCancel(ctx)
wch := client.Watch(cctx, string(kv.Key), v3.WithRev(kv.ModRevision))
wch := client.Watch(cctx, string(kv.Key), v3.WithRev(hdr.Revision+1))
keyDeleted := false
for !keyDeleted {
wr, ok := <-wch
Expand All @@ -209,7 +217,6 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
}
for _, ev := range wr.Events {
if ev.Type == mvccpb.DELETE {
lastRev = ev.Kv.ModRevision
keyDeleted = true
break
}
Expand Down
36 changes: 36 additions & 0 deletions integration/v3_election_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,3 +272,39 @@ func TestElectionOnSessionRestart(t *testing.T) {
t.Errorf("expected value=%q, got response %v", "def", resp)
}
}

// TestElectionObserveCompacted checks that observe can tolerate
// a leader key with a modrev less than the compaction revision.
func TestElectionObserveCompacted(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)

cli := clus.Client(0)

session, err := concurrency.NewSession(cli)
if err != nil {
t.Fatal(err)
}
defer session.Orphan()

e := concurrency.NewElection(session, "test-elect")
if cerr := e.Campaign(context.TODO(), "abc"); cerr != nil {
t.Fatal(cerr)
}

presp, perr := cli.Put(context.TODO(), "foo", "bar")
if perr != nil {
t.Fatal(perr)
}
if _, cerr := cli.Compact(context.TODO(), presp.Header.Revision); cerr != nil {
t.Fatal(cerr)
}

v, ok := <-e.Observe(context.TODO())
if !ok {
t.Fatal("failed to observe on compacted revision")
}
if string(v.Kvs[0].Value) != "abc" {
t.Fatalf(`expected leader value "abc", got %q`, string(v.Kvs[0].Value))
}
}

0 comments on commit c309d74

Please sign in to comment.