Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: chain indexer todos [skip changelog] #12462

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions build/openrpc/full.json
Original file line number Diff line number Diff line change
Expand Up @@ -2059,7 +2059,6 @@
"Height": 42,
"TotalMessages": 42,
"TotalEvents": 42,
"EventsReverted": true,
"Backfilled": true
}
],
Expand All @@ -2068,9 +2067,6 @@
"Backfilled": {
"type": "boolean"
},
"EventsReverted": {
"type": "boolean"
},
"Height": {
"title": "number",
"type": "number"
Expand Down
116 changes: 109 additions & 7 deletions chain/index/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ func (si *SqliteIndexer) ChainValidateIndex(ctx context.Context, epoch abi.Chain
if !si.started {
return nil, xerrors.Errorf("ChainValidateIndex can only be called after the indexer has been started")
}

if si.isClosed() {
return nil, xerrors.Errorf("ChainValidateIndex can only be called before the indexer has been closed")
}

si.writerLk.Lock()
defer si.writerLk.Unlock()

Expand Down Expand Up @@ -126,17 +131,107 @@ func (si *SqliteIndexer) ChainValidateIndex(ctx context.Context, epoch abi.Chain
return nil, xerrors.Errorf("failed to cast tipset key cid: %w", err)
}

if indexedTsKeyCid != expectedTsKeyCid {
if !indexedTsKeyCid.Equals(expectedTsKeyCid) {
return nil, xerrors.Errorf("index corruption: non-reverted tipset at height %d has key %s, but canonical chain has %s", epoch, indexedTsKeyCid, expectedTsKeyCid)
}

// indexedTsKeyCid and expectedTsKeyCid are the same, so we can use `expectedTs` to fetch the indexed data
indexedData, err := si.getIndexedTipSetData(ctx, expectedTs)
if err != nil {
return nil, xerrors.Errorf("failed to get tipset message and event counts at height %d: %w", expectedTs.Height(), err)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ensure indexedData is not nil before proceeding here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, it will only be nil if error is there and if there is error and indexedData is not nil then that means issue is in DB, so we should return.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but no harm in checking it and returning an error of it is nil here so user's node dosen't crash if there exists a bug that causes a nil value here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if indexedData == nil {
return nil, xerrors.Errorf("invalid indexed data for tipset at height %d", expectedTs.Height())
}

if err = si.verifyIndexedData(ctx, expectedTs, indexedData); err != nil {
return nil, xerrors.Errorf("failed to verify indexed data at height %d: %w", expectedTs.Height(), err)
}

return &types.IndexValidation{
TipsetKey: expectedTs.Key().String(),
Height: uint64(expectedTs.Height()),
// TODO Other fields
TipsetKey: expectedTs.Key().String(),
Height: uint64(expectedTs.Height()),
NonRevertedMessageCount: uint64(indexedData.nonRevertedMessageCount),
NonRevertedEventsCount: uint64(indexedData.nonRevertedEventCount),
}, nil
}

type indexedTipSetData struct {
nonRevertedMessageCount int
nonRevertedEventCount int
}

// getIndexedTipSetData fetches the indexed tipset data for a tipset
func (si *SqliteIndexer) getIndexedTipSetData(ctx context.Context, ts *types.TipSet) (*indexedTipSetData, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about you just return a mostly-filled IndexValidation from this that can be built on or merged from here? alternatively just 2 ints since that's all its doing

tsKeyCidBytes, err := toTipsetKeyCidBytes(ts)
if err != nil {
return nil, xerrors.Errorf("failed to get tipset key cid: %w", err)
}

var data indexedTipSetData
err = withTx(ctx, si.db, func(tx *sql.Tx) error {
if err = tx.Stmt(si.stmts.getNonRevertedTipsetMessageCountStmt).QueryRowContext(ctx, tsKeyCidBytes).Scan(&data.nonRevertedMessageCount); err != nil {
return xerrors.Errorf("failed to query non reverted message count: %w", err)
}

if err = tx.Stmt(si.stmts.getNonRevertedTipsetEventCountStmt).QueryRowContext(ctx, tsKeyCidBytes).Scan(&data.nonRevertedEventCount); err != nil {
return xerrors.Errorf("failed to query non reverted event count: %w", err)
}

return nil
})

return &data, err
}

// verifyIndexedData verifies that the indexed data for a tipset is correct
// by comparing the number of messages and events in the chainstore to the number of messages and events indexed.
// NOTE: Events are loaded from the executed messages of the tipset at the next epoch (ts.Height() + 1).
func (si *SqliteIndexer) verifyIndexedData(ctx context.Context, ts *types.TipSet, indexedData *indexedTipSetData) (err error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I see why you want a struct for this, but it's still just 2 ints and it might even be clearer to provide them as named arguments here

tsKeyCid, err := ts.Key().Cid()
if err != nil {
return xerrors.Errorf("failed to get tipset key cid: %w", err)
}

// get the tipset where the messages of `ts` will be executed (deferred execution)
executionTs, err := si.cs.GetTipsetByHeight(ctx, ts.Height()+1, nil, false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see discussion in #12447 (comment), same problem here I think, particularly as you get close to the tip of the chain

Copy link
Contributor

@aarshkshah1992 aarshkshah1992 Sep 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rvagg Which is why we error out here if the parent of this tipset is not the original tipset. We can then ask the user to retry as that means that the chain has forked between the two tipsets.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to re-execute the tipset/regenerate events here.

if err != nil {
return xerrors.Errorf("failed to get tipset by height: %w", err)
}

eParentTsKeyCid, err := executionTs.Parents().Cid()
if err != nil {
return xerrors.Errorf("failed to get execution tipset parent key cid: %w", err)
}

// the parent tipset of the execution tipset should be the same as the indexed tipset (`ts` should be the parent of `executionTs`)
if !eParentTsKeyCid.Equals(tsKeyCid) {
return xerrors.Errorf("execution tipset parent key mismatch: chainstore has %s, index has %s", eParentTsKeyCid, tsKeyCid)
}

executedMsgs, err := si.loadExecutedMessages(ctx, ts, executionTs)
if err != nil {
return xerrors.Errorf("failed to load executed messages: %w", err)
}

totalEventsCount := 0
for _, emsg := range executedMsgs {
totalEventsCount += len(emsg.evs)
}

if totalEventsCount != indexedData.nonRevertedEventCount {
return xerrors.Errorf("tipset event count mismatch: chainstore has %d, index has %d", totalEventsCount, indexedData.nonRevertedEventCount)
}

totalExecutedMsgCount := len(executedMsgs)
if totalExecutedMsgCount != indexedData.nonRevertedMessageCount {
return xerrors.Errorf("tipset executed message count mismatch: chainstore has %d, index has %d", totalExecutedMsgCount, indexedData.nonRevertedMessageCount)
}

return nil
}

func (si *SqliteIndexer) backfillMissingTipset(ctx context.Context, ts *types.TipSet, backfill bool) (*types.IndexValidation, error) {
if !backfill {
return nil, xerrors.Errorf("missing tipset at height %d in the chain index, set backfill to true to backfill", ts.Height())
Expand All @@ -159,9 +254,16 @@ func (si *SqliteIndexer) backfillMissingTipset(ctx context.Context, ts *types.Ti
return nil, xerrors.Errorf("error applying tipset: %w", err)
}

indexedData, err := si.getIndexedTipSetData(ctx, ts)
if err != nil {
return nil, xerrors.Errorf("failed to get tipset message and event counts at height %d: %w", ts.Height(), err)
}

return &types.IndexValidation{
TipsetKey: ts.Key().String(),
Height: uint64(ts.Height()),
Backfilled: true,
TipsetKey: ts.Key().String(),
Height: uint64(ts.Height()),
Backfilled: true,
NonRevertedMessageCount: uint64(indexedData.nonRevertedMessageCount),
NonRevertedEventsCount: uint64(indexedData.nonRevertedEventCount),
}, nil
}
2 changes: 2 additions & 0 deletions chain/index/ddls.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,7 @@ func preparedStatementMapping(ps *preparedStatements) map[**sql.Stmt]string {
&ps.hasNullRoundAtHeightStmt: "SELECT NOT EXISTS(SELECT 1 FROM tipset_message WHERE height = ?)",
&ps.getNonRevertedTipsetAtHeightStmt: "SELECT tipset_key_cid FROM tipset_message WHERE height = ? AND reverted = 0",
&ps.countTipsetsAtHeightStmt: "SELECT COUNT(CASE WHEN reverted = 1 THEN 1 END) AS reverted_count, COUNT(CASE WHEN reverted = 0 THEN 1 END) AS non_reverted_count FROM (SELECT tipset_key_cid, MAX(reverted) AS reverted FROM tipset_message WHERE height = ? GROUP BY tipset_key_cid) AS unique_tipsets",
&ps.getNonRevertedTipsetMessageCountStmt: "SELECT COUNT(*) FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0",
&ps.getNonRevertedTipsetEventCountStmt: "SELECT COUNT(*) FROM event WHERE message_id IN (SELECT message_id FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0)",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akaladarshi What if the tipset is NOT reverted but the events are reverted ? This query only ensures that the tipset is NOT reverted. Does it have any implications on the correctness of the code /API ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you say events are reverted that must mean that it's corresponding message is also reverted.
Shouldn't we mark message inside that tipset reverted as well?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be corruption if one were reverted and the other not? is that something we should also be checking for?

}
}
5 changes: 1 addition & 4 deletions chain/index/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,9 @@ func (si *SqliteIndexer) gcLoop() {
defer cleanupTicker.Stop()

for si.ctx.Err() == nil {
si.closeLk.RLock()
if si.closed {
si.closeLk.RUnlock()
if si.isClosed() {
return
}
si.closeLk.RUnlock()

select {
case <-cleanupTicker.C:
Expand Down
27 changes: 14 additions & 13 deletions chain/index/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type preparedStatements struct {
hasNullRoundAtHeightStmt *sql.Stmt
getNonRevertedTipsetAtHeightStmt *sql.Stmt
countTipsetsAtHeightStmt *sql.Stmt

getNonRevertedTipsetMessageCountStmt *sql.Stmt
getNonRevertedTipsetEventCountStmt *sql.Stmt
}

type SqliteIndexer struct {
Expand Down Expand Up @@ -173,12 +176,9 @@ func (si *SqliteIndexer) initStatements() error {
}

func (si *SqliteIndexer) IndexEthTxHash(ctx context.Context, txHash ethtypes.EthHash, msgCid cid.Cid) error {
si.closeLk.RLock()
if si.closed {
si.closeLk.RUnlock()
if si.isClosed() {
return ErrClosed
}
si.closeLk.RUnlock()

return withTx(ctx, si.db, func(tx *sql.Tx) error {
return si.indexEthTxHash(ctx, tx, txHash, msgCid)
Expand All @@ -199,12 +199,10 @@ func (si *SqliteIndexer) IndexSignedMessage(ctx context.Context, msg *types.Sign
if msg.Signature.Type != crypto.SigTypeDelegated {
return nil
}
si.closeLk.RLock()
if si.closed {
si.closeLk.RUnlock()

if si.isClosed() {
return ErrClosed
}
si.closeLk.RUnlock()

return withTx(ctx, si.db, func(tx *sql.Tx) error {
return si.indexSignedMessage(ctx, tx, msg)
Expand All @@ -227,7 +225,7 @@ func (si *SqliteIndexer) indexSignedMessage(ctx context.Context, tx *sql.Tx, msg

func (si *SqliteIndexer) Apply(ctx context.Context, from, to *types.TipSet) error {
si.closeLk.RLock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double lock here because you do it in isClosed too

if si.closed {
if si.isClosed() {
si.closeLk.RUnlock()
return ErrClosed
}
Expand Down Expand Up @@ -346,12 +344,9 @@ func (si *SqliteIndexer) restoreTipsetIfExists(ctx context.Context, tx *sql.Tx,
}

func (si *SqliteIndexer) Revert(ctx context.Context, from, to *types.TipSet) error {
si.closeLk.RLock()
if si.closed {
si.closeLk.RUnlock()
if si.isClosed() {
return ErrClosed
}
si.closeLk.RUnlock()

si.writerLk.Lock()
defer si.writerLk.Unlock()
Expand Down Expand Up @@ -398,3 +393,9 @@ func (si *SqliteIndexer) Revert(ctx context.Context, from, to *types.TipSet) err

return nil
}

func (si *SqliteIndexer) isClosed() bool {
si.closeLk.RLock()
defer si.closeLk.RUnlock()
return si.closed
}
10 changes: 2 additions & 8 deletions chain/index/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,9 @@ import (
const headIndexedWaitTimeout = 5 * time.Second

func (si *SqliteIndexer) GetCidFromHash(ctx context.Context, txHash ethtypes.EthHash) (cid.Cid, error) {
si.closeLk.RLock()
if si.closed {
si.closeLk.RUnlock()
if si.isClosed() {
return cid.Undef, ErrClosed
}
si.closeLk.RUnlock()

var msgCidBytes []byte

Expand All @@ -44,12 +41,9 @@ func (si *SqliteIndexer) queryMsgCidFromEthHash(ctx context.Context, txHash etht
}

func (si *SqliteIndexer) GetMsgInfo(ctx context.Context, messageCid cid.Cid) (*MsgInfo, error) {
si.closeLk.RLock()
if si.closed {
si.closeLk.RUnlock()
if si.isClosed() {
return nil, ErrClosed
}
si.closeLk.RUnlock()

var tipsetKeyCidBytes []byte
var height int64
Expand Down
5 changes: 1 addition & 4 deletions chain/index/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,9 @@ func (si *SqliteIndexer) ReconcileWithChain(ctx context.Context, head *types.Tip
log.Warn("chain indexer is not storing events during reconciliation; please ensure this is intentional")
}

si.closeLk.RLock()
if si.closed {
si.closeLk.RUnlock()
if si.isClosed() {
return ErrClosed
}
si.closeLk.RUnlock()

if head == nil {
return nil
Expand Down
7 changes: 3 additions & 4 deletions chain/types/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ type IndexValidation struct {
TipsetKey string
Height uint64

TotalMessages uint64
TotalEvents uint64
EventsReverted bool
Backfilled bool
NonRevertedMessageCount uint64
NonRevertedEventsCount uint64
Backfilled bool
}
19 changes: 17 additions & 2 deletions itests/eth_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,16 +525,31 @@ func TestEthGetLogsBasic(t *testing.T) {

AssertEthLogs(t, rctLogs, expected, received)

iv, err := client.ChainValidateIndex(ctx, abi.ChainEpoch(0), false)
epoch := uint64(0)
iv, err := client.ChainValidateIndex(ctx, abi.ChainEpoch(epoch), false)
require.NoError(err)
require.NotNil(iv)

fmt.Printf("index validation: %v\n", iv)

iv, err = client.ChainValidateIndex(ctx, abi.ChainEpoch(22), false)
// Add assertions for IndexValidation fields
require.NotEmpty(t, iv.TipsetKey, "TipsetKey should not be empty")
require.Equal(t, epoch, iv.Height, "Height should be 0")
require.GreaterOrEqual(t, iv.NonRevertedMessageCount, uint64(0), "NonRevertedMessageCount should be non-negative") // TODO: change according to actual number of messages in the tipset
require.GreaterOrEqual(t, iv.NonRevertedEventsCount, uint64(0), "NonRevertedEventsCount should be non-negative") // TODO: change according to actual number of messages in the tipset
require.False(iv.Backfilled, "Backfilled should be flase")

epoch = 22
iv, err = client.ChainValidateIndex(ctx, abi.ChainEpoch(epoch), false)
require.NoError(err)
require.NotNil(iv)
fmt.Printf("index validation: %v\n", iv)

require.NotEmpty(t, iv.TipsetKey, "TipsetKey should not be empty")
require.Equal(t, epoch, iv.Height, "Height should be 22")
require.GreaterOrEqual(t, iv.NonRevertedMessageCount, uint64(0), "NonRevertedMessageCount be non-negative") // TODO: change according to actual number of messages in the tipset
require.GreaterOrEqual(t, iv.NonRevertedEventsCount, uint64(0), "NonRevertedEventsCount be non-negative") // TODO: change according to actual number of messages in the tipset
require.True(iv.Backfilled, "Backfilled should be false")
}

func TestEthSubscribeLogsNoTopicSpec(t *testing.T) {
Expand Down
Loading