Skip to content

Commit

Permalink
feat: add mempool interfaces (#13249)
Browse files Browse the repository at this point in the history
* working out interfaces

* integration to checkTx

* use struct fields directly in sz calculation

* fix typo

* nil guard on mempool

* Remove tx builder method

* impl with panic
  • Loading branch information
kocubinski committed Sep 13, 2022
1 parent c32493a commit b96bed5
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 12 deletions.
6 changes: 3 additions & 3 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc
return res
}

// ProcessProposal implements the ability for the application to verify and/or modify transactions in a block proposal.
// PrepareProposal implements the ability for the application to verify and/or modify transactions in a block proposal.
func (app *BaseApp) PrepareProposal(req abci.RequestPrepareProposal) abci.ResponsePrepareProposal {
// treated as a noop until app side mempool is implemented
return abci.ResponsePrepareProposal{Txs: req.Txs}
Expand All @@ -248,7 +248,7 @@ func (app *BaseApp) ProcessProposal(req abci.RequestProcessProposal) abci.Respon
// CheckTx mode, messages are not executed. This means messages are only validated
// and only the AnteHandler is executed. State is persisted to the BaseApp's
// internal CheckTx state if the AnteHandler passes. Otherwise, the ResponseCheckTx
// will contain releveant error information. Regardless of tx execution outcome,
// will contain relevant error information. Regardless of tx execution outcome,
// the ResponseCheckTx will contain relevant gas execution context.
func (app *BaseApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
var mode runTxMode
Expand Down Expand Up @@ -281,7 +281,7 @@ func (app *BaseApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {

// DeliverTx implements the ABCI interface and executes a tx in DeliverTx mode.
// State only gets persisted if all messages are valid and get executed successfully.
// Otherwise, the ResponseDeliverTx will contain releveant error information.
// Otherwise, the ResponseDeliverTx will contain relevant error information.
// Regardless of tx execution outcome, the ResponseDeliverTx will contain relevant
// gas execution context.
func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx {
Expand Down
22 changes: 17 additions & 5 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"fmt"
"strings"

"github.com/cosmos/gogoproto/proto"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto/tmhash"
"github.com/tendermint/tendermint/libs/log"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
dbm "github.com/tendermint/tm-db"

"github.com/cosmos/gogoproto/proto"

codectypes "github.com/cosmos/cosmos-sdk/codec/types"
"github.com/cosmos/cosmos-sdk/snapshots"
"github.com/cosmos/cosmos-sdk/store"
Expand Down Expand Up @@ -53,6 +54,7 @@ type BaseApp struct { //nolint: maligned
interfaceRegistry codectypes.InterfaceRegistry
txDecoder sdk.TxDecoder // unmarshal []byte into sdk.Tx

mempool sdk.Mempool // application side mempool
anteHandler sdk.AnteHandler // ante handler for fee and auth
postHandler sdk.AnteHandler // post handler, optional, e.g. for tips
initChainer sdk.InitChainer // initialize state with validators and state blob
Expand Down Expand Up @@ -484,7 +486,7 @@ func (app *BaseApp) validateHeight(req abci.RequestBeginBlock) error {
// previous commit). The height we're expecting is the initial height.
expectedHeight = app.initialHeight
} else {
// This case can means two things:
// This case can mean two things:
// - either there was already a previous commit in the store, in which
// case we increment the version from there,
// - or there was no previous commit, and initial version was not set,
Expand Down Expand Up @@ -515,7 +517,7 @@ func validateBasicTxMsgs(msgs []sdk.Msg) error {
return nil
}

// Returns the applications's deliverState if app is in runTxModeDeliver,
// Returns the application's deliverState if app is in runTxModeDeliver,
// otherwise it returns the application's checkstate.
func (app *BaseApp) getState(mode runTxMode) *state {
if mode == runTxModeDeliver {
Expand Down Expand Up @@ -573,7 +575,7 @@ func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context
func (app *BaseApp) runTx(mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, priority int64, err error) {
// NOTE: GasWanted should be returned by the AnteHandler. GasUsed is
// determined by the GasMeter. We need access to the context to get the gas
// meter so we initialize upfront.
// meter, so we initialize upfront.
var gasWanted uint64

ctx := app.getContextForTx(mode, txBytes)
Expand All @@ -595,7 +597,7 @@ func (app *BaseApp) runTx(mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, re

blockGasConsumed := false
// consumeBlockGas makes sure block gas is consumed at most once. It must happen after
// tx processing, and must be execute even if tx processing fails. Hence we use trick with `defer`
// tx processing, and must be executed even if tx processing fails. Hence, we use trick with `defer`
consumeBlockGas := func() {
if !blockGasConsumed {
blockGasConsumed = true
Expand Down Expand Up @@ -665,6 +667,14 @@ func (app *BaseApp) runTx(mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, re
anteEvents = events.ToABCIEvents()
}

// TODO remove nil check when implemented
if mode == runTxModeCheck && app.mempool != nil {
err = app.mempool.Insert(ctx, tx.(sdk.MempoolTx))
if err != nil {
return gInfo, nil, anteEvents, priority, err
}
}

// Create a new Context based off of the existing Context with a MultiStore branch
// in case message processing fails. At this point, the MultiStore
// is a branch of a branch.
Expand All @@ -674,7 +684,9 @@ func (app *BaseApp) runTx(mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, re
// and we're in DeliverTx. Note, runMsgs will never return a reference to a
// Result if any single message fails or does not have a registered Handler.
result, err = app.runMsgs(runMsgCtx, msgs, mode)

if err == nil {

// Run optional postHandlers.
//
// Note: If the postHandler fails, we also revert the runMsgs state.
Expand Down
2 changes: 1 addition & 1 deletion types/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (c Context) WithEventManager(em *EventManager) Context {
return c
}

// WithEventManager returns a Context with an updated tx priority
// WithPriority returns a Context with an updated tx priority
func (c Context) WithPriority(p int64) Context {
c.priority = p
return c
Expand Down
31 changes: 31 additions & 0 deletions types/mempool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package types

// MempoolTx we define an app-side mempool transaction interface that is as
// minimal as possible, only requiring applications to define the size of the
// transaction to be used when reaping and getting the transaction itself.
// Interface type casting can be used in the actual app-side mempool implementation.
type MempoolTx interface {
Tx

// Size returns the size of the transaction in bytes.
Size() int
}

type Mempool interface {
// Insert attempts to insert a MempoolTx into the app-side mempool returning
// an error upon failure.
Insert(Context, MempoolTx) error

// Select returns the next set of available transactions from the app-side
// mempool, up to maxBytes or until the mempool is empty. The application can
// decide to return transactions from its own mempool, from the incoming
// txs, or some combination of both.
Select(ctx Context, txs [][]byte, maxBytes int) ([]MempoolTx, error)

// CountTx returns the number of transactions currently in the mempool.
CountTx() int

// Remove attempts to remove a transaction from the mempool, returning an error
// upon failure.
Remove(Context, MempoolTx) error
}
6 changes: 3 additions & 3 deletions types/tx_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type (
// doesn't require access to any other information.
ValidateBasic() error

// Signers returns the addrs of signers that must sign.
// GetSigners returns the addrs of signers that must sign.
// CONTRACT: All signatures must be present to be valid.
// CONTRACT: Returns addrs in some deterministic order.
GetSigners() []AccAddress
Expand All @@ -37,7 +37,7 @@ type (

// Tx defines the interface a transaction must fulfill.
Tx interface {
// Gets the all the transaction's messages.
// GetMsgs gets the all the transaction's messages.
GetMsgs() []Msg

// ValidateBasic does a simple and lightweight validation check that doesn't
Expand All @@ -54,7 +54,7 @@ type (
FeeGranter() AccAddress
}

// Tx must have GetMemo() method to use ValidateMemoDecorator
// TxWithMemo must have GetMemo() method to use ValidateMemoDecorator
TxWithMemo interface {
Tx
GetMemo() string
Expand Down
5 changes: 5 additions & 0 deletions x/auth/tx/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ var (
_ ante.HasExtensionOptionsTx = &wrapper{}
_ ExtensionOptionsTxBuilder = &wrapper{}
_ tx.TipTx = &wrapper{}
_ sdk.MempoolTx = &wrapper{}
)

// ExtensionOptionsTxBuilder defines a TxBuilder that can also set extensions.
Expand All @@ -62,6 +63,10 @@ func newBuilder(cdc codec.Codec) *wrapper {
}
}

func (w *wrapper) Size() int {
panic("not yet implemented")
}

func (w *wrapper) GetMsgs() []sdk.Msg {
return w.tx.GetMsgs()
}
Expand Down

0 comments on commit b96bed5

Please sign in to comment.