Skip to content

Commit

Permalink
feat(ABCI): New Proposal Struct with Associated Metadata (#126) (#133)
Browse files Browse the repository at this point in the history
* new proto types for proposal info

* new proposal type

* nits

* lane input

* lint

* feat(ABCI): Deprecating `CheckOrderHandler` with new Proposal MetaData (#127)

* refactor without checkorder

* nits

* more nits

* lint

* nits

* feat(ABCI): Updating MEV lane to have no `CheckOrder` handler + testing (#128)

* updating mev lane

* nits

* preventing adding multiple bid txs in prepare

* update

(cherry picked from commit b9d6761)

Co-authored-by: David Terpay <35130517+davidterpay@users.noreply.github.com>
  • Loading branch information
mergify[bot] and davidterpay committed Sep 28, 2023
1 parent 9849b30 commit 18e8e25
Show file tree
Hide file tree
Showing 38 changed files with 3,666 additions and 1,060 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ use-integration:
go work edit -dropuse .
go work edit -use ./tests/integration

tidy:
go mod tidy
gofmt -s -w ./

.PHONY: docker-build docker-build-integration
###############################################################################
## Docker ##
Expand Down
255 changes: 146 additions & 109 deletions abci/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,13 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"

"github.com/skip-mev/block-sdk/block"
"github.com/skip-mev/block-sdk/block/utils"
"github.com/skip-mev/block-sdk/lanes/terminator"
"github.com/skip-mev/block-sdk/block/proposals"
"github.com/skip-mev/block-sdk/block/proposals/types"
)

const (
// ProposalInfoIndex is the index of the proposal metadata in the proposal.
ProposalInfoIndex = 0
)

type (
Expand All @@ -18,66 +23,92 @@ type (
ProposalHandler struct {
logger log.Logger
txDecoder sdk.TxDecoder
txEncoder sdk.TxEncoder
prepareLanesHandler block.PrepareLanesHandler
processLanesHandler block.ProcessLanesHandler
mempool block.Mempool
}
)

// NewProposalHandler returns a new abci++ proposal handler. This proposal handler will
// NewProposalHandler returns a new ABCI++ proposal handler. This proposal handler will
// iteratively call each of the lanes in the chain to prepare and process the proposal.
func NewProposalHandler(logger log.Logger, txDecoder sdk.TxDecoder, mempool block.Mempool) *ProposalHandler {
func NewProposalHandler(
logger log.Logger,
txDecoder sdk.TxDecoder,
txEncoder sdk.TxEncoder,
mempool block.Mempool,
) *ProposalHandler {
return &ProposalHandler{
logger: logger,
txDecoder: txDecoder,
prepareLanesHandler: ChainPrepareLanes(mempool.Registry()...),
processLanesHandler: ChainProcessLanes(mempool.Registry()...),
txEncoder: txEncoder,
prepareLanesHandler: ChainPrepareLanes(mempool.Registry()),
mempool: mempool,
}
}

// PrepareProposalHandler prepares the proposal by selecting transactions from each lane
// according to each lane's selection logic. We select transactions in a greedy fashion. Note that
// each lane has an boundary on the number of bytes that can be included in the proposal. By default,
// the default lane will not have a boundary on the number of bytes that can be included in the proposal and
// will include all valid transactions in the proposal (up to MaxTxBytes).
// according to each lane's selection logic. We select transactions in the order in which the
// lanes are configured on the chain. Note that each lane has an boundary on the number of
// bytes/gas that can be included in the proposal. By default, the default lane will not have
// a boundary on the number of bytes that can be included in the proposal and will include all
// valid transactions in the proposal (up to MaxBlockSize, MaxGasLimit).
func (h *ProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHandler {
return func(ctx sdk.Context, req *abci.RequestPrepareProposal) (resp *abci.ResponsePrepareProposal, err error) {
// In the case where there is a panic, we recover here and return an empty proposal.
defer func() {
if err := recover(); err != nil {
if rec := recover(); rec != nil {
h.logger.Error("failed to prepare proposal", "err", err)

// TODO: Should we attempt to return a empty proposal here with empty proposal info?
resp = &abci.ResponsePrepareProposal{Txs: make([][]byte, 0)}
err = fmt.Errorf("failed to prepare proposal: %v", rec)
}
}()

h.logger.Info("mempool distribution before proposal creation", "distribution", h.mempool.GetTxDistribution())

proposal, err := h.prepareLanesHandler(ctx, block.NewProposal(req.MaxTxBytes))
// Build an empty placeholder proposal with the maximum block size and gas limit.
maxBlockSize, maxGasLimit := proposals.GetBlockLimits(ctx)
emptyProposal := proposals.NewProposal(h.txEncoder, maxBlockSize, maxGasLimit)

// Fill the proposal with transactions from each lane.
finalProposal, err := h.prepareLanesHandler(ctx, emptyProposal)
if err != nil {
h.logger.Error("failed to prepare proposal", "err", err)
return &abci.ResponsePrepareProposal{Txs: make([][]byte, 0)}, err
}

// Retrieve the proposal with metadata and transactions.
txs, err := finalProposal.GetProposalWithInfo()
if err != nil {
h.logger.Error("failed to get proposal with metadata", "err", err)
return &abci.ResponsePrepareProposal{Txs: make([][]byte, 0)}, err
}

h.logger.Info(
"prepared proposal",
"num_txs", proposal.GetNumTxs(),
"total_tx_bytes", proposal.GetTotalTxBytes(),
"num_txs", len(txs),
"total_tx_bytes", finalProposal.Info.BlockSize,
"max_tx_bytes", maxBlockSize,
"total_gas_limit", finalProposal.Info.GasLimit,
"max_gas_limit", maxGasLimit,
"height", req.Height,
)

h.logger.Info("mempool distribution after proposal creation", "distribution", h.mempool.GetTxDistribution())

return &abci.ResponsePrepareProposal{
Txs: proposal.GetProposal(),
Txs: txs,
}, nil
}
}

// ProcessProposalHandler processes the proposal by verifying all transactions in the proposal
// according to each lane's verification logic. We verify proposals in a greedy fashion.
// If a lane's portion of the proposal is invalid, we reject the proposal. After a lane's portion
// of the proposal is verified, we pass the remaining transactions to the next lane in the chain.
// according to each lane's verification logic. Proposals are verified similar to how they are
// constructed. After a proposal is processed, it should amount to the same proposal that was prepared.
// Each proposal will first be broken down by the lanes that prepared each partial proposal. Then, each
// lane will iteratively verify the transactions that it belong to it. If any lane fails to verify the
// transactions, then the proposal is rejected.
func (h *ProposalHandler) ProcessProposalHandler() sdk.ProcessProposalHandler {
return func(ctx sdk.Context, req *abci.RequestProcessProposal) (resp *abci.ResponseProcessProposal, err error) {
// In the case where any of the lanes panic, we recover here and return a reject status.
Expand All @@ -90,126 +121,132 @@ func (h *ProposalHandler) ProcessProposalHandler() sdk.ProcessProposalHandler {
}
}()

txs := req.Txs
if len(txs) == 0 {
h.logger.Info("accepted empty proposal")
return &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT}, nil
// Extract all of the lanes and their corresponding transactions from the proposal.
proposalInfo, partialProposals, err := h.ExtractLanes(req.Txs)
if err != nil {
h.logger.Error("failed to validate proposal", "err", err)
return &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}, err
}

// Decode the transactions from the proposal.
decodedTxs, err := utils.GetDecodedTxs(h.txDecoder, txs)
// Build handler that will verify the partial proposals according to each lane's verification logic.
processLanesHandler := ChainProcessLanes(partialProposals, h.mempool.Registry())

// Build an empty placeholder proposal.
maxBlockSize, maxGasLimit := proposals.GetBlockLimits(ctx)
emptyProposal := proposals.NewProposal(h.txEncoder, maxBlockSize, maxGasLimit)

// Verify the proposal according to the verification logic from each lane.
finalProposal, err := processLanesHandler(ctx, emptyProposal)
if err != nil {
h.logger.Error("failed to decode transactions", "err", err)
h.logger.Error("failed to validate the proposal", "err", err)
return &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}, err
}

// Verify the proposal using the verification logic from each lane.
if _, err := h.processLanesHandler(ctx, decodedTxs); err != nil {
// Ensure block size and gas limit are correct.
if err := h.ValidateBlockLimits(finalProposal, proposalInfo); err != nil {
h.logger.Error("failed to validate the proposal", "err", err)
return &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}, err
}

h.logger.Info("validated proposal", "num_txs", len(txs))
h.logger.Info(
"processed proposal",
"num_txs", len(req.Txs),
"total_tx_bytes", finalProposal.Info.BlockSize,
"max_tx_bytes", maxBlockSize,
"total_gas_limit", finalProposal.Info.GasLimit,
"max_gas_limit", maxGasLimit,
"height", req.Height,
)

return &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT}, nil
}
}

// ChainPrepareLanes chains together the proposal preparation logic from each lane
// into a single function. The first lane in the chain is the first lane to be prepared and
// the last lane in the chain is the last lane to be prepared.
//
// In the case where any of the lanes fail to prepare the partial proposal, the lane that failed
// will be skipped and the next lane in the chain will be called to prepare the proposal.
func ChainPrepareLanes(chain ...block.Lane) block.PrepareLanesHandler {
if len(chain) == 0 {
return nil
// ExtractLanes validates the proposal against the basic invariants that are required
// for the proposal to be valid. This includes:
// 1. The proposal must contain the proposal information and must be valid.
// 2. The proposal must contain the correct number of transactions for each lane.
func (h *ProposalHandler) ExtractLanes(proposal [][]byte) (types.ProposalInfo, [][][]byte, error) {
// If the proposal is empty, then the metadata was not included.
if len(proposal) == 0 {
return types.ProposalInfo{}, nil, fmt.Errorf("proposal does not contain proposal metadata")
}

// Handle non-terminated decorators chain
if (chain[len(chain)-1] != terminator.Terminator{}) {
chain = append(chain, terminator.Terminator{})
metaDataBz, txs := proposal[ProposalInfoIndex], proposal[ProposalInfoIndex+1:]

// Retrieve the metadata from the proposal.
var metaData types.ProposalInfo
if err := metaData.Unmarshal(metaDataBz); err != nil {
return types.ProposalInfo{}, nil, fmt.Errorf("failed to unmarshal proposal metadata: %w", err)
}

return func(ctx sdk.Context, partialProposal block.BlockProposal) (finalProposal block.BlockProposal, err error) {
lane := chain[0]
lane.Logger().Info("preparing lane", "lane", lane.Name())
lanes := h.mempool.Registry()
partialProposals := make([][][]byte, len(lanes))

// Cache the context in the case where any of the lanes fail to prepare the proposal.
cacheCtx, write := ctx.CacheContext()
if metaData.TxsByLane == nil {
if len(txs) > 0 {
return types.ProposalInfo{}, nil, fmt.Errorf("proposal contains invalid number of transactions")
}

// We utilize a recover to handle any panics or errors that occur during the preparation
// of a lane's transactions. This defer will first check if there was a panic or error
// thrown from the lane's preparation logic. If there was, we log the error, skip the lane,
// and call the next lane in the chain to the prepare the proposal.
defer func() {
if rec := recover(); rec != nil || err != nil {
lane.Logger().Error("failed to prepare lane", "lane", lane.Name(), "err", err, "recover_error", rec)
lane.Logger().Info("skipping lane", "lane", lane.Name())

if len(chain) <= 2 {
// If there are only two lanes remaining, then the first lane in the chain
// is the lane that failed to prepare the partial proposal and the second lane in the
// chain is the terminator lane. We return the proposal as is.
finalProposal, err = partialProposal, nil
} else {
// If there are more than two lanes remaining, then the first lane in the chain
// is the lane that failed to prepare the proposal but the second lane in the
// chain is not the terminator lane so there could potentially be more transactions
// added to the proposal
finalProposal, err = ChainPrepareLanes(chain[1:]...)(ctx, partialProposal)
}
} else {
// Write the cache to the context since we know that the lane successfully prepared
// the partial proposal. State is written to in a backwards, cascading fashion. This means
// that the final context will only be updated after all other lanes have successfully
// prepared the partial proposal.
write()
}
}()
return types.ProposalInfo{}, partialProposals, nil
}

// Get the maximum number of bytes that can be included in the proposal for this lane.
maxTxBytesForLane := utils.GetMaxTxBytesForLane(
partialProposal.GetMaxTxBytes(),
partialProposal.GetTotalTxBytes(),
lane.GetMaxBlockSpace(),
)
h.logger.Info(
"received proposal with metadata",
"max_block_size", metaData.MaxBlockSize,
"max_gas_limit", metaData.MaxGasLimit,
"gas_limit", metaData.GasLimit,
"block_size", metaData.BlockSize,
"lanes_with_txs", metaData.TxsByLane,
)

// Iterate through all of the lanes and match the corresponding transactions to the lane.
for index, lane := range lanes {
numTxs := metaData.TxsByLane[lane.Name()]
if numTxs > uint64(len(txs)) {
return types.ProposalInfo{}, nil, fmt.Errorf(
"proposal metadata contains invalid number of transactions for lane %s; got %d, expected %d",
lane.Name(),
len(txs),
numTxs,
)
}

return lane.PrepareLane(
cacheCtx,
partialProposal,
maxTxBytesForLane,
ChainPrepareLanes(chain[1:]...),
)
partialProposals[index] = txs[:numTxs]
txs = txs[numTxs:]
}
}

// ChainProcessLanes chains together the proposal verification logic from each lane
// into a single function. The first lane in the chain is the first lane to be verified and
// the last lane in the chain is the last lane to be verified.
func ChainProcessLanes(chain ...block.Lane) block.ProcessLanesHandler {
if len(chain) == 0 {
return nil
// If there are any transactions remaining in the proposal, then the proposal is invalid.
if len(txs) > 0 {
return types.ProposalInfo{}, nil, fmt.Errorf("proposal contains invalid number of transactions")
}

// Handle non-terminated decorators chain
if (chain[len(chain)-1] != terminator.Terminator{}) {
chain = append(chain, terminator.Terminator{})
}
return metaData, partialProposals, nil
}

return func(ctx sdk.Context, proposalTxs []sdk.Tx) (sdk.Context, error) {
// Short circuit if there are no transactions to process.
if len(proposalTxs) == 0 {
return ctx, nil
}
// ValidateBlockLimits validates the block limits of the proposal against the block limits
// of the chain.
func (h *ProposalHandler) ValidateBlockLimits(finalProposal proposals.Proposal, proposalInfo types.ProposalInfo) error {
// Conduct final checks on block size and gas limit.
if finalProposal.Info.BlockSize != proposalInfo.BlockSize {
h.logger.Error(
"proposal block size does not match",
"expected", proposalInfo.BlockSize,
"got", finalProposal.Info.BlockSize,
)

chain[0].Logger().Info("processing lane", "lane", chain[0].Name())
return fmt.Errorf("proposal block size does not match")
}

if err := chain[0].CheckOrder(ctx, proposalTxs); err != nil {
chain[0].Logger().Error("failed to process lane", "lane", chain[0].Name(), "err", err)
return ctx, err
}
if finalProposal.Info.GasLimit != proposalInfo.GasLimit {
h.logger.Error(
"proposal gas limit does not match",
"expected", proposalInfo.GasLimit,
"got", finalProposal.Info.GasLimit,
)

return chain[0].ProcessLane(ctx, proposalTxs, ChainProcessLanes(chain[1:]...))
return fmt.Errorf("proposal gas limit does not match")
}

return nil
}
Loading

0 comments on commit 18e8e25

Please sign in to comment.