From 9f7bf2b10998d57f19cada73cfb45107e74eda0d Mon Sep 17 00:00:00 2001 From: micagates Date: Fri, 27 Apr 2018 19:47:33 +0000 Subject: [PATCH] lease: do lease pile-up reduction in the background This moves lease pile-up reduction into a goroutine which mostly operates on a copy of the lease list, to avoid locking. This prevents timeouts when the lessor is locked for a long time (when there are a lot of leases, mostly). This should solve https://github.com/coreos/etcd/issues/9496. Before: ``` BenchmarkLessorPromote1-16 500000 4036 ns/op BenchmarkLessorPromote10-16 500000 3932 ns/op BenchmarkLessorPromote100-16 500000 3954 ns/op BenchmarkLessorPromote1000-16 300000 3906 ns/op BenchmarkLessorPromote10000-16 300000 4639 ns/op BenchmarkLessorPromote100000-16 100 27216481 ns/op BenchmarkLessorPromote1000000-16 100 325164684 ns/op ``` After: ``` BenchmarkLessorPromote1-16 500000 3769 ns/op BenchmarkLessorPromote10-16 500000 3835 ns/op BenchmarkLessorPromote100-16 500000 3829 ns/op BenchmarkLessorPromote1000-16 500000 3665 ns/op BenchmarkLessorPromote10000-16 500000 3800 ns/op BenchmarkLessorPromote100000-16 300000 4114 ns/op BenchmarkLessorPromote1000000-16 300000 5143 ns/op ``` --- lease/lessor.go | 109 ++++++++++++++++++++++--------------- lease/lessor_bench_test.go | 26 +++++++++ lease/lessor_test.go | 19 ++++++- 3 files changed, 108 insertions(+), 46 deletions(-) diff --git a/lease/lessor.go b/lease/lessor.go index db5b34cc27b6..f3c8449bb547 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -144,6 +144,9 @@ type lessor struct { stopC chan struct{} // doneC is a channel whose closure indicates that the lessor is stopped. doneC chan struct{} + + // when the lease pile-up reduction is done this is true + Ready bool } func NewLessor(b backend.Backend, minLeaseTTL int64) Lessor { @@ -329,7 +332,6 @@ func (le *lessor) unsafeLeases() []*Lease { for _, l := range le.leaseMap { leases = append(leases, l) } - sort.Sort(leasesByExpiry(leases)) return leases } @@ -337,58 +339,75 @@ func (le *lessor) Leases() []*Lease { le.mu.RLock() ls := le.unsafeLeases() le.mu.RUnlock() + + sort.Sort(leasesByExpiry(ls)) return ls } func (le *lessor) Promote(extend time.Duration) { - le.mu.Lock() - defer le.mu.Unlock() + le.Ready = false + go func() { + le.mu.RLock() + le.demotec = make(chan struct{}) + leaseCopy := le.unsafeLeases() + le.mu.RUnlock() + var updateList []*LeaseWithTime + defer func() { + le.mu.Lock() + defer le.mu.Unlock() + for _, item := range updateList { + heap.Push(&le.leaseHeap, item) + } + le.Ready = true + }() + + // refresh the expiries of all leases. + for _, l := range leaseCopy { + l.refresh(extend) + // check that the lease hasn't been revoked + item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()} + updateList = append(updateList, item) + } + + if len(le.leaseMap) < leaseRevokeRate { + // no possibility of lease pile-up + return + } - le.demotec = make(chan struct{}) + // adjust expiries in case of overlap - // refresh the expiries of all leases. - for _, l := range le.leaseMap { - l.refresh(extend) - item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()} - heap.Push(&le.leaseHeap, item) - } + sort.Sort(leasesByExpiry(leaseCopy)) - if len(le.leaseMap) < leaseRevokeRate { - // no possibility of lease pile-up - return - } + baseWindow := leaseCopy[0].Remaining() + nextWindow := baseWindow + time.Second + expires := 0 + // have fewer expires than the total revoke rate so piled up leases + // don't consume the entire revoke limit + targetExpiresPerSecond := (3 * leaseRevokeRate) / 4 - // adjust expiries in case of overlap - leases := le.unsafeLeases() - - baseWindow := leases[0].Remaining() - nextWindow := baseWindow + time.Second - expires := 0 - // have fewer expires than the total revoke rate so piled up leases - // don't consume the entire revoke limit - targetExpiresPerSecond := (3 * leaseRevokeRate) / 4 - for _, l := range leases { - remaining := l.Remaining() - if remaining > nextWindow { - baseWindow = remaining - nextWindow = baseWindow + time.Second - expires = 1 - continue - } - expires++ - if expires <= targetExpiresPerSecond { - continue + for _, l := range leaseCopy { + remaining := l.Remaining() + if remaining > nextWindow { + baseWindow = remaining + nextWindow = baseWindow + time.Second + expires = 1 + continue + } + expires++ + if expires <= targetExpiresPerSecond { + continue + } + rateDelay := float64(time.Second) * (float64(expires) / float64(targetExpiresPerSecond)) + // If leases are extended by n seconds, leases n seconds ahead of the + // base window should be extended by only one second. + rateDelay -= float64(remaining - baseWindow) + delay := time.Duration(rateDelay) + nextWindow = baseWindow + delay + l.refresh(delay + extend) + item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()} + updateList = append(updateList, item) } - rateDelay := float64(time.Second) * (float64(expires) / float64(targetExpiresPerSecond)) - // If leases are extended by n seconds, leases n seconds ahead of the - // base window should be extended by only one second. - rateDelay -= float64(remaining - baseWindow) - delay := time.Duration(rateDelay) - nextWindow = baseWindow + delay - l.refresh(delay + extend) - item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()} - heap.Push(&le.leaseHeap, item) - } + }() } type leasesByExpiry []*Lease @@ -490,7 +509,7 @@ func (le *lessor) runLoop() { revokeLimit := leaseRevokeRate / 2 le.mu.RLock() - if le.isPrimary() { + if le.isPrimary() && le.Ready { ls = le.findExpiredLeases(revokeLimit) } le.mu.RUnlock() diff --git a/lease/lessor_bench_test.go b/lease/lessor_bench_test.go index a3be6aa95b25..7a9638fc95a2 100644 --- a/lease/lessor_bench_test.go +++ b/lease/lessor_bench_test.go @@ -17,6 +17,7 @@ package lease import ( "os" "testing" + "time" "github.com/coreos/etcd/mvcc/backend" ) @@ -53,6 +54,14 @@ func BenchmarkLessorRevoke10000(b *testing.B) { benchmarkLessorRevoke(10000, b func BenchmarkLessorRevoke100000(b *testing.B) { benchmarkLessorRevoke(100000, b) } func BenchmarkLessorRevoke1000000(b *testing.B) { benchmarkLessorRevoke(1000000, b) } +func BenchmarkLessorPromote1(b *testing.B) { benchmarkLessorPromote(1, b) } +func BenchmarkLessorPromote10(b *testing.B) { benchmarkLessorPromote(10, b) } +func BenchmarkLessorPromote100(b *testing.B) { benchmarkLessorPromote(100, b) } +func BenchmarkLessorPromote1000(b *testing.B) { benchmarkLessorPromote(1000, b) } +func BenchmarkLessorPromote10000(b *testing.B) { benchmarkLessorPromote(10000, b) } +func BenchmarkLessorPromote100000(b *testing.B) { benchmarkLessorPromote(100000, b) } +func BenchmarkLessorPromote1000000(b *testing.B) { benchmarkLessorPromote(1000000, b) } + func benchmarkLessorFindExpired(size int, b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() le := newLessor(be, minLeaseTTL) @@ -115,6 +124,23 @@ func benchmarkLessorRenew(size int, b *testing.B) { } } +func benchmarkLessorPromote(size int, b *testing.B) { + be, tmpPath := backend.NewDefaultTmpBackend() + le := newLessor(be, minLeaseTTL) + defer le.Stop() + defer cleanup(be, tmpPath) + for i := 0; i < size; i++ { + le.Grant(LeaseID(i), int64(100+i)) + } + + b.ResetTimer() + + go func() { le.Promote(100 * time.Second) }() + for i := 0; i < b.N; i++ { + le.Grant(LeaseID(i+size), int64(100+i+size)) + } +} + func cleanup(b backend.Backend, path string) { b.Close() os.Remove(path) diff --git a/lease/lessor_test.go b/lease/lessor_test.go index 3a39e846f729..4e995fac286d 100644 --- a/lease/lessor_test.go +++ b/lease/lessor_test.go @@ -20,6 +20,7 @@ import ( "os" "path/filepath" "reflect" + "runtime" "sort" "sync" "testing" @@ -44,6 +45,7 @@ func TestLessorGrant(t *testing.T) { le := newLessor(be, minLeaseTTL) defer le.Stop() le.Promote(0) + waitForPromotion(le) l, err := le.Grant(1, 1) if err != nil { @@ -205,7 +207,7 @@ func TestLessorRenew(t *testing.T) { le := newLessor(be, minLeaseTTL) defer le.Stop() le.Promote(0) - + waitForPromotion(le) l, err := le.Grant(1, minLeaseTTL) if err != nil { t.Fatalf("failed to grant lease (%v)", err) @@ -263,6 +265,7 @@ func TestLessorRenewExtendPileup(t *testing.T) { // extend after recovery should extend expiration on lease pile-up le.Promote(0) + waitForPromotion(le) windowCounts := make(map[int64]int) for _, l := range le.leaseMap { @@ -360,6 +363,7 @@ func TestLessorExpire(t *testing.T) { defer le.Stop() le.Promote(1 * time.Second) + waitForPromotion(le) l, err := le.Grant(1, testMinTTL) if err != nil { t.Fatalf("failed to create lease: %v", err) @@ -412,6 +416,7 @@ func TestLessorExpireAndDemote(t *testing.T) { defer le.Stop() le.Promote(1 * time.Second) + waitForPromotion(le) l, err := le.Grant(1, testMinTTL) if err != nil { t.Fatalf("failed to create lease: %v", err) @@ -492,3 +497,15 @@ func NewTestBackend(t *testing.T) (string, backend.Backend) { bcfg.Path = filepath.Join(tmpPath, "be") return tmpPath, backend.New(bcfg) } +func waitForPromotion(le *lessor) { + for { + le.mu.RLock() + ready := le.Ready + le.mu.RUnlock() + if ready { + return + } else { + runtime.Gosched() + } + } +}