Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ABCI): New Proposal Struct with Associated Metadata (backport #126) #133

Merged
merged 1 commit into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ use-integration:
go work edit -dropuse .
go work edit -use ./tests/integration

tidy:
go mod tidy
gofmt -s -w ./

.PHONY: docker-build docker-build-integration
###############################################################################
## Docker ##
Expand Down
255 changes: 146 additions & 109 deletions abci/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,13 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"

"github.com/skip-mev/block-sdk/block"
"github.com/skip-mev/block-sdk/block/utils"
"github.com/skip-mev/block-sdk/lanes/terminator"
"github.com/skip-mev/block-sdk/block/proposals"
"github.com/skip-mev/block-sdk/block/proposals/types"
)

const (
// ProposalInfoIndex is the index of the proposal metadata in the proposal.
ProposalInfoIndex = 0
)

type (
Expand All @@ -18,66 +23,92 @@ type (
ProposalHandler struct {
logger log.Logger
txDecoder sdk.TxDecoder
txEncoder sdk.TxEncoder
prepareLanesHandler block.PrepareLanesHandler
processLanesHandler block.ProcessLanesHandler
mempool block.Mempool
}
)

// NewProposalHandler returns a new abci++ proposal handler. This proposal handler will
// NewProposalHandler returns a new ABCI++ proposal handler. This proposal handler will
// iteratively call each of the lanes in the chain to prepare and process the proposal.
func NewProposalHandler(logger log.Logger, txDecoder sdk.TxDecoder, mempool block.Mempool) *ProposalHandler {
func NewProposalHandler(
logger log.Logger,
txDecoder sdk.TxDecoder,
txEncoder sdk.TxEncoder,
mempool block.Mempool,
) *ProposalHandler {
return &ProposalHandler{
logger: logger,
txDecoder: txDecoder,
prepareLanesHandler: ChainPrepareLanes(mempool.Registry()...),
processLanesHandler: ChainProcessLanes(mempool.Registry()...),
txEncoder: txEncoder,
prepareLanesHandler: ChainPrepareLanes(mempool.Registry()),
mempool: mempool,
}
}

// PrepareProposalHandler prepares the proposal by selecting transactions from each lane
// according to each lane's selection logic. We select transactions in a greedy fashion. Note that
// each lane has an boundary on the number of bytes that can be included in the proposal. By default,
// the default lane will not have a boundary on the number of bytes that can be included in the proposal and
// will include all valid transactions in the proposal (up to MaxTxBytes).
// according to each lane's selection logic. We select transactions in the order in which the
// lanes are configured on the chain. Note that each lane has an boundary on the number of
// bytes/gas that can be included in the proposal. By default, the default lane will not have
// a boundary on the number of bytes that can be included in the proposal and will include all
// valid transactions in the proposal (up to MaxBlockSize, MaxGasLimit).
func (h *ProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHandler {
return func(ctx sdk.Context, req *abci.RequestPrepareProposal) (resp *abci.ResponsePrepareProposal, err error) {
// In the case where there is a panic, we recover here and return an empty proposal.
defer func() {
if err := recover(); err != nil {
if rec := recover(); rec != nil {
h.logger.Error("failed to prepare proposal", "err", err)

// TODO: Should we attempt to return a empty proposal here with empty proposal info?
resp = &abci.ResponsePrepareProposal{Txs: make([][]byte, 0)}
err = fmt.Errorf("failed to prepare proposal: %v", rec)
}
}()

h.logger.Info("mempool distribution before proposal creation", "distribution", h.mempool.GetTxDistribution())

proposal, err := h.prepareLanesHandler(ctx, block.NewProposal(req.MaxTxBytes))
// Build an empty placeholder proposal with the maximum block size and gas limit.
maxBlockSize, maxGasLimit := proposals.GetBlockLimits(ctx)
emptyProposal := proposals.NewProposal(h.txEncoder, maxBlockSize, maxGasLimit)

// Fill the proposal with transactions from each lane.
finalProposal, err := h.prepareLanesHandler(ctx, emptyProposal)
if err != nil {
h.logger.Error("failed to prepare proposal", "err", err)
return &abci.ResponsePrepareProposal{Txs: make([][]byte, 0)}, err
}

// Retrieve the proposal with metadata and transactions.
txs, err := finalProposal.GetProposalWithInfo()
if err != nil {
h.logger.Error("failed to get proposal with metadata", "err", err)
return &abci.ResponsePrepareProposal{Txs: make([][]byte, 0)}, err
}

h.logger.Info(
"prepared proposal",
"num_txs", proposal.GetNumTxs(),
"total_tx_bytes", proposal.GetTotalTxBytes(),
"num_txs", len(txs),
"total_tx_bytes", finalProposal.Info.BlockSize,
"max_tx_bytes", maxBlockSize,
"total_gas_limit", finalProposal.Info.GasLimit,
"max_gas_limit", maxGasLimit,
"height", req.Height,
)

h.logger.Info("mempool distribution after proposal creation", "distribution", h.mempool.GetTxDistribution())

return &abci.ResponsePrepareProposal{
Txs: proposal.GetProposal(),
Txs: txs,
}, nil
}
}

// ProcessProposalHandler processes the proposal by verifying all transactions in the proposal
// according to each lane's verification logic. We verify proposals in a greedy fashion.
// If a lane's portion of the proposal is invalid, we reject the proposal. After a lane's portion
// of the proposal is verified, we pass the remaining transactions to the next lane in the chain.
// according to each lane's verification logic. Proposals are verified similar to how they are
// constructed. After a proposal is processed, it should amount to the same proposal that was prepared.
// Each proposal will first be broken down by the lanes that prepared each partial proposal. Then, each
// lane will iteratively verify the transactions that it belong to it. If any lane fails to verify the
// transactions, then the proposal is rejected.
func (h *ProposalHandler) ProcessProposalHandler() sdk.ProcessProposalHandler {
return func(ctx sdk.Context, req *abci.RequestProcessProposal) (resp *abci.ResponseProcessProposal, err error) {
// In the case where any of the lanes panic, we recover here and return a reject status.
Expand All @@ -90,126 +121,132 @@ func (h *ProposalHandler) ProcessProposalHandler() sdk.ProcessProposalHandler {
}
}()

txs := req.Txs
if len(txs) == 0 {
h.logger.Info("accepted empty proposal")
return &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT}, nil
// Extract all of the lanes and their corresponding transactions from the proposal.
proposalInfo, partialProposals, err := h.ExtractLanes(req.Txs)
if err != nil {
h.logger.Error("failed to validate proposal", "err", err)
return &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}, err
}

// Decode the transactions from the proposal.
decodedTxs, err := utils.GetDecodedTxs(h.txDecoder, txs)
// Build handler that will verify the partial proposals according to each lane's verification logic.
processLanesHandler := ChainProcessLanes(partialProposals, h.mempool.Registry())

// Build an empty placeholder proposal.
maxBlockSize, maxGasLimit := proposals.GetBlockLimits(ctx)
emptyProposal := proposals.NewProposal(h.txEncoder, maxBlockSize, maxGasLimit)

// Verify the proposal according to the verification logic from each lane.
finalProposal, err := processLanesHandler(ctx, emptyProposal)
if err != nil {
h.logger.Error("failed to decode transactions", "err", err)
h.logger.Error("failed to validate the proposal", "err", err)
return &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}, err
}

// Verify the proposal using the verification logic from each lane.
if _, err := h.processLanesHandler(ctx, decodedTxs); err != nil {
// Ensure block size and gas limit are correct.
if err := h.ValidateBlockLimits(finalProposal, proposalInfo); err != nil {
h.logger.Error("failed to validate the proposal", "err", err)
return &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}, err
}

h.logger.Info("validated proposal", "num_txs", len(txs))
h.logger.Info(
"processed proposal",
"num_txs", len(req.Txs),
"total_tx_bytes", finalProposal.Info.BlockSize,
"max_tx_bytes", maxBlockSize,
"total_gas_limit", finalProposal.Info.GasLimit,
"max_gas_limit", maxGasLimit,
"height", req.Height,
)

return &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT}, nil
}
}

// ChainPrepareLanes chains together the proposal preparation logic from each lane
// into a single function. The first lane in the chain is the first lane to be prepared and
// the last lane in the chain is the last lane to be prepared.
//
// In the case where any of the lanes fail to prepare the partial proposal, the lane that failed
// will be skipped and the next lane in the chain will be called to prepare the proposal.
func ChainPrepareLanes(chain ...block.Lane) block.PrepareLanesHandler {
if len(chain) == 0 {
return nil
// ExtractLanes validates the proposal against the basic invariants that are required
// for the proposal to be valid. This includes:
// 1. The proposal must contain the proposal information and must be valid.
// 2. The proposal must contain the correct number of transactions for each lane.
func (h *ProposalHandler) ExtractLanes(proposal [][]byte) (types.ProposalInfo, [][][]byte, error) {
// If the proposal is empty, then the metadata was not included.
if len(proposal) == 0 {
return types.ProposalInfo{}, nil, fmt.Errorf("proposal does not contain proposal metadata")
}

// Handle non-terminated decorators chain
if (chain[len(chain)-1] != terminator.Terminator{}) {
chain = append(chain, terminator.Terminator{})
metaDataBz, txs := proposal[ProposalInfoIndex], proposal[ProposalInfoIndex+1:]

// Retrieve the metadata from the proposal.
var metaData types.ProposalInfo
if err := metaData.Unmarshal(metaDataBz); err != nil {
return types.ProposalInfo{}, nil, fmt.Errorf("failed to unmarshal proposal metadata: %w", err)
}

return func(ctx sdk.Context, partialProposal block.BlockProposal) (finalProposal block.BlockProposal, err error) {
lane := chain[0]
lane.Logger().Info("preparing lane", "lane", lane.Name())
lanes := h.mempool.Registry()
partialProposals := make([][][]byte, len(lanes))

// Cache the context in the case where any of the lanes fail to prepare the proposal.
cacheCtx, write := ctx.CacheContext()
if metaData.TxsByLane == nil {
if len(txs) > 0 {
return types.ProposalInfo{}, nil, fmt.Errorf("proposal contains invalid number of transactions")
}

// We utilize a recover to handle any panics or errors that occur during the preparation
// of a lane's transactions. This defer will first check if there was a panic or error
// thrown from the lane's preparation logic. If there was, we log the error, skip the lane,
// and call the next lane in the chain to the prepare the proposal.
defer func() {
if rec := recover(); rec != nil || err != nil {
lane.Logger().Error("failed to prepare lane", "lane", lane.Name(), "err", err, "recover_error", rec)
lane.Logger().Info("skipping lane", "lane", lane.Name())

if len(chain) <= 2 {
// If there are only two lanes remaining, then the first lane in the chain
// is the lane that failed to prepare the partial proposal and the second lane in the
// chain is the terminator lane. We return the proposal as is.
finalProposal, err = partialProposal, nil
} else {
// If there are more than two lanes remaining, then the first lane in the chain
// is the lane that failed to prepare the proposal but the second lane in the
// chain is not the terminator lane so there could potentially be more transactions
// added to the proposal
finalProposal, err = ChainPrepareLanes(chain[1:]...)(ctx, partialProposal)
}
} else {
// Write the cache to the context since we know that the lane successfully prepared
// the partial proposal. State is written to in a backwards, cascading fashion. This means
// that the final context will only be updated after all other lanes have successfully
// prepared the partial proposal.
write()
}
}()
return types.ProposalInfo{}, partialProposals, nil
}

// Get the maximum number of bytes that can be included in the proposal for this lane.
maxTxBytesForLane := utils.GetMaxTxBytesForLane(
partialProposal.GetMaxTxBytes(),
partialProposal.GetTotalTxBytes(),
lane.GetMaxBlockSpace(),
)
h.logger.Info(
"received proposal with metadata",
"max_block_size", metaData.MaxBlockSize,
"max_gas_limit", metaData.MaxGasLimit,
"gas_limit", metaData.GasLimit,
"block_size", metaData.BlockSize,
"lanes_with_txs", metaData.TxsByLane,
)

// Iterate through all of the lanes and match the corresponding transactions to the lane.
for index, lane := range lanes {
numTxs := metaData.TxsByLane[lane.Name()]
if numTxs > uint64(len(txs)) {
return types.ProposalInfo{}, nil, fmt.Errorf(
"proposal metadata contains invalid number of transactions for lane %s; got %d, expected %d",
lane.Name(),
len(txs),
numTxs,
)
}

return lane.PrepareLane(
cacheCtx,
partialProposal,
maxTxBytesForLane,
ChainPrepareLanes(chain[1:]...),
)
partialProposals[index] = txs[:numTxs]
txs = txs[numTxs:]
}
}

// ChainProcessLanes chains together the proposal verification logic from each lane
// into a single function. The first lane in the chain is the first lane to be verified and
// the last lane in the chain is the last lane to be verified.
func ChainProcessLanes(chain ...block.Lane) block.ProcessLanesHandler {
if len(chain) == 0 {
return nil
// If there are any transactions remaining in the proposal, then the proposal is invalid.
if len(txs) > 0 {
return types.ProposalInfo{}, nil, fmt.Errorf("proposal contains invalid number of transactions")
}

// Handle non-terminated decorators chain
if (chain[len(chain)-1] != terminator.Terminator{}) {
chain = append(chain, terminator.Terminator{})
}
return metaData, partialProposals, nil
}

return func(ctx sdk.Context, proposalTxs []sdk.Tx) (sdk.Context, error) {
// Short circuit if there are no transactions to process.
if len(proposalTxs) == 0 {
return ctx, nil
}
// ValidateBlockLimits validates the block limits of the proposal against the block limits
// of the chain.
func (h *ProposalHandler) ValidateBlockLimits(finalProposal proposals.Proposal, proposalInfo types.ProposalInfo) error {
// Conduct final checks on block size and gas limit.
if finalProposal.Info.BlockSize != proposalInfo.BlockSize {
h.logger.Error(
"proposal block size does not match",
"expected", proposalInfo.BlockSize,
"got", finalProposal.Info.BlockSize,
)

chain[0].Logger().Info("processing lane", "lane", chain[0].Name())
return fmt.Errorf("proposal block size does not match")
}

if err := chain[0].CheckOrder(ctx, proposalTxs); err != nil {
chain[0].Logger().Error("failed to process lane", "lane", chain[0].Name(), "err", err)
return ctx, err
}
if finalProposal.Info.GasLimit != proposalInfo.GasLimit {
h.logger.Error(
"proposal gas limit does not match",
"expected", proposalInfo.GasLimit,
"got", finalProposal.Info.GasLimit,
)

return chain[0].ProcessLane(ctx, proposalTxs, ChainProcessLanes(chain[1:]...))
return fmt.Errorf("proposal gas limit does not match")
}

return nil
}
Loading
Loading