diff --git a/eth-providers/src/base-provider.ts b/eth-providers/src/base-provider.ts index 013c45469..c152a7163 100644 --- a/eth-providers/src/base-provider.ts +++ b/eth-providers/src/base-provider.ts @@ -325,52 +325,64 @@ export abstract class BaseProvider extends AbstractProvider { } } - startSubscription = async (): Promise => { + startSubscriptions = async (): Promise => { + await Promise.all([ + this._subscribeEventListeners(), + this._subscribeFinalizedBlock(), + this._subscribeRuntimeVersion(), + ]); + }; + + _subscribeEventListeners = () => { const subscriptionMethod = this.safeMode ? this.api.rpc.chain.subscribeFinalizedHeads.bind(this) : this.api.rpc.chain.subscribeNewHeads.bind(this); - const sub1 = subscriptionMethod(async (header: Header) => { - // block cache + return subscriptionMethod(async (header: Header) => { + // update block cache const blockNumber = header.number.toNumber(); const blockHash = header.hash.toHex(); const txHashes = await this._getTxHashesAtBlock(blockHash); - this.blockCache.addTxsAtBlock(blockNumber, txHashes); // eth_subscribe - const headSubscribers = this.eventListeners[SubscriptionType.NewHeads]; - const logSubscribers = this.eventListeners[SubscriptionType.Logs]; + await this._notifySubscribers(blockNumber); + }); + } - if (headSubscribers.length > 0 || logSubscribers.length > 0) { - const block = await this.getBlockData(blockNumber, false); + _notifySubscribers = async (blockNumber: number) => { + const headSubscribers = this.eventListeners[SubscriptionType.NewHeads]; + const logSubscribers = this.eventListeners[SubscriptionType.Logs]; - const response = hexlifyRpcResult(block); - headSubscribers.forEach((l) => l.cb(response)); + if (headSubscribers.length > 0 || logSubscribers.length > 0) { + const block = await this.getBlockData(blockNumber, false); - if (logSubscribers.length > 0) { - const receipts = await Promise.all( - block.transactions.map((tx) => this.getTransactionReceiptAtBlock(tx as string, blockNumber)) - ); - const logs = receipts.map((r) => r.logs).flat(); + const response = hexlifyRpcResult(block); + headSubscribers.forEach((l) => l.cb(response)); - logSubscribers.forEach(({ cb, filter }) => { - const filteredLogs = logs.filter(log => filterLog(log, filter)); - hexlifyRpcResult(filteredLogs).forEach(cb); - }); - } - } - }); + if (logSubscribers.length > 0) { + const receipts = await Promise.all( + block.transactions.map((tx) => this.getTransactionReceiptAtBlock(tx as string, blockNumber)) + ); + const logs = receipts.map((r) => r.logs).flat(); - const sub2 = this.api.rpc.chain.subscribeFinalizedHeads(async (header: Header) => { - const blockNumber = header.number.toNumber(); - const blockHash = (await this.api.rpc.chain.getBlockHash(blockNumber)).toHex(); + logSubscribers.forEach(({ cb, filter }) => { + const filteredLogs = logs.filter(log => filterLog(log, filter)); + hexlifyRpcResult(filteredLogs).forEach(cb); + }); + } + } + } - this.latestFinalizedBlockNumber = blockNumber; - this.latestFinalizedBlockHash = blockHash; - }); + _subscribeFinalizedBlock = () => ( + this.api.rpc.chain.subscribeFinalizedHeads(async (header: Header) => { + this.latestFinalizedBlockNumber = header.number.toNumber(); + this.latestFinalizedBlockHash = header.hash.toHex(); + }) + ); - const sub3 = this.api.rpc.state.subscribeRuntimeVersion((runtime: RuntimeVersion) => { + _subscribeRuntimeVersion = () => ( + this.api.rpc.state.subscribeRuntimeVersion((runtime: RuntimeVersion) => { const version = runtime.specVersion.toNumber(); this.verbose && logger.info(`runtime version: ${version}`); @@ -380,10 +392,8 @@ export abstract class BaseProvider extends AbstractProvider { logger.warn(`runtime version changed: ${this.runtimeVersion} => ${version}, shutting down myself... good bye 👋`); process.exit(1); } - }); - - await Promise.all([sub1, sub2, sub3]); - }; + }) + ); setApi = (api: ApiPromise): void => { defineReadOnly(this, '_api', api); @@ -469,7 +479,7 @@ export abstract class BaseProvider extends AbstractProvider { if (!this.subscriptionStarted) { this.subscriptionStarted = true; - await this.startSubscription(); + await this.startSubscriptions(); } } catch (e) { await this.api.disconnect(); @@ -1744,7 +1754,7 @@ export abstract class BaseProvider extends AbstractProvider { } _getTxReceiptFromCache = async (txHash: string): Promise => { - const targetBlockNumber = this.blockCache?.getBlockNumber(txHash); + const targetBlockNumber = this.blockCache.getBlockNumber(txHash); if (!targetBlockNumber) return null; let targetBlockHash; @@ -1939,7 +1949,7 @@ export abstract class BaseProvider extends AbstractProvider { return this.subql?.getIndexerMetadata(); }; - getCachInfo = (): CacheInspect | undefined => this.blockCache?._inspect(); + getCachInfo = (): CacheInspect | undefined => this.blockCache._inspect(); _timeEthCalls = async (): Promise<{ gasPriceTime: number;