From 632c6205c1a80409903dc32c31a734a00eeed336 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Sat, 14 Sep 2024 15:55:29 +0530 Subject: [PATCH 1/6] feat: finish todos of validation api --- chain/index/api.go | 59 +++++++++++++++++++++++++++++++++++------- chain/index/ddls.go | 3 +++ chain/index/indexer.go | 4 +++ 3 files changed, 57 insertions(+), 9 deletions(-) diff --git a/chain/index/api.go b/chain/index/api.go index 3674aa3046..797e20124d 100644 --- a/chain/index/api.go +++ b/chain/index/api.go @@ -57,9 +57,10 @@ func (si *SqliteIndexer) getTipsetCountsAtHeight(ctx context.Context, height abi } func (si *SqliteIndexer) ChainValidateIndex(ctx context.Context, epoch abi.ChainEpoch, backfill bool) (*types.IndexValidation, error) { - if !si.started { - return nil, xerrors.Errorf("ChainValidateIndex can only be called after the indexer has been started") + if !si.started && si.closed { + return nil, xerrors.Errorf("ChainValidateIndex can only be called after the indexer has been started and not closed") } + si.writerLk.Lock() defer si.writerLk.Unlock() @@ -126,17 +127,49 @@ func (si *SqliteIndexer) ChainValidateIndex(ctx context.Context, epoch abi.Chain return nil, xerrors.Errorf("failed to cast tipset key cid: %w", err) } - if indexedTsKeyCid != expectedTsKeyCid { + if !indexedTsKeyCid.Equals(expectedTsKeyCid) { return nil, xerrors.Errorf("index corruption: non-reverted tipset at height %d has key %s, but canonical chain has %s", epoch, indexedTsKeyCid, expectedTsKeyCid) } + indexedNonRevertedMsgCount, indexedNonRevertedEventsCount, hasRevertedEvents, err := si.getIndexedTipSetData(ctx, expectedTs.Key()) + if err != nil { + return nil, xerrors.Errorf("failed to get tipset message and event counts at height %d: %w", expectedTs.Height(), err) + } + return &types.IndexValidation{ - TipsetKey: expectedTs.Key().String(), - Height: uint64(expectedTs.Height()), - // TODO Other fields + TipsetKey: expectedTs.Key().String(), + Height: uint64(expectedTs.Height()), + TotalMessages: indexedNonRevertedMsgCount, + TotalEvents: indexedNonRevertedEventsCount, + EventsReverted: hasRevertedEvents, }, nil } +func (si *SqliteIndexer) getIndexedTipSetData(ctx context.Context, tsKey types.TipSetKey) (messageCount uint64, eventCount uint64, hasRevertedEvents bool, err error) { + tsKeyBytes := tsKey.Bytes() + + err = withTx(ctx, si.db, func(tx *sql.Tx) error { + err := tx.Stmt(si.stmts.getNonRevertedTipsetMessageCountStmt).QueryRowContext(ctx, tsKeyBytes).Scan(&messageCount) + if err != nil { + return xerrors.Errorf("failed to query non reverted message count: %w", err) + } + + err = tx.Stmt(si.stmts.getNonRevertedTipsetEventCountStmt).QueryRowContext(ctx, tsKeyBytes).Scan(&eventCount) + if err != nil { + return xerrors.Errorf("failed to query non reverted event count: %w", err) + } + + err = tx.Stmt(si.stmts.hasRevertedEventsStmt).QueryRowContext(ctx, tsKeyBytes).Scan(&hasRevertedEvents) + if err != nil { + return xerrors.Errorf("failed to check for reverted events: %w", err) + } + + return nil + }) + + return messageCount, eventCount, hasRevertedEvents, err +} + func (si *SqliteIndexer) backfillMissingTipset(ctx context.Context, ts *types.TipSet, backfill bool) (*types.IndexValidation, error) { if !backfill { return nil, xerrors.Errorf("missing tipset at height %d in the chain index, set backfill to true to backfill", ts.Height()) @@ -159,9 +192,17 @@ func (si *SqliteIndexer) backfillMissingTipset(ctx context.Context, ts *types.Ti return nil, xerrors.Errorf("error applying tipset: %w", err) } + indexedNonRevertedMsgCount, indexedNonRevertedEventsCount, hasRevertedEvents, err := si.getIndexedTipSetData(ctx, ts.Key()) + if err != nil { + return nil, xerrors.Errorf("failed to get tipset message and event counts at height %d: %w", ts.Height(), err) + } + return &types.IndexValidation{ - TipsetKey: ts.Key().String(), - Height: uint64(ts.Height()), - Backfilled: true, + TipsetKey: ts.Key().String(), + Height: uint64(ts.Height()), + Backfilled: true, + TotalMessages: indexedNonRevertedMsgCount, + TotalEvents: indexedNonRevertedEventsCount, + EventsReverted: hasRevertedEvents, }, nil } diff --git a/chain/index/ddls.go b/chain/index/ddls.go index 8a1927430e..1037c2453f 100644 --- a/chain/index/ddls.go +++ b/chain/index/ddls.go @@ -82,5 +82,8 @@ func preparedStatementMapping(ps *preparedStatements) map[**sql.Stmt]string { &ps.hasNullRoundAtHeightStmt: "SELECT NOT EXISTS(SELECT 1 FROM tipset_message WHERE height = ?)", &ps.getNonRevertedTipsetAtHeightStmt: "SELECT tipset_key_cid FROM tipset_message WHERE height = ? AND reverted = 0", &ps.countTipsetsAtHeightStmt: "SELECT COUNT(CASE WHEN reverted = 1 THEN 1 END) AS reverted_count, COUNT(CASE WHEN reverted = 0 THEN 1 END) AS non_reverted_count FROM (SELECT tipset_key_cid, MAX(reverted) AS reverted FROM tipset_message WHERE height = ? GROUP BY tipset_key_cid) AS unique_tipsets", + &ps.getNonRevertedTipsetMessageCountStmt: "SELECT COUNT(*) FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0", + &ps.getNonRevertedTipsetEventCountStmt: "SELECT COUNT(*) FROM event WHERE message_id IN (SELECT message_id FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0)", + &ps.hasRevertedEventsStmt: "SELECT EXISTS (SELECT 1 FROM event WHERE reverted = 1 AND message_id IN (SELECT message_id FROM tipset_message WHERE tipset_key_cid = ?))", } } diff --git a/chain/index/indexer.go b/chain/index/indexer.go index 18dc71a019..06b6ecbc14 100644 --- a/chain/index/indexer.go +++ b/chain/index/indexer.go @@ -48,6 +48,10 @@ type preparedStatements struct { hasNullRoundAtHeightStmt *sql.Stmt getNonRevertedTipsetAtHeightStmt *sql.Stmt countTipsetsAtHeightStmt *sql.Stmt + + getNonRevertedTipsetMessageCountStmt *sql.Stmt + getNonRevertedTipsetEventCountStmt *sql.Stmt + hasRevertedEventsStmt *sql.Stmt } type SqliteIndexer struct { From e84a794bb932ee1a5ec12fb2e9d7a86e3c97288f Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Mon, 16 Sep 2024 13:35:16 +0530 Subject: [PATCH 2/6] feat: add indexed data verification with chain store --- chain/index/api.go | 35 ++++++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/chain/index/api.go b/chain/index/api.go index 797e20124d..8dea97b078 100644 --- a/chain/index/api.go +++ b/chain/index/api.go @@ -136,6 +136,10 @@ func (si *SqliteIndexer) ChainValidateIndex(ctx context.Context, epoch abi.Chain return nil, xerrors.Errorf("failed to get tipset message and event counts at height %d: %w", expectedTs.Height(), err) } + if err = si.verifyIndexedData(ctx, expectedTs, indexedNonRevertedMsgCount); err != nil { + return nil, xerrors.Errorf("failed to verify indexed data at height %d: %w", expectedTs.Height(), err) + } + return &types.IndexValidation{ TipsetKey: expectedTs.Key().String(), Height: uint64(expectedTs.Height()), @@ -145,23 +149,40 @@ func (si *SqliteIndexer) ChainValidateIndex(ctx context.Context, epoch abi.Chain }, nil } +// verifyIndexedData verifies that the indexed data for a tipset is correct +// by comparing the number of messages in the chainstore to the number of messages indexed + +// TODO: verify indexed events too (to verify the events we need to load the next tipset (ts+1) and verify the events are the same) +func (si *SqliteIndexer) verifyIndexedData(ctx context.Context, ts *types.TipSet, indexedMsgCount uint64) (err error) { + msgs, err := si.cs.MessagesForTipset(ctx, ts) + if err != nil { + return xerrors.Errorf("failed to get messages for tipset: %w", err) + } + + msgCount := uint64(len(msgs)) + if msgCount != indexedMsgCount { + return xerrors.Errorf("tipset message count mismatch: chainstore has %d, index has %d", msgCount, indexedMsgCount) + } + + return nil +} + func (si *SqliteIndexer) getIndexedTipSetData(ctx context.Context, tsKey types.TipSetKey) (messageCount uint64, eventCount uint64, hasRevertedEvents bool, err error) { tsKeyBytes := tsKey.Bytes() err = withTx(ctx, si.db, func(tx *sql.Tx) error { - err := tx.Stmt(si.stmts.getNonRevertedTipsetMessageCountStmt).QueryRowContext(ctx, tsKeyBytes).Scan(&messageCount) - if err != nil { + if err = tx.Stmt(si.stmts.getNonRevertedTipsetMessageCountStmt).QueryRowContext(ctx, tsKeyBytes).Scan(&messageCount); err != nil { return xerrors.Errorf("failed to query non reverted message count: %w", err) } - err = tx.Stmt(si.stmts.getNonRevertedTipsetEventCountStmt).QueryRowContext(ctx, tsKeyBytes).Scan(&eventCount) - if err != nil { + if err = tx.Stmt(si.stmts.getNonRevertedTipsetEventCountStmt).QueryRowContext(ctx, tsKeyBytes).Scan(&eventCount); err != nil { return xerrors.Errorf("failed to query non reverted event count: %w", err) } - err = tx.Stmt(si.stmts.hasRevertedEventsStmt).QueryRowContext(ctx, tsKeyBytes).Scan(&hasRevertedEvents) - if err != nil { - return xerrors.Errorf("failed to check for reverted events: %w", err) + if eventCount > 0 { + if err = tx.Stmt(si.stmts.hasRevertedEventsStmt).QueryRowContext(ctx, tsKeyBytes).Scan(&hasRevertedEvents); err != nil { + return xerrors.Errorf("failed to check for reverted events: %w", err) + } } return nil From 1b5767204bc61c392964978009526d815a8e3d06 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Mon, 16 Sep 2024 19:56:13 +0530 Subject: [PATCH 3/6] feat: address comments and finish TODO --- chain/index/api.go | 118 ++++++++++++++++++++++++++++----------- chain/index/gc.go | 5 +- chain/index/indexer.go | 24 ++++---- chain/index/read.go | 10 +--- chain/index/reconcile.go | 5 +- 5 files changed, 101 insertions(+), 61 deletions(-) diff --git a/chain/index/api.go b/chain/index/api.go index 8dea97b078..a904e01eed 100644 --- a/chain/index/api.go +++ b/chain/index/api.go @@ -57,7 +57,7 @@ func (si *SqliteIndexer) getTipsetCountsAtHeight(ctx context.Context, height abi } func (si *SqliteIndexer) ChainValidateIndex(ctx context.Context, epoch abi.ChainEpoch, backfill bool) (*types.IndexValidation, error) { - if !si.started && si.closed { + if !si.started && si.isClosed() { return nil, xerrors.Errorf("ChainValidateIndex can only be called after the indexer has been started and not closed") } @@ -131,56 +131,53 @@ func (si *SqliteIndexer) ChainValidateIndex(ctx context.Context, epoch abi.Chain return nil, xerrors.Errorf("index corruption: non-reverted tipset at height %d has key %s, but canonical chain has %s", epoch, indexedTsKeyCid, expectedTsKeyCid) } - indexedNonRevertedMsgCount, indexedNonRevertedEventsCount, hasRevertedEvents, err := si.getIndexedTipSetData(ctx, expectedTs.Key()) + // indexedTsKeyCid and expectedTsKeyCid are the same, so we can use `expectedTs` to fetch the indexed data + indexedData, err := si.getIndexedTipSetData(ctx, expectedTs.Key()) if err != nil { return nil, xerrors.Errorf("failed to get tipset message and event counts at height %d: %w", expectedTs.Height(), err) } - if err = si.verifyIndexedData(ctx, expectedTs, indexedNonRevertedMsgCount); err != nil { + if err = si.verifyIndexedData(ctx, expectedTs, indexedData); err != nil { return nil, xerrors.Errorf("failed to verify indexed data at height %d: %w", expectedTs.Height(), err) } return &types.IndexValidation{ TipsetKey: expectedTs.Key().String(), Height: uint64(expectedTs.Height()), - TotalMessages: indexedNonRevertedMsgCount, - TotalEvents: indexedNonRevertedEventsCount, - EventsReverted: hasRevertedEvents, + TotalMessages: uint64(indexedData.nonRevertedMessageCount), + TotalEvents: uint64(indexedData.nonRevertedEventCount), + EventsReverted: indexedData.hasRevertedEvents, }, nil } -// verifyIndexedData verifies that the indexed data for a tipset is correct -// by comparing the number of messages in the chainstore to the number of messages indexed +type indexedTipSetData struct { + nonRevertedMessageCount int + nonRevertedEventCount int + hasRevertedEvents bool +} -// TODO: verify indexed events too (to verify the events we need to load the next tipset (ts+1) and verify the events are the same) -func (si *SqliteIndexer) verifyIndexedData(ctx context.Context, ts *types.TipSet, indexedMsgCount uint64) (err error) { - msgs, err := si.cs.MessagesForTipset(ctx, ts) +// getIndexedTipSetData fetches the indexed tipset data for a tipset +func (si *SqliteIndexer) getIndexedTipSetData(ctx context.Context, tsKey types.TipSetKey) (*indexedTipSetData, error) { + cid, err := tsKey.Cid() if err != nil { - return xerrors.Errorf("failed to get messages for tipset: %w", err) - } - - msgCount := uint64(len(msgs)) - if msgCount != indexedMsgCount { - return xerrors.Errorf("tipset message count mismatch: chainstore has %d, index has %d", msgCount, indexedMsgCount) + return nil, xerrors.Errorf("failed to get tipset key cid: %w", err) } + tsKeyBytes := cid.Bytes() - return nil -} - -func (si *SqliteIndexer) getIndexedTipSetData(ctx context.Context, tsKey types.TipSetKey) (messageCount uint64, eventCount uint64, hasRevertedEvents bool, err error) { - tsKeyBytes := tsKey.Bytes() - + var data indexedTipSetData err = withTx(ctx, si.db, func(tx *sql.Tx) error { - if err = tx.Stmt(si.stmts.getNonRevertedTipsetMessageCountStmt).QueryRowContext(ctx, tsKeyBytes).Scan(&messageCount); err != nil { + if err = tx.Stmt(si.stmts.getNonRevertedTipsetMessageCountStmt).QueryRowContext(ctx, tsKeyBytes).Scan(&data.nonRevertedMessageCount); err != nil { return xerrors.Errorf("failed to query non reverted message count: %w", err) } - if err = tx.Stmt(si.stmts.getNonRevertedTipsetEventCountStmt).QueryRowContext(ctx, tsKeyBytes).Scan(&eventCount); err != nil { + if err = tx.Stmt(si.stmts.getNonRevertedTipsetEventCountStmt).QueryRowContext(ctx, tsKeyBytes).Scan(&data.nonRevertedEventCount); err != nil { return xerrors.Errorf("failed to query non reverted event count: %w", err) } - if eventCount > 0 { - if err = tx.Stmt(si.stmts.hasRevertedEventsStmt).QueryRowContext(ctx, tsKeyBytes).Scan(&hasRevertedEvents); err != nil { + // we are only fetching non-reverted events, so if there are no non-reverted events, + // then we need to check if the events were reverted. + if data.nonRevertedEventCount == 0 { + if err = tx.Stmt(si.stmts.hasRevertedEventsStmt).QueryRowContext(ctx, tsKeyBytes).Scan(&data.hasRevertedEvents); err != nil { return xerrors.Errorf("failed to check for reverted events: %w", err) } } @@ -188,7 +185,64 @@ func (si *SqliteIndexer) getIndexedTipSetData(ctx context.Context, tsKey types.T return nil }) - return messageCount, eventCount, hasRevertedEvents, err + return &data, err +} + +// verifyIndexedData verifies that the indexed data for a tipset is correct +// by comparing the number of messages and events in the chainstore to the number of messages and events indexed. +// NOTE: Events are loaded from the executed messages of the tipset at the next epoch (ts.Height() + 1). +func (si *SqliteIndexer) verifyIndexedData(ctx context.Context, ts *types.TipSet, indexedData *indexedTipSetData) (err error) { + tsKeyCid, err := ts.Key().Cid() + if err != nil { + return xerrors.Errorf("failed to get tipset key cid: %w", err) + } + + msgs, err := si.cs.MessagesForTipset(ctx, ts) + if err != nil { + return xerrors.Errorf("failed to get messages for tipset: %w", err) + } + + msgCount := len(msgs) + if msgCount != indexedData.nonRevertedMessageCount { + return xerrors.Errorf("tipset message count mismatch: chainstore has %d, index has %d", msgCount, indexedData.nonRevertedMessageCount) + } + + // get the tipset where the messages of `ts` will be executed (deferred execution) + executionTs, err := si.cs.GetTipsetByHeight(ctx, ts.Height()+1, nil, false) + if err != nil { + return xerrors.Errorf("failed to get tipset by height: %w", err) + } + + eParentTsKeyCid, err := executionTs.Parents().Cid() + if err != nil { + return xerrors.Errorf("failed to get execution tipset parent key cid: %w", err) + } + + // the parent tipset of the execution tipset should be the same as the indexed tipset (`ts` should be the parent of `executionTs`) + if !eParentTsKeyCid.Equals(tsKeyCid) { + return xerrors.Errorf("execution tipset parent key mismatch: chainstore has %s, index has %s", eParentTsKeyCid, tsKeyCid) + } + + executedMsgs, err := si.loadExecutedMessages(ctx, ts, executionTs) + if err != nil { + return xerrors.Errorf("failed to load executed messages: %w", err) + } + + totalEventsCount := 0 + for _, emsg := range executedMsgs { + totalEventsCount += len(emsg.evs) + } + + if totalEventsCount != indexedData.nonRevertedEventCount { + return xerrors.Errorf("tipset event count mismatch: chainstore has %d, index has %d", totalEventsCount, indexedData.nonRevertedEventCount) + } + + totalExecutedMsgCount := len(executedMsgs) + if totalExecutedMsgCount != int(indexedData.nonRevertedMessageCount) { + return xerrors.Errorf("tipset executed message count mismatch: chainstore has %d, index has %d", totalExecutedMsgCount, indexedData.nonRevertedMessageCount) + } + + return nil } func (si *SqliteIndexer) backfillMissingTipset(ctx context.Context, ts *types.TipSet, backfill bool) (*types.IndexValidation, error) { @@ -213,7 +267,7 @@ func (si *SqliteIndexer) backfillMissingTipset(ctx context.Context, ts *types.Ti return nil, xerrors.Errorf("error applying tipset: %w", err) } - indexedNonRevertedMsgCount, indexedNonRevertedEventsCount, hasRevertedEvents, err := si.getIndexedTipSetData(ctx, ts.Key()) + indexedData, err := si.getIndexedTipSetData(ctx, ts.Key()) if err != nil { return nil, xerrors.Errorf("failed to get tipset message and event counts at height %d: %w", ts.Height(), err) } @@ -222,8 +276,8 @@ func (si *SqliteIndexer) backfillMissingTipset(ctx context.Context, ts *types.Ti TipsetKey: ts.Key().String(), Height: uint64(ts.Height()), Backfilled: true, - TotalMessages: indexedNonRevertedMsgCount, - TotalEvents: indexedNonRevertedEventsCount, - EventsReverted: hasRevertedEvents, + TotalMessages: indexedData.nonRevertedMessageCount, + TotalEvents: indexedData.nonRevertedEventCount, + EventsReverted: indexedData.hasRevertedEvents, }, nil } diff --git a/chain/index/gc.go b/chain/index/gc.go index e30d4700aa..0b34eefcfb 100644 --- a/chain/index/gc.go +++ b/chain/index/gc.go @@ -25,12 +25,9 @@ func (si *SqliteIndexer) gcLoop() { defer cleanupTicker.Stop() for si.ctx.Err() == nil { - si.closeLk.RLock() - if si.closed { - si.closeLk.RUnlock() + if si.isClosed() { return } - si.closeLk.RUnlock() select { case <-cleanupTicker.C: diff --git a/chain/index/indexer.go b/chain/index/indexer.go index 06b6ecbc14..4be5e91c41 100644 --- a/chain/index/indexer.go +++ b/chain/index/indexer.go @@ -177,12 +177,9 @@ func (si *SqliteIndexer) initStatements() error { } func (si *SqliteIndexer) IndexEthTxHash(ctx context.Context, txHash ethtypes.EthHash, msgCid cid.Cid) error { - si.closeLk.RLock() - if si.closed { - si.closeLk.RUnlock() + if si.isClosed() { return ErrClosed } - si.closeLk.RUnlock() return withTx(ctx, si.db, func(tx *sql.Tx) error { return si.indexEthTxHash(ctx, tx, txHash, msgCid) @@ -203,12 +200,10 @@ func (si *SqliteIndexer) IndexSignedMessage(ctx context.Context, msg *types.Sign if msg.Signature.Type != crypto.SigTypeDelegated { return nil } - si.closeLk.RLock() - if si.closed { - si.closeLk.RUnlock() + + if si.isClosed() { return ErrClosed } - si.closeLk.RUnlock() return withTx(ctx, si.db, func(tx *sql.Tx) error { return si.indexSignedMessage(ctx, tx, msg) @@ -231,7 +226,7 @@ func (si *SqliteIndexer) indexSignedMessage(ctx context.Context, tx *sql.Tx, msg func (si *SqliteIndexer) Apply(ctx context.Context, from, to *types.TipSet) error { si.closeLk.RLock() - if si.closed { + if si.isClosed() { si.closeLk.RUnlock() return ErrClosed } @@ -350,12 +345,9 @@ func (si *SqliteIndexer) restoreTipsetIfExists(ctx context.Context, tx *sql.Tx, } func (si *SqliteIndexer) Revert(ctx context.Context, from, to *types.TipSet) error { - si.closeLk.RLock() - if si.closed { - si.closeLk.RUnlock() + if si.isClosed() { return ErrClosed } - si.closeLk.RUnlock() si.writerLk.Lock() defer si.writerLk.Unlock() @@ -402,3 +394,9 @@ func (si *SqliteIndexer) Revert(ctx context.Context, from, to *types.TipSet) err return nil } + +func (si *SqliteIndexer) isClosed() bool { + si.closeLk.RLock() + defer si.closeLk.RUnlock() + return si.closed +} diff --git a/chain/index/read.go b/chain/index/read.go index 3d03e7957c..cee9b2c428 100644 --- a/chain/index/read.go +++ b/chain/index/read.go @@ -16,12 +16,9 @@ import ( const headIndexedWaitTimeout = 5 * time.Second func (si *SqliteIndexer) GetCidFromHash(ctx context.Context, txHash ethtypes.EthHash) (cid.Cid, error) { - si.closeLk.RLock() - if si.closed { - si.closeLk.RUnlock() + if si.isClosed() { return cid.Undef, ErrClosed } - si.closeLk.RUnlock() var msgCidBytes []byte @@ -44,12 +41,9 @@ func (si *SqliteIndexer) queryMsgCidFromEthHash(ctx context.Context, txHash etht } func (si *SqliteIndexer) GetMsgInfo(ctx context.Context, messageCid cid.Cid) (*MsgInfo, error) { - si.closeLk.RLock() - if si.closed { - si.closeLk.RUnlock() + if si.isClosed() { return nil, ErrClosed } - si.closeLk.RUnlock() var tipsetKeyCidBytes []byte var height int64 diff --git a/chain/index/reconcile.go b/chain/index/reconcile.go index c2fdb0e8f3..d0d64532f7 100644 --- a/chain/index/reconcile.go +++ b/chain/index/reconcile.go @@ -34,12 +34,9 @@ func (si *SqliteIndexer) ReconcileWithChain(ctx context.Context, head *types.Tip log.Warn("chain indexer is not storing events during reconciliation; please ensure this is intentional") } - si.closeLk.RLock() - if si.closed { - si.closeLk.RUnlock() + if si.isClosed() { return ErrClosed } - si.closeLk.RUnlock() if head == nil { return nil From 0d22bcfc3f4b1243d5ea1dfc7ceb7b61a8056ca0 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Mon, 16 Sep 2024 20:01:18 +0530 Subject: [PATCH 4/6] fix: build issue --- chain/index/api.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/chain/index/api.go b/chain/index/api.go index a904e01eed..07cf2b770e 100644 --- a/chain/index/api.go +++ b/chain/index/api.go @@ -276,8 +276,8 @@ func (si *SqliteIndexer) backfillMissingTipset(ctx context.Context, ts *types.Ti TipsetKey: ts.Key().String(), Height: uint64(ts.Height()), Backfilled: true, - TotalMessages: indexedData.nonRevertedMessageCount, - TotalEvents: indexedData.nonRevertedEventCount, + TotalMessages: uint64(indexedData.nonRevertedMessageCount), + TotalEvents: uint64(indexedData.nonRevertedEventCount), EventsReverted: indexedData.hasRevertedEvents, }, nil } From 8f2fa8116e04d2e14d745bf0535f785f463e4594 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Tue, 17 Sep 2024 12:29:33 +0530 Subject: [PATCH 5/6] address comments --- chain/index/api.go | 66 +++++++++++++++------------------------ chain/index/ddls.go | 1 - chain/index/indexer.go | 1 - chain/types/index.go | 7 ++--- itests/eth_filter_test.go | 19 +++++++++-- 5 files changed, 46 insertions(+), 48 deletions(-) diff --git a/chain/index/api.go b/chain/index/api.go index 07cf2b770e..054322e682 100644 --- a/chain/index/api.go +++ b/chain/index/api.go @@ -57,8 +57,12 @@ func (si *SqliteIndexer) getTipsetCountsAtHeight(ctx context.Context, height abi } func (si *SqliteIndexer) ChainValidateIndex(ctx context.Context, epoch abi.ChainEpoch, backfill bool) (*types.IndexValidation, error) { - if !si.started && si.isClosed() { - return nil, xerrors.Errorf("ChainValidateIndex can only be called after the indexer has been started and not closed") + if !si.started { + return nil, xerrors.Errorf("ChainValidateIndex can only be called after the indexer has been started") + } + + if si.isClosed() { + return nil, xerrors.Errorf("ChainValidateIndex can only be called before the indexer has been closed") } si.writerLk.Lock() @@ -132,56 +136,49 @@ func (si *SqliteIndexer) ChainValidateIndex(ctx context.Context, epoch abi.Chain } // indexedTsKeyCid and expectedTsKeyCid are the same, so we can use `expectedTs` to fetch the indexed data - indexedData, err := si.getIndexedTipSetData(ctx, expectedTs.Key()) + indexedData, err := si.getIndexedTipSetData(ctx, expectedTs) if err != nil { return nil, xerrors.Errorf("failed to get tipset message and event counts at height %d: %w", expectedTs.Height(), err) } + if indexedData == nil { + return nil, xerrors.Errorf("invalid indexed data for tipset at height %d", expectedTs.Height()) + } + if err = si.verifyIndexedData(ctx, expectedTs, indexedData); err != nil { return nil, xerrors.Errorf("failed to verify indexed data at height %d: %w", expectedTs.Height(), err) } return &types.IndexValidation{ - TipsetKey: expectedTs.Key().String(), - Height: uint64(expectedTs.Height()), - TotalMessages: uint64(indexedData.nonRevertedMessageCount), - TotalEvents: uint64(indexedData.nonRevertedEventCount), - EventsReverted: indexedData.hasRevertedEvents, + TipsetKey: expectedTs.Key().String(), + Height: uint64(expectedTs.Height()), + NonRevertedMessageCount: uint64(indexedData.nonRevertedMessageCount), + NonRevertedEventsCount: uint64(indexedData.nonRevertedEventCount), }, nil } type indexedTipSetData struct { nonRevertedMessageCount int nonRevertedEventCount int - hasRevertedEvents bool } // getIndexedTipSetData fetches the indexed tipset data for a tipset -func (si *SqliteIndexer) getIndexedTipSetData(ctx context.Context, tsKey types.TipSetKey) (*indexedTipSetData, error) { - cid, err := tsKey.Cid() +func (si *SqliteIndexer) getIndexedTipSetData(ctx context.Context, ts *types.TipSet) (*indexedTipSetData, error) { + tsKeyCidBytes, err := toTipsetKeyCidBytes(ts) if err != nil { return nil, xerrors.Errorf("failed to get tipset key cid: %w", err) } - tsKeyBytes := cid.Bytes() var data indexedTipSetData err = withTx(ctx, si.db, func(tx *sql.Tx) error { - if err = tx.Stmt(si.stmts.getNonRevertedTipsetMessageCountStmt).QueryRowContext(ctx, tsKeyBytes).Scan(&data.nonRevertedMessageCount); err != nil { + if err = tx.Stmt(si.stmts.getNonRevertedTipsetMessageCountStmt).QueryRowContext(ctx, tsKeyCidBytes).Scan(&data.nonRevertedMessageCount); err != nil { return xerrors.Errorf("failed to query non reverted message count: %w", err) } - if err = tx.Stmt(si.stmts.getNonRevertedTipsetEventCountStmt).QueryRowContext(ctx, tsKeyBytes).Scan(&data.nonRevertedEventCount); err != nil { + if err = tx.Stmt(si.stmts.getNonRevertedTipsetEventCountStmt).QueryRowContext(ctx, tsKeyCidBytes).Scan(&data.nonRevertedEventCount); err != nil { return xerrors.Errorf("failed to query non reverted event count: %w", err) } - // we are only fetching non-reverted events, so if there are no non-reverted events, - // then we need to check if the events were reverted. - if data.nonRevertedEventCount == 0 { - if err = tx.Stmt(si.stmts.hasRevertedEventsStmt).QueryRowContext(ctx, tsKeyBytes).Scan(&data.hasRevertedEvents); err != nil { - return xerrors.Errorf("failed to check for reverted events: %w", err) - } - } - return nil }) @@ -197,16 +194,6 @@ func (si *SqliteIndexer) verifyIndexedData(ctx context.Context, ts *types.TipSet return xerrors.Errorf("failed to get tipset key cid: %w", err) } - msgs, err := si.cs.MessagesForTipset(ctx, ts) - if err != nil { - return xerrors.Errorf("failed to get messages for tipset: %w", err) - } - - msgCount := len(msgs) - if msgCount != indexedData.nonRevertedMessageCount { - return xerrors.Errorf("tipset message count mismatch: chainstore has %d, index has %d", msgCount, indexedData.nonRevertedMessageCount) - } - // get the tipset where the messages of `ts` will be executed (deferred execution) executionTs, err := si.cs.GetTipsetByHeight(ctx, ts.Height()+1, nil, false) if err != nil { @@ -238,7 +225,7 @@ func (si *SqliteIndexer) verifyIndexedData(ctx context.Context, ts *types.TipSet } totalExecutedMsgCount := len(executedMsgs) - if totalExecutedMsgCount != int(indexedData.nonRevertedMessageCount) { + if totalExecutedMsgCount != indexedData.nonRevertedMessageCount { return xerrors.Errorf("tipset executed message count mismatch: chainstore has %d, index has %d", totalExecutedMsgCount, indexedData.nonRevertedMessageCount) } @@ -267,17 +254,16 @@ func (si *SqliteIndexer) backfillMissingTipset(ctx context.Context, ts *types.Ti return nil, xerrors.Errorf("error applying tipset: %w", err) } - indexedData, err := si.getIndexedTipSetData(ctx, ts.Key()) + indexedData, err := si.getIndexedTipSetData(ctx, ts) if err != nil { return nil, xerrors.Errorf("failed to get tipset message and event counts at height %d: %w", ts.Height(), err) } return &types.IndexValidation{ - TipsetKey: ts.Key().String(), - Height: uint64(ts.Height()), - Backfilled: true, - TotalMessages: uint64(indexedData.nonRevertedMessageCount), - TotalEvents: uint64(indexedData.nonRevertedEventCount), - EventsReverted: indexedData.hasRevertedEvents, + TipsetKey: ts.Key().String(), + Height: uint64(ts.Height()), + Backfilled: true, + NonRevertedMessageCount: uint64(indexedData.nonRevertedMessageCount), + NonRevertedEventsCount: uint64(indexedData.nonRevertedEventCount), }, nil } diff --git a/chain/index/ddls.go b/chain/index/ddls.go index 1037c2453f..922c1c46e2 100644 --- a/chain/index/ddls.go +++ b/chain/index/ddls.go @@ -84,6 +84,5 @@ func preparedStatementMapping(ps *preparedStatements) map[**sql.Stmt]string { &ps.countTipsetsAtHeightStmt: "SELECT COUNT(CASE WHEN reverted = 1 THEN 1 END) AS reverted_count, COUNT(CASE WHEN reverted = 0 THEN 1 END) AS non_reverted_count FROM (SELECT tipset_key_cid, MAX(reverted) AS reverted FROM tipset_message WHERE height = ? GROUP BY tipset_key_cid) AS unique_tipsets", &ps.getNonRevertedTipsetMessageCountStmt: "SELECT COUNT(*) FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0", &ps.getNonRevertedTipsetEventCountStmt: "SELECT COUNT(*) FROM event WHERE message_id IN (SELECT message_id FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0)", - &ps.hasRevertedEventsStmt: "SELECT EXISTS (SELECT 1 FROM event WHERE reverted = 1 AND message_id IN (SELECT message_id FROM tipset_message WHERE tipset_key_cid = ?))", } } diff --git a/chain/index/indexer.go b/chain/index/indexer.go index 4be5e91c41..e6756c07bd 100644 --- a/chain/index/indexer.go +++ b/chain/index/indexer.go @@ -51,7 +51,6 @@ type preparedStatements struct { getNonRevertedTipsetMessageCountStmt *sql.Stmt getNonRevertedTipsetEventCountStmt *sql.Stmt - hasRevertedEventsStmt *sql.Stmt } type SqliteIndexer struct { diff --git a/chain/types/index.go b/chain/types/index.go index a43e2a9019..631aaebd42 100644 --- a/chain/types/index.go +++ b/chain/types/index.go @@ -4,8 +4,7 @@ type IndexValidation struct { TipsetKey string Height uint64 - TotalMessages uint64 - TotalEvents uint64 - EventsReverted bool - Backfilled bool + NonRevertedMessageCount uint64 + NonRevertedEventsCount uint64 + Backfilled bool } diff --git a/itests/eth_filter_test.go b/itests/eth_filter_test.go index ba86ab57c8..67dba8f1ef 100644 --- a/itests/eth_filter_test.go +++ b/itests/eth_filter_test.go @@ -525,16 +525,31 @@ func TestEthGetLogsBasic(t *testing.T) { AssertEthLogs(t, rctLogs, expected, received) - iv, err := client.ChainValidateIndex(ctx, abi.ChainEpoch(0), false) + epoch := uint64(0) + iv, err := client.ChainValidateIndex(ctx, abi.ChainEpoch(epoch), false) require.NoError(err) require.NotNil(iv) fmt.Printf("index validation: %v\n", iv) - iv, err = client.ChainValidateIndex(ctx, abi.ChainEpoch(22), false) + // Add assertions for IndexValidation fields + require.NotEmpty(t, iv.TipsetKey, "TipsetKey should not be empty") + require.Equal(t, epoch, iv.Height, "Height should be 0") + require.GreaterOrEqual(t, iv.NonRevertedMessageCount, uint64(0), "NonRevertedMessageCount should be non-negative") // TODO: change according to actual number of messages in the tipset + require.GreaterOrEqual(t, iv.NonRevertedEventsCount, uint64(0), "NonRevertedEventsCount should be non-negative") // TODO: change according to actual number of messages in the tipset + require.False(iv.Backfilled, "Backfilled should be flase") + + epoch = 22 + iv, err = client.ChainValidateIndex(ctx, abi.ChainEpoch(epoch), false) require.NoError(err) require.NotNil(iv) fmt.Printf("index validation: %v\n", iv) + + require.NotEmpty(t, iv.TipsetKey, "TipsetKey should not be empty") + require.Equal(t, epoch, iv.Height, "Height should be 22") + require.GreaterOrEqual(t, iv.NonRevertedMessageCount, uint64(0), "NonRevertedMessageCount be non-negative") // TODO: change according to actual number of messages in the tipset + require.GreaterOrEqual(t, iv.NonRevertedEventsCount, uint64(0), "NonRevertedEventsCount be non-negative") // TODO: change according to actual number of messages in the tipset + require.True(iv.Backfilled, "Backfilled should be false") } func TestEthSubscribeLogsNoTopicSpec(t *testing.T) { From f0b4739b44a75376863bb4d4e7002cc063021836 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Tue, 17 Sep 2024 12:35:45 +0530 Subject: [PATCH 6/6] fix: ci issue --- build/openrpc/full.json | 4 ---- 1 file changed, 4 deletions(-) diff --git a/build/openrpc/full.json b/build/openrpc/full.json index 8236985c92..ec52f0da4e 100644 --- a/build/openrpc/full.json +++ b/build/openrpc/full.json @@ -2059,7 +2059,6 @@ "Height": 42, "TotalMessages": 42, "TotalEvents": 42, - "EventsReverted": true, "Backfilled": true } ], @@ -2068,9 +2067,6 @@ "Backfilled": { "type": "boolean" }, - "EventsReverted": { - "type": "boolean" - }, "Height": { "title": "number", "type": "number"