diff --git a/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeService.java b/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeService.java index c5ce3b404a..3df2c4dde2 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeService.java +++ b/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeService.java @@ -18,7 +18,6 @@ package bisq.network.p2p.services.peergroup.exchange; import bisq.common.timer.Scheduler; -import bisq.common.util.CompletableFutureUtils; import bisq.common.util.StringUtils; import bisq.network.p2p.message.NetworkMessage; import bisq.network.p2p.node.Address; @@ -30,13 +29,13 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static bisq.network.NetworkService.NETWORK_IO_POOL; @@ -87,37 +86,52 @@ private CompletableFuture doPeerExchange(Set
candidates) { .map(Address::toString) .collect(Collectors.toList()) .toString())); - List> allFutures = candidates.stream() + + CompletableFuture resultFuture = new CompletableFuture<>(); + AtomicInteger numSuccess = new AtomicInteger(); + AtomicInteger numFailures = new AtomicInteger(); + candidates.stream() .map(this::doPeerExchangeAsync) - .collect(Collectors.toList()); - - // When all futures complete successfully, - // then consider peer exchange complete and decide whether it should be re-done, in case of too few peers - CompletableFutureUtils.allOf(allFutures) - .thenApply(resultList -> { - int numSuccess = (int) resultList.stream().filter(e -> e).count(); - log.info("Node {} completed peer exchange to {} candidates. {} requests successfully completed.", - node, candidates.size(), numSuccess); - if (peerExchangeStrategy.redoInitialPeerExchange(numSuccess, candidates.size())) { - log.info("Node {} repeats the initial peer exchange after {} sec as it has not reached sufficient connections " + - "or received sufficient peers", node, doInitialPeerExchangeDelaySec); - scheduler.ifPresent(Scheduler::stop); - scheduler = Optional.of(Scheduler.run(this::doInitialPeerExchange) - .after(doInitialPeerExchangeDelaySec, TimeUnit.SECONDS) - .name("PeerExchangeService.scheduler-" + node)); - doInitialPeerExchangeDelaySec = Math.min(60, doInitialPeerExchangeDelaySec * 2); - } else { - scheduler.ifPresent(Scheduler::stop); - } - return null; + .forEach(future -> { + future.whenComplete((result, throwable) -> { + if (throwable == null) { + if (result) { + numSuccess.incrementAndGet(); + if (!resultFuture.isDone()) { + log.info("We got at least one peerExchange future completed."); + resultFuture.complete(null); + } + } else { + numFailures.incrementAndGet(); + } + } else { + numFailures.incrementAndGet(); + } + + if (numFailures.get() + numSuccess.get() == candidates.size()) { + if (!resultFuture.isDone()) { + log.info("We got all peerExchange futures completed but none was successful. This is expected when the first node bootstraps"); + resultFuture.complete(null); + } + + log.info("Node {} completed peer exchange to {} candidates. {} requests successfully completed.", + node, candidates.size(), numSuccess); + if (peerExchangeStrategy.shouldRedoInitialPeerExchange(numSuccess.get(), candidates.size())) { + log.info("Node {} repeats the initial peer exchange after {} sec as it has not reached sufficient connections " + + "or received sufficient peers", node, doInitialPeerExchangeDelaySec); + scheduler.ifPresent(Scheduler::stop); + scheduler = Optional.of(Scheduler.run(this::doInitialPeerExchange) + .after(doInitialPeerExchangeDelaySec, TimeUnit.SECONDS) + .name("PeerExchangeService.scheduler-" + node)); + doInitialPeerExchangeDelaySec = Math.min(60, doInitialPeerExchangeDelaySec * 2); + } else { + scheduler.ifPresent(Scheduler::stop); + } + } + }); }); - // Complete when any peer exchange succeeds, or when all fail. - return CompletableFutureUtils.anyOf(allFutures) - .thenApply(result -> { - log.info("Node {} completed peer exchange to at least one candidate", node); - return null; - }); + return resultFuture; } private CompletableFuture doPeerExchangeAsync(Address peerAddress) { @@ -140,6 +154,7 @@ private boolean doPeerExchange(Address peerAddress) { Set myPeers = peerExchangeStrategy.getPeers(peerAddress); Set peers = handler.request(myPeers).join(); + log.info("Node {} completed peer exchange with {} and received {} peers.", node, peerAddress, peers.size()); peerExchangeStrategy.addReportedPeers(peers, peerAddress); requestHandlerMap.remove(key); return true; @@ -147,7 +162,7 @@ private boolean doPeerExchange(Address peerAddress) { if (key != null) { requestHandlerMap.remove(key); } - // Expect ConnectException if peer is not available + log.info("Node {} failed to do a peer exchange with {} because of: {}", node, peerAddress, throwable.getMessage()); return false; } } diff --git a/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeStrategy.java b/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeStrategy.java index d513daff70..7f4ab9414b 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeStrategy.java +++ b/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeStrategy.java @@ -120,7 +120,7 @@ void addReportedPeers(Set peers, Address peerAddress) { peerGroup.addReportedPeers(filtered); } - boolean redoInitialPeerExchange(long numSuccess, int numRequests) { + boolean shouldRedoInitialPeerExchange(int numSuccess, int numRequests) { boolean moreThenHalfFailed = numRequests - numSuccess > numRequests / 2; return moreThenHalfFailed || !sufficientConnections() ||