Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
akaladarshi committed Sep 17, 2024
1 parent 0d22bcf commit 8f2fa81
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 48 deletions.
66 changes: 26 additions & 40 deletions chain/index/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,12 @@ func (si *SqliteIndexer) getTipsetCountsAtHeight(ctx context.Context, height abi
}

func (si *SqliteIndexer) ChainValidateIndex(ctx context.Context, epoch abi.ChainEpoch, backfill bool) (*types.IndexValidation, error) {
if !si.started && si.isClosed() {
return nil, xerrors.Errorf("ChainValidateIndex can only be called after the indexer has been started and not closed")
if !si.started {
return nil, xerrors.Errorf("ChainValidateIndex can only be called after the indexer has been started")
}

if si.isClosed() {
return nil, xerrors.Errorf("ChainValidateIndex can only be called before the indexer has been closed")
}

si.writerLk.Lock()
Expand Down Expand Up @@ -132,56 +136,49 @@ func (si *SqliteIndexer) ChainValidateIndex(ctx context.Context, epoch abi.Chain
}

// indexedTsKeyCid and expectedTsKeyCid are the same, so we can use `expectedTs` to fetch the indexed data
indexedData, err := si.getIndexedTipSetData(ctx, expectedTs.Key())
indexedData, err := si.getIndexedTipSetData(ctx, expectedTs)
if err != nil {
return nil, xerrors.Errorf("failed to get tipset message and event counts at height %d: %w", expectedTs.Height(), err)
}

if indexedData == nil {
return nil, xerrors.Errorf("invalid indexed data for tipset at height %d", expectedTs.Height())
}

if err = si.verifyIndexedData(ctx, expectedTs, indexedData); err != nil {
return nil, xerrors.Errorf("failed to verify indexed data at height %d: %w", expectedTs.Height(), err)
}

return &types.IndexValidation{
TipsetKey: expectedTs.Key().String(),
Height: uint64(expectedTs.Height()),
TotalMessages: uint64(indexedData.nonRevertedMessageCount),
TotalEvents: uint64(indexedData.nonRevertedEventCount),
EventsReverted: indexedData.hasRevertedEvents,
TipsetKey: expectedTs.Key().String(),
Height: uint64(expectedTs.Height()),
NonRevertedMessageCount: uint64(indexedData.nonRevertedMessageCount),
NonRevertedEventsCount: uint64(indexedData.nonRevertedEventCount),
}, nil
}

type indexedTipSetData struct {
nonRevertedMessageCount int
nonRevertedEventCount int
hasRevertedEvents bool
}

// getIndexedTipSetData fetches the indexed tipset data for a tipset
func (si *SqliteIndexer) getIndexedTipSetData(ctx context.Context, tsKey types.TipSetKey) (*indexedTipSetData, error) {
cid, err := tsKey.Cid()
func (si *SqliteIndexer) getIndexedTipSetData(ctx context.Context, ts *types.TipSet) (*indexedTipSetData, error) {
tsKeyCidBytes, err := toTipsetKeyCidBytes(ts)
if err != nil {
return nil, xerrors.Errorf("failed to get tipset key cid: %w", err)
}
tsKeyBytes := cid.Bytes()

var data indexedTipSetData
err = withTx(ctx, si.db, func(tx *sql.Tx) error {
if err = tx.Stmt(si.stmts.getNonRevertedTipsetMessageCountStmt).QueryRowContext(ctx, tsKeyBytes).Scan(&data.nonRevertedMessageCount); err != nil {
if err = tx.Stmt(si.stmts.getNonRevertedTipsetMessageCountStmt).QueryRowContext(ctx, tsKeyCidBytes).Scan(&data.nonRevertedMessageCount); err != nil {
return xerrors.Errorf("failed to query non reverted message count: %w", err)
}

if err = tx.Stmt(si.stmts.getNonRevertedTipsetEventCountStmt).QueryRowContext(ctx, tsKeyBytes).Scan(&data.nonRevertedEventCount); err != nil {
if err = tx.Stmt(si.stmts.getNonRevertedTipsetEventCountStmt).QueryRowContext(ctx, tsKeyCidBytes).Scan(&data.nonRevertedEventCount); err != nil {
return xerrors.Errorf("failed to query non reverted event count: %w", err)
}

// we are only fetching non-reverted events, so if there are no non-reverted events,
// then we need to check if the events were reverted.
if data.nonRevertedEventCount == 0 {
if err = tx.Stmt(si.stmts.hasRevertedEventsStmt).QueryRowContext(ctx, tsKeyBytes).Scan(&data.hasRevertedEvents); err != nil {
return xerrors.Errorf("failed to check for reverted events: %w", err)
}
}

return nil
})

Expand All @@ -197,16 +194,6 @@ func (si *SqliteIndexer) verifyIndexedData(ctx context.Context, ts *types.TipSet
return xerrors.Errorf("failed to get tipset key cid: %w", err)
}

msgs, err := si.cs.MessagesForTipset(ctx, ts)
if err != nil {
return xerrors.Errorf("failed to get messages for tipset: %w", err)
}

msgCount := len(msgs)
if msgCount != indexedData.nonRevertedMessageCount {
return xerrors.Errorf("tipset message count mismatch: chainstore has %d, index has %d", msgCount, indexedData.nonRevertedMessageCount)
}

// get the tipset where the messages of `ts` will be executed (deferred execution)
executionTs, err := si.cs.GetTipsetByHeight(ctx, ts.Height()+1, nil, false)
if err != nil {
Expand Down Expand Up @@ -238,7 +225,7 @@ func (si *SqliteIndexer) verifyIndexedData(ctx context.Context, ts *types.TipSet
}

totalExecutedMsgCount := len(executedMsgs)
if totalExecutedMsgCount != int(indexedData.nonRevertedMessageCount) {
if totalExecutedMsgCount != indexedData.nonRevertedMessageCount {
return xerrors.Errorf("tipset executed message count mismatch: chainstore has %d, index has %d", totalExecutedMsgCount, indexedData.nonRevertedMessageCount)
}

Expand Down Expand Up @@ -267,17 +254,16 @@ func (si *SqliteIndexer) backfillMissingTipset(ctx context.Context, ts *types.Ti
return nil, xerrors.Errorf("error applying tipset: %w", err)
}

indexedData, err := si.getIndexedTipSetData(ctx, ts.Key())
indexedData, err := si.getIndexedTipSetData(ctx, ts)
if err != nil {
return nil, xerrors.Errorf("failed to get tipset message and event counts at height %d: %w", ts.Height(), err)
}

return &types.IndexValidation{
TipsetKey: ts.Key().String(),
Height: uint64(ts.Height()),
Backfilled: true,
TotalMessages: uint64(indexedData.nonRevertedMessageCount),
TotalEvents: uint64(indexedData.nonRevertedEventCount),
EventsReverted: indexedData.hasRevertedEvents,
TipsetKey: ts.Key().String(),
Height: uint64(ts.Height()),
Backfilled: true,
NonRevertedMessageCount: uint64(indexedData.nonRevertedMessageCount),
NonRevertedEventsCount: uint64(indexedData.nonRevertedEventCount),
}, nil
}
1 change: 0 additions & 1 deletion chain/index/ddls.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,5 @@ func preparedStatementMapping(ps *preparedStatements) map[**sql.Stmt]string {
&ps.countTipsetsAtHeightStmt: "SELECT COUNT(CASE WHEN reverted = 1 THEN 1 END) AS reverted_count, COUNT(CASE WHEN reverted = 0 THEN 1 END) AS non_reverted_count FROM (SELECT tipset_key_cid, MAX(reverted) AS reverted FROM tipset_message WHERE height = ? GROUP BY tipset_key_cid) AS unique_tipsets",
&ps.getNonRevertedTipsetMessageCountStmt: "SELECT COUNT(*) FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0",
&ps.getNonRevertedTipsetEventCountStmt: "SELECT COUNT(*) FROM event WHERE message_id IN (SELECT message_id FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0)",
&ps.hasRevertedEventsStmt: "SELECT EXISTS (SELECT 1 FROM event WHERE reverted = 1 AND message_id IN (SELECT message_id FROM tipset_message WHERE tipset_key_cid = ?))",
}
}
1 change: 0 additions & 1 deletion chain/index/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ type preparedStatements struct {

getNonRevertedTipsetMessageCountStmt *sql.Stmt
getNonRevertedTipsetEventCountStmt *sql.Stmt
hasRevertedEventsStmt *sql.Stmt
}

type SqliteIndexer struct {
Expand Down
7 changes: 3 additions & 4 deletions chain/types/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ type IndexValidation struct {
TipsetKey string
Height uint64

TotalMessages uint64
TotalEvents uint64
EventsReverted bool
Backfilled bool
NonRevertedMessageCount uint64
NonRevertedEventsCount uint64
Backfilled bool
}
19 changes: 17 additions & 2 deletions itests/eth_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,16 +525,31 @@ func TestEthGetLogsBasic(t *testing.T) {

AssertEthLogs(t, rctLogs, expected, received)

iv, err := client.ChainValidateIndex(ctx, abi.ChainEpoch(0), false)
epoch := uint64(0)
iv, err := client.ChainValidateIndex(ctx, abi.ChainEpoch(epoch), false)
require.NoError(err)
require.NotNil(iv)

fmt.Printf("index validation: %v\n", iv)

iv, err = client.ChainValidateIndex(ctx, abi.ChainEpoch(22), false)
// Add assertions for IndexValidation fields
require.NotEmpty(t, iv.TipsetKey, "TipsetKey should not be empty")
require.Equal(t, epoch, iv.Height, "Height should be 0")
require.GreaterOrEqual(t, iv.NonRevertedMessageCount, uint64(0), "NonRevertedMessageCount should be non-negative") // TODO: change according to actual number of messages in the tipset
require.GreaterOrEqual(t, iv.NonRevertedEventsCount, uint64(0), "NonRevertedEventsCount should be non-negative") // TODO: change according to actual number of messages in the tipset
require.False(iv.Backfilled, "Backfilled should be flase")

epoch = 22
iv, err = client.ChainValidateIndex(ctx, abi.ChainEpoch(epoch), false)
require.NoError(err)
require.NotNil(iv)
fmt.Printf("index validation: %v\n", iv)

require.NotEmpty(t, iv.TipsetKey, "TipsetKey should not be empty")
require.Equal(t, epoch, iv.Height, "Height should be 22")
require.GreaterOrEqual(t, iv.NonRevertedMessageCount, uint64(0), "NonRevertedMessageCount be non-negative") // TODO: change according to actual number of messages in the tipset
require.GreaterOrEqual(t, iv.NonRevertedEventsCount, uint64(0), "NonRevertedEventsCount be non-negative") // TODO: change according to actual number of messages in the tipset
require.True(iv.Backfilled, "Backfilled should be false")
}

func TestEthSubscribeLogsNoTopicSpec(t *testing.T) {
Expand Down

0 comments on commit 8f2fa81

Please sign in to comment.