diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index 8ce6162a50..fd270b4724 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -66,8 +66,11 @@ export class BaseProtocol implements IBaseProtocolCore { public async connectedPeers(): Promise { const peers = await this.allPeers(); return peers.filter((peer) => { - return ( - this.components.connectionManager.getConnections(peer.id).length > 0 + const connections = this.components.connectionManager.getConnections( + peer.id + ); + return connections.some((c) => + c.streams.some((s) => s.protocol === this.multicodec) ); }); } diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index 5300718b2b..fdc19698a2 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -37,6 +37,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore { wakuMessage: WakuMessage, peerIdStr: string ) => Promise, + private handleError: (error: Error) => Promise, public readonly pubsubTopics: PubsubTopic[], libp2p: Libp2p ) { @@ -301,8 +302,18 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore { () => { log.info("Receiving pipe closed."); }, - (e) => { - log.error("Error with receiving pipe", e); + async (e) => { + log.error( + "Error with receiving pipe", + e, + " -- ", + "on peer ", + connection.remotePeer.toString(), + " -- ", + "stream ", + stream + ); + await this.handleError(e); } ); } catch (e) { diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 5b5e9ea919..7a3324c444 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -25,7 +25,7 @@ export type IBaseProtocolCore = { export type IBaseProtocolSDK = { readonly connectedPeers: Peer[]; - renewPeer: (peerToDisconnect: PeerId) => Promise; + renewPeer: (peerToDisconnect: PeerId) => Promise; readonly numPeersToUse: number; }; diff --git a/packages/sdk/src/protocols/base_protocol.ts b/packages/sdk/src/protocols/base_protocol.ts index 52009aa05c..5636d2617f 100644 --- a/packages/sdk/src/protocols/base_protocol.ts +++ b/packages/sdk/src/protocols/base_protocol.ts @@ -56,18 +56,11 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { * @param peerToDisconnect The peer to disconnect from. * @returns The new peer that was found and connected to. */ - public async renewPeer(peerToDisconnect: PeerId): Promise { + public async renewPeer(peerToDisconnect: PeerId): Promise { this.log.info(`Renewing peer ${peerToDisconnect}`); await this.connectionManager.dropConnection(peerToDisconnect); - const peer = (await this.findAndAddPeers(1))[0]; - if (!peer) { - this.log.error( - "Failed to find a new peer to replace the disconnected one." - ); - } - const updatedPeers = this.peers.filter( (peer) => !peer.id.equals(peerToDisconnect) ); @@ -77,9 +70,17 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { `Peer ${peerToDisconnect} disconnected and removed from the peer list` ); + const newPeer = await this.findAndAddPeers(1); + if (newPeer.length === 0) { + this.log.error( + "Failed to find a new peer to replace the disconnected one." + ); + return undefined; + } + this.renewPeersLocker.lock(peerToDisconnect); - return peer; + return newPeer[0]; } /** @@ -174,6 +175,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { } this.maintainPeersLock = true; + await this.confirmPeers(); this.log.info(`Maintaining peers, current count: ${this.peers.length}`); try { const numPeersToAdd = this.numPeersToUse - this.peers.length; @@ -190,6 +192,25 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { return true; } + private async confirmPeers(): Promise { + const connectedPeers = await this.core.connectedPeers(); + const currentPeers = this.peers; + const peersToAdd = connectedPeers.filter( + (p) => !currentPeers.some((cp) => cp.id.equals(p.id)) + ); + const peersToRemove = currentPeers.filter( + (p) => !connectedPeers.some((cp) => cp.id.equals(p.id)) + ); + + peersToAdd.forEach((p) => this.peers.push(p)); + peersToRemove.forEach((p) => { + const index = this.peers.findIndex((cp) => cp.id.equals(p.id)); + if (index !== -1) this.peers.splice(index, 1); + }); + + this.updatePeers(this.peers); + } + /** * Finds and adds new peers to the peers list. * @param numPeers The number of peers to find and add. diff --git a/packages/sdk/src/protocols/filter/constants.ts b/packages/sdk/src/protocols/filter/constants.ts index 3889e7638b..01cea6859f 100644 --- a/packages/sdk/src/protocols/filter/constants.ts +++ b/packages/sdk/src/protocols/filter/constants.ts @@ -1,4 +1,4 @@ -export const DEFAULT_KEEP_ALIVE = 30 * 1000; +export const DEFAULT_KEEP_ALIVE = 10_000; export const DEFAULT_SUBSCRIBE_OPTIONS = { keepAlive: DEFAULT_KEEP_ALIVE diff --git a/packages/sdk/src/protocols/filter/index.ts b/packages/sdk/src/protocols/filter/index.ts index c8840ea0e4..c9db3cd80e 100644 --- a/packages/sdk/src/protocols/filter/index.ts +++ b/packages/sdk/src/protocols/filter/index.ts @@ -54,6 +54,9 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { await subscription.processIncomingMessage(wakuMessage, peerIdStr); }, + async (error: Error) => { + log.error("Error with receiving pipe", error); + }, connectionManager.configuredPubsubTopics, libp2p ), diff --git a/packages/sdk/src/protocols/filter/subscription_manager.ts b/packages/sdk/src/protocols/filter/subscription_manager.ts index 51a9e16e8d..4300246624 100644 --- a/packages/sdk/src/protocols/filter/subscription_manager.ts +++ b/packages/sdk/src/protocols/filter/subscription_manager.ts @@ -41,7 +41,9 @@ export class SubscriptionManager implements ISubscriptionSDK { private readonly protocol: FilterCore, private readonly connectionManager: ConnectionManager, private readonly getPeers: () => Peer[], - private readonly renewPeer: (peerToDisconnect: PeerId) => Promise + private readonly renewPeer: ( + peerToDisconnect: PeerId + ) => Promise ) { this.pubsubTopic = pubsubTopic; this.subscriptionCallbacks = new Map(); @@ -251,11 +253,13 @@ export class SubscriptionManager implements ISubscriptionSDK { } private startSubscriptionsMaintenance(interval: number): void { + log.info("Starting subscriptions maintenance"); this.startKeepAlivePings(interval); this.startConnectionListener(); } private stopSubscriptionsMaintenance(): void { + log.info("Stopping subscriptions maintenance"); this.stopKeepAlivePings(); this.stopConnectionListener(); } @@ -299,9 +303,10 @@ export class SubscriptionManager implements ISubscriptionSDK { } this.keepAliveTimer = setInterval(() => { - void this.ping().catch((error) => { - log.error("Error in keep-alive ping cycle:", error); - }); + log.info("Sending keep-alive ping"); + void this.ping() + .then(() => log.info("Keep-alive ping successful")) + .catch((error) => log.error("Error in keep-alive ping cycle:", error)); }, interval) as unknown as number; } diff --git a/packages/sdk/src/reliability_monitor/index.ts b/packages/sdk/src/reliability_monitor/index.ts index 1e420dd921..b0d8dc1c3a 100644 --- a/packages/sdk/src/reliability_monitor/index.ts +++ b/packages/sdk/src/reliability_monitor/index.ts @@ -18,7 +18,7 @@ export class ReliabilityMonitorManager { public static createReceiverMonitor( pubsubTopic: PubsubTopic, getPeers: () => Peer[], - renewPeer: (peerId: PeerId) => Promise, + renewPeer: (peerId: PeerId) => Promise, getContentTopics: () => ContentTopic[], protocolSubscribe: ( pubsubTopic: PubsubTopic, @@ -42,7 +42,7 @@ export class ReliabilityMonitorManager { } public static createSenderMonitor( - renewPeer: (peerId: PeerId) => Promise + renewPeer: (peerId: PeerId) => Promise ): SenderReliabilityMonitor { if (!ReliabilityMonitorManager.senderMonitor) { ReliabilityMonitorManager.senderMonitor = new SenderReliabilityMonitor( diff --git a/packages/sdk/src/reliability_monitor/receiver.ts b/packages/sdk/src/reliability_monitor/receiver.ts index 440e35829c..985c52a59d 100644 --- a/packages/sdk/src/reliability_monitor/receiver.ts +++ b/packages/sdk/src/reliability_monitor/receiver.ts @@ -32,7 +32,7 @@ export class ReceiverReliabilityMonitor { public constructor( private readonly pubsubTopic: PubsubTopic, private getPeers: () => Peer[], - private renewPeer: (peerId: PeerId) => Promise, + private renewPeer: (peerId: PeerId) => Promise, private getContentTopics: () => ContentTopic[], private protocolSubscribe: ( pubsubTopic: PubsubTopic, @@ -163,15 +163,21 @@ export class ReceiverReliabilityMonitor { private async renewAndSubscribePeer( peerId: PeerId ): Promise { + const peerIdStr = peerId.toString(); try { - if (this.peerRenewalLocks.has(peerId.toString())) { - log.info(`Peer ${peerId.toString()} is already being renewed.`); + if (this.peerRenewalLocks.has(peerIdStr)) { + log.info(`Peer ${peerIdStr} is already being renewed.`); return; } - this.peerRenewalLocks.add(peerId.toString()); + this.peerRenewalLocks.add(peerIdStr); const newPeer = await this.renewPeer(peerId); + if (!newPeer) { + log.warn(`Failed to renew peer ${peerIdStr}: No new peer found.`); + return; + } + await this.protocolSubscribe( this.pubsubTopic, newPeer, @@ -181,16 +187,16 @@ export class ReceiverReliabilityMonitor { this.receivedMessagesHashes.nodes[newPeer.id.toString()] = new Set(); this.missedMessagesByPeer.set(newPeer.id.toString(), 0); - this.peerFailures.delete(peerId.toString()); - this.missedMessagesByPeer.delete(peerId.toString()); - delete this.receivedMessagesHashes.nodes[peerId.toString()]; + this.peerFailures.delete(peerIdStr); + this.missedMessagesByPeer.delete(peerIdStr); + delete this.receivedMessagesHashes.nodes[peerIdStr]; return newPeer; } catch (error) { - log.warn(`Failed to renew peer ${peerId.toString()}: ${error}.`); + log.error(`Failed to renew peer ${peerIdStr}: ${error}.`); return; } finally { - this.peerRenewalLocks.delete(peerId.toString()); + this.peerRenewalLocks.delete(peerIdStr); } } diff --git a/packages/sdk/src/reliability_monitor/sender.ts b/packages/sdk/src/reliability_monitor/sender.ts index 0ffe9a1659..914c321da8 100644 --- a/packages/sdk/src/reliability_monitor/sender.ts +++ b/packages/sdk/src/reliability_monitor/sender.ts @@ -11,7 +11,9 @@ export class SenderReliabilityMonitor { private readonly maxAttemptsBeforeRenewal = DEFAULT_MAX_ATTEMPTS_BEFORE_RENEWAL; - public constructor(private renewPeer: (peerId: PeerId) => Promise) {} + public constructor( + private renewPeer: (peerId: PeerId) => Promise + ) {} public async attemptRetriesOrRenew( peerId: PeerId, @@ -42,13 +44,19 @@ export class SenderReliabilityMonitor { } else { try { const newPeer = await this.renewPeer(peerId); - log.info( - `Renewed peer ${peerId.toString()} to ${newPeer.id.toString()}` - ); + if (newPeer) { + log.info( + `Renewed peer ${peerId.toString()} to ${newPeer.id.toString()}` + ); - this.attempts.delete(peerIdStr); - this.attempts.set(newPeer.id.toString(), 0); - await protocolSend(); + this.attempts.delete(peerIdStr); + this.attempts.set(newPeer.id.toString(), 0); + await protocolSend(); + } else { + log.error( + `Failed to renew peer ${peerId.toString()}: New peer is undefined` + ); + } } catch (error) { log.error(`Failed to renew peer ${peerId.toString()}: ${error}`); }