Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add finalityConfirmation to substrate processor #315

Merged
merged 4 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@subsquid/substrate-data-raw",
"comment": "allow to use finalityConfirmation to determine finalized blocks",
"type": "minor"
}
],
"packageName": "@subsquid/substrate-data-raw"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@subsquid/substrate-data",
"comment": "accept finalityConfirmation in `RpcDataSourceOptions`",
"type": "minor"
}
],
"packageName": "@subsquid/substrate-data"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@subsquid/substrate-processor",
"comment": "allow to use finalityConfirmation if `chain_getFinalizedHead` is not an option",
"type": "minor"
}
],
"packageName": "@subsquid/substrate-processor"
}
89 changes: 63 additions & 26 deletions substrate/substrate-data-raw/src/datasource.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import {Logger, LogLevel} from '@subsquid/logger'
import {RpcClient} from '@subsquid/rpc-client'
import {RpcClient, SubscriptionHandle} from '@subsquid/rpc-client'
import {AsyncQueue, ensureError, maybeLast, partitionBy, Throttler, wait} from '@subsquid/util-internal'
import {
assertIsValid,
Batch,
BlockConsistencyError,
BlockRef,
ChainHeads,
DataConsistencyError,
HashAndHeight,
HotProcessor,
HotState,
Expand All @@ -34,6 +33,7 @@ export interface RpcDataSourceOptions {
rpc: RpcClient
headPollInterval?: number
newHeadTimeout?: number
finalityConfirmation?: number
log?: Logger
}

Expand All @@ -42,20 +42,50 @@ export class RpcDataSource {
public readonly rpc: Rpc
private headPollInterval: number
private newHeadTimeout: number
private finalityConfirmation?: number
private log?: Logger

constructor(options: RpcDataSourceOptions) {
this.rpc = new Rpc(options.rpc)
this.headPollInterval = options.headPollInterval ?? 5000
this.newHeadTimeout = options.newHeadTimeout ?? 0
this.finalityConfirmation = options.finalityConfirmation
this.log = options.log
}

async getFinalizedHeight(): Promise<number> {
let head = await this.rpc.getFinalizedHead()
let header = await this.rpc.getBlockHeader(head)
assert(header, 'finalized blocks must be always available')
return qty2Int(header.number)
if (this.finalityConfirmation == null) {
let head = await this.rpc.getFinalizedHead()
let header = await this.rpc.getBlockHeader(head)
assert(header, 'finalized blocks must be always available')
return qty2Int(header.number)
} else {
let retries = 0
while (retries < 5) {
let head = await this.rpc.getHead()
let header = await this.rpc.getBlockHeader(head)
belopash marked this conversation as resolved.
Show resolved Hide resolved
if (header == null) {
this.log?.debug(`"${head}" block has no corresponding header. will retry`)
retries += 1
continue
}
return Math.max(0, qty2Int(header.number) - this.finalityConfirmation)
}
throw new Error('Cannot determine head of the chain')
}
}

async getFinalizedHead(best: string): Promise<string> {
belopash marked this conversation as resolved.
Show resolved Hide resolved
if (this.finalityConfirmation == null) {
return this.rpc.getFinalizedHead()
} else {
let header = await this.rpc.getBlockHeader(best)
assert(header)
let height = qty2Int(header.number) - this.finalityConfirmation
let hash = await this.rpc.getBlockHash(height)
assert(hash)
belopash marked this conversation as resolved.
Show resolved Hide resolved
return hash
}
}

async *getFinalizedBlocks(requests: RangeRequestList<DataRequest>, stopOnHead?: boolean): AsyncIterable<Batch<BlockData>> {
Expand Down Expand Up @@ -129,7 +159,7 @@ export class RpcDataSource {
for (let split of splitRangeByRequest(requests, {from, to: top})) {
for (let range of splitRange(10, split.range)) {
let blocks = await fetch.getHotSplit(
from,
range.from,
range.to === headBlock?.height ? headBlock : range.to,
split.request || {}
)
Expand Down Expand Up @@ -172,7 +202,7 @@ export class RpcDataSource {
while (!isEnd()) {
let head = await headSrc.call()
if (head === prev) continue
let finalizedHead = await this.rpc.getFinalizedHead()
let finalizedHead = await this.getFinalizedHead(head)
await this.handleNewHeads({
best: {hash: head},
finalized: {hash: finalizedHead}
Expand All @@ -181,27 +211,31 @@ export class RpcDataSource {
}

private async subscription(cb: (heads: ChainHeads) => Promise<void>, isEnd: () => boolean): Promise<void> {
let finalityConfirmation = this.finalityConfirmation
let queue = new AsyncQueue<number | Error>(1)
let finalizedHeight = 0
let prevHeight = 0

let finalizedHeadsHandle = this.rpc.client.subscribe<BlockHeader>({
method: 'chain_subscribeFinalizedHeads',
unsubscribe: 'chain_unsubscribeFinalizedHeads',
notification: 'chain_finalizedHead',
onMessage(head: BlockHeader) {
try {
let height = qty2Int(head.number)
finalizedHeight = Math.max(finalizedHeight, height)
} catch(err: any) {
close(err)
}
},
onError(err: Error) {
close(ensureError(err))
},
resubscribeOnConnectionLoss: true
})
let finalizedHeadsHandle: SubscriptionHandle | undefined
if (finalityConfirmation == null) {
finalizedHeadsHandle = this.rpc.client.subscribe<BlockHeader>({
method: 'chain_subscribeFinalizedHeads',
unsubscribe: 'chain_unsubscribeFinalizedHeads',
notification: 'chain_finalizedHead',
onMessage(head: BlockHeader) {
try {
let height = qty2Int(head.number)
finalizedHeight = Math.max(finalizedHeight, height)
} catch(err: any) {
close(err)
}
},
onError(err: Error) {
close(ensureError(err))
},
resubscribeOnConnectionLoss: true
})
}

let newHeadsHandle = this.rpc.client.subscribe<BlockHeader>({
method: 'chain_subscribeNewHeads',
Expand All @@ -212,6 +246,9 @@ export class RpcDataSource {
let height = qty2Int(head.number)
if (height >= prevHeight) {
prevHeight = height
if (finalityConfirmation != null) {
finalizedHeight = Math.max(0, height - finalityConfirmation)
}
queue.forcePut(height)
}
} catch(err: any) {
Expand All @@ -226,7 +263,7 @@ export class RpcDataSource {

function close(err?: Error) {
newHeadsHandle.close()
finalizedHeadsHandle.close()
finalizedHeadsHandle?.close()
if (err) {
queue.forcePut(err)
}
Expand Down
4 changes: 3 additions & 1 deletion substrate/substrate-data/src/datasource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export interface RpcDataSourceOptions {
headPollInterval?: number
newHeadTimeout?: number
typesBundle?: OldTypesBundle | OldSpecsBundle
finalityConfirmation?: number
}


Expand All @@ -23,7 +24,8 @@ export class RpcDataSource {
this.rawDataSource = new raw.RpcDataSource({
rpc: options.rpc,
headPollInterval: options.headPollInterval,
newHeadTimeout: options.newHeadTimeout
newHeadTimeout: options.newHeadTimeout,
finalityConfirmation: options.finalityConfirmation
})
this.typesBundle = options.typesBundle
}
Expand Down
1 change: 1 addition & 0 deletions substrate/substrate-processor/src/ds-rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export interface RpcDataSourceOptions {
headPollInterval?: number
newHeadTimeout?: number
typesBundle?: OldTypesBundle | OldSpecsBundle
finalityConfirmation?: number
}


Expand Down
17 changes: 16 additions & 1 deletion substrate/substrate-processor/src/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ export class SubstrateBatchProcessor<F extends FieldSelection = {}> {
private requests: RangeRequest<DataRequest>[] = []
private fields?: FieldSelection
private blockRange?: Range
private finalityConfirmation?: number
private archive?: GatewaySettings
private rpcEndpoint?: RpcEndpointSettings
private rpcIngestSettings?: RpcDataIngestionSettings
Expand Down Expand Up @@ -285,6 +286,18 @@ export class SubstrateBatchProcessor<F extends FieldSelection = {}> {
return this
}

/**
* Distance from the head block behind which all blocks are considered to be finalized.
*
* By default, the processor will track finalized blocks via `chain_getFinalizedHead`.
* Configure it only if `chain_getFinalizedHead` doesn’t return the expected info.
*/
setFinalityConfirmation(nBlocks: number): this {
this.assertNotRunning()
this.finalityConfirmation = nBlocks
return this
}

/**
* Configure a set of fetched fields
*/
Expand Down Expand Up @@ -456,7 +469,8 @@ export class SubstrateBatchProcessor<F extends FieldSelection = {}> {
rpc: this.getChainRpcClient(),
headPollInterval: this.rpcIngestSettings?.headPollInterval,
newHeadTimeout: this.rpcIngestSettings?.newHeadTimeout,
typesBundle: this.typesBundle
typesBundle: this.typesBundle,
finalityConfirmation: this.finalityConfirmation
})
}

Expand Down Expand Up @@ -586,6 +600,7 @@ export class SubstrateBatchProcessor<F extends FieldSelection = {}> {
requests: this.getBatchRequests(),
archive: this.archive == null ? undefined : this.getArchiveDataSource(),
hotDataSource: this.rpcIngestSettings?.disabled ? undefined : this.getRpcDataSource(),
allBlocksAreFinal: this.finalityConfirmation === 0,
process: (s, b) => this.processBatch(s, b as any, handler),
prometheus: this.prometheus,
log
Expand Down
Loading