Skip to content

Commit

Permalink
Add state sync support (#7166)
Browse files Browse the repository at this point in the history
* Add state sync support

* fix incorrect test tempdir

* proto: move and update Protobuf schemas

* proto: lint fixes

* comment tweaks

* don't use type aliasing

* don't call .Error() when logging errors

* use create terminology instead of take for snapshots

* reuse chunk hasher

* simplify key encoding code

* track chunk index in Manager

* add restoreDone message for Manager

* add a ready channel to Snapshotter.Restore()

* add comment on streaming IO API

* use sdkerrors for error handling

* fix incorrect error

* tweak changelog

* syntax fix

* update test code after merge
  • Loading branch information
erikgrinaker committed Sep 8, 2020
1 parent d61fa43 commit 4faeefe
Show file tree
Hide file tree
Showing 30 changed files with 4,518 additions and 16 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ be used to retrieve the actual proposal `Content`. Also the `NewMsgSubmitProposa
* (modules) [\#6734](https://github.com/cosmos/cosmos-sdk/issues/6834) Add `TxEncodingConfig` parameter to `AppModuleBasic.ValidateGenesis` command to support JSON tx decoding in `genutil`.
* (genesis) [\#7000](https://github.com/cosmos/cosmos-sdk/pull/7000) The root `GenesisState` is now decoded using `encoding/json` instead of amino so `int64` and `uint64` types are now encoded as integers as opposed to strings.
* (types) [\#7032](https://github.com/cosmos/cosmos-sdk/pull/7032) All types ending with `ID` (e.g. `ProposalID`) now end with `Id` (e.g. `ProposalId`), to match default Protobuf generated format. Also see [\#7033](https://github.com/cosmos/cosmos-sdk/pull/7033) for more details.
* (store) [\#5803](https://github.com/cosmos/cosmos-sdk/pull/5803) The `store.CommitMultiStore` interface now includes the new `snapshots.Snapshotter` interface as well.

### Features

Expand All @@ -166,6 +167,8 @@ be used to retrieve the actual proposal `Content`. Also the `NewMsgSubmitProposa
* (crypto/multisig) [\#6241](https://github.com/cosmos/cosmos-sdk/pull/6241) Add Multisig type directly to the repo. Previously this was in tendermint.
* (rest) [\#6167](https://github.com/cosmos/cosmos-sdk/pull/6167) Support `max-body-bytes` CLI flag for the REST service.
* (x/ibc) [\#5588](https://github.com/cosmos/cosmos-sdk/pull/5588) Add [ICS 024 - Host State Machine Requirements](https://github.com/cosmos/ics/tree/master/spec/ics-024-host-requirements) subpackage to `x/ibc` module.
* (baseapp) [\#5803](https://github.com/cosmos/cosmos-sdk/pull/5803) Added support for taking state snapshots at regular height intervals, via options `snapshot-interval` and `snapshot-keep-recent`.
* (store) [\#5803](https://github.com/cosmos/cosmos-sdk/pull/5803) Added `rootmulti.Store` methods for taking and restoring snapshots, based on `iavl.Store` export/import.
* (x/ibc) [\#5277](https://github.com/cosmos/cosmos-sdk/pull/5277) `x/ibc` changes from IBC alpha. For more details check the the [`x/ibc/spec`](https://github.com/cosmos/tree/master/x/ibc/spec) directory:
* [ICS 002 - Client Semantics](https://github.com/cosmos/ics/tree/master/spec/ics-002-client-semantics) subpackage
* [ICS 003 - Connection Semantics](https://github.com/cosmos/ics/blob/master/spec/ics-003-connection-semantics) subpackage
Expand Down
137 changes: 121 additions & 16 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package baseapp

import (
"errors"
"fmt"
"os"
"sort"
Expand All @@ -15,6 +16,7 @@ import (
grpcstatus "google.golang.org/grpc/status"

"github.com/cosmos/cosmos-sdk/codec"
snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types"
"github.com/cosmos/cosmos-sdk/telemetry"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
Expand Down Expand Up @@ -310,6 +312,10 @@ func (app *BaseApp) Commit() (res abci.ResponseCommit) {
app.halt()
}

if app.snapshotInterval > 0 && uint64(header.Height)%app.snapshotInterval == 0 {
go app.snapshot(header.Height)
}

return abci.ResponseCommit{
Data: commitID.Hash,
}
Expand Down Expand Up @@ -337,6 +343,27 @@ func (app *BaseApp) halt() {
os.Exit(0)
}

// snapshot takes a snapshot of the current state and prunes any old snapshottypes.
func (app *BaseApp) snapshot(height int64) {
app.logger.Info("Creating state snapshot", "height", height)
snapshot, err := app.snapshotManager.Create(uint64(height))
if err != nil {
app.logger.Error("Failed to create state snapshot", "height", height, "err", err)
return
}
app.logger.Info("Completed state snapshot", "height", height, "format", snapshot.Format)

if app.snapshotKeepRecent > 0 {
app.logger.Debug("Pruning state snapshots")
pruned, err := app.snapshotManager.Prune(app.snapshotKeepRecent)
if err != nil {
app.logger.Error("Failed to prune state snapshots", "err", err)
return
}
app.logger.Debug("Pruned state snapshots", "pruned", pruned)
}
}

// Query implements the ABCI interface. It delegates to CommitMultiStore if it
// implements Queryable.
func (app *BaseApp) Query(req abci.RequestQuery) abci.ResponseQuery {
Expand Down Expand Up @@ -371,6 +398,100 @@ func (app *BaseApp) Query(req abci.RequestQuery) abci.ResponseQuery {
return sdkerrors.QueryResult(sdkerrors.Wrap(sdkerrors.ErrUnknownRequest, "unknown query path"))
}

// ListSnapshots implements the ABCI interface. It delegates to app.snapshotManager if set.
func (app *BaseApp) ListSnapshots(req abci.RequestListSnapshots) abci.ResponseListSnapshots {
resp := abci.ResponseListSnapshots{Snapshots: []*abci.Snapshot{}}
if app.snapshotManager == nil {
return resp
}

snapshots, err := app.snapshotManager.List()
if err != nil {
app.logger.Error("Failed to list snapshots", "err", err)
return resp
}
for _, snapshot := range snapshots {
abciSnapshot, err := snapshot.ToABCI()
if err != nil {
app.logger.Error("Failed to list snapshots", "err", err)
return resp
}
resp.Snapshots = append(resp.Snapshots, &abciSnapshot)
}

return resp
}

// LoadSnapshotChunk implements the ABCI interface. It delegates to app.snapshotManager if set.
func (app *BaseApp) LoadSnapshotChunk(req abci.RequestLoadSnapshotChunk) abci.ResponseLoadSnapshotChunk {
if app.snapshotManager == nil {
return abci.ResponseLoadSnapshotChunk{}
}
chunk, err := app.snapshotManager.LoadChunk(req.Height, req.Format, req.Chunk)
if err != nil {
app.logger.Error("Failed to load snapshot chunk", "height", req.Height, "format", req.Format,
"chunk", req.Chunk, "err")
return abci.ResponseLoadSnapshotChunk{}
}
return abci.ResponseLoadSnapshotChunk{Chunk: chunk}
}

// OfferSnapshot implements the ABCI interface. It delegates to app.snapshotManager if set.
func (app *BaseApp) OfferSnapshot(req abci.RequestOfferSnapshot) abci.ResponseOfferSnapshot {
if req.Snapshot == nil {
app.logger.Error("Received nil snapshot")
return abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}
}

snapshot, err := snapshottypes.SnapshotFromABCI(req.Snapshot)
if err != nil {
app.logger.Error("Failed to decode snapshot metadata", "err", err)
return abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}
}
err = app.snapshotManager.Restore(snapshot)
switch {
case err == nil:
return abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ACCEPT}

case errors.Is(err, snapshottypes.ErrUnknownFormat):
return abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_FORMAT}

case errors.Is(err, snapshottypes.ErrInvalidMetadata):
app.logger.Error("Rejecting invalid snapshot", "height", req.Snapshot.Height,
"format", req.Snapshot.Format, "err", err)
return abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}

default:
app.logger.Error("Failed to restore snapshot", "height", req.Snapshot.Height,
"format", req.Snapshot.Format, "err", err)
// We currently don't support resetting the IAVL stores and retrying a different snapshot,
// so we ask Tendermint to abort all snapshot restoration.
return abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}
}
}

// ApplySnapshotChunk implements the ABCI interface. It delegates to app.snapshotManager if set.
func (app *BaseApp) ApplySnapshotChunk(req abci.RequestApplySnapshotChunk) abci.ResponseApplySnapshotChunk {
_, err := app.snapshotManager.RestoreChunk(req.Chunk)
switch {
case err == nil:
return abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}

case errors.Is(err, snapshottypes.ErrChunkHashMismatch):
app.logger.Error("Chunk checksum mismatch, rejecting sender and requesting refetch",
"chunk", req.Index, "sender", req.Sender, "err", err)
return abci.ResponseApplySnapshotChunk{
Result: abci.ResponseApplySnapshotChunk_RETRY,
RefetchChunks: []uint32{req.Index},
RejectSenders: []string{req.Sender},
}

default:
app.logger.Error("Failed to restore snapshot", "err", err)
return abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ABORT}
}
}

func (app *BaseApp) handleQueryGRPC(handler GRPCQueryHandler, req abci.RequestQuery) abci.ResponseQuery {
ctx, err := app.createQueryContext(req.Height, req.Prove)
if err != nil {
Expand Down Expand Up @@ -597,19 +718,3 @@ func splitPath(requestPath string) (path []string) {

return path
}

func (app *BaseApp) ListSnapshots(abci.RequestListSnapshots) abci.ResponseListSnapshots {
return abci.ResponseListSnapshots{}
}

func (app *BaseApp) OfferSnapshot(abci.RequestOfferSnapshot) abci.ResponseOfferSnapshot {
return abci.ResponseOfferSnapshot{}
}

func (app *BaseApp) LoadSnapshotChunk(abci.RequestLoadSnapshotChunk) abci.ResponseLoadSnapshotChunk {
return abci.ResponseLoadSnapshotChunk{}
}

func (app *BaseApp) ApplySnapshotChunk(abci.RequestApplySnapshotChunk) abci.ResponseApplySnapshotChunk {
return abci.ResponseApplySnapshotChunk{}
}
22 changes: 22 additions & 0 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package baseapp

import (
"errors"
"fmt"
"reflect"
"strings"
Expand All @@ -12,7 +13,9 @@ import (
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
dbm "github.com/tendermint/tm-db"

"github.com/cosmos/cosmos-sdk/snapshots"
"github.com/cosmos/cosmos-sdk/store"
"github.com/cosmos/cosmos-sdk/store/rootmulti"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)
Expand Down Expand Up @@ -60,6 +63,11 @@ type BaseApp struct { // nolint: maligned
idPeerFilter sdk.PeerFilter // filter peers by node ID
fauxMerkleMode bool // if true, IAVL MountStores uses MountStoresDB for simulation speed.

// manages snapshots, i.e. dumps of app state at certain intervals
snapshotManager *snapshots.Manager
snapshotInterval uint64 // block interval between state sync snapshots
snapshotKeepRecent uint32 // recent state sync snapshots to keep

// volatile states:
//
// checkState is set on InitChain and reset on Commit
Expand Down Expand Up @@ -261,6 +269,20 @@ func (app *BaseApp) init() error {
app.setCheckState(tmproto.Header{})
app.Seal()

// make sure the snapshot interval is a multiple of the pruning KeepEvery interval
if app.snapshotManager != nil && app.snapshotInterval > 0 {
rms, ok := app.cms.(*rootmulti.Store)
if !ok {
return errors.New("state sync snapshots require a rootmulti store")
}
pruningOpts := rms.GetPruning()
if pruningOpts.KeepEvery > 0 && app.snapshotInterval%pruningOpts.KeepEvery != 0 {
return fmt.Errorf(
"state sync snapshot interval %v must be a multiple of pruning keep every interval %v",
app.snapshotInterval, pruningOpts.KeepEvery)
}
}

return nil
}

Expand Down
Loading

0 comments on commit 4faeefe

Please sign in to comment.