Skip to content

Commit

Permalink
feat: lotus-shed tooling for chain indexer (#12474)
Browse files Browse the repository at this point in the history
* feat: add lotus-shed command for backfilling chain indexer

* feat: add lotus-shed command for inspecting the chain indexer

* feat: use single lotus-shed command to inspect and backfill

* fix: remove the unused queries

* small changes

* add change log
  • Loading branch information
akaladarshi committed Sep 18, 2024
1 parent 11712bf commit e7506cc
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 1 deletion.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Lotus changelog

# UNRELEASED

- https://github.com/filecoin-project/lotus/pull/12474: feat: lotus-shed tooling for chain indexer

## ☢️ Upgrade Warnings ☢️

## New features
Expand Down
177 changes: 177 additions & 0 deletions cmd/lotus-shed/indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"math"
"path"
Expand Down Expand Up @@ -51,6 +52,7 @@ var indexesCmd = &cli.Command{
withCategory("txhash", backfillTxHashCmd),
withCategory("events", backfillEventsCmd),
withCategory("events", inspectEventsCmd),
withCategory("chainindex_validation", validateChainIndexCmd),
},
}

Expand Down Expand Up @@ -879,3 +881,178 @@ var backfillTxHashCmd = &cli.Command{
return nil
},
}

type IndexValidationJSON struct {
IndexedTipsetKey types.TipSetKey `json:"indexed_tipset_key"`
IndexedHeight uint64 `json:"indexed_height"`
IndexedMsgCount uint64 `json:"indexed_message_count"`
IndexedEventsCount uint64 `json:"indexed_events_count"`
}

var validateChainIndexCmd = &cli.Command{
Name: "validate-chainindex ",
Usage: "Validates the chainindex for a range of tipset epochs",
Flags: []cli.Flag{
&cli.IntFlag{
Name: "from",
Usage: "The tipset height (epoch) to start backfilling from (0 is head of chain)",
},
&cli.IntFlag{
Name: "to",
Usage: "The tipset height (epoch) to end backfilling at",
Required: true,
},
&cli.BoolFlag{
Name: "backfill",
Usage: "Backfill missing index entries while validating the chain index. When enabled, the command will perform backfilling for any missing indexes (default: true)",
Value: true,
},
&cli.BoolFlag{
Name: "failfast",
Usage: "Failfast when enabled, the validation process will terminate immediately upon encountering the first error (default: true)",
Value: true,
},
&cli.BoolFlag{
Name: "output",
Usage: "Output the backfilling results in JSON format",
Value: false,
},
},
Action: func(cctx *cli.Context) error {
srv, err := lcli.GetFullNodeServices(cctx)
if err != nil {
return fmt.Errorf("failed to get full node services: %w", err)
}
defer func() {
if closeErr := srv.Close(); closeErr != nil {
log.Errorf("Error closing services: %v", closeErr)
}
}()

api := srv.FullNodeAPI()
ctx := lcli.ReqContext(cctx)

fromEpoch := cctx.Int("from")
if fromEpoch == 0 {
curTs, err := api.ChainHead(ctx)
if err != nil {
return err
}
fromEpoch = int(curTs.Height()) - 1
} else {
fromEpoch = fromEpoch - 1
}

if fromEpoch <= 0 {
return fmt.Errorf("invalid from epoch: %d", fromEpoch)
}

toEpoch := cctx.Int("to")
if toEpoch > fromEpoch {
return fmt.Errorf("to epoch must be less than from epoch")
}

backfill := cctx.Bool("backfill")
failfast := cctx.Bool("failfast")
output := cctx.Bool("output")

// Results Tracking
var results []IndexValidationJSON
var backfilledEpochs []int

log.Infof("Starting %s from epoch: %d to epoch: %d",
func() string {
if backfill {
return "backfill and inspect chainindex"
}
return "inspect chainindex"
}(),
fromEpoch, toEpoch)

// starting from the FromEpoch-1 and going down to the ToEpoch
// this is because `ChainValidateIndex` fetches the tipset.height()+1, which might not be available in case of chain head
for epoch := fromEpoch; epoch >= toEpoch; epoch-- {
if ctx.Err() != nil {
return ctx.Err()
}

indexValidateResp, err := api.ChainValidateIndex(ctx, abi.ChainEpoch(epoch), backfill)
if err != nil {
if failfast {
return fmt.Errorf("failed to validate index for epoch %d: %w", epoch, err)
}
log.Warnf("Error validating index for epoch %d: %v", epoch, err)
continue
}

if !indexValidateResp.TipSetKey.IsEmpty() || indexValidateResp.Height != uint64(epoch) && indexValidateResp.Backfilled == backfill {
errMsg := fmt.Sprintf("epoch %d: invalid index validation response: %+v", epoch, indexValidateResp)
if failfast {
return fmt.Errorf(errMsg)
}
log.Warn(errMsg)
continue
}

if backfill {
backfilledEpochs = append(backfilledEpochs, epoch)
}

if output {
results = append(results, IndexValidationJSON{
IndexedTipsetKey: indexValidateResp.TipSetKey,
IndexedHeight: indexValidateResp.Height,
IndexedMsgCount: indexValidateResp.IndexedMessagesCount,
IndexedEventsCount: indexValidateResp.IndexedEventsCount,
})
} else {
logEpochResult(epoch, indexValidateResp)
}
}

// Output JSON Results
if output {
if err := outputResults(results); err != nil {
return err
}
}

// Log Summary
if backfill {
log.Infof("Backfilled epochs: %v", backfilledEpochs)
} else {
log.Infof("Inspection of chain index from epoch %d to %d completed.", fromEpoch, toEpoch)
}

return nil
},
}

// outputResults marshals the results into JSON and outputs them.
func outputResults(results []IndexValidationJSON) error {
jsonData, err := json.MarshalIndent(results, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal results to JSON: %w", err)
}
fmt.Println(string(jsonData))
return nil
}

// logEpochResult logs the result of backfilling for a single epoch.
func logEpochResult(epoch int, indexValidate *types.IndexValidation) {
if indexValidate.Backfilled {
log.Infof("Epoch %d: Backfilled successfully. TipsetKey: %s, TotalMessages: %d, TotalEvents: %d",
epoch,
indexValidate.TipSetKey,
indexValidate.IndexedMessagesCount,
indexValidate.IndexedMessagesCount,
)
} else {
log.Info("Epoch %d: validated successfully. TipsetKey: %s, TotalMessages: %d, TotalEvents: %d",
epoch,
indexValidate.TipSetKey,
indexValidate.IndexedMessagesCount,
indexValidate.IndexedMessagesCount,
)
}
}

0 comments on commit e7506cc

Please sign in to comment.