diff --git a/go.mod b/go.mod index 015fb8b90a86..f5b96a256267 100644 --- a/go.mod +++ b/go.mod @@ -56,7 +56,7 @@ require ( github.com/hashicorp/go-memdb v1.0.2 github.com/hashicorp/go-msgpack v0.5.5 github.com/hashicorp/go-multierror v1.0.0 - github.com/hashicorp/go-raftchunking v0.6.2 + github.com/hashicorp/go-raftchunking v0.6.3-0.20191002164813-7e9e8525653a github.com/hashicorp/go-rootcerts v1.0.1 github.com/hashicorp/go-sockaddr v1.0.2 github.com/hashicorp/go-syslog v1.0.0 @@ -64,7 +64,7 @@ require ( github.com/hashicorp/golang-lru v0.5.3 github.com/hashicorp/hcl v1.0.0 github.com/hashicorp/nomad/api v0.0.0-20190412184103-1c38ced33adf - github.com/hashicorp/raft v1.1.1 + github.com/hashicorp/raft v1.1.2-0.20191002163536-9c6bd3e3eb17 github.com/hashicorp/raft-snapshot v1.0.2-0.20190827162939-8117efcc5aab github.com/hashicorp/vault-plugin-auth-alicloud v0.5.2-0.20190814210027-93970f08f2ec github.com/hashicorp/vault-plugin-auth-azure v0.5.2-0.20190814210035-08e00d801115 diff --git a/go.sum b/go.sum index dd9da86cd9ce..d60e1d489ca7 100644 --- a/go.sum +++ b/go.sum @@ -73,6 +73,7 @@ github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYE github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= +github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4= github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= github.com/boombuler/barcode v1.0.0 h1:s1TvRnXwL2xJRaccrdcBQMZxq6X7DvsMogtmJeHDdrc= github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= @@ -291,6 +292,8 @@ github.com/hashicorp/go-plugin v1.0.1 h1:4OtAfUGbnKC6yS48p0CtMX2oFYtzFZVv6rok3cR github.com/hashicorp/go-plugin v1.0.1/go.mod h1:++UyYGoz3o5w9ZzAdZxtQKrWWP+iqPBn3cQptSMzBuY= github.com/hashicorp/go-raftchunking v0.6.2 h1:imj6CVkwXj6VzgXZQvzS+fSrkbFCzlJ2t00F3PacnuU= github.com/hashicorp/go-raftchunking v0.6.2/go.mod h1:cGlg3JtDy7qy6c/3Bu660Mic1JF+7lWqIwCFSb08fX0= +github.com/hashicorp/go-raftchunking v0.6.3-0.20191002164813-7e9e8525653a h1:FmnBDwGwlTgugDGbVxwV8UavqSMACbGrUpfc98yFLR4= +github.com/hashicorp/go-raftchunking v0.6.3-0.20191002164813-7e9e8525653a/go.mod h1:xbXnmKqX9/+RhPkJ4zrEx4738HacP72aaUPlT2RZ4sU= github.com/hashicorp/go-retryablehttp v0.5.3 h1:QlWt0KvWT0lq8MFppF9tsJGF+ynG7ztc2KIPhzRGk7s= github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= github.com/hashicorp/go-retryablehttp v0.5.4 h1:1BZvpawXoJCWX6pNtow9+rpEj+3itIlutiqnntI6jOE= @@ -330,6 +333,8 @@ github.com/hashicorp/nomad/api v0.0.0-20190412184103-1c38ced33adf/go.mod h1:BDng github.com/hashicorp/raft v1.0.1/go.mod h1:DVSAWItjLjTOkVbSpWQ0j0kUADIvDaCtBxIcbNAQLkI= github.com/hashicorp/raft v1.1.1 h1:HJr7UE1x/JrJSc9Oy6aDBHtNHUUBHjcQjTgvUVihoZs= github.com/hashicorp/raft v1.1.1/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8= +github.com/hashicorp/raft v1.1.2-0.20191002163536-9c6bd3e3eb17 h1:p+2EISNdFCnD9R+B4xCiqSn429MCFtvM41aHJDJ6qW4= +github.com/hashicorp/raft v1.1.2-0.20191002163536-9c6bd3e3eb17/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8= github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk= github.com/hashicorp/raft-snapshot v1.0.2-0.20190827162939-8117efcc5aab h1:WzGMwlO1DvaC93SvVOBOKtn+nXGEDXapyJuaRV3/VaY= github.com/hashicorp/raft-snapshot v1.0.2-0.20190827162939-8117efcc5aab/go.mod h1:5sL9eUn72lH5DzsFIJ9jaysITbHksSSszImWSOTC8Ic= diff --git a/physical/raft/fsm.go b/physical/raft/fsm.go index 12330a9f21f4..22f525624f15 100644 --- a/physical/raft/fsm.go +++ b/physical/raft/fsm.go @@ -48,7 +48,7 @@ var ( var _ physical.Backend = (*FSM)(nil) var _ physical.Transactional = (*FSM)(nil) var _ raft.FSM = (*FSM)(nil) -var _ raft.ConfigurationStore = (*FSM)(nil) +var _ raft.BatchingFSM = (*FSM)(nil) type restoreCallback func(context.Context) error @@ -75,7 +75,6 @@ type FSM struct { l sync.RWMutex path string logger log.Logger - permitPool *physical.PermitPool noopRestore bool db *bolt.DB @@ -88,7 +87,7 @@ type FSM struct { // additional state in the backend. storeLatestState bool - chunker *raftchunking.ChunkingConfigurationStore + chunker *raftchunking.ChunkingBatchingFSM } // NewFSM constructs a FSM using the given directory @@ -159,9 +158,8 @@ func NewFSM(conf map[string]string, logger log.Logger) (*FSM, error) { } f := &FSM{ - path: conf["path"], - logger: logger, - permitPool: physical.NewPermitPool(physical.DefaultParallelOperations), + path: conf["path"], + logger: logger, db: boltDB, latestTerm: latestTerm, @@ -170,7 +168,7 @@ func NewFSM(conf map[string]string, logger log.Logger) (*FSM, error) { storeLatestState: storeLatestState, } - f.chunker = raftchunking.NewChunkingConfigurationStore(f, &FSMChunkStorage{ + f.chunker = raftchunking.NewChunkingBatchingFSM(f, &FSMChunkStorage{ f: f, ctx: context.Background(), }) @@ -245,9 +243,6 @@ func (f *FSM) witnessSnapshot(index, term, configurationIndex uint64, configurat func (f *FSM) Delete(ctx context.Context, path string) error { defer metrics.MeasureSince([]string{"raft", "delete"}, time.Now()) - f.permitPool.Acquire() - defer f.permitPool.Release() - f.l.RLock() defer f.l.RUnlock() @@ -260,9 +255,6 @@ func (f *FSM) Delete(ctx context.Context, path string) error { func (f *FSM) DeletePrefix(ctx context.Context, prefix string) error { defer metrics.MeasureSince([]string{"raft", "delete_prefix"}, time.Now()) - f.permitPool.Acquire() - defer f.permitPool.Release() - f.l.RLock() defer f.l.RUnlock() @@ -287,9 +279,6 @@ func (f *FSM) DeletePrefix(ctx context.Context, prefix string) error { func (f *FSM) Get(ctx context.Context, path string) (*physical.Entry, error) { defer metrics.MeasureSince([]string{"raft", "get"}, time.Now()) - f.permitPool.Acquire() - defer f.permitPool.Release() - f.l.RLock() defer f.l.RUnlock() @@ -324,9 +313,6 @@ func (f *FSM) Get(ctx context.Context, path string) (*physical.Entry, error) { func (f *FSM) Put(ctx context.Context, entry *physical.Entry) error { defer metrics.MeasureSince([]string{"raft", "put"}, time.Now()) - f.permitPool.Acquire() - defer f.permitPool.Release() - f.l.RLock() defer f.l.RUnlock() @@ -340,9 +326,6 @@ func (f *FSM) Put(ctx context.Context, entry *physical.Entry) error { func (f *FSM) List(ctx context.Context, prefix string) ([]string, error) { defer metrics.MeasureSince([]string{"raft", "list"}, time.Now()) - f.permitPool.Acquire() - defer f.permitPool.Release() - f.l.RLock() defer f.l.RUnlock() @@ -374,9 +357,6 @@ func (f *FSM) List(ctx context.Context, prefix string) ([]string, error) { // Transaction writes all the operations in the provided transaction to the bolt // file. func (f *FSM) Transaction(ctx context.Context, txns []*physical.TxnEntry) error { - f.permitPool.Acquire() - defer f.permitPool.Release() - f.l.RLock() defer f.l.RUnlock() @@ -404,27 +384,51 @@ func (f *FSM) Transaction(ctx context.Context, txns []*physical.TxnEntry) error return err } -// Apply will apply a log value to the FSM. This is called from the raft +// ApplyBatch will apply a set of logs to the FSM. This is called from the raft // library. -func (f *FSM) Apply(log *raft.Log) interface{} { - command := &LogData{} - err := proto.Unmarshal(log.Data, command) - if err != nil { - f.logger.Error("error proto unmarshaling log data", "error", err) - panic("error proto unmarshaling log data") - } +func (f *FSM) ApplyBatch(logs []*raft.Log) []interface{} { + if len(logs) == 0 { + return []interface{}{} + } + + // Do the unmarshalling first so we don't hold locks + var latestConfiguration *ConfigurationValue + commands := make([]interface{}, 0, len(logs)) + for _, log := range logs { + switch log.Type { + case raft.LogCommand: + command := &LogData{} + err := proto.Unmarshal(log.Data, command) + if err != nil { + f.logger.Error("error proto unmarshaling log data", "error", err) + panic("error proto unmarshaling log data") + } + commands = append(commands, command) + case raft.LogConfiguration: + configuration := raft.DecodeConfiguration(log.Data) + config := raftConfigurationToProtoConfiguration(log.Index, configuration) - f.l.RLock() - defer f.l.RUnlock() + commands = append(commands, config) + + // Update the latest configuration the fsm has received; we will + // store this after it has been committed to storage. + latestConfiguration = config + + default: + panic(fmt.Sprintf("got unexpected log type: %d", log.Type)) + } + } // Only advance latest pointer if this log has a higher index value than // what we have seen in the past. var logIndex []byte + var err error latestIndex, _ := f.LatestState() - if latestIndex.Index < log.Index { + lastLog := logs[len(logs)-1] + if latestIndex.Index < lastLog.Index { logIndex, err = proto.Marshal(&IndexValue{ - Term: log.Term, - Index: log.Index, + Term: lastLog.Term, + Index: lastLog.Index, }) if err != nil { f.logger.Error("unable to marshal latest index", "error", err) @@ -432,29 +436,46 @@ func (f *FSM) Apply(log *raft.Log) interface{} { } } + f.l.RLock() + defer f.l.RUnlock() + err = f.db.Update(func(tx *bolt.Tx) error { b := tx.Bucket(dataBucketName) - for _, op := range command.Operations { - var err error - switch op.OpType { - case putOp: - err = b.Put([]byte(op.Key), op.Value) - case deleteOp: - err = b.Delete([]byte(op.Key)) - case restoreCallbackOp: - if f.restoreCb != nil { - // Kick off the restore callback function in a go routine - go f.restoreCb(context.Background()) + for _, commandRaw := range commands { + switch command := commandRaw.(type) { + case *LogData: + for _, op := range command.Operations { + var err error + switch op.OpType { + case putOp: + err = b.Put([]byte(op.Key), op.Value) + case deleteOp: + err = b.Delete([]byte(op.Key)) + case restoreCallbackOp: + if f.restoreCb != nil { + // Kick off the restore callback function in a go routine + go f.restoreCb(context.Background()) + } + default: + return fmt.Errorf("%q is not a supported transaction operation", op.OpType) + } + if err != nil { + return err + } + } + + case *ConfigurationValue: + b := tx.Bucket(configBucketName) + configBytes, err := proto.Marshal(command) + if err != nil { + return err + } + if err := b.Put(latestConfigKey, configBytes); err != nil { + return err } - default: - return fmt.Errorf("%q is not a supported transaction operation", op.OpType) - } - if err != nil { - return err } } - // TODO: benchmark so we can know how much time this adds if f.storeLatestState && len(logIndex) > 0 { b := tx.Bucket(configBucketName) err = b.Put(latestIndexKey, logIndex) @@ -472,13 +493,32 @@ func (f *FSM) Apply(log *raft.Log) interface{} { // If we advanced the latest value, update the in-memory representation too. if len(logIndex) > 0 { - atomic.StoreUint64(f.latestTerm, log.Term) - atomic.StoreUint64(f.latestIndex, log.Index) + atomic.StoreUint64(f.latestTerm, lastLog.Term) + atomic.StoreUint64(f.latestIndex, lastLog.Index) + } + + // If one or more configuration changes were processed, store the latest one. + if latestConfiguration != nil { + f.latestConfig.Store(latestConfiguration) } - return &FSMApplyResponse{ - Success: true, + // Build the responses. The logs array is used here to ensure we reply to + // all command values; even if they are not of the types we expect. This + // should future proof this function from more log types being provided. + resp := make([]interface{}, len(logs)) + for i := range logs { + resp[i] = &FSMApplyResponse{ + Success: true, + } } + + return resp +} + +// Apply will apply a log value to the FSM. This is called from the raft +// library. +func (f *FSM) Apply(log *raft.Log) interface{} { + return f.ApplyBatch([]*raft.Log{log})[0] } type writeErrorCloser interface { @@ -609,61 +649,6 @@ func (s *noopSnapshotter) Persist(sink raft.SnapshotSink) error { // Release doesn't do anything. func (s *noopSnapshotter) Release() {} -// StoreConfig satisfies the raft.ConfigurationStore interface and persists the -// latest raft server configuration to the bolt file. -func (f *FSM) StoreConfiguration(index uint64, configuration raft.Configuration) { - f.l.RLock() - defer f.l.RUnlock() - - var indexBytes []byte - latestIndex, _ := f.LatestState() - // Only write the new index if we are advancing the pointer - if index > latestIndex.Index { - latestIndex.Index = index - - var err error - indexBytes, err = proto.Marshal(latestIndex) - if err != nil { - f.logger.Error("unable to marshal latest index", "error", err) - panic(fmt.Sprintf("unable to marshal latest index: %v", err)) - } - } - - protoConfig := raftConfigurationToProtoConfiguration(index, configuration) - configBytes, err := proto.Marshal(protoConfig) - if err != nil { - f.logger.Error("unable to marshal config", "error", err) - panic(fmt.Sprintf("unable to marshal config: %v", err)) - } - - if f.storeLatestState { - err = f.db.Update(func(tx *bolt.Tx) error { - b := tx.Bucket(configBucketName) - err := b.Put(latestConfigKey, configBytes) - if err != nil { - return err - } - - // TODO: benchmark so we can know how much time this adds - if len(indexBytes) > 0 { - err = b.Put(latestIndexKey, indexBytes) - if err != nil { - return err - } - } - - return nil - }) - if err != nil { - f.logger.Error("unable to store latest configuration", "error", err) - panic(fmt.Sprintf("unable to store latest configuration: %v", err)) - } - } - - f.witnessIndex(latestIndex) - f.latestConfig.Store(protoConfig) -} - // raftConfigurationToProtoConfiguration converts a raft configuration object to // a proto value. func raftConfigurationToProtoConfiguration(index uint64, configuration raft.Configuration) *ConfigurationValue { @@ -722,9 +707,6 @@ func (f *FSMChunkStorage) StoreChunk(chunk *raftchunking.ChunkInfo) (bool, error Value: b, } - f.f.permitPool.Acquire() - defer f.f.permitPool.Release() - f.f.l.RLock() defer f.f.l.RUnlock() diff --git a/physical/raft/fsm_test.go b/physical/raft/fsm_test.go new file mode 100644 index 000000000000..7caeeb41c6e4 --- /dev/null +++ b/physical/raft/fsm_test.go @@ -0,0 +1,129 @@ +package raft + +import ( + "context" + fmt "fmt" + "io/ioutil" + "math/rand" + "os" + "testing" + + proto "github.com/golang/protobuf/proto" + hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/raft" +) + +func getFSM(t testing.TB) (*FSM, string) { + raftDir, err := ioutil.TempDir("", "vault-raft-") + if err != nil { + t.Fatal(err) + } + t.Logf("raft dir: %s", raftDir) + + logger := hclog.New(&hclog.LoggerOptions{ + Name: "raft", + Level: hclog.Trace, + }) + + fsm, err := NewFSM(map[string]string{ + "path": raftDir, + }, logger) + if err != nil { + t.Fatal(err) + } + + return fsm, raftDir +} + +func TestFSM_Batching(t *testing.T) { + fsm, dir := getFSM(t) + defer os.RemoveAll(dir) + + var index uint64 + var term uint64 = 1 + + getLog := func(i uint64) (int, *raft.Log) { + if rand.Intn(10) >= 8 { + term += 1 + return 0, &raft.Log{ + Index: i, + Term: term, + Type: raft.LogConfiguration, + Data: raft.EncodeConfiguration(raft.Configuration{ + Servers: []raft.Server{ + raft.Server{ + Address: raft.ServerAddress("test"), + ID: raft.ServerID("test"), + }, + }, + }), + } + } + + command := &LogData{ + Operations: make([]*LogOperation, rand.Intn(10)), + } + + for j := range command.Operations { + command.Operations[j] = &LogOperation{ + OpType: putOp, + Key: fmt.Sprintf("key-%d-%d", i, j), + Value: []byte(fmt.Sprintf("value-%d-%d", i, j)), + } + } + commandBytes, err := proto.Marshal(command) + if err != nil { + t.Fatal(err) + } + return len(command.Operations), &raft.Log{ + Index: i, + Term: term, + Type: raft.LogCommand, + Data: commandBytes, + } + } + + totalKeys := 0 + for i := 0; i < 100; i++ { + batchSize := rand.Intn(64) + batch := make([]*raft.Log, batchSize) + for j := 0; j < batchSize; j++ { + var keys int + index++ + keys, batch[j] = getLog(index) + totalKeys += keys + } + + resp := fsm.ApplyBatch(batch) + if len(resp) != batchSize { + t.Fatalf("incorrect response length: got %d expected %d", len(resp), batchSize) + } + + for _, r := range resp { + if _, ok := r.(*FSMApplyResponse); !ok { + t.Fatal("bad response type") + } + } + } + + keys, err := fsm.List(context.Background(), "") + if err != nil { + t.Fatal(err) + } + + if len(keys) != totalKeys { + t.Fatalf("incorrect number of keys: got %d expected %d", len(keys), totalKeys) + } + + latestIndex, latestConfig := fsm.LatestState() + if latestIndex.Index != index { + t.Fatalf("bad latest index: got %d expected %d", latestIndex.Index, index) + } + if latestIndex.Term != term { + t.Fatalf("bad latest term: got %d expected %d", latestIndex.Term, term) + } + + if latestConfig == nil && term > 1 { + t.Fatal("config wasn't updated") + } +} diff --git a/physical/raft/raft.go b/physical/raft/raft.go index 329d61e0e56c..f2e0a2b20ddf 100644 --- a/physical/raft/raft.go +++ b/physical/raft/raft.go @@ -12,6 +12,7 @@ import ( "sync" "time" + metrics "github.com/armon/go-metrics" proto "github.com/golang/protobuf/proto" "github.com/hashicorp/errwrap" log "github.com/hashicorp/go-hclog" @@ -94,6 +95,9 @@ type RaftBackend struct { // serverAddressProvider is used to map server IDs to addresses. serverAddressProvider raft.ServerAddressProvider + + // permitPool is used to limit the number of concurrent storage calls. + permitPool *physical.PermitPool } // EnsurePath is used to make sure a path exists @@ -201,6 +205,7 @@ func NewRaftBackend(conf map[string]string, logger log.Logger) (physical.Backend snapStore: snap, dataDir: path, localID: localID, + permitPool: physical.NewPermitPool(physical.DefaultParallelOperations), }, nil } @@ -344,6 +349,9 @@ func (b *RaftBackend) applyConfigSettings(config *raft.Config) error { config.TrailingLogs = uint64(trailingLogs) } + config.NoSnapshotRestoreOnStart = true + config.MaxAppendEntries = 64 + return nil } @@ -709,6 +717,7 @@ func (b *RaftBackend) RestoreSnapshot(ctx context.Context, metadata raft.Snapsho // Delete inserts an entry in the log to delete the given path func (b *RaftBackend) Delete(ctx context.Context, path string) error { + defer metrics.MeasureSince([]string{"raft-storage", "delete"}, time.Now()) command := &LogData{ Operations: []*LogOperation{ &LogOperation{ @@ -717,6 +726,8 @@ func (b *RaftBackend) Delete(ctx context.Context, path string) error { }, }, } + b.permitPool.Acquire() + defer b.permitPool.Release() b.l.RLock() err := b.applyLog(ctx, command) @@ -726,15 +737,20 @@ func (b *RaftBackend) Delete(ctx context.Context, path string) error { // Get returns the value corresponding to the given path from the fsm func (b *RaftBackend) Get(ctx context.Context, path string) (*physical.Entry, error) { + defer metrics.MeasureSince([]string{"raft-storage", "get"}, time.Now()) if b.fsm == nil { return nil, errors.New("raft: fsm not configured") } + b.permitPool.Acquire() + defer b.permitPool.Release() + return b.fsm.Get(ctx, path) } // Put inserts an entry in the log for the put operation func (b *RaftBackend) Put(ctx context.Context, entry *physical.Entry) error { + defer metrics.MeasureSince([]string{"raft-storage", "put"}, time.Now()) command := &LogData{ Operations: []*LogOperation{ &LogOperation{ @@ -745,6 +761,9 @@ func (b *RaftBackend) Put(ctx context.Context, entry *physical.Entry) error { }, } + b.permitPool.Acquire() + defer b.permitPool.Release() + b.l.RLock() err := b.applyLog(ctx, command) b.l.RUnlock() @@ -753,16 +772,21 @@ func (b *RaftBackend) Put(ctx context.Context, entry *physical.Entry) error { // List enumerates all the items under the prefix from the fsm func (b *RaftBackend) List(ctx context.Context, prefix string) ([]string, error) { + defer metrics.MeasureSince([]string{"raft-storage", "list"}, time.Now()) if b.fsm == nil { return nil, errors.New("raft: fsm not configured") } + b.permitPool.Acquire() + defer b.permitPool.Release() + return b.fsm.List(ctx, prefix) } // Transaction applies all the given operations into a single log and // applies it. func (b *RaftBackend) Transaction(ctx context.Context, txns []*physical.TxnEntry) error { + defer metrics.MeasureSince([]string{"raft-storage", "transaction"}, time.Now()) command := &LogData{ Operations: make([]*LogOperation, len(txns)), } @@ -783,6 +807,9 @@ func (b *RaftBackend) Transaction(ctx context.Context, txns []*physical.TxnEntry command.Operations[i] = op } + b.permitPool.Acquire() + defer b.permitPool.Release() + b.l.RLock() err := b.applyLog(ctx, command) b.l.RUnlock() diff --git a/vault/external_tests/raft/raft_test.go b/vault/external_tests/raft/raft_test.go index b4bc8d8109c0..79f8bb406c8f 100644 --- a/vault/external_tests/raft/raft_test.go +++ b/vault/external_tests/raft/raft_test.go @@ -2,6 +2,7 @@ package vault import ( "bytes" + "crypto/md5" "fmt" "io/ioutil" "net/http" @@ -11,6 +12,7 @@ import ( "time" "github.com/hashicorp/go-cleanhttp" + uuid "github.com/hashicorp/go-uuid" "github.com/hashicorp/vault/api" "github.com/hashicorp/vault/helper/testhelpers" "github.com/hashicorp/vault/helper/testhelpers/teststorage" @@ -20,7 +22,7 @@ import ( "golang.org/x/net/http2" ) -func raftCluster(t *testing.T) *vault.TestCluster { +func raftCluster(t testing.TB) *vault.TestCluster { var conf vault.CoreConfig var opts = vault.TestClusterOptions{HandlerFunc: vaulthttp.Handler} teststorage.RaftBackendSetup(&conf, &opts) @@ -731,3 +733,32 @@ func TestRaft_SnapshotAPI_DifferentCluster(t *testing.T) { testhelpers.WaitForNCoresSealed(t, cluster2, 3) } } + +func BenchmarkRaft_SingleNode(b *testing.B) { + cluster := raftCluster(b) + defer cluster.Cleanup() + + leaderClient := cluster.Cores[0].Client + + bench := func(b *testing.B, dataSize int) { + data, err := uuid.GenerateRandomBytes(dataSize) + if err != nil { + b.Fatal(err) + } + + testName := b.Name() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + key := fmt.Sprintf("secret/%x", md5.Sum([]byte(fmt.Sprintf("%s-%d", testName, i)))) + _, err := leaderClient.Logical().Write(key, map[string]interface{}{ + "test": data, + }) + if err != nil { + b.Fatal(err) + } + } + } + + b.Run("256b", func(b *testing.B) { bench(b, 25) }) +} diff --git a/vendor/github.com/hashicorp/go-raftchunking/fsm.go b/vendor/github.com/hashicorp/go-raftchunking/fsm.go index 53e0e00efeae..374b477370d3 100644 --- a/vendor/github.com/hashicorp/go-raftchunking/fsm.go +++ b/vendor/github.com/hashicorp/go-raftchunking/fsm.go @@ -11,6 +11,7 @@ import ( var _ raft.FSM = (*ChunkingFSM)(nil) var _ raft.ConfigurationStore = (*ChunkingConfigurationStore)(nil) +var _ raft.BatchingFSM = (*ChunkingBatchingFSM)(nil) type ChunkingSuccess struct { Response interface{} @@ -28,6 +29,11 @@ type ChunkingFSM struct { lastTerm uint64 } +type ChunkingBatchingFSM struct { + *ChunkingFSM + underlyingBatchingFSM raft.BatchingFSM +} + type ChunkingConfigurationStore struct { *ChunkingFSM underlyingConfigurationStore raft.ConfigurationStore @@ -44,6 +50,20 @@ func NewChunkingFSM(underlying raft.FSM, store ChunkStorage) *ChunkingFSM { return ret } +func NewChunkingBatchingFSM(underlying raft.BatchingFSM, store ChunkStorage) *ChunkingBatchingFSM { + ret := &ChunkingBatchingFSM{ + ChunkingFSM: &ChunkingFSM{ + underlying: underlying, + store: store, + }, + underlyingBatchingFSM: underlying, + } + if store == nil { + ret.ChunkingFSM.store = NewInmemChunkStorage() + } + return ret +} + func NewChunkingConfigurationStore(underlying raft.ConfigurationStore, store ChunkStorage) *ChunkingConfigurationStore { ret := &ChunkingConfigurationStore{ ChunkingFSM: &ChunkingFSM{ @@ -58,14 +78,7 @@ func NewChunkingConfigurationStore(underlying raft.ConfigurationStore, store Chu return ret } -// Apply applies the log, handling chunking as needed. The return value will -// either be an error or whatever is returned from the underlying Apply. -func (c *ChunkingFSM) Apply(l *raft.Log) interface{} { - // Not chunking or wrong type, pass through - if l.Type != raft.LogCommand || l.Extensions == nil { - return c.underlying.Apply(l) - } - +func (c *ChunkingFSM) applyChunk(l *raft.Log) (*raft.Log, error) { if l.Term != c.lastTerm { // Term has changed. A raft library client that was applying chunks // should get an error that it's no longer the leader and bail, and @@ -73,7 +86,7 @@ func (c *ChunkingFSM) Apply(l *raft.Log) interface{} { // chunking operation automatically, which will be under a different // opnum. So it should be safe in this case to clear the map. if err := c.store.RestoreChunks(nil); err != nil { - return err + return nil, err } c.lastTerm = l.Term } @@ -81,7 +94,7 @@ func (c *ChunkingFSM) Apply(l *raft.Log) interface{} { // Get chunk info from extensions var ci types.ChunkInfo if err := proto.Unmarshal(l.Extensions, &ci); err != nil { - return errwrap.Wrapf("error unmarshaling chunk info: {{err}}", err) + return nil, errwrap.Wrapf("error unmarshaling chunk info: {{err}}", err) } // Store the current chunk and find out if all chunks have arrived @@ -93,19 +106,20 @@ func (c *ChunkingFSM) Apply(l *raft.Log) interface{} { Data: l.Data, }) if err != nil { - return err + return nil, err } if !done { - return nil + return nil, nil } // All chunks are here; get the full set and clear storage of the op chunks, err := c.store.FinalizeOp(ci.OpNum) if err != nil { - return err + return nil, err } finalData := make([]byte, 0, len(chunks)*raft.SuggestedMaxDataSize) + for _, chunk := range chunks { finalData = append(finalData, chunk.Data...) } @@ -119,7 +133,27 @@ func (c *ChunkingFSM) Apply(l *raft.Log) interface{} { Extensions: ci.NextExtensions, } - return ChunkingSuccess{Response: c.underlying.Apply(logToApply)} + return logToApply, nil +} + +// Apply applies the log, handling chunking as needed. The return value will +// either be an error or whatever is returned from the underlying Apply. +func (c *ChunkingFSM) Apply(l *raft.Log) interface{} { + // Not chunking or wrong type, pass through + if l.Type != raft.LogCommand || l.Extensions == nil { + return c.underlying.Apply(l) + } + + logToApply, err := c.applyChunk(l) + if err != nil { + return err + } + + if logToApply != nil { + return ChunkingSuccess{Response: c.underlying.Apply(logToApply)} + } + + return nil } func (c *ChunkingFSM) Snapshot() (raft.FSMSnapshot, error) { @@ -157,3 +191,68 @@ func (c *ChunkingFSM) RestoreState(state *State) error { func (c *ChunkingConfigurationStore) StoreConfiguration(index uint64, configuration raft.Configuration) { c.underlyingConfigurationStore.StoreConfiguration(index, configuration) } + +// ApplyBatch applies the logs, handling chunking as needed. The return value will +// be an array containing an error or whatever is returned from the underlying +// Apply for each log. +func (c *ChunkingBatchingFSM) ApplyBatch(logs []*raft.Log) []interface{} { + // responses has a response for each log; their slice index should match. + responses := make([]interface{}, len(logs)) + + // sentLogs keeps track of which logs we sent. The key is the raft Index + // associated with the log and the value is true if this is a finalized set + // of chunks. + sentLogs := make(map[uint64]bool) + + // sendLogs is the subset of logs that we need to pass onto the underlying + // FSM. + sendLogs := make([]*raft.Log, 0, len(logs)) + + for i, l := range logs { + // Not chunking or wrong type, pass through + if l.Type != raft.LogCommand || l.Extensions == nil { + sendLogs = append(sendLogs, l) + sentLogs[l.Index] = false + continue + } + + logToApply, err := c.applyChunk(l) + if err != nil { + responses[i] = err + continue + } + + if logToApply != nil { + sendLogs = append(sendLogs, logToApply) + sentLogs[l.Index] = true + } + } + + // Send remaining logs to the underlying FSM. + var sentResponses []interface{} + if len(sendLogs) > 0 { + sentResponses = c.underlyingBatchingFSM.ApplyBatch(sendLogs) + } + + var sentCounter int + for j, l := range logs { + // If the response is already set we errored above and should continue + // onto the next. + if responses[j] != nil { + continue + } + + var resp interface{} + if chunked, ok := sentLogs[l.Index]; ok { + resp = sentResponses[sentCounter] + if chunked { + resp = ChunkingSuccess{Response: sentResponses[sentCounter]} + } + sentCounter++ + } + + responses[j] = resp + } + + return responses +} diff --git a/vendor/github.com/hashicorp/go-raftchunking/go.mod b/vendor/github.com/hashicorp/go-raftchunking/go.mod index ff3c15b98c57..8e126cdb9ea6 100644 --- a/vendor/github.com/hashicorp/go-raftchunking/go.mod +++ b/vendor/github.com/hashicorp/go-raftchunking/go.mod @@ -6,7 +6,7 @@ require ( github.com/go-test/deep v1.0.2 github.com/golang/protobuf v1.3.1 github.com/hashicorp/errwrap v1.0.0 - github.com/hashicorp/raft v1.1.1 + github.com/hashicorp/raft v1.1.2-0.20191002163536-9c6bd3e3eb17 github.com/kr/pretty v0.1.0 github.com/mitchellh/copystructure v1.0.0 ) diff --git a/vendor/github.com/hashicorp/go-raftchunking/go.sum b/vendor/github.com/hashicorp/go-raftchunking/go.sum index 9494f6cb0f41..94980752bce7 100644 --- a/vendor/github.com/hashicorp/go-raftchunking/go.sum +++ b/vendor/github.com/hashicorp/go-raftchunking/go.sum @@ -30,6 +30,8 @@ github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCO github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/raft v1.1.1 h1:HJr7UE1x/JrJSc9Oy6aDBHtNHUUBHjcQjTgvUVihoZs= github.com/hashicorp/raft v1.1.1/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8= +github.com/hashicorp/raft v1.1.2-0.20191002163536-9c6bd3e3eb17 h1:p+2EISNdFCnD9R+B4xCiqSn429MCFtvM41aHJDJ6qW4= +github.com/hashicorp/raft v1.1.2-0.20191002163536-9c6bd3e3eb17/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8= github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= diff --git a/vendor/github.com/hashicorp/raft/CHANGELOG.md b/vendor/github.com/hashicorp/raft/CHANGELOG.md index efd56b0fed19..56b4443df53f 100644 --- a/vendor/github.com/hashicorp/raft/CHANGELOG.md +++ b/vendor/github.com/hashicorp/raft/CHANGELOG.md @@ -1,5 +1,18 @@ # UNRELEASED +FEATURES + +* Improve FSM apply performance through batching. Implementing the `BatchingFSM` interface enables this new feature [[GH-364](https://github.com/hashicorp/raft/pull/364)] + +IMPROVEMENTS + +* Replace logger with hclog [[GH-360](https://github.com/hashicorp/raft/pull/360)] + +BUG FIXES + +* Export the leader field in LeaderObservation [[GH-357](https://github.com/hashicorp/raft/pull/357)] +* Fix snapshot to not attempt to truncate a negative range [[GH-358](https://github.com/hashicorp/raft/pull/358)] + # 1.1.1 (July 23rd, 2019) FEATURES diff --git a/vendor/github.com/hashicorp/raft/Makefile b/vendor/github.com/hashicorp/raft/Makefile index b4ef1a94df64..ec95881bda8f 100644 --- a/vendor/github.com/hashicorp/raft/Makefile +++ b/vendor/github.com/hashicorp/raft/Makefile @@ -2,22 +2,28 @@ DEPS = $(go list -f '{{range .TestImports}}{{.}} {{end}}' ./...) TEST_RESULTS_DIR?=/tmp/test-results test: - go test -timeout=60s -race . + go test $(TESTARGS) -timeout=60s -race . + go test $(TESTARGS) -timeout=60s -tags batchtest -race . integ: test - INTEG_TESTS=yes go test -timeout=25s -run=Integ . + INTEG_TESTS=yes go test $(TESTARGS) -timeout=25s -run=Integ . + INTEG_TESTS=yes go test $(TESTARGS) -timeout=25s -tags batchtest -run=Integ . ci.test-norace: gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=60s + gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=60s -tags batchtest ci.test: gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=60s -race . + gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=60s -race -tags batchtest . ci.integ: ci.test INTEG_TESTS=yes gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-integ.xml -- -timeout=25s -run=Integ . + INTEG_TESTS=yes gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-integ.xml -- -timeout=25s -run=Integ -tags batchtest . fuzz: - go test -timeout=300s ./fuzzy + go test $(TESTARGS) -timeout=500s ./fuzzy + go test $(TESTARGS) -timeout=500s -tags batchtest ./fuzzy deps: go get -t -d -v ./... diff --git a/vendor/github.com/hashicorp/raft/api.go b/vendor/github.com/hashicorp/raft/api.go index 9c61fefbee7d..17ca6556ee0d 100644 --- a/vendor/github.com/hashicorp/raft/api.go +++ b/vendor/github.com/hashicorp/raft/api.go @@ -234,7 +234,7 @@ func BootstrapCluster(conf *Config, logs LogStore, stable StableStore, entry.Data = encodePeers(configuration, trans) } else { entry.Type = LogConfiguration - entry.Data = encodeConfiguration(configuration) + entry.Data = EncodeConfiguration(configuration) } if err := logs.StoreLog(entry); err != nil { return fmt.Errorf("failed to append configuration entry to log: %v", err) @@ -528,13 +528,14 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna for index := snapshotIndex + 1; index <= lastLog.Index; index++ { var entry Log if err := r.logs.GetLog(index, &entry); err != nil { - r.logger.Error(fmt.Sprintf("Failed to get log at %d: %v", index, err)) + r.logger.Error("failed to get log", "index", index, "error", err) panic(err) } r.processConfigurationLogEntry(&entry) } - r.logger.Info(fmt.Sprintf("Initial configuration (index=%d): %+v", - r.configurations.latestIndex, r.configurations.latest.Servers)) + r.logger.Info("initial configuration", + "index", r.configurations.latestIndex, + "servers", hclog.Fmt("%+v", r.configurations.latest.Servers)) // Setup a heartbeat fast-path to avoid head-of-line // blocking where possible. It MUST be safe for this @@ -554,7 +555,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna func (r *Raft) restoreSnapshot() error { snapshots, err := r.snapshots.List() if err != nil { - r.logger.Error(fmt.Sprintf("Failed to list snapshots: %v", err)) + r.logger.Error("failed to list snapshots", "error", err) return err } @@ -563,7 +564,7 @@ func (r *Raft) restoreSnapshot() error { if !r.conf.NoSnapshotRestoreOnStart { _, source, err := r.snapshots.Open(snapshot.ID) if err != nil { - r.logger.Error(fmt.Sprintf("Failed to open snapshot %v: %v", snapshot.ID, err)) + r.logger.Error("failed to open snapshot", "id", snapshot.ID, "error", err) continue } @@ -571,11 +572,11 @@ func (r *Raft) restoreSnapshot() error { // Close the source after the restore has completed source.Close() if err != nil { - r.logger.Error(fmt.Sprintf("Failed to restore snapshot %v: %v", snapshot.ID, err)) + r.logger.Error("failed to restore snapshot", "id", snapshot.ID, "error", err) continue } - r.logger.Info(fmt.Sprintf("Restored from snapshot %v", snapshot.ID)) + r.logger.Info("restored from snapshot", "id", snapshot.ID) } // Update the lastApplied so we don't replay old logs r.setLastApplied(snapshot.Index) @@ -1013,7 +1014,7 @@ func (r *Raft) Stats() map[string]string { future := r.GetConfiguration() if err := future.Error(); err != nil { - r.logger.Warn(fmt.Sprintf("could not get configuration for Stats: %v", err)) + r.logger.Warn("could not get configuration for stats", "error", err) } else { configuration := future.Configuration() s["latest_configuration_index"] = toString(future.Index()) diff --git a/vendor/github.com/hashicorp/raft/commands.go b/vendor/github.com/hashicorp/raft/commands.go index 17416311d8a0..3358a3284703 100644 --- a/vendor/github.com/hashicorp/raft/commands.go +++ b/vendor/github.com/hashicorp/raft/commands.go @@ -35,7 +35,7 @@ type AppendEntriesRequest struct { LeaderCommitIndex uint64 } -// See WithRPCHeader. +// GetRPCHeader - See WithRPCHeader. func (r *AppendEntriesRequest) GetRPCHeader() RPCHeader { return r.RPCHeader } @@ -59,7 +59,7 @@ type AppendEntriesResponse struct { NoRetryBackoff bool } -// See WithRPCHeader. +// GetRPCHeader - See WithRPCHeader. func (r *AppendEntriesResponse) GetRPCHeader() RPCHeader { return r.RPCHeader } @@ -83,7 +83,7 @@ type RequestVoteRequest struct { LeadershipTransfer bool } -// See WithRPCHeader. +// GetRPCHeader - See WithRPCHeader. func (r *RequestVoteRequest) GetRPCHeader() RPCHeader { return r.RPCHeader } @@ -104,7 +104,7 @@ type RequestVoteResponse struct { Granted bool } -// See WithRPCHeader. +// GetRPCHeader - See WithRPCHeader. func (r *RequestVoteResponse) GetRPCHeader() RPCHeader { return r.RPCHeader } @@ -136,7 +136,7 @@ type InstallSnapshotRequest struct { Size int64 } -// See WithRPCHeader. +// GetRPCHeader - See WithRPCHeader. func (r *InstallSnapshotRequest) GetRPCHeader() RPCHeader { return r.RPCHeader } @@ -150,7 +150,7 @@ type InstallSnapshotResponse struct { Success bool } -// See WithRPCHeader. +// GetRPCHeader - See WithRPCHeader. func (r *InstallSnapshotResponse) GetRPCHeader() RPCHeader { return r.RPCHeader } @@ -161,7 +161,7 @@ type TimeoutNowRequest struct { RPCHeader } -// See WithRPCHeader. +// GetRPCHeader - See WithRPCHeader. func (r *TimeoutNowRequest) GetRPCHeader() RPCHeader { return r.RPCHeader } @@ -171,7 +171,7 @@ type TimeoutNowResponse struct { RPCHeader } -// See WithRPCHeader. +// GetRPCHeader - See WithRPCHeader. func (r *TimeoutNowResponse) GetRPCHeader() RPCHeader { return r.RPCHeader } diff --git a/vendor/github.com/hashicorp/raft/config.go b/vendor/github.com/hashicorp/raft/config.go index e43ba544964b..272761b795c5 100644 --- a/vendor/github.com/hashicorp/raft/config.go +++ b/vendor/github.com/hashicorp/raft/config.go @@ -8,8 +8,8 @@ import ( "github.com/hashicorp/go-hclog" ) -// These are the versions of the protocol (which includes RPC messages as -// well as Raft-specific log entries) that this server can _understand_. Use +// ProtocolVersion is the version of the protocol (which includes RPC messages +// as well as Raft-specific log entries) that this server can _understand_. Use // the ProtocolVersion member of the Config object to control the version of // the protocol to use when _speaking_ to other servers. Note that depending on // the protocol version being spoken, some otherwise understood RPC messages @@ -88,13 +88,15 @@ import ( type ProtocolVersion int const ( + // ProtocolVersionMin is the minimum protocol version ProtocolVersionMin ProtocolVersion = 0 - ProtocolVersionMax = 3 + // ProtocolVersionMax is the maximum protocol version + ProtocolVersionMax = 3 ) -// These are versions of snapshots that this server can _understand_. Currently, -// it is always assumed that this server generates the latest version, though -// this may be changed in the future to include a configurable version. +// SnapshotVersion is the version of snapshots that this server can understand. +// Currently, it is always assumed that the server generates the latest version, +// though this may be changed in the future to include a configurable version. // // Version History // @@ -112,8 +114,10 @@ const ( type SnapshotVersion int const ( + // SnapshotVersionMin is the minimum snapshot version SnapshotVersionMin SnapshotVersion = 0 - SnapshotVersionMax = 1 + // SnapshotVersionMax is the maximum snapshot version + SnapshotVersionMax = 1 ) // Config provides any necessary configuration for the Raft server. diff --git a/vendor/github.com/hashicorp/raft/configuration.go b/vendor/github.com/hashicorp/raft/configuration.go index 4902fb1e97a1..bf19997b650f 100644 --- a/vendor/github.com/hashicorp/raft/configuration.go +++ b/vendor/github.com/hashicorp/raft/configuration.go @@ -342,9 +342,9 @@ func decodePeers(buf []byte, trans Transport) Configuration { } } -// encodeConfiguration serializes a Configuration using MsgPack, or panics on +// EncodeConfiguration serializes a Configuration using MsgPack, or panics on // errors. -func encodeConfiguration(configuration Configuration) []byte { +func EncodeConfiguration(configuration Configuration) []byte { buf, err := encodeMsgPack(configuration) if err != nil { panic(fmt.Errorf("failed to encode configuration: %v", err)) @@ -352,9 +352,9 @@ func encodeConfiguration(configuration Configuration) []byte { return buf.Bytes() } -// decodeConfiguration deserializes a Configuration using MsgPack, or panics on +// DecodeConfiguration deserializes a Configuration using MsgPack, or panics on // errors. -func decodeConfiguration(buf []byte) Configuration { +func DecodeConfiguration(buf []byte) Configuration { var configuration Configuration if err := decodeMsgPack(buf, &configuration); err != nil { panic(fmt.Errorf("failed to decode configuration: %v", err)) diff --git a/vendor/github.com/hashicorp/raft/discard_snapshot.go b/vendor/github.com/hashicorp/raft/discard_snapshot.go index 5e93a9fe01fc..187b8e7fb1d6 100644 --- a/vendor/github.com/hashicorp/raft/discard_snapshot.go +++ b/vendor/github.com/hashicorp/raft/discard_snapshot.go @@ -12,6 +12,11 @@ import ( // suitable for testing. type DiscardSnapshotStore struct{} +// DiscardSnapshotSink is used to fulfill the SnapshotSink interface +// while always discarding the . This is useful for when the log +// should be truncated but no snapshot should be retained. This +// should never be used for production use, and is only suitable +// for testing. type DiscardSnapshotSink struct{} // NewDiscardSnapshotStore is used to create a new DiscardSnapshotStore. @@ -19,31 +24,41 @@ func NewDiscardSnapshotStore() *DiscardSnapshotStore { return &DiscardSnapshotStore{} } +// Create returns a valid type implementing the SnapshotSink which +// always discards the snapshot. func (d *DiscardSnapshotStore) Create(version SnapshotVersion, index, term uint64, configuration Configuration, configurationIndex uint64, trans Transport) (SnapshotSink, error) { return &DiscardSnapshotSink{}, nil } +// List returns successfully with a nil for []*SnapshotMeta. func (d *DiscardSnapshotStore) List() ([]*SnapshotMeta, error) { return nil, nil } +// Open returns an error since the DiscardSnapshotStore does not +// support opening snapshots. func (d *DiscardSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error) { return nil, nil, fmt.Errorf("open is not supported") } +// Write returns successfully with the lenght of the input byte slice +// to satisfy the WriteCloser interface func (d *DiscardSnapshotSink) Write(b []byte) (int, error) { return len(b), nil } +// Close returns a nil error func (d *DiscardSnapshotSink) Close() error { return nil } +// ID returns "discard" for DiscardSnapshotSink func (d *DiscardSnapshotSink) ID() string { return "discard" } +// Cancel returns successfully with a nil error func (d *DiscardSnapshotSink) Cancel() error { return nil } diff --git a/vendor/github.com/hashicorp/raft/file_snapshot.go b/vendor/github.com/hashicorp/raft/file_snapshot.go index ffc9414542f0..d59f5d3559e8 100644 --- a/vendor/github.com/hashicorp/raft/file_snapshot.go +++ b/vendor/github.com/hashicorp/raft/file_snapshot.go @@ -5,11 +5,11 @@ import ( "bytes" "encoding/json" "fmt" + "github.com/hashicorp/go-hclog" "hash" "hash/crc64" "io" "io/ioutil" - "log" "os" "path/filepath" "runtime" @@ -31,7 +31,7 @@ const ( type FileSnapshotStore struct { path string retain int - logger *log.Logger + logger hclog.Logger } type snapMetaSlice []*fileSnapshotMeta @@ -39,7 +39,7 @@ type snapMetaSlice []*fileSnapshotMeta // FileSnapshotSink implements SnapshotSink with a file. type FileSnapshotSink struct { store *FileSnapshotStore - logger *log.Logger + logger hclog.Logger dir string parentDir string meta fileSnapshotMeta @@ -76,12 +76,16 @@ func (b *bufferedFile) Close() error { // NewFileSnapshotStoreWithLogger creates a new FileSnapshotStore based // on a base directory. The `retain` parameter controls how many // snapshots are retained. Must be at least 1. -func NewFileSnapshotStoreWithLogger(base string, retain int, logger *log.Logger) (*FileSnapshotStore, error) { +func NewFileSnapshotStoreWithLogger(base string, retain int, logger hclog.Logger) (*FileSnapshotStore, error) { if retain < 1 { return nil, fmt.Errorf("must retain at least one snapshot") } if logger == nil { - logger = log.New(os.Stderr, "", log.LstdFlags) + logger = hclog.New(&hclog.LoggerOptions{ + Name: "snapshot", + Output: hclog.DefaultOutput, + Level: hclog.DefaultLevel, + }) } // Ensure our path exists @@ -111,7 +115,11 @@ func NewFileSnapshotStore(base string, retain int, logOutput io.Writer) (*FileSn if logOutput == nil { logOutput = os.Stderr } - return NewFileSnapshotStoreWithLogger(base, retain, log.New(logOutput, "", log.LstdFlags)) + return NewFileSnapshotStoreWithLogger(base, retain, hclog.New(&hclog.LoggerOptions{ + Name: "snapshot", + Output: logOutput, + Level: hclog.DefaultLevel, + })) } // testPermissions tries to touch a file in our path to see if it works. @@ -150,11 +158,11 @@ func (f *FileSnapshotStore) Create(version SnapshotVersion, index, term uint64, // Create a new path name := snapshotName(term, index) path := filepath.Join(f.path, name+tmpSuffix) - f.logger.Printf("[INFO] snapshot: Creating new snapshot at %s", path) + f.logger.Info("creating new snapshot", "path", path) // Make the directory if err := os.MkdirAll(path, 0755); err != nil { - f.logger.Printf("[ERR] snapshot: Failed to make snapshot directory: %v", err) + f.logger.Error("failed to make snapshot directly", "error", err) return nil, err } @@ -180,7 +188,7 @@ func (f *FileSnapshotStore) Create(version SnapshotVersion, index, term uint64, // Write out the meta data if err := sink.writeMeta(); err != nil { - f.logger.Printf("[ERR] snapshot: Failed to write metadata: %v", err) + f.logger.Error("failed to write metadata", "error", err) return nil, err } @@ -188,7 +196,7 @@ func (f *FileSnapshotStore) Create(version SnapshotVersion, index, term uint64, statePath := filepath.Join(path, stateFilePath) fh, err := os.Create(statePath) if err != nil { - f.logger.Printf("[ERR] snapshot: Failed to create state file: %v", err) + f.logger.Error("failed to create state file", "error", err) return nil, err } sink.stateFile = fh @@ -209,7 +217,7 @@ func (f *FileSnapshotStore) List() ([]*SnapshotMeta, error) { // Get the eligible snapshots snapshots, err := f.getSnapshots() if err != nil { - f.logger.Printf("[ERR] snapshot: Failed to get snapshots: %v", err) + f.logger.Error("failed to get snapshots", "error", err) return nil, err } @@ -228,7 +236,7 @@ func (f *FileSnapshotStore) getSnapshots() ([]*fileSnapshotMeta, error) { // Get the eligible snapshots snapshots, err := ioutil.ReadDir(f.path) if err != nil { - f.logger.Printf("[ERR] snapshot: Failed to scan snapshot dir: %v", err) + f.logger.Error("failed to scan snapshot directory", "error", err) return nil, err } @@ -243,20 +251,20 @@ func (f *FileSnapshotStore) getSnapshots() ([]*fileSnapshotMeta, error) { // Ignore any temporary snapshots dirName := snap.Name() if strings.HasSuffix(dirName, tmpSuffix) { - f.logger.Printf("[WARN] snapshot: Found temporary snapshot: %v", dirName) + f.logger.Warn("found temporary snapshot", "name", dirName) continue } // Try to read the meta data meta, err := f.readMeta(dirName) if err != nil { - f.logger.Printf("[WARN] snapshot: Failed to read metadata for %v: %v", dirName, err) + f.logger.Warn("failed to read metadata", "name", dirName, "error", err) continue } // Make sure we can understand this version. if meta.Version < SnapshotVersionMin || meta.Version > SnapshotVersionMax { - f.logger.Printf("[WARN] snapshot: Snapshot version for %v not supported: %d", dirName, meta.Version) + f.logger.Warn("snapshot version not supported", "name", dirName, "version", meta.Version) continue } @@ -297,7 +305,7 @@ func (f *FileSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error // Get the metadata meta, err := f.readMeta(id) if err != nil { - f.logger.Printf("[ERR] snapshot: Failed to get meta data to open snapshot: %v", err) + f.logger.Error("failed to get meta data to open snapshot", "error", err) return nil, nil, err } @@ -305,7 +313,7 @@ func (f *FileSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error statePath := filepath.Join(f.path, id, stateFilePath) fh, err := os.Open(statePath) if err != nil { - f.logger.Printf("[ERR] snapshot: Failed to open state file: %v", err) + f.logger.Error("failed to open state file", "error", err) return nil, nil, err } @@ -315,7 +323,7 @@ func (f *FileSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error // Compute the hash _, err = io.Copy(stateHash, fh) if err != nil { - f.logger.Printf("[ERR] snapshot: Failed to read state file: %v", err) + f.logger.Error("failed to read state file", "error", err) fh.Close() return nil, nil, err } @@ -323,15 +331,14 @@ func (f *FileSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error // Verify the hash computed := stateHash.Sum(nil) if bytes.Compare(meta.CRC, computed) != 0 { - f.logger.Printf("[ERR] snapshot: CRC checksum failed (stored: %v computed: %v)", - meta.CRC, computed) + f.logger.Error("CRC checksum failed", "stored", meta.CRC, "computed", computed) fh.Close() return nil, nil, fmt.Errorf("CRC mismatch") } // Seek to the start if _, err := fh.Seek(0, 0); err != nil { - f.logger.Printf("[ERR] snapshot: State file seek failed: %v", err) + f.logger.Error("state file seek failed", "error", err) fh.Close() return nil, nil, err } @@ -349,15 +356,15 @@ func (f *FileSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error func (f *FileSnapshotStore) ReapSnapshots() error { snapshots, err := f.getSnapshots() if err != nil { - f.logger.Printf("[ERR] snapshot: Failed to get snapshots: %v", err) + f.logger.Error("failed to get snapshots", "error", err) return err } for i := f.retain; i < len(snapshots); i++ { path := filepath.Join(f.path, snapshots[i].ID) - f.logger.Printf("[INFO] snapshot: reaping snapshot %v", path) + f.logger.Info("reaping snapshot", "path", path) if err := os.RemoveAll(path); err != nil { - f.logger.Printf("[ERR] snapshot: Failed to reap snapshot %v: %v", path, err) + f.logger.Error("failed to reap snapshot", "path", path, "error", err) return err } } @@ -386,9 +393,9 @@ func (s *FileSnapshotSink) Close() error { // Close the open handles if err := s.finalize(); err != nil { - s.logger.Printf("[ERR] snapshot: Failed to finalize snapshot: %v", err) + s.logger.Error("failed to finalize snapshot", "error", err) if delErr := os.RemoveAll(s.dir); delErr != nil { - s.logger.Printf("[ERR] snapshot: Failed to delete temporary snapshot directory at path %v: %v", s.dir, delErr) + s.logger.Error("failed to delete temporary snapshot directory", "path", s.dir, "error", delErr) return delErr } return err @@ -396,27 +403,27 @@ func (s *FileSnapshotSink) Close() error { // Write out the meta data if err := s.writeMeta(); err != nil { - s.logger.Printf("[ERR] snapshot: Failed to write metadata: %v", err) + s.logger.Error("failed to write metadata", "error", err) return err } // Move the directory into place newPath := strings.TrimSuffix(s.dir, tmpSuffix) if err := os.Rename(s.dir, newPath); err != nil { - s.logger.Printf("[ERR] snapshot: Failed to move snapshot into place: %v", err) + s.logger.Error("failed to move snapshot into place", "error", err) return err } - if runtime.GOOS != "windows" { //skipping fsync for directory entry edits on Windows, only needed for *nix style file systems + if runtime.GOOS != "windows" { // skipping fsync for directory entry edits on Windows, only needed for *nix style file systems parentFH, err := os.Open(s.parentDir) defer parentFH.Close() if err != nil { - s.logger.Printf("[ERR] snapshot: Failed to open snapshot parent directory %v, error: %v", s.parentDir, err) + s.logger.Error("failed to open snapshot parent directory", "path", s.parentDir, "error", err) return err } if err = parentFH.Sync(); err != nil { - s.logger.Printf("[ERR] snapshot: Failed syncing parent directory %v, error: %v", s.parentDir, err) + s.logger.Error("failed syncing parent directory", "path", s.parentDir, "error", err) return err } } @@ -439,7 +446,7 @@ func (s *FileSnapshotSink) Cancel() error { // Close the open handles if err := s.finalize(); err != nil { - s.logger.Printf("[ERR] snapshot: Failed to finalize snapshot: %v", err) + s.logger.Error("failed to finalize snapshot", "error", err) return err } diff --git a/vendor/github.com/hashicorp/raft/fsm.go b/vendor/github.com/hashicorp/raft/fsm.go index 8ad9b5995a7e..5622ebf82892 100644 --- a/vendor/github.com/hashicorp/raft/fsm.go +++ b/vendor/github.com/hashicorp/raft/fsm.go @@ -31,6 +31,26 @@ type FSM interface { Restore(io.ReadCloser) error } +// BatchingFSM extends the FSM interface to add an ApplyBatch function. This can +// optionally be implemented by clients to enable multiple logs to be applied to +// the FSM in batches. Up to MaxAppendEntries could be sent in a batch. +type BatchingFSM interface { + // ApplyBatch is invoked once a batch of log entries has been committed and + // are ready to be applied to the FSM. ApplyBatch will take in an array of + // log entries. These log entries will be in the order they were committed, + // will not have gaps, and could be of a few log types. Clients should check + // the log type prior to attempting to decode the data attached. Presently + // the LogCommand and LogConfiguration types will be sent. + // + // The returned slice must be the same length as the input and each response + // should correlate to the log at the same index of the input. The returned + // values will be made available in the ApplyFuture returned by Raft.Apply + // method if that method was called on the same Raft node as the FSM. + ApplyBatch([]*Log) []interface{} + + FSM +} + // FSMSnapshot is returned by an FSM in response to a Snapshot // It must be safe to invoke FSMSnapshot methods with concurrent // calls to Apply. @@ -49,7 +69,10 @@ type FSMSnapshot interface { func (r *Raft) runFSM() { var lastIndex, lastTerm uint64 - commit := func(req *commitTuple) { + batchingFSM, batchingEnabled := r.fsm.(BatchingFSM) + configStore, configStoreEnabled := r.fsm.(ConfigurationStore) + + commitSingle := func(req *commitTuple) { // Apply the log if a command or config change var resp interface{} // Make sure we send a response @@ -68,15 +91,14 @@ func (r *Raft) runFSM() { metrics.MeasureSince([]string{"raft", "fsm", "apply"}, start) case LogConfiguration: - configStore, ok := r.fsm.(ConfigurationStore) - if !ok { + if !configStoreEnabled { // Return early to avoid incrementing the index and term for // an unimplemented operation. return } start := time.Now() - configStore.StoreConfiguration(req.log.Index, decodeConfiguration(req.log.Data)) + configStore.StoreConfiguration(req.log.Index, DecodeConfiguration(req.log.Data)) metrics.MeasureSince([]string{"raft", "fsm", "store_config"}, start) } @@ -85,6 +107,67 @@ func (r *Raft) runFSM() { lastTerm = req.log.Term } + commitBatch := func(reqs []*commitTuple) { + if !batchingEnabled { + for _, ct := range reqs { + commitSingle(ct) + } + return + } + + // Only send LogCommand and LogConfiguration log types. LogBarrier types + // will not be sent to the FSM. + shouldSend := func(l *Log) bool { + switch l.Type { + case LogCommand, LogConfiguration: + return true + } + return false + } + + var lastBatchIndex, lastBatchTerm uint64 + sendLogs := make([]*Log, 0, len(reqs)) + for _, req := range reqs { + if shouldSend(req.log) { + sendLogs = append(sendLogs, req.log) + } + lastBatchIndex = req.log.Index + lastBatchTerm = req.log.Term + } + + var responses []interface{} + if len(sendLogs) > 0 { + start := time.Now() + responses = batchingFSM.ApplyBatch(sendLogs) + metrics.MeasureSince([]string{"raft", "fsm", "applyBatch"}, start) + metrics.AddSample([]string{"raft", "fsm", "applyBatchNum"}, float32(len(reqs))) + + // Ensure we get the expected responses + if len(sendLogs) != len(responses) { + panic("invalid number of responses") + } + } + + // Update the indexes + lastIndex = lastBatchIndex + lastTerm = lastBatchTerm + + var i int + for _, req := range reqs { + var resp interface{} + // If the log was sent to the FSM, retrieve the response. + if shouldSend(req.log) { + resp = responses[i] + i++ + } + + if req.future != nil { + req.future.response = resp + req.future.respond(nil) + } + } + } + restore := func(req *restoreFuture) { // Open the snapshot meta, source, err := r.snapshots.Open(req.ID) @@ -132,8 +215,8 @@ func (r *Raft) runFSM() { select { case ptr := <-r.fsmMutateCh: switch req := ptr.(type) { - case *commitTuple: - commit(req) + case []*commitTuple: + commitBatch(req) case *restoreFuture: restore(req) diff --git a/vendor/github.com/hashicorp/raft/future.go b/vendor/github.com/hashicorp/raft/future.go index cc1e905ef010..6346b453b19c 100644 --- a/vendor/github.com/hashicorp/raft/future.go +++ b/vendor/github.com/hashicorp/raft/future.go @@ -183,14 +183,13 @@ type userSnapshotFuture struct { func (u *userSnapshotFuture) Open() (*SnapshotMeta, io.ReadCloser, error) { if u.opener == nil { return nil, nil, fmt.Errorf("no snapshot available") - } else { - // Invalidate the opener so it can't get called multiple times, - // which isn't generally safe. - defer func() { - u.opener = nil - }() - return u.opener() } + // Invalidate the opener so it can't get called multiple times, + // which isn't generally safe. + defer func() { + u.opener = nil + }() + return u.opener() } // userRestoreFuture is used for waiting on a user-triggered restore of an diff --git a/vendor/github.com/hashicorp/raft/inmem_snapshot.go b/vendor/github.com/hashicorp/raft/inmem_snapshot.go index ad52f93aeffd..641d9d8172ae 100644 --- a/vendor/github.com/hashicorp/raft/inmem_snapshot.go +++ b/vendor/github.com/hashicorp/raft/inmem_snapshot.go @@ -100,10 +100,12 @@ func (s *InmemSnapshotSink) Close() error { return nil } +// ID returns the ID of the SnapshotMeta func (s *InmemSnapshotSink) ID() string { return s.meta.ID } +// Cancel returns successfully with a nil error func (s *InmemSnapshotSink) Cancel() error { return nil } diff --git a/vendor/github.com/hashicorp/raft/log.go b/vendor/github.com/hashicorp/raft/log.go index c15a84472fde..ad3bf0f09d5b 100644 --- a/vendor/github.com/hashicorp/raft/log.go +++ b/vendor/github.com/hashicorp/raft/log.go @@ -10,12 +10,12 @@ const ( // LogNoop is used to assert leadership. LogNoop - // LogAddPeer is used to add a new peer. This should only be used with + // LogAddPeerDeprecated is used to add a new peer. This should only be used with // older protocol versions designed to be compatible with unversioned // Raft servers. See comments in config.go for details. LogAddPeerDeprecated - // LogRemovePeer is used to remove an existing peer. This should only be + // LogRemovePeerDeprecated is used to remove an existing peer. This should only be // used with older protocol versions designed to be compatible with // unversioned Raft servers. See comments in config.go for details. LogRemovePeerDeprecated diff --git a/vendor/github.com/hashicorp/raft/net_transport.go b/vendor/github.com/hashicorp/raft/net_transport.go index 523fa698e552..6092cafbcfef 100644 --- a/vendor/github.com/hashicorp/raft/net_transport.go +++ b/vendor/github.com/hashicorp/raft/net_transport.go @@ -5,8 +5,8 @@ import ( "context" "errors" "fmt" + "github.com/hashicorp/go-hclog" "io" - "log" "net" "os" "sync" @@ -66,7 +66,7 @@ type NetworkTransport struct { heartbeatFn func(RPC) heartbeatFnLock sync.Mutex - logger *log.Logger + logger hclog.Logger maxPool int @@ -92,7 +92,7 @@ type NetworkTransportConfig struct { // ServerAddressProvider is used to override the target address when establishing a connection to invoke an RPC ServerAddressProvider ServerAddressProvider - Logger *log.Logger + Logger hclog.Logger // Dialer Stream StreamLayer @@ -105,6 +105,7 @@ type NetworkTransportConfig struct { Timeout time.Duration } +// ServerAddressProvider is a target address to which we invoke an RPC when establishing a connection type ServerAddressProvider interface { ServerAddr(id ServerID) (ServerAddress, error) } @@ -148,7 +149,11 @@ func NewNetworkTransportWithConfig( config *NetworkTransportConfig, ) *NetworkTransport { if config.Logger == nil { - config.Logger = log.New(os.Stderr, "", log.LstdFlags) + config.Logger = hclog.New(&hclog.LoggerOptions{ + Name: "raft-net", + Output: hclog.DefaultOutput, + Level: hclog.DefaultLevel, + }) } trans := &NetworkTransport{ connPool: make(map[ServerAddress][]*netConn), @@ -182,7 +187,11 @@ func NewNetworkTransport( if logOutput == nil { logOutput = os.Stderr } - logger := log.New(logOutput, "", log.LstdFlags) + logger := hclog.New(&hclog.LoggerOptions{ + Name: "raft-net", + Output: logOutput, + Level: hclog.DefaultLevel, + }) config := &NetworkTransportConfig{Stream: stream, MaxPool: maxPool, Timeout: timeout, Logger: logger} return NewNetworkTransportWithConfig(config) } @@ -195,7 +204,7 @@ func NewNetworkTransportWithLogger( stream StreamLayer, maxPool int, timeout time.Duration, - logger *log.Logger, + logger hclog.Logger, ) *NetworkTransport { config := &NetworkTransportConfig{Stream: stream, MaxPool: maxPool, Timeout: timeout, Logger: logger} return NewNetworkTransportWithConfig(config) @@ -310,7 +319,7 @@ func (n *NetworkTransport) getProviderAddressOrFallback(id ServerID, target Serv if n.serverAddressProvider != nil { serverAddressOverride, err := n.serverAddressProvider.ServerAddr(id) if err != nil { - n.logger.Printf("[WARN] raft: Unable to get address for server id %v, using fallback address %v: %v", id, target, err) + n.logger.Warn("unable to get address for sever, using fallback address", "id", id, "fallback", target, "error", err) } else { return serverAddressOverride } @@ -486,7 +495,7 @@ func (n *NetworkTransport) listen() { } if !n.IsShutdown() { - n.logger.Printf("[ERR] raft-net: Failed to accept connection: %v", err) + n.logger.Error("failed to accept connection", "error", err) } select { @@ -499,7 +508,7 @@ func (n *NetworkTransport) listen() { // No error, reset loop delay loopDelay = 0 - n.logger.Printf("[DEBUG] raft-net: %v accepted connection from: %v", n.LocalAddr(), conn.RemoteAddr()) + n.logger.Debug("accepted connection", "local-address", n.LocalAddr(), "remote-address", conn.RemoteAddr().String()) // Handle the connection in dedicated routine go n.handleConn(n.getStreamContext(), conn) @@ -519,19 +528,19 @@ func (n *NetworkTransport) handleConn(connCtx context.Context, conn net.Conn) { for { select { case <-connCtx.Done(): - n.logger.Println("[DEBUG] raft-net: stream layer is closed") + n.logger.Debug("stream layer is closed") return default: } if err := n.handleCommand(r, dec, enc); err != nil { if err != io.EOF { - n.logger.Printf("[ERR] raft-net: Failed to decode incoming command: %v", err) + n.logger.Error("failed to decode incoming command", "error", err) } return } if err := w.Flush(); err != nil { - n.logger.Printf("[ERR] raft-net: Failed to flush response: %v", err) + n.logger.Error("failed to flush response", "error", err) return } } diff --git a/vendor/github.com/hashicorp/raft/observer.go b/vendor/github.com/hashicorp/raft/observer.go index 2d4f37db12d1..1611d6b44b11 100644 --- a/vendor/github.com/hashicorp/raft/observer.go +++ b/vendor/github.com/hashicorp/raft/observer.go @@ -18,7 +18,7 @@ type Observation struct { // LeaderObservation is used for the data when leadership changes. type LeaderObservation struct { - leader ServerAddress + Leader ServerAddress } // PeerObservation is sent to observers when peers change. diff --git a/vendor/github.com/hashicorp/raft/raft.go b/vendor/github.com/hashicorp/raft/raft.go index e1f6a760f757..af60c75914a1 100644 --- a/vendor/github.com/hashicorp/raft/raft.go +++ b/vendor/github.com/hashicorp/raft/raft.go @@ -9,6 +9,8 @@ import ( "sync/atomic" "time" + "github.com/hashicorp/go-hclog" + "github.com/armon/go-metrics" ) @@ -94,7 +96,7 @@ func (r *Raft) setLeader(leader ServerAddress) { r.leader = leader r.leaderLock.Unlock() if oldLeader != leader { - r.observe(LeaderObservation{leader: leader}) + r.observe(LeaderObservation{Leader: leader}) } } @@ -147,7 +149,7 @@ func (r *Raft) run() { // runFollower runs the FSM for a follower. func (r *Raft) runFollower() { didWarn := false - r.logger.Info(fmt.Sprintf("%v entering Follower state (Leader: %q)", r, r.Leader())) + r.logger.Info("entering follower state", "follower", r, "leader", r.Leader()) metrics.IncrCounter([]string{"raft", "state", "follower"}, 1) heartbeatTimer := randomTimeout(r.conf.HeartbeatTimeout) @@ -209,7 +211,7 @@ func (r *Raft) runFollower() { didWarn = true } } else { - r.logger.Warn(fmt.Sprintf("Heartbeat timeout from %q reached, starting election", lastLeader)) + r.logger.Warn("heartbeat timeout reached, starting election", "last-leader", lastLeader) metrics.IncrCounter([]string{"raft", "transition", "heartbeat_timeout"}, 1) r.setState(Candidate) return @@ -245,7 +247,7 @@ func (r *Raft) liveBootstrap(configuration Configuration) error { // runCandidate runs the FSM for a candidate. func (r *Raft) runCandidate() { - r.logger.Info(fmt.Sprintf("%v entering Candidate state in term %v", r, r.getCurrentTerm()+1)) + r.logger.Info("entering candidate state", "node", r, "term", r.getCurrentTerm()+1) metrics.IncrCounter([]string{"raft", "state", "candidate"}, 1) // Start vote for us, and set a timeout @@ -263,7 +265,7 @@ func (r *Raft) runCandidate() { // Tally the votes, need a simple majority grantedVotes := 0 votesNeeded := r.quorumSize() - r.logger.Debug(fmt.Sprintf("Votes needed: %d", votesNeeded)) + r.logger.Debug("votes", "needed", votesNeeded) for r.getState() == Candidate { select { @@ -273,7 +275,7 @@ func (r *Raft) runCandidate() { case vote := <-voteCh: // Check if the term is greater than ours, bail if vote.Term > r.getCurrentTerm() { - r.logger.Debug("Newer term discovered, fallback to follower") + r.logger.Debug("newer term discovered, fallback to follower") r.setState(Follower) r.setCurrentTerm(vote.Term) return @@ -282,13 +284,12 @@ func (r *Raft) runCandidate() { // Check if the vote is granted if vote.Granted { grantedVotes++ - r.logger.Debug(fmt.Sprintf("Vote granted from %s in term %v. Tally: %d", - vote.voterID, vote.Term, grantedVotes)) + r.logger.Debug("vote granted", "from", vote.voterID, "term", vote.Term, "tally", grantedVotes) } // Check if we've become the leader if grantedVotes >= votesNeeded { - r.logger.Info(fmt.Sprintf("Election won. Tally: %d", grantedVotes)) + r.logger.Info("election won", "tally", grantedVotes) r.setState(Leader) r.setLeader(r.localAddr) return @@ -359,7 +360,7 @@ func (r *Raft) setupLeaderState() { // runLeader runs the FSM for a leader. Do the setup here and drop into // the leaderLoop for the hot loop. func (r *Raft) runLeader() { - r.logger.Info(fmt.Sprintf("%v entering Leader state", r)) + r.logger.Info("entering leader state", "leader", r) metrics.IncrCounter([]string{"raft", "state", "leader"}, 1) // Notify that we are the leader @@ -470,7 +471,7 @@ func (r *Raft) startStopReplication() { } inConfig[server.ID] = true if _, ok := r.leaderState.replState[server.ID]; !ok { - r.logger.Info(fmt.Sprintf("Added peer %v, starting replication", server.ID)) + r.logger.Info("added peer, starting replication", "peer", server.ID) s := &followerReplication{ peer: server, commitment: r.leaderState.commitment, @@ -497,7 +498,7 @@ func (r *Raft) startStopReplication() { continue } // Replicate up to lastIdx and stop - r.logger.Info(fmt.Sprintf("Removed peer %v, stopping replication after %v", serverID, lastIdx)) + r.logger.Info("removed peer, stopping replication", "peer", serverID, "last-index", lastIdx) repl.stopCh <- lastIdx close(repl.stopCh) delete(r.leaderState.replState, serverID) @@ -618,6 +619,8 @@ func (r *Raft) leaderLoop() { commitIndex := r.leaderState.commitment.getCommitIndex() r.setCommitIndex(commitIndex) + // New configration has been committed, set it as the committed + // value. if r.configurations.latestIndex > oldCommitIndex && r.configurations.latestIndex <= commitIndex { r.configurations.committed = r.configurations.latest @@ -627,40 +630,48 @@ func (r *Raft) leaderLoop() { } } - var numProcessed int start := time.Now() + var groupReady []*list.Element + var groupFutures = make(map[uint64]*logFuture) + var lastIdxInGroup uint64 - for { - e := r.leaderState.inflight.Front() - if e == nil { - break - } + // Pull all inflight logs that are committed off the queue. + for e := r.leaderState.inflight.Front(); e != nil; e = e.Next() { commitLog := e.Value.(*logFuture) idx := commitLog.log.Index if idx > commitIndex { + // Don't go past the committed index break } + // Measure the commit time metrics.MeasureSince([]string{"raft", "commitTime"}, commitLog.dispatch) + groupReady = append(groupReady, e) + groupFutures[idx] = commitLog + lastIdxInGroup = idx + } - r.processLogs(idx, commitLog) + // Process the group + if len(groupReady) != 0 { + r.processLogs(lastIdxInGroup, groupFutures) - r.leaderState.inflight.Remove(e) - numProcessed++ + for _, e := range groupReady { + r.leaderState.inflight.Remove(e) + } } // Measure the time to enqueue batch of logs for FSM to apply metrics.MeasureSince([]string{"raft", "fsm", "enqueue"}, start) // Count the number of logs enqueued - metrics.SetGauge([]string{"raft", "commitNumLogs"}, float32(numProcessed)) + metrics.SetGauge([]string{"raft", "commitNumLogs"}, float32(len(groupReady))) if stepDown { if r.conf.ShutdownOnRemove { - r.logger.Info("Removed ourself, shutting down") + r.logger.Info("removed ourself, shutting down") r.Shutdown() } else { - r.logger.Info("Removed ourself, transitioning to follower") + r.logger.Info("removed ourself, transitioning to follower") r.setState(Follower) } } @@ -672,7 +683,7 @@ func (r *Raft) leaderLoop() { } else if v.votes < v.quorumSize { // Early return, means there must be a new leader - r.logger.Warn("New leader elected, stepping down") + r.logger.Warn("new leader elected, stepping down") r.setState(Follower) delete(r.leaderState.notify, v) for _, repl := range r.leaderState.replState { @@ -867,9 +878,9 @@ func (r *Raft) checkLeaderLease() time.Duration { } else { // Log at least once at high value, then debug. Otherwise it gets very verbose. if diff <= 3*r.conf.LeaderLeaseTimeout { - r.logger.Warn(fmt.Sprintf("Failed to contact %v in %v", server.ID, diff)) + r.logger.Warn("failed to contact", "server-id", server.ID, "time", diff) } else { - r.logger.Debug(fmt.Sprintf("Failed to contact %v in %v", server.ID, diff)) + r.logger.Debug("failed to contact", "server-id", server.ID, "time", diff) } } metrics.AddSample([]string{"raft", "leader", "lastContact"}, float32(diff/time.Millisecond)) @@ -879,7 +890,7 @@ func (r *Raft) checkLeaderLease() time.Duration { // Verify we can contact a quorum quorum := r.quorumSize() if contacted < quorum { - r.logger.Warn("Failed to contact quorum of nodes, stepping down") + r.logger.Warn("failed to contact quorum of nodes, stepping down") r.setState(Follower) metrics.IncrCounter([]string{"raft", "transition", "leader_lease_timeout"}, 1) } @@ -967,7 +978,7 @@ func (r *Raft) restoreUserSnapshot(meta *SnapshotMeta, reader io.Reader) error { if err := sink.Close(); err != nil { return fmt.Errorf("failed to close snapshot: %v", err) } - r.logger.Info(fmt.Sprintf("Copied %d bytes to local snapshot", n)) + r.logger.Info("copied to local snapshot", "bytes", n) // Restore the snapshot into the FSM. If this fails we are in a // bad state so we panic to take ourselves out. @@ -991,7 +1002,7 @@ func (r *Raft) restoreUserSnapshot(meta *SnapshotMeta, reader io.Reader) error { r.setLastApplied(lastIndex) r.setLastSnapshot(lastIndex, term) - r.logger.Info(fmt.Sprintf("Restored user snapshot (index %d)", lastIndex)) + r.logger.Info("restored user snapshot", "index", latestIndex) return nil } @@ -1005,8 +1016,11 @@ func (r *Raft) appendConfigurationEntry(future *configurationChangeFuture) { return } - r.logger.Info(fmt.Sprintf("Updating configuration with %s (%v, %v) to %+v", - future.req.command, future.req.serverID, future.req.serverAddress, configuration.Servers)) + r.logger.Info("updating configuration", + "command", future.req.command, + "server-id", future.req.serverID, + "server-addr", future.req.serverAddress, + "servers", hclog.Fmt("%+v", configuration.Servers)) // In pre-ID compatibility mode we translate all configuration changes // in to an old remove peer message, which can handle all supported @@ -1023,7 +1037,7 @@ func (r *Raft) appendConfigurationEntry(future *configurationChangeFuture) { } else { future.log = Log{ Type: LogConfiguration, - Data: encodeConfiguration(configuration), + Data: EncodeConfiguration(configuration), } } @@ -1059,7 +1073,7 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) { // Write the log entry locally if err := r.logs.StoreLogs(logs); err != nil { - r.logger.Error(fmt.Sprintf("Failed to commit logs: %v", err)) + r.logger.Error("failed to commit logs", "error", err) for _, applyLog := range applyLogs { applyLog.respond(err) } @@ -1081,72 +1095,88 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) { // applied up to the given index limit. // This can be called from both leaders and followers. // Followers call this from AppendEntries, for n entries at a time, and always -// pass future=nil. -// Leaders call this once per inflight when entries are committed. They pass -// the future from inflights. -func (r *Raft) processLogs(index uint64, future *logFuture) { +// pass futures=nil. +// Leaders call this when entries are committed. They pass the futures from any +// inflight logs. +func (r *Raft) processLogs(index uint64, futures map[uint64]*logFuture) { // Reject logs we've applied already lastApplied := r.getLastApplied() if index <= lastApplied { - r.logger.Warn(fmt.Sprintf("Skipping application of old log: %d", index)) + r.logger.Warn("skipping application of old log", "index", index) return } + applyBatch := func(batch []*commitTuple) { + select { + case r.fsmMutateCh <- batch: + case <-r.shutdownCh: + for _, cl := range batch { + if cl.future != nil { + cl.future.respond(ErrRaftShutdown) + } + } + } + } + + batch := make([]*commitTuple, 0, r.conf.MaxAppendEntries) + // Apply all the preceding logs - for idx := r.getLastApplied() + 1; idx <= index; idx++ { + for idx := lastApplied + 1; idx <= index; idx++ { + var preparedLog *commitTuple // Get the log, either from the future or from our log store - if future != nil && future.log.Index == idx { - r.processLog(&future.log, future) + future, futureOk := futures[idx] + if futureOk { + preparedLog = r.prepareLog(&future.log, future) } else { l := new(Log) if err := r.logs.GetLog(idx, l); err != nil { - r.logger.Error(fmt.Sprintf("Failed to get log at %d: %v", idx, err)) + r.logger.Error("failed to get log", "index", idx, "error", err) panic(err) } - r.processLog(l, nil) + preparedLog = r.prepareLog(l, nil) + } + + switch { + case preparedLog != nil: + // If we have a log ready to send to the FSM add it to the batch. + // The FSM thread will respond to the future. + batch = append(batch, preparedLog) + + // If we have filled up a batch, send it to the FSM + if len(batch) >= r.conf.MaxAppendEntries { + applyBatch(batch) + batch = make([]*commitTuple, 0, r.conf.MaxAppendEntries) + } + + case futureOk: + // Invoke the future if given. + future.respond(nil) } + } - // Update the lastApplied index and term - r.setLastApplied(idx) + // If there are any remaining logs in the batch apply them + if len(batch) != 0 { + applyBatch(batch) } + + // Update the lastApplied index and term + r.setLastApplied(index) } // processLog is invoked to process the application of a single committed log entry. -func (r *Raft) processLog(l *Log, future *logFuture) { +func (r *Raft) prepareLog(l *Log, future *logFuture) *commitTuple { switch l.Type { case LogBarrier: // Barrier is handled by the FSM fallthrough case LogCommand: - // Forward to the fsm handler - select { - case r.fsmMutateCh <- &commitTuple{l, future}: - case <-r.shutdownCh: - if future != nil { - future.respond(ErrRaftShutdown) - } - } - - // Return so that the future is only responded to - // by the FSM handler when the application is done - return + return &commitTuple{l, future} case LogConfiguration: // Only support this with the v2 configuration format if r.protocolVersion > 2 { - // Forward to the fsm handler - select { - case r.fsmMutateCh <- &commitTuple{l, future}: - case <-r.shutdownCh: - if future != nil { - future.respond(ErrRaftShutdown) - } - } - - // Return so that the future is only responded to - // by the FSM handler when the application is done - return + return &commitTuple{l, future} } case LogAddPeerDeprecated: case LogRemovePeerDeprecated: @@ -1157,10 +1187,7 @@ func (r *Raft) processLog(l *Log, future *logFuture) { panic(fmt.Errorf("unrecognized log type: %#v", l)) } - // Invoke the future if given - if future != nil { - future.respond(nil) - } + return nil } // processRPC is called to handle an incoming RPC request. This must only be @@ -1181,7 +1208,8 @@ func (r *Raft) processRPC(rpc RPC) { case *TimeoutNowRequest: r.timeoutNow(rpc, cmd) default: - r.logger.Error(fmt.Sprintf("Got unexpected command: %#v", rpc.Command)) + r.logger.Error("got unexpected command", + "command", hclog.Fmt("%#v", rpc.Command)) rpc.Respond(nil, fmt.Errorf("unexpected command")) } } @@ -1204,7 +1232,7 @@ func (r *Raft) processHeartbeat(rpc RPC) { case *AppendEntriesRequest: r.appendEntries(rpc, cmd) default: - r.logger.Error(fmt.Sprintf("Expected heartbeat, got command: %#v", rpc.Command)) + r.logger.Error("expected heartbeat, got", "command", hclog.Fmt("%#v", rpc.Command)) rpc.Respond(nil, fmt.Errorf("unexpected command")) } } @@ -1254,8 +1282,10 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { } else { var prevLog Log if err := r.logs.GetLog(a.PrevLogEntry, &prevLog); err != nil { - r.logger.Warn(fmt.Sprintf("Failed to get previous log: %d %v (last: %d)", - a.PrevLogEntry, err, lastIdx)) + r.logger.Warn("failed to get previous log", + "previous-index", a.PrevLogEntry, + "last-index", lastIdx, + "error", err) resp.NoRetryBackoff = true return } @@ -1263,8 +1293,9 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { } if a.PrevLogTerm != prevLogTerm { - r.logger.Warn(fmt.Sprintf("Previous log term mis-match: ours: %d remote: %d", - prevLogTerm, a.PrevLogTerm)) + r.logger.Warn("previous log term mis-match", + "ours", prevLogTerm, + "remote", a.PrevLogTerm) resp.NoRetryBackoff = true return } @@ -1284,14 +1315,17 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { } var storeEntry Log if err := r.logs.GetLog(entry.Index, &storeEntry); err != nil { - r.logger.Warn(fmt.Sprintf("Failed to get log entry %d: %v", - entry.Index, err)) + r.logger.Warn("failed to get log entry", + "index", entry.Index, + "error", err) return } if entry.Term != storeEntry.Term { - r.logger.Warn(fmt.Sprintf("Clearing log suffix from %d to %d", entry.Index, lastLogIdx)) + r.logger.Warn("clearing log suffix", + "from", entry.Index, + "to", lastLogIdx) if err := r.logs.DeleteRange(entry.Index, lastLogIdx); err != nil { - r.logger.Error(fmt.Sprintf("Failed to clear log suffix: %v", err)) + r.logger.Error("failed to clear log suffix", "error", err) return } if entry.Index <= r.configurations.latestIndex { @@ -1306,7 +1340,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { if n := len(newEntries); n > 0 { // Append the new entries if err := r.logs.StoreLogs(newEntries); err != nil { - r.logger.Error(fmt.Sprintf("Failed to append to logs: %v", err)) + r.logger.Error("failed to append to logs", "error", err) // TODO: leaving r.getLastLog() in the wrong // state if there was a truncation above return @@ -1351,7 +1385,7 @@ func (r *Raft) processConfigurationLogEntry(entry *Log) { if entry.Type == LogConfiguration { r.configurations.committed = r.configurations.latest r.configurations.committedIndex = r.configurations.latestIndex - r.configurations.latest = decodeConfiguration(entry.Data) + r.configurations.latest = DecodeConfiguration(entry.Data) r.configurations.latestIndex = entry.Index } else if entry.Type == LogAddPeerDeprecated || entry.Type == LogRemovePeerDeprecated { r.configurations.committed = r.configurations.latest @@ -1389,8 +1423,9 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { // vote! candidate := r.trans.DecodePeer(req.Candidate) if leader := r.Leader(); leader != "" && leader != candidate && !req.LeadershipTransfer { - r.logger.Warn(fmt.Sprintf("Rejecting vote request from %v since we have a leader: %v", - candidate, leader)) + r.logger.Warn("rejecting vote request since we have a leader", + "from", candidate, + "leader", leader) return } @@ -1402,7 +1437,7 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { // Increase the term if we see a newer one if req.Term > r.getCurrentTerm() { // Ensure transition to follower - r.logger.Debug("lost leadership because received a requestvote with newer term") + r.logger.Debug("lost leadership because received a requestVote with a newer term") r.setState(Follower) r.setCurrentTerm(req.Term) resp.Term = req.Term @@ -1411,20 +1446,20 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { // Check if we have voted yet lastVoteTerm, err := r.stable.GetUint64(keyLastVoteTerm) if err != nil && err.Error() != "not found" { - r.logger.Error(fmt.Sprintf("Failed to get last vote term: %v", err)) + r.logger.Error("failed to get last vote term", "error", err) return } lastVoteCandBytes, err := r.stable.Get(keyLastVoteCand) if err != nil && err.Error() != "not found" { - r.logger.Error(fmt.Sprintf("Failed to get last vote candidate: %v", err)) + r.logger.Error("failed to get last vote candidate", "error", err) return } // Check if we've voted in this election before if lastVoteTerm == req.Term && lastVoteCandBytes != nil { - r.logger.Info(fmt.Sprintf("Duplicate RequestVote for same term: %d", req.Term)) + r.logger.Info("duplicate requestVote for same term", "term", req.Term) if bytes.Compare(lastVoteCandBytes, req.Candidate) == 0 { - r.logger.Warn(fmt.Sprintf("Duplicate RequestVote from candidate: %s", req.Candidate)) + r.logger.Warn("duplicate requestVote from", "candidate", req.Candidate) resp.Granted = true } return @@ -1433,20 +1468,24 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { // Reject if their term is older lastIdx, lastTerm := r.getLastEntry() if lastTerm > req.LastLogTerm { - r.logger.Warn(fmt.Sprintf("Rejecting vote request from %v since our last term is greater (%d, %d)", - candidate, lastTerm, req.LastLogTerm)) + r.logger.Warn("rejecting vote request since our last term is greater", + "candidate", candidate, + "last-term", lastTerm, + "last-candidate-term", req.LastLogTerm) return } if lastTerm == req.LastLogTerm && lastIdx > req.LastLogIndex { - r.logger.Warn(fmt.Sprintf("Rejecting vote request from %v since our last index is greater (%d, %d)", - candidate, lastIdx, req.LastLogIndex)) + r.logger.Warn("rejecting vote request since our last index is greater", + "candidate", candidate, + "last-index", lastIdx, + "last-candidate-index", req.LastLogIndex) return } // Persist a vote for safety if err := r.persistVote(req.Term, req.Candidate); err != nil { - r.logger.Error(fmt.Sprintf("Failed to persist vote: %v", err)) + r.logger.Error("failed to persist vote", "error", err) return } @@ -1481,8 +1520,9 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { // Ignore an older term if req.Term < r.getCurrentTerm() { - r.logger.Info(fmt.Sprintf("Ignoring installSnapshot request with older term of %d vs currentTerm %d", - req.Term, r.getCurrentTerm())) + r.logger.Info("ignoring installSnapshot request with older term than current term", + "request-term", req.Term, + "current-term", r.getCurrentTerm()) return } @@ -1501,7 +1541,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { var reqConfiguration Configuration var reqConfigurationIndex uint64 if req.SnapshotVersion > 0 { - reqConfiguration = decodeConfiguration(req.Configuration) + reqConfiguration = DecodeConfiguration(req.Configuration) reqConfigurationIndex = req.ConfigurationIndex } else { reqConfiguration = decodePeers(req.Peers, r.trans) @@ -1511,7 +1551,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { sink, err := r.snapshots.Create(version, req.LastLogIndex, req.LastLogTerm, reqConfiguration, reqConfigurationIndex, r.trans) if err != nil { - r.logger.Error(fmt.Sprintf("Failed to create snapshot to install: %v", err)) + r.logger.Error("failed to create snapshot to install", "error", err) rpcErr = fmt.Errorf("failed to create snapshot: %v", err) return } @@ -1520,7 +1560,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { n, err := io.Copy(sink, rpc.Reader) if err != nil { sink.Cancel() - r.logger.Error(fmt.Sprintf("Failed to copy snapshot: %v", err)) + r.logger.Error("failed to copy snapshot", "error", err) rpcErr = err return } @@ -1528,18 +1568,19 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { // Check that we received it all if n != req.Size { sink.Cancel() - r.logger.Error(fmt.Sprintf("Failed to receive whole snapshot: %d / %d", n, req.Size)) + r.logger.Error("failed to receive whole snapshot", + "received", hclog.Fmt("%d / %d", n, req.Size)) rpcErr = fmt.Errorf("short read") return } // Finalize the snapshot if err := sink.Close(); err != nil { - r.logger.Error(fmt.Sprintf("Failed to finalize snapshot: %v", err)) + r.logger.Error("failed to finalize snapshot", "error", err) rpcErr = err return } - r.logger.Info(fmt.Sprintf("Copied %d bytes to local snapshot", n)) + r.logger.Info("copied to local snapshot", "bytes", n) // Restore snapshot future := &restoreFuture{ID: sink.ID()} @@ -1553,7 +1594,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { // Wait for the restore to happen if err := future.Error(); err != nil { - r.logger.Error(fmt.Sprintf("Failed to restore snapshot: %v", err)) + r.logger.Error("failed to restore snapshot", "error", err) rpcErr = err return } @@ -1572,7 +1613,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { // Compact logs, continue even if this fails if err := r.compactLogs(req.LastLogIndex); err != nil { - r.logger.Error(fmt.Sprintf("Failed to compact logs: %v", err)) + r.logger.Error("failed to compact logs", "error", err) } r.logger.Info("Installed remote snapshot") @@ -1622,7 +1663,9 @@ func (r *Raft) electSelf() <-chan *voteResult { resp := &voteResult{voterID: peer.ID} err := r.trans.RequestVote(peer.ID, peer.Address, req, &resp.RequestVoteResponse) if err != nil { - r.logger.Error(fmt.Sprintf("Failed to make RequestVote RPC to %v: %v", peer, err)) + r.logger.Error("failed to make requestVote RPC", + "target", peer, + "error", err) resp.Term = req.Term resp.Granted = false } @@ -1636,7 +1679,7 @@ func (r *Raft) electSelf() <-chan *voteResult { if server.ID == r.localID { // Persist a vote for ourselves if err := r.persistVote(req.Term, req.Candidate); err != nil { - r.logger.Error(fmt.Sprintf("Failed to persist vote : %v", err)) + r.logger.Error("failed to persist vote", "error", err) return nil } // Include our own vote diff --git a/vendor/github.com/hashicorp/raft/replication.go b/vendor/github.com/hashicorp/raft/replication.go index 900afc4c24ac..1e2f2db701ea 100644 --- a/vendor/github.com/hashicorp/raft/replication.go +++ b/vendor/github.com/hashicorp/raft/replication.go @@ -100,7 +100,7 @@ func (s *followerReplication) notifyAll(leader bool) { s.notifyLock.Unlock() // Submit our votes - for v, _ := range n { + for v := range n { v.vote(leader) } } @@ -182,7 +182,7 @@ PIPELINE: // to standard mode on failure. if err := r.pipelineReplicate(s); err != nil { if err != ErrPipelineReplicationNotSupported { - r.logger.Error(fmt.Sprintf("Failed to start pipeline replication to %s: %s", s.peer, err)) + r.logger.Error("failed to start pipeline replication to", "peer", s.peer, "error", err) } } goto RPC @@ -215,7 +215,7 @@ START: // Make the RPC call start = time.Now() if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil { - r.logger.Error(fmt.Sprintf("Failed to AppendEntries to %v: %v", s.peer, err)) + r.logger.Error("failed to appendEntries to", "peer", s.peer, "error", err) s.failures++ return } @@ -245,7 +245,7 @@ START: } else { s.failures++ } - r.logger.Warn(fmt.Sprintf("AppendEntries to %v rejected, sending older logs (next: %d)", s.peer, atomic.LoadUint64(&s.nextIndex))) + r.logger.Warn("appendEntries rejected, sending older logs", "peer", s.peer, "next", atomic.LoadUint64(&s.nextIndex)) } CHECK_MORE: @@ -272,7 +272,7 @@ SEND_SNAP: if stop, err := r.sendLatestSnapshot(s); stop { return true } else if err != nil { - r.logger.Error(fmt.Sprintf("Failed to send snapshot to %v: %v", s.peer, err)) + r.logger.Error("failed to send snapshot to", "peer", s.peer, "error", err) return } @@ -286,7 +286,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) { // Get the snapshots snapshots, err := r.snapshots.List() if err != nil { - r.logger.Error(fmt.Sprintf("Failed to list snapshots: %v", err)) + r.logger.Error("failed to list snapshots", "error", err) return false, err } @@ -299,7 +299,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) { snapID := snapshots[0].ID meta, snapshot, err := r.snapshots.Open(snapID) if err != nil { - r.logger.Error(fmt.Sprintf("Failed to open snapshot %v: %v", snapID, err)) + r.logger.Error("failed to open snapshot", "id", snapID, "error", err) return false, err } defer snapshot.Close() @@ -314,7 +314,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) { LastLogTerm: meta.Term, Peers: meta.Peers, Size: meta.Size, - Configuration: encodeConfiguration(meta.Configuration), + Configuration: EncodeConfiguration(meta.Configuration), ConfigurationIndex: meta.ConfigurationIndex, } @@ -322,7 +322,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) { start := time.Now() var resp InstallSnapshotResponse if err := r.trans.InstallSnapshot(s.peer.ID, s.peer.Address, &req, &resp, snapshot); err != nil { - r.logger.Error(fmt.Sprintf("Failed to install snapshot %v: %v", snapID, err)) + r.logger.Error("failed to install snapshot", "id", snapID, "error", err) s.failures++ return false, err } @@ -350,7 +350,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) { s.notifyAll(true) } else { s.failures++ - r.logger.Warn(fmt.Sprintf("InstallSnapshot to %v rejected", s.peer)) + r.logger.Warn("installSnapshot rejected to", "peer", s.peer) } return false, nil } @@ -377,7 +377,7 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) { start := time.Now() if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil { - r.logger.Error(fmt.Sprintf("Failed to heartbeat to %v: %v", s.peer.Address, err)) + r.logger.Error("failed to heartbeat to", "peer", s.peer.Address, "error", err) failures++ select { case <-time.After(backoff(failureWait, failures, maxFailureScale)): @@ -405,8 +405,8 @@ func (r *Raft) pipelineReplicate(s *followerReplication) error { defer pipeline.Close() // Log start and stop of pipeline - r.logger.Info(fmt.Sprintf("pipelining replication to peer %v", s.peer)) - defer r.logger.Info(fmt.Sprintf("aborting pipeline replication to peer %v", s.peer)) + r.logger.Info("pipelining replication", "peer", s.peer) + defer r.logger.Info("aborting pipeline replication", "peer", s.peer) // Create a shutdown and finish channel stopCh := make(chan struct{}) @@ -467,7 +467,7 @@ func (r *Raft) pipelineSend(s *followerReplication, p AppendPipeline, nextIdx *u // Pipeline the append entries if _, err := p.AppendEntries(req, new(AppendEntriesResponse)); err != nil { - r.logger.Error(fmt.Sprintf("Failed to pipeline AppendEntries to %v: %v", s.peer, err)) + r.logger.Error("failed to pipeline appendEntries", "peer", s.peer, "error", err) return true } @@ -543,7 +543,7 @@ func (r *Raft) setPreviousLog(req *AppendEntriesRequest, nextIndex uint64) error } else { var l Log if err := r.logs.GetLog(nextIndex-1, &l); err != nil { - r.logger.Error(fmt.Sprintf("Failed to get log at index %d: %v", nextIndex-1, err)) + r.logger.Error("failed to get log", "index", nextIndex-1, "error", err) return err } @@ -562,7 +562,7 @@ func (r *Raft) setNewLogs(req *AppendEntriesRequest, nextIndex, lastIndex uint64 for i := nextIndex; i <= maxIndex; i++ { oldLog := new(Log) if err := r.logs.GetLog(i, oldLog); err != nil { - r.logger.Error(fmt.Sprintf("Failed to get log at index %d: %v", i, err)) + r.logger.Error("failed to get log", "index", i, "error", err) return err } req.Entries = append(req.Entries, oldLog) @@ -578,7 +578,7 @@ func appendStats(peer string, start time.Time, logs float32) { // handleStaleTerm is used when a follower indicates that we have a stale term. func (r *Raft) handleStaleTerm(s *followerReplication) { - r.logger.Error(fmt.Sprintf("peer %v has newer term, stopping replication", s.peer)) + r.logger.Error("peer has newer term, stopping replication", "peer", s.peer) s.notifyAll(false) // No longer leader asyncNotifyCh(s.stepDown) } diff --git a/vendor/github.com/hashicorp/raft/snapshot.go b/vendor/github.com/hashicorp/raft/snapshot.go index 2e0f77a5dd7f..f4c39451453c 100644 --- a/vendor/github.com/hashicorp/raft/snapshot.go +++ b/vendor/github.com/hashicorp/raft/snapshot.go @@ -77,14 +77,14 @@ func (r *Raft) runSnapshots() { // Trigger a snapshot if _, err := r.takeSnapshot(); err != nil { - r.logger.Error(fmt.Sprintf("Failed to take snapshot: %v", err)) + r.logger.Error("failed to take snapshot", "error", err) } case future := <-r.userSnapshotCh: // User-triggered, run immediately id, err := r.takeSnapshot() if err != nil { - r.logger.Error(fmt.Sprintf("Failed to take snapshot: %v", err)) + r.logger.Error("failed to take snapshot", "error", err) } else { future.opener = func() (*SnapshotMeta, io.ReadCloser, error) { return r.snapshots.Open(id) @@ -107,7 +107,7 @@ func (r *Raft) shouldSnapshot() bool { // Check the last log index lastIdx, err := r.logs.LastIndex() if err != nil { - r.logger.Error(fmt.Sprintf("Failed to get last log index: %v", err)) + r.logger.Error("failed to get last log index", "error", err) return false } @@ -172,7 +172,7 @@ func (r *Raft) takeSnapshot() (string, error) { } // Create a new snapshot. - r.logger.Info(fmt.Sprintf("Starting snapshot up to %d", snapReq.index)) + r.logger.Info("starting snapshot up to", "index", snapReq.index) start := time.Now() version := getSnapshotVersion(r.protocolVersion) sink, err := r.snapshots.Create(version, snapReq.index, snapReq.term, committed, committedIndex, r.trans) @@ -202,7 +202,7 @@ func (r *Raft) takeSnapshot() (string, error) { return "", err } - r.logger.Info(fmt.Sprintf("Snapshot to %d complete", snapReq.index)) + r.logger.Info("snapshot complete up to", "index", snapReq.index) return sink.ID(), nil } @@ -228,8 +228,12 @@ func (r *Raft) compactLogs(snapIdx uint64) error { // after the snapshot to be removed. maxLog := min(snapIdx, lastLogIdx-r.conf.TrailingLogs) - // Log this - r.logger.Info(fmt.Sprintf("Compacting logs from %d to %d", minLog, maxLog)) + if minLog > maxLog { + r.logger.Info("no logs to truncate") + return nil + } + + r.logger.Info("compacting logs", "from", minLog, "to", maxLog) // Compact the logs if err := r.logs.DeleteRange(minLog, maxLog); err != nil { diff --git a/vendor/github.com/hashicorp/raft/tcp_transport.go b/vendor/github.com/hashicorp/raft/tcp_transport.go index 69c928ed9284..ff40a57bcd2a 100644 --- a/vendor/github.com/hashicorp/raft/tcp_transport.go +++ b/vendor/github.com/hashicorp/raft/tcp_transport.go @@ -2,8 +2,8 @@ package raft import ( "errors" + "github.com/hashicorp/go-hclog" "io" - "log" "net" "time" ) @@ -40,7 +40,7 @@ func NewTCPTransportWithLogger( advertise net.Addr, maxPool int, timeout time.Duration, - logger *log.Logger, + logger hclog.Logger, ) (*NetworkTransport, error) { return newTCPTransport(bindAddr, advertise, func(stream StreamLayer) *NetworkTransport { return NewNetworkTransportWithLogger(stream, maxPool, timeout, logger) diff --git a/vendor/github.com/hashicorp/raft/testing.go b/vendor/github.com/hashicorp/raft/testing.go index 913070b0694d..70fd7b795521 100644 --- a/vendor/github.com/hashicorp/raft/testing.go +++ b/vendor/github.com/hashicorp/raft/testing.go @@ -5,7 +5,6 @@ import ( "fmt" "io" "io/ioutil" - "log" "os" "reflect" "sync" @@ -16,6 +15,10 @@ import ( "github.com/hashicorp/go-msgpack/codec" ) +var ( + userSnapshotErrorsOnNoData = true +) + // Return configurations optimized for in-memory func inmemConfig(t *testing.T) *Config { conf := DefaultConfig() @@ -151,12 +154,18 @@ func (a *testLoggerAdapter) Write(d []byte) (int, error) { return len(d), nil } -func newTestLogger(t *testing.T) *log.Logger { - return log.New(&testLoggerAdapter{t: t}, "", log.Lmicroseconds) +func newTestLogger(t *testing.T) hclog.Logger { + return hclog.New(&hclog.LoggerOptions{ + Output: &testLoggerAdapter{t: t}, + Level: hclog.DefaultLevel, + }) } -func newTestLoggerWithPrefix(t *testing.T, prefix string) *log.Logger { - return log.New(&testLoggerAdapter{t: t, prefix: prefix}, "", log.Lmicroseconds) +func newTestLoggerWithPrefix(t *testing.T, prefix string) hclog.Logger { + return hclog.New(&hclog.LoggerOptions{ + Output: &testLoggerAdapter{t: t, prefix: prefix}, + Level: hclog.DefaultLevel, + }) } func newTestLeveledLogger(t *testing.T) hclog.Logger { @@ -185,7 +194,7 @@ type cluster struct { conf *Config propagateTimeout time.Duration longstopTimeout time.Duration - logger *log.Logger + logger hclog.Logger startTime time.Time failedLock sync.Mutex @@ -222,7 +231,7 @@ func (c *cluster) notifyFailed() { // thread to block until all goroutines have completed in order to reliably // fail tests using this function. func (c *cluster) Failf(format string, args ...interface{}) { - c.logger.Printf(format, args...) + c.logger.Error(fmt.Sprintf(format, args...)) c.t.Fail() c.notifyFailed() } @@ -233,7 +242,7 @@ func (c *cluster) Failf(format string, args ...interface{}) { // other goroutines created during the test. Calling FailNowf does not stop // those other goroutines. func (c *cluster) FailNowf(format string, args ...interface{}) { - c.logger.Printf(format, args...) + c.logger.Error(fmt.Sprintf(format, args...)) c.t.FailNow() } @@ -254,7 +263,7 @@ func (c *cluster) Close() { for _, f := range futures { if err := f.Error(); err != nil { - c.FailNowf("[ERR] shutdown future err: %v", err) + c.FailNowf("shutdown future err: %v", err) } } @@ -316,7 +325,7 @@ CHECK: c.t.FailNow() case <-limitCh: - c.FailNowf("[ERR] Timeout waiting for replication") + c.FailNowf("timeout waiting for replication") case <-ch: for _, fsmRaw := range c.fsms { @@ -354,7 +363,7 @@ func (c *cluster) pollState(s RaftState) ([]*Raft, uint64) { // GetInState polls the state of the cluster and attempts to identify when it has // settled into the given state. func (c *cluster) GetInState(s RaftState) []*Raft { - c.logger.Printf("[INFO] Starting stability test for raft state: %+v", s) + c.logger.Info("starting stability test", "raft-state", s) limitCh := time.After(c.longstopTimeout) // An election should complete after 2 * max(HeartbeatTimeout, ElectionTimeout) @@ -411,17 +420,18 @@ func (c *cluster) GetInState(s RaftState) []*Raft { c.t.FailNow() case <-limitCh: - c.FailNowf("[ERR] Timeout waiting for stable %s state", s) + c.FailNowf("timeout waiting for stable %s state", s) case <-c.WaitEventChan(filter, 0): - c.logger.Printf("[DEBUG] Resetting stability timeout") + c.logger.Debug("resetting stability timeout") case t, ok := <-timer.C: if !ok { - c.FailNowf("[ERR] Timer channel errored") + c.FailNowf("timer channel errored") } - c.logger.Printf("[INFO] Stable state for %s reached at %s (%d nodes), %s from start of poll, %s from cluster start. Timeout at %s, %s after stability", - s, inStateTime, len(inState), inStateTime.Sub(pollStartTime), inStateTime.Sub(c.startTime), t, t.Sub(inStateTime)) + + c.logger.Info(fmt.Sprintf("stable state for %s reached at %s (%d nodes), %s from start of poll, %s from cluster start. Timeout at %s, %s after stability", + s, inStateTime, len(inState), inStateTime.Sub(pollStartTime), inStateTime.Sub(c.startTime), t, t.Sub(inStateTime))) return inState } } @@ -431,7 +441,7 @@ func (c *cluster) GetInState(s RaftState) []*Raft { func (c *cluster) Leader() *Raft { leaders := c.GetInState(Leader) if len(leaders) != 1 { - c.FailNowf("[ERR] expected one leader: %v", leaders) + c.FailNowf("expected one leader: %v", leaders) } return leaders[0] } @@ -442,14 +452,14 @@ func (c *cluster) Followers() []*Raft { expFollowers := len(c.rafts) - 1 followers := c.GetInState(Follower) if len(followers) != expFollowers { - c.FailNowf("[ERR] timeout waiting for %d followers (followers are %v)", expFollowers, followers) + c.FailNowf("timeout waiting for %d followers (followers are %v)", expFollowers, followers) } return followers } // FullyConnect connects all the transports together. func (c *cluster) FullyConnect() { - c.logger.Printf("[DEBUG] Fully Connecting") + c.logger.Debug("fully connecting") for i, t1 := range c.trans { for j, t2 := range c.trans { if i != j { @@ -462,7 +472,7 @@ func (c *cluster) FullyConnect() { // Disconnect disconnects all transports from the given address. func (c *cluster) Disconnect(a ServerAddress) { - c.logger.Printf("[DEBUG] Disconnecting %v", a) + c.logger.Debug("disconnecting", "address", a) for _, t := range c.trans { if t.LocalAddr() == a { t.DisconnectAll() @@ -475,7 +485,7 @@ func (c *cluster) Disconnect(a ServerAddress) { // Partition keeps the given list of addresses connected but isolates them // from the other members of the cluster. func (c *cluster) Partition(far []ServerAddress) { - c.logger.Printf("[DEBUG] Partitioning %v", far) + c.logger.Debug("partitioning", "addresses", far) // Gather the set of nodes on the "near" side of the partition (we // will call the supplied list of nodes the "far" side). @@ -500,7 +510,7 @@ OUTER: t.Disconnect(a) } } else { - for a, _ := range near { + for a := range near { t.Disconnect(a) } } @@ -530,15 +540,15 @@ func (c *cluster) EnsureLeader(t *testing.T, expect ServerAddress) { leader = "[none]" } if expect == "" { - c.logger.Printf("[ERR] Peer %s sees leader %v expected [none]", r, leader) + c.logger.Error("peer sees incorrect leader", "peer", r, "leader", leader, "expected-leader", "[none]") } else { - c.logger.Printf("[ERR] Peer %s sees leader %v expected %v", r, leader, expect) + c.logger.Error("peer sees incorrect leader", "peer", r, "leader", leader, "expected-leader", expect) } fail = true } } if fail { - c.FailNowf("[ERR] At least one peer has the wrong notion of leader") + c.FailNowf("at least one peer has the wrong notion of leader") } } @@ -559,7 +569,7 @@ CHECK: if len(first.logs) != len(fsm.logs) { fsm.Unlock() if time.Now().After(limit) { - c.FailNowf("[ERR] FSM log length mismatch: %d %d", + c.FailNowf("FSM log length mismatch: %d %d", len(first.logs), len(fsm.logs)) } else { goto WAIT @@ -570,7 +580,7 @@ CHECK: if bytes.Compare(first.logs[idx], fsm.logs[idx]) != 0 { fsm.Unlock() if time.Now().After(limit) { - c.FailNowf("[ERR] FSM log mismatch at index %d", idx) + c.FailNowf("FSM log mismatch at index %d", idx) } else { goto WAIT } @@ -579,7 +589,7 @@ CHECK: if len(first.configurations) != len(fsm.configurations) { fsm.Unlock() if time.Now().After(limit) { - c.FailNowf("[ERR] FSM configuration length mismatch: %d %d", + c.FailNowf("FSM configuration length mismatch: %d %d", len(first.logs), len(fsm.logs)) } else { goto WAIT @@ -590,7 +600,7 @@ CHECK: if !reflect.DeepEqual(first.configurations[idx], fsm.configurations[idx]) { fsm.Unlock() if time.Now().After(limit) { - c.FailNowf("[ERR] FSM configuration mismatch at index %d: %v, %v", idx, first.configurations[idx], fsm.configurations[idx]) + c.FailNowf("FSM configuration mismatch at index %d: %v, %v", idx, first.configurations[idx], fsm.configurations[idx]) } else { goto WAIT } @@ -613,7 +623,7 @@ WAIT: func (c *cluster) getConfiguration(r *Raft) Configuration { future := r.GetConfiguration() if err := future.Error(); err != nil { - c.FailNowf("[ERR] failed to get configuration: %v", err) + c.FailNowf("failed to get configuration: %v", err) return Configuration{} } @@ -634,7 +644,7 @@ CHECK: otherSet := c.getConfiguration(raft) if !reflect.DeepEqual(peerSet, otherSet) { if time.Now().After(limit) { - c.FailNowf("[ERR] peer mismatch: %+v %+v", peerSet, otherSet) + c.FailNowf("peer mismatch: %+v %+v", peerSet, otherSet) } else { goto WAIT } @@ -687,7 +697,7 @@ func makeCluster(t *testing.T, opts *MakeClusterOpts) *cluster { for i := 0; i < opts.Peers; i++ { dir, err := ioutil.TempDir("", "raft") if err != nil { - c.FailNowf("[ERR] err: %v ", err) + c.FailNowf("err: %v", err) } store := NewInmemStore() @@ -742,18 +752,18 @@ func makeCluster(t *testing.T, opts *MakeClusterOpts) *cluster { if opts.Bootstrap { err := BootstrapCluster(peerConf, logs, store, snap, trans, configuration) if err != nil { - c.FailNowf("[ERR] BootstrapCluster failed: %v", err) + c.FailNowf("BootstrapCluster failed: %v", err) } } raft, err := NewRaft(peerConf, c.fsms[i], logs, store, snap, trans) if err != nil { - c.FailNowf("[ERR] NewRaft failed: %v", err) + c.FailNowf("NewRaft failed: %v", err) } raft.RegisterObserver(NewObserver(c.observationCh, false, nil)) if err != nil { - c.FailNowf("[ERR] RegisterObserver failed: %v", err) + c.FailNowf("RegisterObserver failed: %v", err) } c.rafts = append(c.rafts, raft) } diff --git a/vendor/github.com/hashicorp/raft/testing_batch.go b/vendor/github.com/hashicorp/raft/testing_batch.go new file mode 100644 index 000000000000..afb22856144f --- /dev/null +++ b/vendor/github.com/hashicorp/raft/testing_batch.go @@ -0,0 +1,29 @@ +// +build batchtest + +package raft + +func init() { + userSnapshotErrorsOnNoData = false +} + +// ApplyBatch enables MockFSM to satisfy the BatchingFSM interface. This +// function is gated by the batchtest build flag. +// +// NOTE: This is exposed for middleware testing purposes and is not a stable API +func (m *MockFSM) ApplyBatch(logs []*Log) []interface{} { + m.Lock() + defer m.Unlock() + + ret := make([]interface{}, len(logs)) + for i, log := range logs { + switch log.Type { + case LogCommand: + m.logs = append(m.logs, log.Data) + ret[i] = len(m.logs) + default: + ret[i] = nil + } + } + + return ret +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 2d3b0b699fc0..ae297db88bdc 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -298,7 +298,7 @@ github.com/hashicorp/go-multierror # github.com/hashicorp/go-plugin v1.0.1 github.com/hashicorp/go-plugin github.com/hashicorp/go-plugin/internal/plugin -# github.com/hashicorp/go-raftchunking v0.6.2 +# github.com/hashicorp/go-raftchunking v0.6.3-0.20191002164813-7e9e8525653a github.com/hashicorp/go-raftchunking github.com/hashicorp/go-raftchunking/types # github.com/hashicorp/go-retryablehttp v0.6.2 @@ -330,7 +330,7 @@ github.com/hashicorp/hcl/json/token # github.com/hashicorp/nomad/api v0.0.0-20190412184103-1c38ced33adf github.com/hashicorp/nomad/api github.com/hashicorp/nomad/api/contexts -# github.com/hashicorp/raft v1.1.1 +# github.com/hashicorp/raft v1.1.2-0.20191002163536-9c6bd3e3eb17 github.com/hashicorp/raft # github.com/hashicorp/raft-snapshot v1.0.2-0.20190827162939-8117efcc5aab github.com/hashicorp/raft-snapshot