diff --git a/lease/lessor.go b/lease/lessor.go index 47e84a24181a..fa1635767e62 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -144,6 +144,9 @@ type lessor struct { stopC chan struct{} // doneC is a channel whose closure indicates that the lessor is stopped. doneC chan struct{} + + // when the lease pile-up reduction is done this is true + Ready bool } func NewLessor(b backend.Backend, minLeaseTTL int64) Lessor { @@ -348,54 +351,70 @@ func (le *lessor) Leases() []*Lease { func (le *lessor) Promote(extend time.Duration) { le.mu.Lock() - defer le.mu.Unlock() + le.Ready = false + le.mu.Unlock() + go func() { + le.mu.Lock() + le.demotec = make(chan struct{}) + leaseCopy := le.unsafeLeases() + le.mu.Unlock() + var updateList []*LeaseWithTime + defer func() { + le.mu.Lock() + defer le.mu.Unlock() + for _, item := range updateList { + heap.Push(&le.leaseHeap, item) + } + le.Ready = true + }() + + // refresh the expiries of all leases. + for _, l := range leaseCopy { + l.refresh(extend) + // check that the lease hasn't been revoked + item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()} + updateList = append(updateList, item) + } - le.demotec = make(chan struct{}) + if len(le.leaseMap) < leaseRevokeRate { + // no possibility of lease pile-up + return + } - // refresh the expiries of all leases. - for _, l := range le.leaseMap { - l.refresh(extend) - item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()} - heap.Push(&le.leaseHeap, item) - } + // adjust expiries in case of overlap - if len(le.leaseMap) < leaseRevokeRate { - // no possibility of lease pile-up - return - } + sort.Sort(leasesByExpiry(leaseCopy)) - // adjust expiries in case of overlap - leases := le.unsafeLeases() - sort.Sort(leasesByExpiry(leases)) - - baseWindow := leases[0].Remaining() - nextWindow := baseWindow + time.Second - expires := 0 - // have fewer expires than the total revoke rate so piled up leases - // don't consume the entire revoke limit - targetExpiresPerSecond := (3 * leaseRevokeRate) / 4 - for _, l := range leases { - remaining := l.Remaining() - if remaining > nextWindow { - baseWindow = remaining - nextWindow = baseWindow + time.Second - expires = 1 - continue - } - expires++ - if expires <= targetExpiresPerSecond { - continue + baseWindow := leaseCopy[0].Remaining() + nextWindow := baseWindow + time.Second + expires := 0 + // have fewer expires than the total revoke rate so piled up leases + // don't consume the entire revoke limit + targetExpiresPerSecond := (3 * leaseRevokeRate) / 4 + + for _, l := range leaseCopy { + remaining := l.Remaining() + if remaining > nextWindow { + baseWindow = remaining + nextWindow = baseWindow + time.Second + expires = 1 + continue + } + expires++ + if expires <= targetExpiresPerSecond { + continue + } + rateDelay := float64(time.Second) * (float64(expires) / float64(targetExpiresPerSecond)) + // If leases are extended by n seconds, leases n seconds ahead of the + // base window should be extended by only one second. + rateDelay -= float64(remaining - baseWindow) + delay := time.Duration(rateDelay) + nextWindow = baseWindow + delay + l.refresh(delay + extend) + item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()} + updateList = append(updateList, item) } - rateDelay := float64(time.Second) * (float64(expires) / float64(targetExpiresPerSecond)) - // If leases are extended by n seconds, leases n seconds ahead of the - // base window should be extended by only one second. - rateDelay -= float64(remaining - baseWindow) - delay := time.Duration(rateDelay) - nextWindow = baseWindow + delay - l.refresh(delay + extend) - item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()} - heap.Push(&le.leaseHeap, item) - } + }() } type leasesByExpiry []*Lease @@ -497,7 +516,7 @@ func (le *lessor) runLoop() { revokeLimit := leaseRevokeRate / 2 le.mu.RLock() - if le.isPrimary() { + if le.isPrimary() && le.Ready { ls = le.findExpiredLeases(revokeLimit) } le.mu.RUnlock() diff --git a/lease/lessor_bench_test.go b/lease/lessor_bench_test.go index a3be6aa95b25..7a9638fc95a2 100644 --- a/lease/lessor_bench_test.go +++ b/lease/lessor_bench_test.go @@ -17,6 +17,7 @@ package lease import ( "os" "testing" + "time" "github.com/coreos/etcd/mvcc/backend" ) @@ -53,6 +54,14 @@ func BenchmarkLessorRevoke10000(b *testing.B) { benchmarkLessorRevoke(10000, b func BenchmarkLessorRevoke100000(b *testing.B) { benchmarkLessorRevoke(100000, b) } func BenchmarkLessorRevoke1000000(b *testing.B) { benchmarkLessorRevoke(1000000, b) } +func BenchmarkLessorPromote1(b *testing.B) { benchmarkLessorPromote(1, b) } +func BenchmarkLessorPromote10(b *testing.B) { benchmarkLessorPromote(10, b) } +func BenchmarkLessorPromote100(b *testing.B) { benchmarkLessorPromote(100, b) } +func BenchmarkLessorPromote1000(b *testing.B) { benchmarkLessorPromote(1000, b) } +func BenchmarkLessorPromote10000(b *testing.B) { benchmarkLessorPromote(10000, b) } +func BenchmarkLessorPromote100000(b *testing.B) { benchmarkLessorPromote(100000, b) } +func BenchmarkLessorPromote1000000(b *testing.B) { benchmarkLessorPromote(1000000, b) } + func benchmarkLessorFindExpired(size int, b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() le := newLessor(be, minLeaseTTL) @@ -115,6 +124,23 @@ func benchmarkLessorRenew(size int, b *testing.B) { } } +func benchmarkLessorPromote(size int, b *testing.B) { + be, tmpPath := backend.NewDefaultTmpBackend() + le := newLessor(be, minLeaseTTL) + defer le.Stop() + defer cleanup(be, tmpPath) + for i := 0; i < size; i++ { + le.Grant(LeaseID(i), int64(100+i)) + } + + b.ResetTimer() + + go func() { le.Promote(100 * time.Second) }() + for i := 0; i < b.N; i++ { + le.Grant(LeaseID(i+size), int64(100+i+size)) + } +} + func cleanup(b backend.Backend, path string) { b.Close() os.Remove(path) diff --git a/lease/lessor_test.go b/lease/lessor_test.go index 3a39e846f729..4e995fac286d 100644 --- a/lease/lessor_test.go +++ b/lease/lessor_test.go @@ -20,6 +20,7 @@ import ( "os" "path/filepath" "reflect" + "runtime" "sort" "sync" "testing" @@ -44,6 +45,7 @@ func TestLessorGrant(t *testing.T) { le := newLessor(be, minLeaseTTL) defer le.Stop() le.Promote(0) + waitForPromotion(le) l, err := le.Grant(1, 1) if err != nil { @@ -205,7 +207,7 @@ func TestLessorRenew(t *testing.T) { le := newLessor(be, minLeaseTTL) defer le.Stop() le.Promote(0) - + waitForPromotion(le) l, err := le.Grant(1, minLeaseTTL) if err != nil { t.Fatalf("failed to grant lease (%v)", err) @@ -263,6 +265,7 @@ func TestLessorRenewExtendPileup(t *testing.T) { // extend after recovery should extend expiration on lease pile-up le.Promote(0) + waitForPromotion(le) windowCounts := make(map[int64]int) for _, l := range le.leaseMap { @@ -360,6 +363,7 @@ func TestLessorExpire(t *testing.T) { defer le.Stop() le.Promote(1 * time.Second) + waitForPromotion(le) l, err := le.Grant(1, testMinTTL) if err != nil { t.Fatalf("failed to create lease: %v", err) @@ -412,6 +416,7 @@ func TestLessorExpireAndDemote(t *testing.T) { defer le.Stop() le.Promote(1 * time.Second) + waitForPromotion(le) l, err := le.Grant(1, testMinTTL) if err != nil { t.Fatalf("failed to create lease: %v", err) @@ -492,3 +497,15 @@ func NewTestBackend(t *testing.T) (string, backend.Backend) { bcfg.Path = filepath.Join(tmpPath, "be") return tmpPath, backend.New(bcfg) } +func waitForPromotion(le *lessor) { + for { + le.mu.RLock() + ready := le.Ready + le.mu.RUnlock() + if ready { + return + } else { + runtime.Gosched() + } + } +}