Skip to content

Commit

Permalink
fix: do not skip QueueIndex 0 in worker (ethereum#439)
Browse files Browse the repository at this point in the history
fix: do not skip QueueIndex 0
  • Loading branch information
Thegaram committed Aug 4, 2023
1 parent 26eeb40 commit cf42026
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 22 deletions.
55 changes: 34 additions & 21 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,19 @@ type environment struct {
receipts []*types.Receipt

// circuit capacity check related fields
traceEnv *core.TraceEnv // env for tracing
accRows *types.RowConsumption // accumulated row consumption for a block
maxL1Index uint64 // maximum L1 index included or skipped
traceEnv *core.TraceEnv // env for tracing
accRows *types.RowConsumption // accumulated row consumption for a block
nextL1MsgIndex uint64 // next L1 queue index to be processed
}

// task contains all information for consensus engine sealing and result submitting.
type task struct {
receipts []*types.Receipt
state *state.StateDB
block *types.Block
createdAt time.Time
accRows *types.RowConsumption // accumulated row consumption in the circuit side
maxL1Index uint64 // maximum L1 index included or skipped
receipts []*types.Receipt
state *state.StateDB
block *types.Block
createdAt time.Time
accRows *types.RowConsumption // accumulated row consumption in the circuit side
nextL1MsgIndex uint64 // next L1 queue index to be processed
}

const (
Expand Down Expand Up @@ -698,10 +698,11 @@ func (w *worker) resultLoop() {
}
logs = append(logs, receipt.Logs...)
}
// Store highest L1 queue index processed by this block. This includes both
// included and skipped messages. This way, if a block only skips messages,
// we won't reprocess the same messages from the next block.
rawdb.WriteFirstQueueIndexNotInL2Block(w.eth.ChainDb(), hash, task.maxL1Index+1)
// Store first L1 queue index not processed by this block.
// Note: This accounts for both included and skipped messages. This
// way, if a block only skips messages, we won't reprocess the same
// messages from the next block.
rawdb.WriteFirstQueueIndexNotInL2Block(w.eth.ChainDb(), hash, task.nextL1MsgIndex)
// Store circuit row consumption.
log.Trace(
"Worker write block row consumption",
Expand Down Expand Up @@ -773,7 +774,7 @@ func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error {
env.tcount = 0
env.blockSize = 0
env.l1TxCount = 0
env.maxL1Index = 0
env.nextL1MsgIndex = 0 // initialized in commitNewWork

// Swap out the old work with the new one, terminating any leftover prefetcher
// processes in the mean time and starting a new one.
Expand Down Expand Up @@ -938,6 +939,13 @@ loop:
if tx == nil {
break
}
if tx.IsL1MessageTx() && tx.AsL1MessageTx().QueueIndex != w.current.nextL1MsgIndex {
log.Error(
"Unexpected L1 message queue index in worker",
"expected", w.current.nextL1MsgIndex,
"got", tx.AsL1MessageTx().QueueIndex,
)
}
if !w.chainConfig.Scroll.IsValidBlockSize(w.current.blockSize + tx.Size()) {
log.Trace("Block size limit reached", "have", w.current.blockSize, "want", w.chainConfig.Scroll.MaxTxPayloadBytesPerBlock, "tx", tx.Size())
txs.Pop() // skip transactions from this account
Expand Down Expand Up @@ -981,7 +989,7 @@ loop:
coalescedLogs = append(coalescedLogs, logs...)
if tx.IsL1MessageTx() {
w.current.l1TxCount++
w.current.maxL1Index = tx.AsL1MessageTx().QueueIndex
w.current.nextL1MsgIndex = tx.AsL1MessageTx().QueueIndex + 1
}
w.current.tcount++
w.current.blockSize += tx.Size()
Expand All @@ -1007,7 +1015,7 @@ loop:
// This is also useful for skipping "problematic" L1MessageTxs.
queueIndex := tx.AsL1MessageTx().QueueIndex
log.Trace("Circuit capacity limit reached for a single tx", "tx", tx.Hash(), "queueIndex", queueIndex)
w.current.maxL1Index = queueIndex
w.current.nextL1MsgIndex = queueIndex + 1
txs.Shift()

case (errors.Is(err, circuitcapacitychecker.ErrTxRowConsumptionOverflow) && !tx.IsL1MessageTx()):
Expand All @@ -1021,7 +1029,7 @@ loop:
// shift to the next from the account because we shouldn't skip the entire txs from the same account
queueIndex := tx.AsL1MessageTx().QueueIndex
log.Trace("Unknown circuit capacity checker error for L1MessageTx", "tx", tx.Hash(), "queueIndex", queueIndex)
w.current.maxL1Index = queueIndex
w.current.nextL1MsgIndex = queueIndex + 1
txs.Shift()

case (errors.Is(err, circuitcapacitychecker.ErrUnknown) && !tx.IsL1MessageTx()):
Expand Down Expand Up @@ -1060,15 +1068,15 @@ loop:
return false, circuitCapacityReached
}

func (w *worker) collectPendingL1Messages() []types.L1MessageTx {
func (w *worker) collectPendingL1Messages() (uint64, []types.L1MessageTx) {
nextQueueIndex := rawdb.ReadFirstQueueIndexNotInL2Block(w.eth.ChainDb(), w.chain.CurrentHeader().Hash())
if nextQueueIndex == nil {
// the parent (w.chain.CurrentHeader) must have been processed before we start a new mining job.
log.Crit("Failed to read last L1 message in L2 block", "l2BlockHash", w.chain.CurrentHeader().Hash())
}
startIndex := *nextQueueIndex
maxCount := w.chainConfig.Scroll.L1Config.NumL1MessagesPerBlock
return rawdb.ReadL1MessagesFrom(w.eth.ChainDb(), startIndex, maxCount)
return startIndex, rawdb.ReadL1MessagesFrom(w.eth.ChainDb(), startIndex, maxCount)
}

// commitNewWork generates several new sealing tasks based on the parent block.
Expand Down Expand Up @@ -1177,7 +1185,12 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64)
l1Txs := make(map[common.Address]types.Transactions)
pendingL1Txs := 0
if w.chainConfig.Scroll.ShouldIncludeL1Messages() {
l1Messages := w.collectPendingL1Messages()
nextQueueIndex, l1Messages := w.collectPendingL1Messages()

// If l1Messages is empty, we should still set this
// to the same value as the parent block.
env.nextL1MsgIndex = nextQueueIndex

pendingL1Txs = len(l1Messages)
for _, l1msg := range l1Messages {
tx := types.NewTx(&l1msg)
Expand Down Expand Up @@ -1288,7 +1301,7 @@ func (w *worker) commit(uncles []*types.Header, interval func(), update bool, st
interval()
}
select {
case w.taskCh <- &task{receipts: receipts, state: s, block: block, createdAt: time.Now(), accRows: w.current.accRows, maxL1Index: w.current.maxL1Index}:
case w.taskCh <- &task{receipts: receipts, state: s, block: block, createdAt: time.Now(), accRows: w.current.accRows, nextL1MsgIndex: w.current.nextL1MsgIndex}:
w.unconfirmed.Shift(block.NumberU64() - 1)
log.Info("Commit new mining work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()),
"uncles", len(uncles), "txs", w.current.tcount,
Expand Down
2 changes: 1 addition & 1 deletion params/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
const (
VersionMajor = 4 // Major version component of the current release
VersionMinor = 3 // Minor version component of the current release
VersionPatch = 18 // Patch version component of the current release
VersionPatch = 19 // Patch version component of the current release
VersionMeta = "sepolia" // Version metadata to append to the version string
)

Expand Down

0 comments on commit cf42026

Please sign in to comment.