Skip to content

Commit

Permalink
actpool for blobtx
Browse files Browse the repository at this point in the history
  • Loading branch information
envestcc committed Aug 21, 2024
1 parent b6b4365 commit dbdf68b
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 1 deletion.
13 changes: 13 additions & 0 deletions actpool/actpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,25 @@ func NewActPool(g genesis.Genesis, sf protocol.StateReader, cfg Config, opts ...
func (ap *actPool) Start(ctx context.Context) error {
// open action store and load all actions
if ap.store != nil {
blobs := make(SortedActions, 0)
err := ap.store.Open(func(selp *action.SealedEnvelope) error {
isBlobTx := false
if isBlobTx {
blobs = append(blobs, selp)
return nil
}
return ap.add(ctx, selp)
})
if err != nil {
return err
}
// add blob txs to actpool in nonce order
sort.Sort(blobs)
for _, selp := range blobs {
if err := ap.add(ctx, selp); err != nil {
return err
}
}
return ap.storeSync.Start(ctx)
}
return nil
Expand Down
50 changes: 49 additions & 1 deletion actpool/actqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/facebookgo/clock"
"github.com/pkg/errors"
"go.uber.org/zap"

"github.com/iotexproject/iotex-address/address"
Expand Down Expand Up @@ -59,6 +60,8 @@ type actQueue struct {
clock clock.Clock
ttl time.Duration
mu sync.RWMutex
// count of blob txs in the queue
blobCount int
}

// NewActQueue create a new action queue
Expand Down Expand Up @@ -106,6 +109,31 @@ func (q *actQueue) Put(act *action.SealedEnvelope) error {
if nonce < q.pendingNonce && act.GasFeeCap().Cmp(actInPool.GasFeeCap()) != 1 {
return action.ErrReplaceUnderpriced
}
// TODO: 2x bumps in gas price are allowed for blob tx
isPrevBlobTx, isBlobTx := false, false
if isPrevBlobTx {
if !isBlobTx {
return errors.Wrap(action.ErrReplaceUnderpriced, "blob tx can only replace blob tx")
}
var (
// TODO: set the real value
prevBlobGasFeeCap = big.NewInt(0)
blobGasFeeCap = big.NewInt(0)

priceBump = big.NewInt(2)
minGasFeeCap = new(big.Int).Mul(actInPool.GasFeeCap(), priceBump)
minGasTipCap = new(big.Int).Mul(actInPool.GasTipCap(), priceBump)
minBlobGasFeeCap = new(big.Int).Mul(prevBlobGasFeeCap, priceBump)
)
switch {
case act.GasFeeCap().Cmp(minGasFeeCap) < 0:
return errors.Wrapf(action.ErrReplaceUnderpriced, "gas fee cap %s < %s", act.GasFeeCap(), minGasFeeCap)
case act.GasTipCap().Cmp(minGasTipCap) < 0:
return errors.Wrapf(action.ErrReplaceUnderpriced, "gas tip cap %s < %s", act.GasTipCap(), minGasTipCap)
case blobGasFeeCap.Cmp(minBlobGasFeeCap) < 0:
return errors.Wrapf(action.ErrReplaceUnderpriced, "blob gas fee cap %s < %s", blobGasFeeCap, minBlobGasFeeCap)
}
}
// update action in q.items and q.index
q.items[nonce] = act
for i := range q.ascQueue {
Expand All @@ -117,13 +145,21 @@ func (q *actQueue) Put(act *action.SealedEnvelope) error {
q.updateFromNonce(nonce)
return nil
}
// check max number of blob txs per account
isBlobTx := false
if isBlobTx && q.blobCount >= int(q.ap.cfg.MaxNumBlobsPerAcct) {
return errors.Wrap(action.ErrNonceTooHigh, "too many blob txs in the queue")
}
nttl := &nonceWithTTL{nonce: nonce, deadline: q.clock.Now().Add(q.ttl)}
heap.Push(&q.ascQueue, nttl)
heap.Push(&q.descQueue, nttl)
q.items[nonce] = act
if nonce == q.pendingNonce {
q.updateFromNonce(q.pendingNonce)
}
if isBlobTx {
q.blobCount++
}
return nil
}

Expand Down Expand Up @@ -188,6 +224,10 @@ func (q *actQueue) cleanTimeout() []*action.SealedEnvelope {
delete(q.pendingBalance, nonce)
q.ascQueue[i] = q.ascQueue[size-1]
size--
isBlobTx := false
if isBlobTx {
q.blobCount--
}
continue
}
i++
Expand Down Expand Up @@ -221,6 +261,10 @@ func (q *actQueue) UpdateAccountState(nonce uint64, balance *big.Int) []*action.
nonce := nttl.nonce
removed = append(removed, q.items[nonce])
delete(q.items, nonce)
isBlobTx := false
if isBlobTx {
q.blobCount--
}
}
return removed
}
Expand Down Expand Up @@ -264,6 +308,7 @@ func (q *actQueue) Reset() {
q.pendingBalance = make(map[uint64]*big.Int)
q.accountNonce = 0
q.accountBalance = big.NewInt(0)
q.blobCount = 0
}

// PendingActs creates a consecutive nonce-sorted slice of actions
Expand Down Expand Up @@ -337,6 +382,9 @@ func (q *actQueue) PopActionWithLargestNonce() *action.SealedEnvelope {
item := q.items[itemMeta.nonce]
delete(q.items, itemMeta.nonce)
q.updateFromNonce(itemMeta.nonce)

isBlobTx := false
if isBlobTx {
q.blobCount--
}
return item
}
3 changes: 3 additions & 0 deletions actpool/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ var (
ActionExpiry: 10 * time.Minute,
MinGasPriceStr: big.NewInt(unit.Qev).String(),
BlackList: []string{},
MaxNumBlobsPerAcct: 16,
}
)

Expand All @@ -39,6 +40,8 @@ type Config struct {
BlackList []string `yaml:"blackList"`
// Store defines the config for persistent cache
Store *StoreConfig `yaml:"store"`
// MaxNumBlobsPerAcct defines the maximum number of blob txs an account can have
MaxNumBlobsPerAcct uint64 `yaml:"maxNumBlobsPerAcct"`
}

// MinGasPrice returns the minimal gas price threshold
Expand Down
13 changes: 13 additions & 0 deletions actpool/queueworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,19 @@ func (worker *queueWorker) checkSelpWithState(act *action.SealedEnvelope, pendin
return action.ErrNonceTooHigh
}

// TODO: Nonce must be continuous for blob tx
isBlobTx := false
if isBlobTx {
pendingNonceInPool, ok := worker.PendingNonce(act.SenderAddress())
if !ok {
pendingNonceInPool = pendingNonce
}
if act.Nonce() > pendingNonceInPool {
_actpoolMtc.WithLabelValues("nonceTooLarge").Inc()
return action.ErrNonceTooHigh
}
}

if cost, _ := act.Cost(); balance.Cmp(cost) < 0 {
_actpoolMtc.WithLabelValues("insufficientBalance").Inc()
sender := act.SenderAddress().String()
Expand Down

0 comments on commit dbdf68b

Please sign in to comment.