Skip to content

Commit

Permalink
lease: Checkpoint lease TTLs to prevent indefinite auto-renewal of lo…
Browse files Browse the repository at this point in the history
…ng lived leases
  • Loading branch information
jpbetz committed Jul 17, 2018
1 parent bbe2d77 commit c81870f
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 45 deletions.
14 changes: 14 additions & 0 deletions etcdserver/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type applierV3 interface {
LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)

LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.LeaseCheckpointResponse, error)

Alarm(*pb.AlarmRequest) (*pb.AlarmResponse, error)

Authenticate(r *pb.InternalAuthenticateRequest) (*pb.AuthenticateResponse, error)
Expand Down Expand Up @@ -130,6 +132,8 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
ar.resp, ar.err = a.s.applyV3.LeaseGrant(r.LeaseGrant)
case r.LeaseRevoke != nil:
ar.resp, ar.err = a.s.applyV3.LeaseRevoke(r.LeaseRevoke)
case r.LeaseCheckpoint != nil:
ar.resp, ar.err = a.s.applyV3.LeaseCheckpoint(r.LeaseCheckpoint)
case r.Alarm != nil:
ar.resp, ar.err = a.s.applyV3.Alarm(r.Alarm)
case r.Authenticate != nil:
Expand Down Expand Up @@ -582,6 +586,16 @@ func (a *applierV3backend) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevo
return &pb.LeaseRevokeResponse{Header: newHeader(a.s)}, err
}

func (a *applierV3backend) LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.LeaseCheckpointResponse, error) {
for _, c := range lc.Checkpoints {
err := a.s.lessor.Checkpoint(lease.LeaseID(c.ID), c.Remaining_TTL)
if err != nil {
return &pb.LeaseCheckpointResponse{Header: newHeader(a.s)}, err
}
}
return &pb.LeaseCheckpointResponse{Header: newHeader(a.s)}, nil
}

func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error) {
resp := &pb.AlarmResponse{}
oldCount := len(a.s.alarmStore.Get(ar.Alarm))
Expand Down
3 changes: 3 additions & 0 deletions etcdserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ type ServerConfig struct {
Debug bool

ForceNewCluster bool

// LeaseCheckpointInterval time.Duration is the wait duration between lease checkpoints.
LeaseCheckpointInterval time.Duration
}

// VerifyBootstrap sanity-checks the initial config for bootstrap case
Expand Down
6 changes: 5 additions & 1 deletion etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {

// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
srv.lessor = lease.NewLessor(srv.getLogger(), srv.be, lease.LessorConfig{MinLeaseTTL: int64(math.Ceil(minTTL.Seconds()))})
srv.lessor = lease.NewLessor(srv.getLogger(), srv.be, lease.LessorConfig{MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())), CheckpointInterval: cfg.LeaseCheckpointInterval})
srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, &srv.consistIndex)
if beExist {
kvindex := srv.kv.ConsistentIndex()
Expand Down Expand Up @@ -576,6 +576,10 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
return nil, err
}

srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
srv.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseCheckpoint: cp})
})

// TODO: move transport initialization near the definition of remote
tr := &rafthttp.Transport{
Logger: cfg.Logger,
Expand Down
9 changes: 5 additions & 4 deletions lease/lease_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@ package lease

// LeaseWithTime contains lease object with expire information.
type LeaseWithTime struct {
id LeaseID
expiration int64
index int
id LeaseID
// Unix nanos timestamp.
time int64
index int
}

type LeaseQueue []*LeaseWithTime

func (pq LeaseQueue) Len() int { return len(pq) }

func (pq LeaseQueue) Less(i, j int) bool {
return pq[i].expiration < pq[j].expiration
return pq[i].time < pq[j].time
}

func (pq LeaseQueue) Swap(i, j int) {
Expand Down
2 changes: 1 addition & 1 deletion lease/lease_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestLeaseQueue(t *testing.T) {
exp = time.Now().UnixNano()
}
le.leaseMap[LeaseID(i)] = &Lease{ID: LeaseID(i)}
heap.Push(&le.leaseHeap, &LeaseWithTime{id: LeaseID(i), expiration: exp})
heap.Push(&le.leaseHeap, &LeaseWithTime{id: LeaseID(i), time: exp})
}

// first element must be front
Expand Down
Loading

0 comments on commit c81870f

Please sign in to comment.