Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add tx hash count in chain validation index API [skip changelog] #12492

Open
wants to merge 2 commits into
base: feat/implement-index-validation-api
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions build/openrpc/full.json
Original file line number Diff line number Diff line change
Expand Up @@ -2066,6 +2066,7 @@
"Height": 42,
"IndexedMessagesCount": 42,
"IndexedEventsCount": 42,
"IndexedTxHashCount": 42,
"Backfilled": true,
"IsNullRound": true
}
Expand All @@ -2087,6 +2088,10 @@
"title": "number",
"type": "number"
},
"IndexedTxHashCount": {
"title": "number",
"type": "number"
},
"IsNullRound": {
"type": "boolean"
},
Expand Down
82 changes: 72 additions & 10 deletions chain/index/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
"golang.org/x/xerrors"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto"

"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/ethtypes"
)

var ErrChainForked = xerrors.New("chain forked")
Expand Down Expand Up @@ -125,6 +127,7 @@ func (si *SqliteIndexer) ChainValidateIndex(ctx context.Context, epoch abi.Chain
Height: uint64(expectedTs.Height()),
IndexedMessagesCount: uint64(indexedData.nonRevertedMessageCount),
IndexedEventsCount: uint64(indexedData.nonRevertedEventCount),
IndexedTxHashCount: uint64(indexedData.txHashCount),
}, nil
}

Expand Down Expand Up @@ -161,6 +164,7 @@ func (si *SqliteIndexer) getTipsetCountsAtHeight(ctx context.Context, height abi
type indexedTipSetData struct {
nonRevertedMessageCount int
nonRevertedEventCount int
txHashCount int
}

// getIndexedTipSetData fetches the indexed tipset data for a tipset
Expand All @@ -180,6 +184,10 @@ func (si *SqliteIndexer) getIndexedTipSetData(ctx context.Context, ts *types.Tip
return xerrors.Errorf("failed to query non reverted event count: %w", err)
}

if err = tx.Stmt(si.stmts.getTxHashCountStmt).QueryRowContext(ctx, tsKeyCidBytes).Scan(&data.txHashCount); err != nil {
return xerrors.Errorf("failed to query tx hash count: %w", err)
}

return nil
})

Expand All @@ -200,23 +208,21 @@ func (si *SqliteIndexer) verifyIndexedData(ctx context.Context, ts *types.TipSet
return xerrors.Errorf("failed to get next tipset for height %d: %w", ts.Height(), err)
}

executedMsgs, err := si.loadExecutedMessages(ctx, ts, executionTs)
chainStoreData, err := si.getChainStoreTipsetData(ctx, ts, executionTs)
if err != nil {
return xerrors.Errorf("failed to load executed messages for height %d: %w", ts.Height(), err)
return xerrors.Errorf("failed to get chain store data for tipset at height %d: %w", ts.Height(), err)
}

totalEventsCount := 0
for _, emsg := range executedMsgs {
totalEventsCount += len(emsg.evs)
if chainStoreData == nil {
return xerrors.Errorf("invalid chain store data for tipset at height %d", ts.Height())
}

if totalEventsCount != indexedData.nonRevertedEventCount {
return xerrors.Errorf("event count mismatch for height %d: chainstore has %d, index has %d", ts.Height(), totalEventsCount, indexedData.nonRevertedEventCount)
if chainStoreData.totalMsgCount != indexedData.nonRevertedMessageCount {
return xerrors.Errorf("message count mismatch: chainstore has %d, index has %d", chainStoreData.totalMsgCount, indexedData.nonRevertedMessageCount)
}

totalExecutedMsgCount := len(executedMsgs)
if totalExecutedMsgCount != indexedData.nonRevertedMessageCount {
return xerrors.Errorf("message count mismatch for height %d: chainstore has %d, index has %d", ts.Height(), totalExecutedMsgCount, indexedData.nonRevertedMessageCount)
if chainStoreData.totalEventsCount != indexedData.nonRevertedEventCount {
return xerrors.Errorf("event count mismatch: chainstore has %d, index has %d", chainStoreData.totalEventsCount, indexedData.nonRevertedEventCount)
}

// if non-reverted events exist which means that tipset `ts` has been executed, there should be 0 reverted events in the DB
Expand All @@ -229,9 +235,64 @@ func (si *SqliteIndexer) verifyIndexedData(ctx context.Context, ts *types.TipSet
return xerrors.Errorf("index corruption: reverted events found for an executed tipset %s at height %d", tsKeyCid, ts.Height())
}

if chainStoreData.totalTxHashCount != indexedData.txHashCount {
return xerrors.Errorf("tx hash count mismatch: chainstore has %d, index has %d", chainStoreData.totalTxHashCount, indexedData.txHashCount)
}

return nil
}

type chainStoreTipsetData struct {
totalMsgCount int
totalEventsCount int
totalTxHashCount int
}

func (si *SqliteIndexer) getChainStoreTipsetData(ctx context.Context, ts *types.TipSet, executionTs *types.TipSet) (*chainStoreTipsetData, error) {
executedMsgs, err := si.loadExecutedMessages(ctx, ts, executionTs)
if err != nil {
return nil, xerrors.Errorf("failed to load executed messages: %w", err)
}

msgCount := len(executedMsgs)
eventsCount := 0
txHashCount := 0

for _, emsg := range executedMsgs {
eventsCount += len(emsg.evs)
}

for _, header := range ts.Blocks() {
_, smsgs, err := si.cs.MessagesForBlock(ctx, header)
if err != nil {
return nil, xerrors.Errorf("failed to get messages for block: %w", err)
}

for _, smsg := range smsgs {
if smsg.Signature.Type != crypto.SigTypeDelegated {
continue
}

tx, err := ethtypes.EthTransactionFromSignedFilecoinMessage(smsg)
if err != nil {
return nil, xerrors.Errorf("failed to convert from signed message: %w at epoch: %d", err, ts.Height())
}

if _, err = tx.TxHash(); err != nil {
return nil, xerrors.Errorf("failed to calculate hash for ethTx: %w at epoch: %d", err, ts.Height())
}

txHashCount++
}
}

return &chainStoreTipsetData{
totalMsgCount: msgCount,
totalEventsCount: eventsCount,
totalTxHashCount: txHashCount,
}, nil
}

func (si *SqliteIndexer) backfillMissingTipset(ctx context.Context, ts *types.TipSet, backfill bool) (*types.IndexValidation, error) {
if !backfill {
return nil, xerrors.Errorf("missing tipset at height %d in the chain index, set backfill flag to true to fix", ts.Height())
Expand Down Expand Up @@ -274,6 +335,7 @@ func (si *SqliteIndexer) backfillMissingTipset(ctx context.Context, ts *types.Ti
Backfilled: true,
IndexedMessagesCount: uint64(indexedData.nonRevertedMessageCount),
IndexedEventsCount: uint64(indexedData.nonRevertedEventCount),
IndexedTxHashCount: uint64(indexedData.txHashCount),
}, nil
}

Expand Down
5 changes: 4 additions & 1 deletion chain/index/ddls.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ var ddls = []string{

`CREATE INDEX IF NOT EXISTS idx_height ON tipset_message (height)`,

`CREATE INDEX IF NOT EXISTS event_entry_event_id ON event_entry(event_id)`,
`CREATE INDEX IF NOT EXISTS idx_eth_tx_hash_message_cid ON eth_tx_hash (message_cid);`,

`CREATE INDEX IF NOT EXISTS event_entry_event_id ON event_entry(event_id);`,
}

// preparedStatementMapping returns a map of fields of the preparedStatements struct to the SQL
Expand Down Expand Up @@ -84,5 +86,6 @@ func preparedStatementMapping(ps *preparedStatements) map[**sql.Stmt]string {
&ps.getNonRevertedTipsetMessageCountStmt: "SELECT COUNT(*) FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0 AND message_cid IS NOT NULL",
&ps.getNonRevertedTipsetEventCountStmt: "SELECT COUNT(*) FROM event WHERE reverted = 0 AND message_id IN (SELECT message_id FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0)",
&ps.hasRevertedEventsInTipsetStmt: "SELECT EXISTS(SELECT 1 FROM event WHERE reverted = 1 AND message_id IN (SELECT message_id FROM tipset_message WHERE tipset_key_cid = ?))",
&ps.getTxHashCountStmt: "SELECT COUNT(e.tx_hash) AS tx_hash_count FROM tipset_message t INNER JOIN eth_tx_hash e ON t.message_cid = e.message_cid WHERE t.tipset_key_cid = ? AND t.reverted = 0 AND t.message_cid IS NOT NULL",
}
}
1 change: 1 addition & 0 deletions chain/index/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type preparedStatements struct {

getNonRevertedTipsetMessageCountStmt *sql.Stmt
getNonRevertedTipsetEventCountStmt *sql.Stmt
getTxHashCountStmt *sql.Stmt
hasRevertedEventsInTipsetStmt *sql.Stmt
}

Expand Down
1 change: 1 addition & 0 deletions chain/types/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ type IndexValidation struct {

IndexedMessagesCount uint64
IndexedEventsCount uint64
IndexedTxHashCount uint64
Backfilled bool
IsNullRound bool
}
1 change: 1 addition & 0 deletions documentation/en/api-v1-unstable-methods.md
Original file line number Diff line number Diff line change
Expand Up @@ -1261,6 +1261,7 @@ Response:
"Height": 42,
"IndexedMessagesCount": 42,
"IndexedEventsCount": 42,
"IndexedTxHashCount": 42,
"Backfilled": true,
"IsNullRound": true
}
Expand Down
Loading