Skip to content

Commit

Permalink
refactor subscribe (#640)
Browse files Browse the repository at this point in the history
  • Loading branch information
shunjizhan committed Jan 30, 2023
1 parent c3ea6a8 commit affa614
Showing 1 changed file with 46 additions and 36 deletions.
82 changes: 46 additions & 36 deletions eth-providers/src/base-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -325,52 +325,64 @@ export abstract class BaseProvider extends AbstractProvider {
}
}

startSubscription = async (): Promise<void> => {
startSubscriptions = async (): Promise<void> => {
await Promise.all([
this._subscribeEventListeners(),
this._subscribeFinalizedBlock(),
this._subscribeRuntimeVersion(),
]);
};

_subscribeEventListeners = () => {
const subscriptionMethod = this.safeMode
? this.api.rpc.chain.subscribeFinalizedHeads.bind(this)
: this.api.rpc.chain.subscribeNewHeads.bind(this);

const sub1 = subscriptionMethod(async (header: Header) => {
// block cache
return subscriptionMethod(async (header: Header) => {
// update block cache
const blockNumber = header.number.toNumber();
const blockHash = header.hash.toHex();
const txHashes = await this._getTxHashesAtBlock(blockHash);

this.blockCache.addTxsAtBlock(blockNumber, txHashes);

// eth_subscribe
const headSubscribers = this.eventListeners[SubscriptionType.NewHeads];
const logSubscribers = this.eventListeners[SubscriptionType.Logs];
await this._notifySubscribers(blockNumber);
});
}

if (headSubscribers.length > 0 || logSubscribers.length > 0) {
const block = await this.getBlockData(blockNumber, false);
_notifySubscribers = async (blockNumber: number) => {
const headSubscribers = this.eventListeners[SubscriptionType.NewHeads];
const logSubscribers = this.eventListeners[SubscriptionType.Logs];

const response = hexlifyRpcResult(block);
headSubscribers.forEach((l) => l.cb(response));
if (headSubscribers.length > 0 || logSubscribers.length > 0) {
const block = await this.getBlockData(blockNumber, false);

if (logSubscribers.length > 0) {
const receipts = await Promise.all(
block.transactions.map((tx) => this.getTransactionReceiptAtBlock(tx as string, blockNumber))
);
const logs = receipts.map((r) => r.logs).flat();
const response = hexlifyRpcResult(block);
headSubscribers.forEach((l) => l.cb(response));

logSubscribers.forEach(({ cb, filter }) => {
const filteredLogs = logs.filter(log => filterLog(log, filter));
hexlifyRpcResult(filteredLogs).forEach(cb);
});
}
}
});
if (logSubscribers.length > 0) {
const receipts = await Promise.all(
block.transactions.map((tx) => this.getTransactionReceiptAtBlock(tx as string, blockNumber))
);
const logs = receipts.map((r) => r.logs).flat();

const sub2 = this.api.rpc.chain.subscribeFinalizedHeads(async (header: Header) => {
const blockNumber = header.number.toNumber();
const blockHash = (await this.api.rpc.chain.getBlockHash(blockNumber)).toHex();
logSubscribers.forEach(({ cb, filter }) => {
const filteredLogs = logs.filter(log => filterLog(log, filter));
hexlifyRpcResult(filteredLogs).forEach(cb);
});
}
}
}

this.latestFinalizedBlockNumber = blockNumber;
this.latestFinalizedBlockHash = blockHash;
});
_subscribeFinalizedBlock = () => (
this.api.rpc.chain.subscribeFinalizedHeads(async (header: Header) => {
this.latestFinalizedBlockNumber = header.number.toNumber();
this.latestFinalizedBlockHash = header.hash.toHex();
})
);

const sub3 = this.api.rpc.state.subscribeRuntimeVersion((runtime: RuntimeVersion) => {
_subscribeRuntimeVersion = () => (
this.api.rpc.state.subscribeRuntimeVersion((runtime: RuntimeVersion) => {
const version = runtime.specVersion.toNumber();
this.verbose && logger.info(`runtime version: ${version}`);

Expand All @@ -380,10 +392,8 @@ export abstract class BaseProvider extends AbstractProvider {
logger.warn(`runtime version changed: ${this.runtimeVersion} => ${version}, shutting down myself... good bye 👋`);
process.exit(1);
}
});

await Promise.all([sub1, sub2, sub3]);
};
})
);

setApi = (api: ApiPromise): void => {
defineReadOnly(this, '_api', api);
Expand Down Expand Up @@ -469,7 +479,7 @@ export abstract class BaseProvider extends AbstractProvider {

if (!this.subscriptionStarted) {
this.subscriptionStarted = true;
await this.startSubscription();
await this.startSubscriptions();
}
} catch (e) {
await this.api.disconnect();
Expand Down Expand Up @@ -1744,7 +1754,7 @@ export abstract class BaseProvider extends AbstractProvider {
}

_getTxReceiptFromCache = async (txHash: string): Promise<TransactionReceipt | null> => {
const targetBlockNumber = this.blockCache?.getBlockNumber(txHash);
const targetBlockNumber = this.blockCache.getBlockNumber(txHash);
if (!targetBlockNumber) return null;

let targetBlockHash;
Expand Down Expand Up @@ -1939,7 +1949,7 @@ export abstract class BaseProvider extends AbstractProvider {
return this.subql?.getIndexerMetadata();
};

getCachInfo = (): CacheInspect | undefined => this.blockCache?._inspect();
getCachInfo = (): CacheInspect | undefined => this.blockCache._inspect();

_timeEthCalls = async (): Promise<{
gasPriceTime: number;
Expand Down

0 comments on commit affa614

Please sign in to comment.