Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mvcc: Optimize compaction for short commit pauses #11034

Merged
merged 4 commits into from
Aug 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG-3.4.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.3.0...v3.4.0) and
- Improve [heartbeat send failure logging](https://github.com/etcd-io/etcd/pull/10663).
- Support [users with no password](https://github.com/etcd-io/etcd/pull/9817) for reducing security risk introduced by leaked password. The users can only be authenticated with CommonName based auth.
- Add flag `--experimental-peer-skip-client-san-verification` to [skip verification of peer client address](https://github.com/etcd-io/etcd/pull/10524)
- Reduced default compaction batch size from 10k revisions to 1k revisions to improve p99 latency during compactions and reduced wait between compactions from 100ms to 10ms
- Add flag `--experimental-compaction-batch-limit` to [sets the maximum revisions deleted in each compaction batch](https://github.com/etcd-io/etcd/pull/11034)

### Breaking Changes

Expand Down
5 changes: 5 additions & 0 deletions Documentation/op-guide/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,11 @@ Follow the instructions when using these flags.
+ default: 0s
+ env variable: ETCD_EXPERIMENTAL_CORRUPT_CHECK_TIME

### --experimental-compaction-batch-limit
+ Sets the maximum revisions deleted in each compaction batch.
+ default: 1000
+ env variable: ETCD_EXPERIMENTAL_COMPACTION_BATCH_LIMIT

[build-cluster]: clustering.md#static
[reconfig]: runtime-configuration.md
[discovery]: clustering.md#discovery
Expand Down
3 changes: 2 additions & 1 deletion clientv3/integration/maintenance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"io"
"io/ioutil"
"math"
"path/filepath"
"testing"
"time"
Expand Down Expand Up @@ -149,7 +150,7 @@ func TestMaintenanceSnapshotErrorInflight(t *testing.T) {
clus.Members[0].Stop(t)
dpath := filepath.Join(clus.Members[0].DataDir, "member", "snap", "db")
b := backend.NewDefaultBackend(dpath)
s := mvcc.NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
s := mvcc.NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32})
rev := 100000
for i := 2; i <= rev; i++ {
s.Put([]byte(fmt.Sprintf("%10d", i)), bytes.Repeat([]byte("a"), 1024), lease.NoLease)
Expand Down
2 changes: 1 addition & 1 deletion clientv3/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func (s *v3Manager) saveDB() error {
// a lessor never timeouts leases
lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64})

mvs := mvcc.NewStore(s.lg, be, lessor, (*initIndex)(&commit))
mvs := mvcc.NewStore(s.lg, be, lessor, (*initIndex)(&commit), mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32})
txn := mvs.Write()
btx := be.BatchTx()
del := func(k, v []byte) error {
Expand Down
1 change: 1 addition & 0 deletions embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ type Config struct {
ExperimentalBackendFreelistType string `json:"experimental-backend-bbolt-freelist-type"`
// ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`

// ForceNewCluster starts a new cluster even if previously started; unsafe.
ForceNewCluster bool `json:"force-new-cluster"`
Expand Down
1 change: 1 addition & 0 deletions embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
ForceNewCluster: cfg.ForceNewCluster,
EnableGRPCGateway: cfg.EnableGRPCGateway,
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
}
print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
Expand Down
1 change: 1 addition & 0 deletions etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ func newConfig() *config {
fs.StringVar(&cfg.ec.ExperimentalEnableV2V3, "experimental-enable-v2v3", cfg.ec.ExperimentalEnableV2V3, "v3 prefix for serving emulated v2 state.")
fs.StringVar(&cfg.ec.ExperimentalBackendFreelistType, "experimental-backend-bbolt-freelist-type", cfg.ec.ExperimentalBackendFreelistType, "ExperimentalBackendFreelistType specifies the type of freelist that boltdb backend uses(array and map are supported types)")
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.")
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")

// unsafe
fs.BoolVar(&cfg.ec.ForceNewCluster, "force-new-cluster", false, "Force to create a new one member cluster.")
Expand Down
2 changes: 2 additions & 0 deletions etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ Experimental feature:
ExperimentalBackendFreelistType specifies the type of freelist that boltdb backend uses(array and map are supported types).
--experimental-enable-lease-checkpoint
ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
--experimental-compaction-batch-limit
ExperimentalCompactionBatchLimit sets the maximum revisions deleted in each compaction batch.

Unsafe feature:
--force-new-cluster 'false'
Expand Down
2 changes: 1 addition & 1 deletion etcdserver/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func openBackend(cfg ServerConfig) backend.Backend {
// case, replace the db with the snapshot db sent by the leader.
func recoverSnapshotBackend(cfg ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) {
var cIndex consistentIndex
kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, &cIndex)
kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, &cIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
defer kv.Close()
if snapshot.Metadata.Index <= kv.ConsistentIndex() {
return oldbe, nil
Expand Down
1 change: 1 addition & 0 deletions etcdserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ type ServerConfig struct {

AutoCompactionRetention time.Duration
AutoCompactionMode string
CompactionBatchLimit int
QuotaBackendBytes int64
MaxTxnOps uint

Expand Down
2 changes: 1 addition & 1 deletion etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
CheckpointInterval: cfg.LeaseCheckpointInterval,
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
})
srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, &srv.consistIndex)
srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, &srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
if beExist {
kvindex := srv.kv.ConsistentIndex()
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
Expand Down
8 changes: 4 additions & 4 deletions etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,7 @@ func TestSnapshot(t *testing.T) {
r: *r,
v2store: st,
}
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex)
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex, mvcc.StoreConfig{})
srv.be = be

ch := make(chan struct{}, 2)
Expand Down Expand Up @@ -1065,7 +1065,7 @@ func TestSnapshotOrdering(t *testing.T) {

be, tmpPath := backend.NewDefaultTmpBackend()
defer os.RemoveAll(tmpPath)
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex)
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex, mvcc.StoreConfig{})
s.be = be

s.start()
Expand Down Expand Up @@ -1126,7 +1126,7 @@ func TestTriggerSnap(t *testing.T) {
}
srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}

srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex)
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex, mvcc.StoreConfig{})
srv.be = be

srv.start()
Expand Down Expand Up @@ -1198,7 +1198,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
defer func() {
os.RemoveAll(tmpPath)
}()
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex)
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex, mvcc.StoreConfig{})
s.be = be

s.start()
Expand Down
2 changes: 1 addition & 1 deletion integration/v3_alarm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func TestV3CorruptAlarm(t *testing.T) {
clus.Members[0].Stop(t)
fp := filepath.Join(clus.Members[0].DataDir, "member", "snap", "db")
be := backend.NewDefaultBackend(fp)
s := mvcc.NewStore(zap.NewExample(), be, nil, &fakeConsistentIndex{13})
s := mvcc.NewStore(zap.NewExample(), be, nil, &fakeConsistentIndex{13}, mvcc.StoreConfig{})
// NOTE: cluster_proxy mode with namespacing won't set 'k', but namespace/'k'.
s.Put([]byte("abc"), []byte("def"), 0)
s.Put([]byte("xyz"), []byte("123"), 0)
Expand Down
38 changes: 19 additions & 19 deletions mvcc/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestKVTxnRange(t *testing.T) { testKVRange(t, txnRangeFunc) }

func testKVRange(t *testing.T, f rangeFunc) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer cleanup(s, b, tmpPath)

kvs := put3TestKVs(s)
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestKVTxnRangeRev(t *testing.T) { testKVRangeRev(t, txnRangeFunc) }

func testKVRangeRev(t *testing.T, f rangeFunc) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer cleanup(s, b, tmpPath)

kvs := put3TestKVs(s)
Expand Down Expand Up @@ -178,7 +178,7 @@ func TestKVTxnRangeBadRev(t *testing.T) { testKVRangeBadRev(t, txnRangeFunc) }

func testKVRangeBadRev(t *testing.T, f rangeFunc) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer cleanup(s, b, tmpPath)

put3TestKVs(s)
Expand Down Expand Up @@ -211,7 +211,7 @@ func TestKVTxnRangeLimit(t *testing.T) { testKVRangeLimit(t, txnRangeFunc) }

func testKVRangeLimit(t *testing.T, f rangeFunc) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer cleanup(s, b, tmpPath)

kvs := put3TestKVs(s)
Expand Down Expand Up @@ -252,7 +252,7 @@ func TestKVTxnPutMultipleTimes(t *testing.T) { testKVPutMultipleTimes(t, txnPutF

func testKVPutMultipleTimes(t *testing.T, f putFunc) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer cleanup(s, b, tmpPath)

for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -314,7 +314,7 @@ func testKVDeleteRange(t *testing.T, f deleteRangeFunc) {

for i, tt := range tests {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})

s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
s.Put([]byte("foo1"), []byte("bar1"), lease.NoLease)
Expand All @@ -334,7 +334,7 @@ func TestKVTxnDeleteMultipleTimes(t *testing.T) { testKVDeleteMultipleTimes(t, t

func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer cleanup(s, b, tmpPath)

s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
Expand All @@ -355,7 +355,7 @@ func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
// test that range, put, delete on single key in sequence repeatedly works correctly.
func TestKVOperationInSequence(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer cleanup(s, b, tmpPath)

for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -402,7 +402,7 @@ func TestKVOperationInSequence(t *testing.T) {

func TestKVTxnBlockWriteOperations(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})

tests := []func(){
func() { s.Put([]byte("foo"), nil, lease.NoLease) },
Expand Down Expand Up @@ -435,7 +435,7 @@ func TestKVTxnBlockWriteOperations(t *testing.T) {

func TestKVTxnNonBlockRange(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer cleanup(s, b, tmpPath)

txn := s.Write()
Expand All @@ -456,7 +456,7 @@ func TestKVTxnNonBlockRange(t *testing.T) {
// test that txn range, put, delete on single key in sequence repeatedly works correctly.
func TestKVTxnOperationInSequence(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer cleanup(s, b, tmpPath)

for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -506,7 +506,7 @@ func TestKVTxnOperationInSequence(t *testing.T) {

func TestKVCompactReserveLastValue(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer cleanup(s, b, tmpPath)

s.Put([]byte("foo"), []byte("bar0"), 1)
Expand Down Expand Up @@ -560,7 +560,7 @@ func TestKVCompactReserveLastValue(t *testing.T) {

func TestKVCompactBad(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer cleanup(s, b, tmpPath)

s.Put([]byte("foo"), []byte("bar0"), lease.NoLease)
Expand Down Expand Up @@ -593,7 +593,7 @@ func TestKVHash(t *testing.T) {
for i := 0; i < len(hashes); i++ {
var err error
b, tmpPath := backend.NewDefaultTmpBackend()
kv := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
kv := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
kv.Put([]byte("foo0"), []byte("bar0"), lease.NoLease)
kv.Put([]byte("foo1"), []byte("bar0"), lease.NoLease)
hashes[i], _, err = kv.Hash()
Expand Down Expand Up @@ -631,7 +631,7 @@ func TestKVRestore(t *testing.T) {
}
for i, tt := range tests {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
tt(s)
var kvss [][]mvccpb.KeyValue
for k := int64(0); k < 10; k++ {
Expand All @@ -643,7 +643,7 @@ func TestKVRestore(t *testing.T) {
s.Close()

// ns should recover the the previous state from backend.
ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})

if keysRestore := readGaugeInt(keysGauge); keysBefore != keysRestore {
t.Errorf("#%d: got %d key count, expected %d", i, keysRestore, keysBefore)
Expand Down Expand Up @@ -675,7 +675,7 @@ func readGaugeInt(g prometheus.Gauge) int {

func TestKVSnapshot(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer cleanup(s, b, tmpPath)

wkvs := put3TestKVs(s)
Expand All @@ -695,7 +695,7 @@ func TestKVSnapshot(t *testing.T) {
}
f.Close()

ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer ns.Close()
r, err := ns.Range([]byte("a"), []byte("z"), RangeOptions{})
if err != nil {
Expand All @@ -711,7 +711,7 @@ func TestKVSnapshot(t *testing.T) {

func TestWatchableKVWatch(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil))
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
defer cleanup(s, b, tmpPath)

w := s.NewWatchStream()
Expand Down
13 changes: 12 additions & 1 deletion mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ const (
)

var restoreChunkKeys = 10000 // non-const for testing
var defaultCompactBatchLimit = 1000

// ConsistentIndexGetter is an interface that wraps the Get method.
// Consistent index is the offset of an entry in a consistent replicated log.
Expand All @@ -68,6 +69,10 @@ type ConsistentIndexGetter interface {
ConsistentIndex() uint64
}

type StoreConfig struct {
CompactionBatchLimit int
}

type store struct {
ReadView
WriteView
Expand All @@ -76,6 +81,8 @@ type store struct {
// through atomics so must be 64-bit aligned.
consistentIndex uint64

cfg StoreConfig

// mu read locks for txns and write locks for non-txn store changes.
mu sync.RWMutex

Expand Down Expand Up @@ -108,8 +115,12 @@ type store struct {

// NewStore returns a new store. It is useful to create a store inside
// mvcc pkg. It should only be used for testing externally.
func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *store {
func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *store {
if cfg.CompactionBatchLimit == 0 {
cfg.CompactionBatchLimit = defaultCompactBatchLimit
}
s := &store{
cfg: cfg,
b: b,
ig: ig,
kvindex: newTreeIndex(lg),
Expand Down
Loading