Skip to content

Commit

Permalink
feat(server/v2/cometbft): config (#20989)
Browse files Browse the repository at this point in the history
  • Loading branch information
julienrbrt committed Jul 22, 2024
1 parent 4b3a0b0 commit 8484dc5
Show file tree
Hide file tree
Showing 24 changed files with 468 additions and 339 deletions.
5 changes: 5 additions & 0 deletions runtime/v2/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ type App[T transaction.Tx] struct {
GRPCQueryDecoders map[string]func(requestBytes []byte) (gogoproto.Message, error)
}

// Name returns the app name.
func (a *App[T]) Name() string {
return a.config.AppName
}

// Logger returns the app logger.
func (a *App[T]) Logger() log.Logger {
return a.logger
Expand Down
1 change: 0 additions & 1 deletion runtime/v2/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ func (a *AppBuilder[T]) Build(opts ...AppBuilderOption[T]) (*App[T], error) {
if err != nil {
return nil, fmt.Errorf("failed to create STF: %w", err)
}

a.app.stf = stf

rs, err := rootstore.CreateRootStore(a.storeOptions)
Expand Down
2 changes: 1 addition & 1 deletion server/v2/api/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func New[T transaction.Tx](cfgOptions ...CfgOption) *GRPCServer[T] {
func (s *GRPCServer[T]) Init(appI serverv2.AppI[T], v *viper.Viper, logger log.Logger) error {
cfg := s.Config().(*Config)
if v != nil {
if err := v.Sub(s.Name()).Unmarshal(&cfg); err != nil {
if err := serverv2.UnmarshalSubConfig(v, s.Name(), &cfg); err != nil {
return fmt.Errorf("failed to unmarshal config: %w", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/v2/api/grpcgateway/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (s *GRPCGatewayServer[T]) Config() any {
func (s *GRPCGatewayServer[T]) Init(appI serverv2.AppI[transaction.Tx], v *viper.Viper, logger log.Logger) error {
cfg := s.Config().(*Config)
if v != nil {
if err := v.Sub(s.Name()).Unmarshal(&cfg); err != nil {
if err := serverv2.UnmarshalSubConfig(v, s.Name(), &cfg); err != nil {
return fmt.Errorf("failed to unmarshal config: %w", err)
}
}
Expand Down
92 changes: 49 additions & 43 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,22 @@ import (
var _ abci.Application = (*Consensus[transaction.Tx])(nil)

type Consensus[T transaction.Tx] struct {
// legacy support for gRPC
grpcQueryDecoders map[string]func(requestBytes []byte) (gogoproto.Message, error)

app *appmanager.AppManager[T]
cfg Config
store types.Store
logger log.Logger
txCodec transaction.Codec[T]
streaming streaming.Manager
snapshotManager *snapshots.Manager
mempool mempool.Mempool[T]

logger log.Logger
appName, version string
consensusAuthority string // Set by the application to grant authority to the consensus engine to send messages to the consensus module
app *appmanager.AppManager[T]
txCodec transaction.Codec[T]
store types.Store
streaming streaming.Manager
snapshotManager *snapshots.Manager
mempool mempool.Mempool[T]
grpcQueryDecoders map[string]func(requestBytes []byte) (gogoproto.Message, error) // legacy support for gRPC

cfg Config
indexedEvents map[string]struct{}
chainID string

initialHeight uint64
// this is only available after this node has committed a block (in FinalizeBlock),
// otherwise it will be empty and we will need to query the app for the last
// committed block.
Expand All @@ -54,19 +58,26 @@ type Consensus[T transaction.Tx] struct {
verifyVoteExt handlers.VerifyVoteExtensionhandler
extendVote handlers.ExtendVoteHandler

chainID string
addrPeerFilter types.PeerFilter // filter peers by address and port
idPeerFilter types.PeerFilter // filter peers by node ID
}

func NewConsensus[T transaction.Tx](
logger log.Logger,
appName string,
consensusAuthority string,
app *appmanager.AppManager[T],
mp mempool.Mempool[T],
indexedEvents map[string]struct{},
grpcQueryDecoders map[string]func(requestBytes []byte) (gogoproto.Message, error),
store types.Store,
cfg Config,
txCodec transaction.Codec[T],
logger log.Logger,
) *Consensus[T] {
return &Consensus[T]{
appName: appName,
version: getCometBFTServerVersion(),
consensusAuthority: consensusAuthority,
grpcQueryDecoders: grpcQueryDecoders,
app: app,
cfg: cfg,
Expand All @@ -82,27 +93,24 @@ func NewConsensus[T transaction.Tx](
verifyVoteExt: nil,
extendVote: nil,
chainID: "",
indexedEvents: indexedEvents,
initialHeight: 0,
}
}

// SetStreamingManager sets the streaming manager for the consensus module.
func (c *Consensus[T]) SetStreamingManager(sm streaming.Manager) {
c.streaming = sm
}

// SetSnapshotManager sets the snapshot manager for the Consensus.
// The snapshot manager is responsible for managing snapshots of the Consensus state.
// It allows for creating, storing, and restoring snapshots of the Consensus state.
// The provided snapshot manager will be used by the Consensus to handle snapshots.
func (c *Consensus[T]) SetSnapshotManager(sm *snapshots.Manager) {
c.snapshotManager = sm
}

// RegisterExtensions registers the given extensions with the consensus module's snapshot manager.
// It allows additional snapshotter implementations to be used for creating and restoring snapshots.
func (c *Consensus[T]) RegisterExtensions(extensions ...snapshots.ExtensionSnapshotter) {
func (c *Consensus[T]) RegisterSnapshotExtensions(extensions ...snapshots.ExtensionSnapshotter) error {
if err := c.snapshotManager.RegisterExtensions(extensions...); err != nil {
panic(fmt.Errorf("failed to register snapshot extensions: %w", err))
return fmt.Errorf("failed to register snapshot extensions: %w", err)
}

return nil
}

// CheckTx implements types.Application.
Expand All @@ -122,7 +130,7 @@ func (c *Consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxReques
Code: resp.Code,
GasWanted: uint64ToInt64(resp.GasWanted),
GasUsed: uint64ToInt64(resp.GasUsed),
Events: intoABCIEvents(resp.Events, c.cfg.IndexEvents),
Events: intoABCIEvents(resp.Events, c.indexedEvents),
Info: resp.Info,
Data: resp.Data,
Log: resp.Log,
Expand All @@ -144,7 +152,7 @@ func (c *Consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abc

// cp, err := c.GetConsensusParams(ctx)
// if err != nil {
// return nil, err
// return nil, err
// }

cid, err := c.store.LastCommitID()
Expand All @@ -153,10 +161,9 @@ func (c *Consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abc
}

return &abciproto.InfoResponse{
Data: c.cfg.Name,
Version: c.cfg.Version,
// AppVersion: cp.GetVersion().App,
AppVersion: 0, // TODO fetch from store?
Data: c.appName,
Version: c.version,
AppVersion: 0, // TODO fetch consensus params?
LastBlockHeight: int64(version),
LastBlockAppHash: cid.Hash,
}, nil
Expand All @@ -173,7 +180,6 @@ func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (
return nil, fmt.Errorf("unable to decode gRPC request with path %s from ABCI.Query: %w", req.Path, err)
}
res, err := c.app.Query(ctx, uint64(req.Height), protoRequest)

if err != nil {
resp := queryResult(err)
resp.Height = req.Height
Expand All @@ -188,7 +194,7 @@ func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (
// it must be an app/p2p/store query
path := splitABCIQueryPath(req.Path)
if len(path) == 0 {
return QueryResult(errorsmod.Wrap(cometerrors.ErrUnknownRequest, "no query path provided"), c.cfg.Trace), nil
return QueryResult(errorsmod.Wrap(cometerrors.ErrUnknownRequest, "no query path provided"), c.cfg.AppTomlConfig.Trace), nil
}

switch path[0] {
Expand All @@ -202,11 +208,11 @@ func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (
resp, err = c.handleQueryP2P(path)

default:
resp = QueryResult(errorsmod.Wrap(cometerrors.ErrUnknownRequest, "unknown query path"), c.cfg.Trace)
resp = QueryResult(errorsmod.Wrap(cometerrors.ErrUnknownRequest, "unknown query path"), c.cfg.AppTomlConfig.Trace)
}

if err != nil {
return QueryResult(err, c.cfg.Trace), nil
return QueryResult(err, c.cfg.AppTomlConfig.Trace), nil
}

return resp, nil
Expand All @@ -218,17 +224,17 @@ func (c *Consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRe

// store chainID to be used later on in execution
c.chainID = req.ChainId
// TODO: check if we need to load the config from genesis.json or config.toml
c.cfg.InitialHeight = uint64(req.InitialHeight)

// On a new chain, we consider the init chain block height as 0, even though
// req.InitialHeight is 1 by default.
// TODO
// TODO: check if we need to load the config from genesis.json or config.toml
c.initialHeight = uint64(req.InitialHeight)
if c.initialHeight == 0 { // If initial height is 0, set it to 1
c.initialHeight = 1
}

var consMessages []transaction.Msg
if req.ConsensusParams != nil {
consMessages = append(consMessages, &consensustypes.MsgUpdateParams{
Authority: c.cfg.ConsensusAuthority,
Authority: c.consensusAuthority,
Block: req.ConsensusParams.Block,
Evidence: req.ConsensusParams.Evidence,
Validator: req.ConsensusParams.Validator,
Expand Down Expand Up @@ -394,7 +400,7 @@ func (c *Consensus[T]) FinalizeBlock(

// TODO evaluate this approach vs. service using context.
// cometInfo := &consensustypes.MsgUpdateCometInfo{
// Authority: c.cfg.ConsensusAuthority,
// Authority: c.consensusAuthority,
// CometInfo: &consensustypes.CometInfo{
// Evidence: req.Misbehavior,
// ValidatorsHash: req.NextValidatorsHash,
Expand All @@ -411,7 +417,7 @@ func (c *Consensus[T]) FinalizeBlock(
// })

// we don't need to deliver the block in the genesis block
if req.Height == int64(c.cfg.InitialHeight) {
if req.Height == int64(c.initialHeight) {
appHash, err := c.store.Commit(store.NewChangeset())
if err != nil {
return nil, fmt.Errorf("unable to commit the changeset: %w", err)
Expand Down Expand Up @@ -495,7 +501,7 @@ func (c *Consensus[T]) FinalizeBlock(
return nil, err
}

return finalizeBlockResponse(resp, cp, appHash, c.cfg.IndexEvents)
return finalizeBlockResponse(resp, cp, appHash, c.indexedEvents)
}

// Commit implements types.Application.
Expand Down
27 changes: 13 additions & 14 deletions server/v2/cometbft/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"sigs.k8s.io/yaml"

"cosmossdk.io/server/v2/cometbft/client/rpc"
auth "cosmossdk.io/x/auth/client/cli"

"github.com/cosmos/cosmos-sdk/client"
cryptocodec "github.com/cosmos/cosmos-sdk/crypto/codec"
Expand All @@ -29,7 +28,7 @@ import (
)

func (s *CometBFTServer[T]) rpcClient(cmd *cobra.Command) (rpc.CometRPC, error) {
if s.config.Standalone {
if s.config.AppTomlConfig.Standalone {
client, err := rpchttp.New(client.GetConfigFromCmd(cmd).RPC.ListenAddress)
if err != nil {
return nil, err
Expand Down Expand Up @@ -201,10 +200,10 @@ for. Each module documents its respective events under 'xx_events.md'.
return err
}

query, _ := cmd.Flags().GetString(auth.FlagQuery)
query, _ := cmd.Flags().GetString(FlagQuery)
page, _ := cmd.Flags().GetInt(FlagPage)
limit, _ := cmd.Flags().GetInt(FlagLimit)
orderBy, _ := cmd.Flags().GetString(auth.FlagOrderBy)
orderBy, _ := cmd.Flags().GetString(FlagOrderBy)

blocks, err := rpc.QueryBlocks(cmd.Context(), rpcclient, page, limit, query, orderBy)
if err != nil {
Expand All @@ -223,9 +222,9 @@ for. Each module documents its respective events under 'xx_events.md'.
AddQueryFlagsToCmd(cmd)
cmd.Flags().Int(FlagPage, query.DefaultPage, "Query a specific page of paginated results")
cmd.Flags().Int(FlagLimit, query.DefaultLimit, "Query number of transactions results per page returned")
cmd.Flags().String(auth.FlagQuery, "", "The blocks events query per CometBFT's query semantics")
cmd.Flags().String(auth.FlagOrderBy, "", "The ordering semantics (asc|dsc)")
_ = cmd.MarkFlagRequired(auth.FlagQuery)
cmd.Flags().String(FlagQuery, "", "The blocks events query per CometBFT's query semantics")
cmd.Flags().String(FlagOrderBy, "", "The ordering semantics (asc|dsc)")
_ = cmd.MarkFlagRequired(FlagQuery)

return cmd
}
Expand All @@ -240,19 +239,19 @@ func (s *CometBFTServer[T]) QueryBlockCmd() *cobra.Command {
$ %s query block --%s=%s <height>
$ %s query block --%s=%s <hash>
`,
version.AppName, auth.FlagType, auth.TypeHeight,
version.AppName, auth.FlagType, auth.TypeHash)),
version.AppName, FlagType, TypeHeight,
version.AppName, FlagType, TypeHash)),
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
typ, _ := cmd.Flags().GetString(auth.FlagType)
typ, _ := cmd.Flags().GetString(FlagType)

rpcclient, err := s.rpcClient(cmd)
if err != nil {
return err
}

switch typ {
case auth.TypeHeight:
case TypeHeight:
if args[0] == "" {
return fmt.Errorf("argument should be a block height")
}
Expand Down Expand Up @@ -282,7 +281,7 @@ $ %s query block --%s=%s <hash>

return printOutput(cmd, bz)

case auth.TypeHash:
case TypeHash:

if args[0] == "" {
return fmt.Errorf("argument should be a tx hash")
Expand All @@ -306,13 +305,13 @@ $ %s query block --%s=%s <hash>
return printOutput(cmd, bz)

default:
return fmt.Errorf("unknown --%s value %s", auth.FlagType, typ)
return fmt.Errorf("unknown --%s value %s", FlagType, typ)
}
},
}

AddQueryFlagsToCmd(cmd)
cmd.Flags().String(auth.FlagType, auth.TypeHash, fmt.Sprintf("The type to be used when querying tx, can be one of \"%s\", \"%s\"", auth.TypeHeight, auth.TypeHash))
cmd.Flags().String(FlagType, TypeHash, fmt.Sprintf("The type to be used when querying tx, can be one of \"%s\", \"%s\"", TypeHeight, TypeHash))

return cmd
}
Expand Down
Loading

0 comments on commit 8484dc5

Please sign in to comment.