Skip to content

Commit

Permalink
Merge pull request #6929 from heyitsanthony/ctx-lease-renew
Browse files Browse the repository at this point in the history
etcdserver: use context for Renew
  • Loading branch information
Anthony Romano committed Dec 7, 2016
2 parents 96626d0 + be1f36d commit da3b71b
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 52 deletions.
43 changes: 43 additions & 0 deletions clientv3/integration/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,3 +510,46 @@ func TestLeaseTimeToLive(t *testing.T) {
t.Fatalf("unexpected keys %+v", lresp.Keys)
}
}

// TestLeaseRenewLostQuorum ensures keepalives work after losing quorum
// for a while.
func TestLeaseRenewLostQuorum(t *testing.T) {
defer testutil.AfterTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)

cli := clus.Client(0)
r, err := cli.Grant(context.TODO(), 4)
if err != nil {
t.Fatal(err)
}

kctx, kcancel := context.WithCancel(context.Background())
defer kcancel()
ka, err := cli.KeepAlive(kctx, r.ID)
if err != nil {
t.Fatal(err)
}
// consume first keepalive so next message sends when cluster is down
<-ka

// force keepalive stream message to timeout
clus.Members[1].Stop(t)
clus.Members[2].Stop(t)
// Use TTL-1 since the client closes the keepalive channel if no
// keepalive arrives before the lease deadline.
// The cluster has 1 second to recover and reply to the keepalive.
time.Sleep(time.Duration(r.TTL-1) * time.Second)
clus.Members[1].Restart(t)
clus.Members[2].Restart(t)

select {
case _, ok := <-ka:
if !ok {
t.Fatalf("keepalive closed")
}
case <-time.After(time.Duration(r.TTL) * time.Second):
t.Fatalf("timed out waiting for keepalive")
}
}
4 changes: 2 additions & 2 deletions etcdserver/api/v3rpc/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@ func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
resp := &pb.LeaseKeepAliveResponse{ID: req.ID, Header: &pb.ResponseHeader{}}
ls.hdr.fill(resp.Header)

ttl, err := ls.le.LeaseRenew(lease.LeaseID(req.ID))
ttl, err := ls.le.LeaseRenew(stream.Context(), lease.LeaseID(req.ID))
if err == lease.ErrLeaseNotFound {
err = nil
ttl = 0
}

if err != nil {
return err
return togRPCError(err)
}

resp.TTL = ttl
Expand Down
82 changes: 39 additions & 43 deletions etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package etcdserver
import (
"bytes"
"encoding/binary"
"io"
"strconv"
"strings"
"time"
Expand All @@ -27,7 +26,6 @@ import (
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/lease/leasehttp"
"github.com/coreos/etcd/lease/leasepb"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/raft"

Expand Down Expand Up @@ -70,7 +68,7 @@ type Lessor interface {

// LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error
// is returned.
LeaseRenew(id lease.LeaseID) (int64, error)
LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error)

// LeaseTimeToLive retrieves lease information.
LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error)
Expand Down Expand Up @@ -306,7 +304,7 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest)
return result.resp.(*pb.LeaseRevokeResponse), nil
}

func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) {
func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
ttl, err := s.lessor.Renew(id)
if err == nil { // already requested to primary lessor(leader)
return ttl, nil
Expand All @@ -315,21 +313,24 @@ func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) {
return -1, err
}

// renewals don't go through raft; forward to leader manually
leader, err := s.waitLeader()
if err != nil {
return -1, err
}
cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
defer cancel()

for _, url := range leader.PeerURLs {
lurl := url + leasehttp.LeasePrefix
ttl, err = leasehttp.RenewHTTP(id, lurl, s.peerRt, s.Cfg.peerDialTimeout())
if err == nil {
break
// renewals don't go through raft; forward to leader manually
for cctx.Err() == nil && err != nil {
leader, lerr := s.waitLeader(cctx)
if lerr != nil {
return -1, lerr
}
for _, url := range leader.PeerURLs {
lurl := url + leasehttp.LeasePrefix
ttl, err = leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)
if err == nil || err == lease.ErrLeaseNotFound {
return ttl, err
}
}
err = convertEOFToNoLeader(err)
}
return ttl, err
return -1, ErrTimeout
}

func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
Expand All @@ -352,46 +353,41 @@ func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveR
return resp, nil
}

// manually request to leader
leader, err := s.waitLeader()
if err != nil {
return nil, err
}
cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
defer cancel()

for _, url := range leader.PeerURLs {
lurl := url + leasehttp.LeaseInternalPrefix
var iresp *leasepb.LeaseInternalResponse
iresp, err = leasehttp.TimeToLiveHTTP(ctx, lease.LeaseID(r.ID), r.Keys, lurl, s.peerRt)
if err == nil {
return iresp.LeaseTimeToLiveResponse, nil
// forward to leader
for cctx.Err() == nil {
leader, err := s.waitLeader(cctx)
if err != nil {
return nil, err
}
for _, url := range leader.PeerURLs {
lurl := url + leasehttp.LeaseInternalPrefix
resp, err := leasehttp.TimeToLiveHTTP(cctx, lease.LeaseID(r.ID), r.Keys, lurl, s.peerRt)
if err == nil {
return resp.LeaseTimeToLiveResponse, nil
}
if err == lease.ErrLeaseNotFound {
return nil, err
}
}
err = convertEOFToNoLeader(err)
}
return nil, err
}

// convertEOFToNoLeader converts EOF erros to ErrNoLeader because
// lease renew, timetolive requests to followers are forwarded to leader,
// and follower might not be able to reach leader from transient network
// errors (often EOF errors). By returning ErrNoLeader, signal clients
// to retry its requests.
func convertEOFToNoLeader(err error) error {
if err == io.EOF || err == io.ErrUnexpectedEOF {
return ErrNoLeader
}
return err
return nil, ErrTimeout
}

func (s *EtcdServer) waitLeader() (*membership.Member, error) {
func (s *EtcdServer) waitLeader(ctx context.Context) (*membership.Member, error) {
leader := s.cluster.Member(s.Leader())
for i := 0; i < 5 && leader == nil; i++ {
for leader == nil {
// wait an election
dur := time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond
select {
case <-time.After(dur):
leader = s.cluster.Member(s.Leader())
case <-s.stopping:
return nil, ErrStopped
case <-ctx.Done():
return nil, ErrNoLeader
}
}
if leader == nil || len(leader.PeerURLs) == 0 {
Expand Down
8 changes: 6 additions & 2 deletions integration/v3_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ import (
"testing"
"time"

"golang.org/x/net/context"
"google.golang.org/grpc/metadata"

"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/coreos/etcd/pkg/testutil"
"golang.org/x/net/context"
)

// TestV3LeasePrmote ensures the newly elected leader can promote itself
Expand Down Expand Up @@ -356,7 +358,9 @@ func TestV3LeaseFailover(t *testing.T) {

lreq := &pb.LeaseKeepAliveRequest{ID: lresp.ID}

ctx, cancel := context.WithCancel(context.Background())
md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
mctx := metadata.NewContext(context.Background(), md)
ctx, cancel := context.WithCancel(mctx)
defer cancel()
lac, err := lc.LeaseKeepAlive(ctx)
if err != nil {
Expand Down
14 changes: 10 additions & 4 deletions lease/leasehttp/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"fmt"
"io/ioutil"
"net/http"
"time"

pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/lease"
Expand Down Expand Up @@ -125,15 +124,22 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

// RenewHTTP renews a lease at a given primary server.
// TODO: Batch request in future?
func RenewHTTP(id lease.LeaseID, url string, rt http.RoundTripper, timeout time.Duration) (int64, error) {
func RenewHTTP(ctx context.Context, id lease.LeaseID, url string, rt http.RoundTripper) (int64, error) {
// will post lreq protobuf to leader
lreq, err := (&pb.LeaseKeepAliveRequest{ID: int64(id)}).Marshal()
if err != nil {
return -1, err
}

cc := &http.Client{Transport: rt, Timeout: timeout}
resp, err := cc.Post(url, "application/protobuf", bytes.NewReader(lreq))
cc := &http.Client{Transport: rt}
req, err := http.NewRequest("POST", url, bytes.NewReader(lreq))
if err != nil {
return -1, err
}
req.Header.Set("Content-Type", "application/protobuf")
req.Cancel = ctx.Done()

resp, err := cc.Do(req)
if err != nil {
return -1, err
}
Expand Down
2 changes: 1 addition & 1 deletion lease/leasehttp/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestRenewHTTP(t *testing.T) {
ts := httptest.NewServer(NewHandler(le))
defer ts.Close()

ttl, err := RenewHTTP(l.ID, ts.URL+LeasePrefix, http.DefaultTransport, time.Second)
ttl, err := RenewHTTP(context.TODO(), l.ID, ts.URL+LeasePrefix, http.DefaultTransport)
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit da3b71b

Please sign in to comment.