Skip to content

Commit

Permalink
substrate: allow to configure finalityConfirmation in substrate proce…
Browse files Browse the repository at this point in the history
…ssor (#315)

* add finalityConfirmation to substrate processor

* simplify getFinalizedHeight if finalityConfirmation is specified

* retry in case of reorg

* retry both rpc calls
  • Loading branch information
tmcgroul committed Jul 23, 2024
1 parent 412c471 commit c91a92c
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@subsquid/substrate-data-raw",
"comment": "allow to use finalityConfirmation to determine finalized blocks",
"type": "minor"
}
],
"packageName": "@subsquid/substrate-data-raw"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@subsquid/substrate-data",
"comment": "accept finalityConfirmation in `RpcDataSourceOptions`",
"type": "minor"
}
],
"packageName": "@subsquid/substrate-data"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@subsquid/substrate-processor",
"comment": "allow to use finalityConfirmation if `chain_getFinalizedHead` is not an option",
"type": "minor"
}
],
"packageName": "@subsquid/substrate-processor"
}
85 changes: 59 additions & 26 deletions substrate/substrate-data-raw/src/datasource.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import {Logger, LogLevel} from '@subsquid/logger'
import {RpcClient} from '@subsquid/rpc-client'
import {RpcClient, SubscriptionHandle} from '@subsquid/rpc-client'
import {AsyncQueue, ensureError, maybeLast, partitionBy, Throttler, wait} from '@subsquid/util-internal'
import {
assertIsValid,
Batch,
BlockConsistencyError,
BlockRef,
ChainHeads,
DataConsistencyError,
HashAndHeight,
HotProcessor,
HotState,
Expand All @@ -34,6 +33,7 @@ export interface RpcDataSourceOptions {
rpc: RpcClient
headPollInterval?: number
newHeadTimeout?: number
finalityConfirmation?: number
log?: Logger
}

Expand All @@ -42,20 +42,28 @@ export class RpcDataSource {
public readonly rpc: Rpc
private headPollInterval: number
private newHeadTimeout: number
private finalityConfirmation?: number
private log?: Logger

constructor(options: RpcDataSourceOptions) {
this.rpc = new Rpc(options.rpc)
this.headPollInterval = options.headPollInterval ?? 5000
this.newHeadTimeout = options.newHeadTimeout ?? 0
this.finalityConfirmation = options.finalityConfirmation
this.log = options.log
}

async getFinalizedHeight(): Promise<number> {
let head = await this.rpc.getFinalizedHead()
let header = await this.rpc.getBlockHeader(head)
assert(header, 'finalized blocks must be always available')
return qty2Int(header.number)
if (this.finalityConfirmation == null) {
let head = await this.rpc.getFinalizedHead()
let header = await this.rpc.getBlockHeader(head)
assert(header, 'finalized blocks must be always available')
return qty2Int(header.number)
} else {
let header = await this.rpc.getBlockHeader()
assert(header, 'the header of the latest block on the chain must be always available')
return Math.max(0, qty2Int(header.number) - this.finalityConfirmation)
}
}

async *getFinalizedBlocks(requests: RangeRequestList<DataRequest>, stopOnHead?: boolean): AsyncIterable<Batch<BlockData>> {
Expand Down Expand Up @@ -129,7 +137,7 @@ export class RpcDataSource {
for (let split of splitRangeByRequest(requests, {from, to: top})) {
for (let range of splitRange(10, split.range)) {
let blocks = await fetch.getHotSplit(
from,
range.from,
range.to === headBlock?.height ? headBlock : range.to,
split.request || {}
)
Expand Down Expand Up @@ -172,7 +180,25 @@ export class RpcDataSource {
while (!isEnd()) {
let head = await headSrc.call()
if (head === prev) continue
let finalizedHead = await this.rpc.getFinalizedHead()

let finalizedHead: string | null = null
if (this.finalityConfirmation == null) {
finalizedHead = await this.rpc.getFinalizedHead()
} else {
let attempt = 0
while (attempt < 5) {
let header = await this.rpc.getBlockHeader(head)
if (header != null) {
let finalizedHeight = qty2Int(header.number) - this.finalityConfirmation
finalizedHead = await this.rpc.getBlockHash(finalizedHeight)
if (finalizedHead != null) break
}

head = await this.rpc.getHead()
attempt += 1
}
assert(finalizedHead != null, `failed to get finalized head after ${attempt} attempts`)
}
await this.handleNewHeads({
best: {hash: head},
finalized: {hash: finalizedHead}
Expand All @@ -181,27 +207,31 @@ export class RpcDataSource {
}

private async subscription(cb: (heads: ChainHeads) => Promise<void>, isEnd: () => boolean): Promise<void> {
let finalityConfirmation = this.finalityConfirmation
let queue = new AsyncQueue<number | Error>(1)
let finalizedHeight = 0
let prevHeight = 0

let finalizedHeadsHandle = this.rpc.client.subscribe<BlockHeader>({
method: 'chain_subscribeFinalizedHeads',
unsubscribe: 'chain_unsubscribeFinalizedHeads',
notification: 'chain_finalizedHead',
onMessage(head: BlockHeader) {
try {
let height = qty2Int(head.number)
finalizedHeight = Math.max(finalizedHeight, height)
} catch(err: any) {
close(err)
}
},
onError(err: Error) {
close(ensureError(err))
},
resubscribeOnConnectionLoss: true
})
let finalizedHeadsHandle: SubscriptionHandle | undefined
if (finalityConfirmation == null) {
finalizedHeadsHandle = this.rpc.client.subscribe<BlockHeader>({
method: 'chain_subscribeFinalizedHeads',
unsubscribe: 'chain_unsubscribeFinalizedHeads',
notification: 'chain_finalizedHead',
onMessage(head: BlockHeader) {
try {
let height = qty2Int(head.number)
finalizedHeight = Math.max(finalizedHeight, height)
} catch(err: any) {
close(err)
}
},
onError(err: Error) {
close(ensureError(err))
},
resubscribeOnConnectionLoss: true
})
}

let newHeadsHandle = this.rpc.client.subscribe<BlockHeader>({
method: 'chain_subscribeNewHeads',
Expand All @@ -212,6 +242,9 @@ export class RpcDataSource {
let height = qty2Int(head.number)
if (height >= prevHeight) {
prevHeight = height
if (finalityConfirmation != null) {
finalizedHeight = Math.max(0, height - finalityConfirmation)
}
queue.forcePut(height)
}
} catch(err: any) {
Expand All @@ -226,7 +259,7 @@ export class RpcDataSource {

function close(err?: Error) {
newHeadsHandle.close()
finalizedHeadsHandle.close()
finalizedHeadsHandle?.close()
if (err) {
queue.forcePut(err)
}
Expand Down
2 changes: 1 addition & 1 deletion substrate/substrate-data-raw/src/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export class Rpc {
return this.call('chain_getBlockHash', [toQty(height)])
}

getBlockHeader(blockHash: Hash): Promise<BlockHeader | null> {
getBlockHeader(blockHash?: Hash): Promise<BlockHeader | null> {
return this.call('chain_getHeader', [blockHash])
}

Expand Down
4 changes: 3 additions & 1 deletion substrate/substrate-data/src/datasource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export interface RpcDataSourceOptions {
headPollInterval?: number
newHeadTimeout?: number
typesBundle?: OldTypesBundle | OldSpecsBundle
finalityConfirmation?: number
}


Expand All @@ -23,7 +24,8 @@ export class RpcDataSource {
this.rawDataSource = new raw.RpcDataSource({
rpc: options.rpc,
headPollInterval: options.headPollInterval,
newHeadTimeout: options.newHeadTimeout
newHeadTimeout: options.newHeadTimeout,
finalityConfirmation: options.finalityConfirmation
})
this.typesBundle = options.typesBundle
}
Expand Down
1 change: 1 addition & 0 deletions substrate/substrate-processor/src/ds-rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export interface RpcDataSourceOptions {
headPollInterval?: number
newHeadTimeout?: number
typesBundle?: OldTypesBundle | OldSpecsBundle
finalityConfirmation?: number
}


Expand Down
17 changes: 16 additions & 1 deletion substrate/substrate-processor/src/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ export class SubstrateBatchProcessor<F extends FieldSelection = {}> {
private requests: RangeRequest<DataRequest>[] = []
private fields?: FieldSelection
private blockRange?: Range
private finalityConfirmation?: number
private archive?: GatewaySettings
private rpcEndpoint?: RpcEndpointSettings
private rpcIngestSettings?: RpcDataIngestionSettings
Expand Down Expand Up @@ -285,6 +286,18 @@ export class SubstrateBatchProcessor<F extends FieldSelection = {}> {
return this
}

/**
* Distance from the head block behind which all blocks are considered to be finalized.
*
* By default, the processor will track finalized blocks via `chain_getFinalizedHead`.
* Configure it only if `chain_getFinalizedHead` doesn’t return the expected info.
*/
setFinalityConfirmation(nBlocks: number): this {
this.assertNotRunning()
this.finalityConfirmation = nBlocks
return this
}

/**
* Configure a set of fetched fields
*/
Expand Down Expand Up @@ -456,7 +469,8 @@ export class SubstrateBatchProcessor<F extends FieldSelection = {}> {
rpc: this.getChainRpcClient(),
headPollInterval: this.rpcIngestSettings?.headPollInterval,
newHeadTimeout: this.rpcIngestSettings?.newHeadTimeout,
typesBundle: this.typesBundle
typesBundle: this.typesBundle,
finalityConfirmation: this.finalityConfirmation
})
}

Expand Down Expand Up @@ -586,6 +600,7 @@ export class SubstrateBatchProcessor<F extends FieldSelection = {}> {
requests: this.getBatchRequests(),
archive: this.archive == null ? undefined : this.getArchiveDataSource(),
hotDataSource: this.rpcIngestSettings?.disabled ? undefined : this.getRpcDataSource(),
allBlocksAreFinal: this.finalityConfirmation === 0,
process: (s, b) => this.processBatch(s, b as any, handler),
prometheus: this.prometheus,
log
Expand Down

0 comments on commit c91a92c

Please sign in to comment.