Skip to content

Commit

Permalink
*: Add experimental-compaction-batch-limit flag
Browse files Browse the repository at this point in the history
  • Loading branch information
jpbetz authored and gyuho committed Aug 15, 2019
1 parent d57bc6e commit 9b51feb
Show file tree
Hide file tree
Showing 24 changed files with 111 additions and 87 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG-3.4.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.3.0...v3.4.0) and
- Improve [heartbeat send failure logging](https://github.com/etcd-io/etcd/pull/10663).
- Support [users with no password](https://github.com/etcd-io/etcd/pull/9817) for reducing security risk introduced by leaked password. The users can only be authenticated with CommonName based auth.
- Add flag `--experimental-peer-skip-client-san-verification` to [skip verification of peer client address](https://github.com/etcd-io/etcd/pull/10524)
- Reduced default compaction batch size from 10k revisions to 1k revisions to improve p99 latency during compactions and reduced wait between compactions from 100ms to 10ms
- Add flag `--experimental-compaction-batch-limit` to [sets the maximum revisions deleted in each compaction batch](https://github.com/etcd-io/etcd/pull/11034)

### Breaking Changes

Expand Down
5 changes: 5 additions & 0 deletions Documentation/op-guide/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,11 @@ Follow the instructions when using these flags.
+ default: 0s
+ env variable: ETCD_EXPERIMENTAL_CORRUPT_CHECK_TIME

### --experimental-compaction-batch-limit
+ Sets the maximum revisions deleted in each compaction batch.
+ default: 1000
+ env variable: ETCD_EXPERIMENTAL_COMPACTION_BATCH_LIMIT

[build-cluster]: clustering.md#static
[reconfig]: runtime-configuration.md
[discovery]: clustering.md#discovery
Expand Down
2 changes: 1 addition & 1 deletion clientv3/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func (s *v3Manager) saveDB() error {
// a lessor never timeouts leases
lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64})

mvs := mvcc.NewStore(s.lg, be, lessor, (*initIndex)(&commit))
mvs := mvcc.NewStore(s.lg, be, lessor, (*initIndex)(&commit), mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32})
txn := mvs.Write()
btx := be.BatchTx()
del := func(k, v []byte) error {
Expand Down
1 change: 1 addition & 0 deletions embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ type Config struct {
ExperimentalBackendFreelistType string `json:"experimental-backend-bbolt-freelist-type"`
// ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`

// ForceNewCluster starts a new cluster even if previously started; unsafe.
ForceNewCluster bool `json:"force-new-cluster"`
Expand Down
1 change: 1 addition & 0 deletions embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
ForceNewCluster: cfg.ForceNewCluster,
EnableGRPCGateway: cfg.EnableGRPCGateway,
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
}
print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
Expand Down
1 change: 1 addition & 0 deletions etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ func newConfig() *config {
fs.StringVar(&cfg.ec.ExperimentalEnableV2V3, "experimental-enable-v2v3", cfg.ec.ExperimentalEnableV2V3, "v3 prefix for serving emulated v2 state.")
fs.StringVar(&cfg.ec.ExperimentalBackendFreelistType, "experimental-backend-bbolt-freelist-type", cfg.ec.ExperimentalBackendFreelistType, "ExperimentalBackendFreelistType specifies the type of freelist that boltdb backend uses(array and map are supported types)")
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.")
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")

// unsafe
fs.BoolVar(&cfg.ec.ForceNewCluster, "force-new-cluster", false, "Force to create a new one member cluster.")
Expand Down
2 changes: 2 additions & 0 deletions etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ Experimental feature:
ExperimentalBackendFreelistType specifies the type of freelist that boltdb backend uses(array and map are supported types).
--experimental-enable-lease-checkpoint
ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
--experimental-compaction-batch-limit
ExperimentalCompactionBatchLimit sets the maximum revisions deleted in each compaction batch.
Unsafe feature:
--force-new-cluster 'false'
Expand Down
2 changes: 1 addition & 1 deletion etcdserver/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func openBackend(cfg ServerConfig) backend.Backend {
// case, replace the db with the snapshot db sent by the leader.
func recoverSnapshotBackend(cfg ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) {
var cIndex consistentIndex
kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, &cIndex)
kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, &cIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
defer kv.Close()
if snapshot.Metadata.Index <= kv.ConsistentIndex() {
return oldbe, nil
Expand Down
1 change: 1 addition & 0 deletions etcdserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ type ServerConfig struct {

AutoCompactionRetention time.Duration
AutoCompactionMode string
CompactionBatchLimit int
QuotaBackendBytes int64
MaxTxnOps uint

Expand Down
2 changes: 1 addition & 1 deletion etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
CheckpointInterval: cfg.LeaseCheckpointInterval,
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
})
srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, &srv.consistIndex)
srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, &srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
if beExist {
kvindex := srv.kv.ConsistentIndex()
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
Expand Down
8 changes: 4 additions & 4 deletions etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,7 @@ func TestSnapshot(t *testing.T) {
r: *r,
v2store: st,
}
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex)
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex, mvcc.StoreConfig{})
srv.be = be

ch := make(chan struct{}, 2)
Expand Down Expand Up @@ -1065,7 +1065,7 @@ func TestSnapshotOrdering(t *testing.T) {

be, tmpPath := backend.NewDefaultTmpBackend()
defer os.RemoveAll(tmpPath)
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex)
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex, mvcc.StoreConfig{})
s.be = be

s.start()
Expand Down Expand Up @@ -1126,7 +1126,7 @@ func TestTriggerSnap(t *testing.T) {
}
srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}

srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex)
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex, mvcc.StoreConfig{})
srv.be = be

srv.start()
Expand Down Expand Up @@ -1198,7 +1198,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
defer func() {
os.RemoveAll(tmpPath)
}()
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex)
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex, mvcc.StoreConfig{})
s.be = be

s.start()
Expand Down
2 changes: 1 addition & 1 deletion integration/v3_alarm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func TestV3CorruptAlarm(t *testing.T) {
clus.Members[0].Stop(t)
fp := filepath.Join(clus.Members[0].DataDir, "member", "snap", "db")
be := backend.NewDefaultBackend(fp)
s := mvcc.NewStore(zap.NewExample(), be, nil, &fakeConsistentIndex{13})
s := mvcc.NewStore(zap.NewExample(), be, nil, &fakeConsistentIndex{13}, mvcc.StoreConfig{})
// NOTE: cluster_proxy mode with namespacing won't set 'k', but namespace/'k'.
s.Put([]byte("abc"), []byte("def"), 0)
s.Put([]byte("xyz"), []byte("123"), 0)
Expand Down
38 changes: 19 additions & 19 deletions mvcc/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestKVTxnRange(t *testing.T) { testKVRange(t, txnRangeFunc) }

func testKVRange(t *testing.T, f rangeFunc) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer cleanup(s, b, tmpPath)

kvs := put3TestKVs(s)
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestKVTxnRangeRev(t *testing.T) { testKVRangeRev(t, txnRangeFunc) }

func testKVRangeRev(t *testing.T, f rangeFunc) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer cleanup(s, b, tmpPath)

kvs := put3TestKVs(s)
Expand Down Expand Up @@ -178,7 +178,7 @@ func TestKVTxnRangeBadRev(t *testing.T) { testKVRangeBadRev(t, txnRangeFunc) }

func testKVRangeBadRev(t *testing.T, f rangeFunc) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer cleanup(s, b, tmpPath)

put3TestKVs(s)
Expand Down Expand Up @@ -211,7 +211,7 @@ func TestKVTxnRangeLimit(t *testing.T) { testKVRangeLimit(t, txnRangeFunc) }

func testKVRangeLimit(t *testing.T, f rangeFunc) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer cleanup(s, b, tmpPath)

kvs := put3TestKVs(s)
Expand Down Expand Up @@ -252,7 +252,7 @@ func TestKVTxnPutMultipleTimes(t *testing.T) { testKVPutMultipleTimes(t, txnPutF

func testKVPutMultipleTimes(t *testing.T, f putFunc) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer cleanup(s, b, tmpPath)

for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -314,7 +314,7 @@ func testKVDeleteRange(t *testing.T, f deleteRangeFunc) {

for i, tt := range tests {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})

s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
s.Put([]byte("foo1"), []byte("bar1"), lease.NoLease)
Expand All @@ -334,7 +334,7 @@ func TestKVTxnDeleteMultipleTimes(t *testing.T) { testKVDeleteMultipleTimes(t, t

func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer cleanup(s, b, tmpPath)

s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
Expand All @@ -355,7 +355,7 @@ func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
// test that range, put, delete on single key in sequence repeatedly works correctly.
func TestKVOperationInSequence(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer cleanup(s, b, tmpPath)

for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -402,7 +402,7 @@ func TestKVOperationInSequence(t *testing.T) {

func TestKVTxnBlockWriteOperations(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})

tests := []func(){
func() { s.Put([]byte("foo"), nil, lease.NoLease) },
Expand Down Expand Up @@ -435,7 +435,7 @@ func TestKVTxnBlockWriteOperations(t *testing.T) {

func TestKVTxnNonBlockRange(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer cleanup(s, b, tmpPath)

txn := s.Write()
Expand All @@ -456,7 +456,7 @@ func TestKVTxnNonBlockRange(t *testing.T) {
// test that txn range, put, delete on single key in sequence repeatedly works correctly.
func TestKVTxnOperationInSequence(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer cleanup(s, b, tmpPath)

for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -506,7 +506,7 @@ func TestKVTxnOperationInSequence(t *testing.T) {

func TestKVCompactReserveLastValue(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer cleanup(s, b, tmpPath)

s.Put([]byte("foo"), []byte("bar0"), 1)
Expand Down Expand Up @@ -560,7 +560,7 @@ func TestKVCompactReserveLastValue(t *testing.T) {

func TestKVCompactBad(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer cleanup(s, b, tmpPath)

s.Put([]byte("foo"), []byte("bar0"), lease.NoLease)
Expand Down Expand Up @@ -593,7 +593,7 @@ func TestKVHash(t *testing.T) {
for i := 0; i < len(hashes); i++ {
var err error
b, tmpPath := backend.NewDefaultTmpBackend()
kv := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
kv := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
kv.Put([]byte("foo0"), []byte("bar0"), lease.NoLease)
kv.Put([]byte("foo1"), []byte("bar0"), lease.NoLease)
hashes[i], _, err = kv.Hash()
Expand Down Expand Up @@ -631,7 +631,7 @@ func TestKVRestore(t *testing.T) {
}
for i, tt := range tests {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
tt(s)
var kvss [][]mvccpb.KeyValue
for k := int64(0); k < 10; k++ {
Expand All @@ -643,7 +643,7 @@ func TestKVRestore(t *testing.T) {
s.Close()

// ns should recover the the previous state from backend.
ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})

if keysRestore := readGaugeInt(keysGauge); keysBefore != keysRestore {
t.Errorf("#%d: got %d key count, expected %d", i, keysRestore, keysBefore)
Expand Down Expand Up @@ -675,7 +675,7 @@ func readGaugeInt(g prometheus.Gauge) int {

func TestKVSnapshot(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer cleanup(s, b, tmpPath)

wkvs := put3TestKVs(s)
Expand All @@ -695,7 +695,7 @@ func TestKVSnapshot(t *testing.T) {
}
f.Close()

ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer ns.Close()
r, err := ns.Range([]byte("a"), []byte("z"), RangeOptions{})
if err != nil {
Expand All @@ -711,7 +711,7 @@ func TestKVSnapshot(t *testing.T) {

func TestWatchableKVWatch(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil))
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
defer cleanup(s, b, tmpPath)

w := s.NewWatchStream()
Expand Down
13 changes: 12 additions & 1 deletion mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ const (
)

var restoreChunkKeys = 10000 // non-const for testing
var defaultCompactBatchLimit = 1000

// ConsistentIndexGetter is an interface that wraps the Get method.
// Consistent index is the offset of an entry in a consistent replicated log.
Expand All @@ -68,10 +69,16 @@ type ConsistentIndexGetter interface {
ConsistentIndex() uint64
}

type StoreConfig struct {
CompactionBatchLimit int
}

type store struct {
ReadView
WriteView

cfg StoreConfig

// consistentIndex caches the "consistent_index" key's value. Accessed
// through atomics so must be 64-bit aligned.
consistentIndex uint64
Expand Down Expand Up @@ -108,8 +115,12 @@ type store struct {

// NewStore returns a new store. It is useful to create a store inside
// mvcc pkg. It should only be used for testing externally.
func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *store {
func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *store {
if cfg.CompactionBatchLimit == 0 {
cfg.CompactionBatchLimit = defaultCompactBatchLimit
}
s := &store{
cfg: cfg,
b: b,
ig: ig,
kvindex: newTreeIndex(lg),
Expand Down
Loading

0 comments on commit 9b51feb

Please sign in to comment.