Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: fix #2137

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,11 @@ export class BaseProtocol implements IBaseProtocolCore {
public async connectedPeers(): Promise<Peer[]> {
const peers = await this.allPeers();
return peers.filter((peer) => {
return (
this.components.connectionManager.getConnections(peer.id).length > 0
const connections = this.components.connectionManager.getConnections(
peer.id
);
return connections.some((c) =>
c.streams.some((s) => s.protocol === this.multicodec)
);
});
}
Expand Down
15 changes: 13 additions & 2 deletions packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
wakuMessage: WakuMessage,
peerIdStr: string
) => Promise<void>,
private handleError: (error: Error) => Promise<void>,
public readonly pubsubTopics: PubsubTopic[],
libp2p: Libp2p
) {
Expand Down Expand Up @@ -301,8 +302,18 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
() => {
log.info("Receiving pipe closed.");
},
(e) => {
log.error("Error with receiving pipe", e);
async (e) => {
log.error(
"Error with receiving pipe",
e,
" -- ",
"on peer ",
connection.remotePeer.toString(),
" -- ",
"stream ",
stream
);
await this.handleError(e);
}
);
} catch (e) {
Expand Down
2 changes: 1 addition & 1 deletion packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export type IBaseProtocolCore = {

export type IBaseProtocolSDK = {
readonly connectedPeers: Peer[];
renewPeer: (peerToDisconnect: PeerId) => Promise<Peer>;
renewPeer: (peerToDisconnect: PeerId) => Promise<Peer | undefined>;
readonly numPeersToUse: number;
};

Expand Down
39 changes: 30 additions & 9 deletions packages/sdk/src/protocols/base_protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,11 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
* @param peerToDisconnect The peer to disconnect from.
* @returns The new peer that was found and connected to.
*/
public async renewPeer(peerToDisconnect: PeerId): Promise<Peer> {
public async renewPeer(peerToDisconnect: PeerId): Promise<Peer | undefined> {
this.log.info(`Renewing peer ${peerToDisconnect}`);

await this.connectionManager.dropConnection(peerToDisconnect);

const peer = (await this.findAndAddPeers(1))[0];
if (!peer) {
this.log.error(
"Failed to find a new peer to replace the disconnected one."
);
}

const updatedPeers = this.peers.filter(
(peer) => !peer.id.equals(peerToDisconnect)
);
Expand All @@ -77,9 +70,17 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
`Peer ${peerToDisconnect} disconnected and removed from the peer list`
);

const newPeer = await this.findAndAddPeers(1);
if (newPeer.length === 0) {
this.log.error(
"Failed to find a new peer to replace the disconnected one."
);
return undefined;
}

this.renewPeersLocker.lock(peerToDisconnect);

return peer;
return newPeer[0];
}

/**
Expand Down Expand Up @@ -174,6 +175,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
}

this.maintainPeersLock = true;
await this.confirmPeers();
this.log.info(`Maintaining peers, current count: ${this.peers.length}`);
try {
const numPeersToAdd = this.numPeersToUse - this.peers.length;
Expand All @@ -190,6 +192,25 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
return true;
}

private async confirmPeers(): Promise<void> {
const connectedPeers = await this.core.connectedPeers();
const currentPeers = this.peers;
const peersToAdd = connectedPeers.filter(
(p) => !currentPeers.some((cp) => cp.id.equals(p.id))
);
const peersToRemove = currentPeers.filter(
(p) => !connectedPeers.some((cp) => cp.id.equals(p.id))
);

peersToAdd.forEach((p) => this.peers.push(p));
peersToRemove.forEach((p) => {
const index = this.peers.findIndex((cp) => cp.id.equals(p.id));
if (index !== -1) this.peers.splice(index, 1);
});

this.updatePeers(this.peers);
}

/**
* Finds and adds new peers to the peers list.
* @param numPeers The number of peers to find and add.
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/src/protocols/filter/constants.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export const DEFAULT_KEEP_ALIVE = 30 * 1000;
export const DEFAULT_KEEP_ALIVE = 10_000;

export const DEFAULT_SUBSCRIBE_OPTIONS = {
keepAlive: DEFAULT_KEEP_ALIVE
Expand Down
3 changes: 3 additions & 0 deletions packages/sdk/src/protocols/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {

await subscription.processIncomingMessage(wakuMessage, peerIdStr);
},
async (error: Error) => {
log.error("Error with receiving pipe", error);
},
connectionManager.configuredPubsubTopics,
libp2p
),
Expand Down
13 changes: 9 additions & 4 deletions packages/sdk/src/protocols/filter/subscription_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ export class SubscriptionManager implements ISubscriptionSDK {
private readonly protocol: FilterCore,
private readonly connectionManager: ConnectionManager,
private readonly getPeers: () => Peer[],
private readonly renewPeer: (peerToDisconnect: PeerId) => Promise<Peer>
private readonly renewPeer: (
peerToDisconnect: PeerId
) => Promise<Peer | undefined>
) {
this.pubsubTopic = pubsubTopic;
this.subscriptionCallbacks = new Map();
Expand Down Expand Up @@ -251,11 +253,13 @@ export class SubscriptionManager implements ISubscriptionSDK {
}

private startSubscriptionsMaintenance(interval: number): void {
log.info("Starting subscriptions maintenance");
this.startKeepAlivePings(interval);
this.startConnectionListener();
}

private stopSubscriptionsMaintenance(): void {
log.info("Stopping subscriptions maintenance");
this.stopKeepAlivePings();
this.stopConnectionListener();
}
Expand Down Expand Up @@ -299,9 +303,10 @@ export class SubscriptionManager implements ISubscriptionSDK {
}

this.keepAliveTimer = setInterval(() => {
void this.ping().catch((error) => {
log.error("Error in keep-alive ping cycle:", error);
});
log.info("Sending keep-alive ping");
void this.ping()
.then(() => log.info("Keep-alive ping successful"))
.catch((error) => log.error("Error in keep-alive ping cycle:", error));
}, interval) as unknown as number;
}

Expand Down
4 changes: 2 additions & 2 deletions packages/sdk/src/reliability_monitor/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export class ReliabilityMonitorManager {
public static createReceiverMonitor(
pubsubTopic: PubsubTopic,
getPeers: () => Peer[],
renewPeer: (peerId: PeerId) => Promise<Peer>,
renewPeer: (peerId: PeerId) => Promise<Peer | undefined>,
getContentTopics: () => ContentTopic[],
protocolSubscribe: (
pubsubTopic: PubsubTopic,
Expand All @@ -42,7 +42,7 @@ export class ReliabilityMonitorManager {
}

public static createSenderMonitor(
renewPeer: (peerId: PeerId) => Promise<Peer>
renewPeer: (peerId: PeerId) => Promise<Peer | undefined>
): SenderReliabilityMonitor {
if (!ReliabilityMonitorManager.senderMonitor) {
ReliabilityMonitorManager.senderMonitor = new SenderReliabilityMonitor(
Expand Down
24 changes: 15 additions & 9 deletions packages/sdk/src/reliability_monitor/receiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export class ReceiverReliabilityMonitor {
public constructor(
private readonly pubsubTopic: PubsubTopic,
private getPeers: () => Peer[],
private renewPeer: (peerId: PeerId) => Promise<Peer>,
private renewPeer: (peerId: PeerId) => Promise<Peer | undefined>,
private getContentTopics: () => ContentTopic[],
private protocolSubscribe: (
pubsubTopic: PubsubTopic,
Expand Down Expand Up @@ -163,15 +163,21 @@ export class ReceiverReliabilityMonitor {
private async renewAndSubscribePeer(
peerId: PeerId
): Promise<Peer | undefined> {
const peerIdStr = peerId.toString();
try {
if (this.peerRenewalLocks.has(peerId.toString())) {
log.info(`Peer ${peerId.toString()} is already being renewed.`);
if (this.peerRenewalLocks.has(peerIdStr)) {
log.info(`Peer ${peerIdStr} is already being renewed.`);
return;
}

this.peerRenewalLocks.add(peerId.toString());
this.peerRenewalLocks.add(peerIdStr);

const newPeer = await this.renewPeer(peerId);
if (!newPeer) {
log.warn(`Failed to renew peer ${peerIdStr}: No new peer found.`);
return;
}

await this.protocolSubscribe(
this.pubsubTopic,
newPeer,
Expand All @@ -181,16 +187,16 @@ export class ReceiverReliabilityMonitor {
this.receivedMessagesHashes.nodes[newPeer.id.toString()] = new Set();
this.missedMessagesByPeer.set(newPeer.id.toString(), 0);

this.peerFailures.delete(peerId.toString());
this.missedMessagesByPeer.delete(peerId.toString());
delete this.receivedMessagesHashes.nodes[peerId.toString()];
this.peerFailures.delete(peerIdStr);
this.missedMessagesByPeer.delete(peerIdStr);
delete this.receivedMessagesHashes.nodes[peerIdStr];

return newPeer;
} catch (error) {
log.warn(`Failed to renew peer ${peerId.toString()}: ${error}.`);
log.error(`Failed to renew peer ${peerIdStr}: ${error}.`);
return;
} finally {
this.peerRenewalLocks.delete(peerId.toString());
this.peerRenewalLocks.delete(peerIdStr);
}
}

Expand Down
22 changes: 15 additions & 7 deletions packages/sdk/src/reliability_monitor/sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ export class SenderReliabilityMonitor {
private readonly maxAttemptsBeforeRenewal =
DEFAULT_MAX_ATTEMPTS_BEFORE_RENEWAL;

public constructor(private renewPeer: (peerId: PeerId) => Promise<Peer>) {}
public constructor(
private renewPeer: (peerId: PeerId) => Promise<Peer | undefined>
) {}

public async attemptRetriesOrRenew(
peerId: PeerId,
Expand Down Expand Up @@ -42,13 +44,19 @@ export class SenderReliabilityMonitor {
} else {
try {
const newPeer = await this.renewPeer(peerId);
log.info(
`Renewed peer ${peerId.toString()} to ${newPeer.id.toString()}`
);
if (newPeer) {
log.info(
`Renewed peer ${peerId.toString()} to ${newPeer.id.toString()}`
);

this.attempts.delete(peerIdStr);
this.attempts.set(newPeer.id.toString(), 0);
await protocolSend();
this.attempts.delete(peerIdStr);
this.attempts.set(newPeer.id.toString(), 0);
await protocolSend();
} else {
log.error(
`Failed to renew peer ${peerId.toString()}: New peer is undefined`
);
}
} catch (error) {
log.error(`Failed to renew peer ${peerId.toString()}: ${error}`);
}
Expand Down
Loading