Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add inventory monitor module #4680

Merged
merged 26 commits into from
Oct 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
5943845
Map shutdown call from sig int handler to userThread
chimp1984 Oct 23, 2020
c0f981a
Fix incorrect shutdown behaviour
chimp1984 Oct 23, 2020
87bf30b
Remove torSetup.cleanupTorFiles() call.
chimp1984 Oct 23, 2020
3292498
Extract method for filling seed nodes from property file
chimp1984 Oct 14, 2020
55b693e
Make config nullable and extract methods where config is used and app…
chimp1984 Oct 14, 2020
3521619
Add inventory module
chimp1984 Oct 14, 2020
9dab186
Move inventory package to core as we want to include other domain dat…
chimp1984 Oct 15, 2020
ca7fe94
Add signature to request to limit the feature to requests from truste…
chimp1984 Oct 15, 2020
088f539
Add readableFileSize
chimp1984 Oct 15, 2020
d9ce8ce
Add readSeedNodePropertyFile method
chimp1984 Oct 15, 2020
821fa39
Add sparkjava dependency
chimp1984 Oct 15, 2020
21bd85c
Remove signature from GetInventoryRequest (as monitor is public it do…
chimp1984 Oct 15, 2020
aa812ba
Add shadow plugin
chimp1984 Oct 16, 2020
b846979
Make RequestInfo public
chimp1984 Oct 18, 2020
752208b
Add InventoryItem enum
chimp1984 Oct 18, 2020
1b83083
Add nodes from mike
chimp1984 Oct 18, 2020
7768e58
Shut down connection after response received
chimp1984 Oct 19, 2020
ec4e85d
Add network node shutdown
chimp1984 Oct 19, 2020
1c77930
Add more warn/alert checks
chimp1984 Oct 21, 2020
d08b24e
Add triggers for maxConnections and numConnections
chimp1984 Oct 21, 2020
5fb48fe
Fix error handling
chimp1984 Oct 21, 2020
8c156eb
Add peakNumConnections
chimp1984 Oct 21, 2020
5bdc5ab
Add numAllConnectionsLostEvents
chimp1984 Oct 21, 2020
0c4eb14
Only log and increase numAllConnectionsLostEvents if we have not call…
chimp1984 Oct 22, 2020
f1fdf3c
Improve deviation model
chimp1984 Oct 23, 2020
f7b44f1
Improve logs
chimp1984 Oct 23, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ configure([project(':cli'),
project(':seednode'),
project(':statsnode'),
project(':pricenode'),
project(':inventory'),
project(':apitest')]) {

apply plugin: 'application'
Expand Down Expand Up @@ -594,6 +595,21 @@ configure(project(':daemon')) {
}
}

configure(project(':inventory')) {
apply plugin: 'com.github.johnrengelman.shadow'

mainClassName = 'bisq.inventory.InventoryMonitorMain'

dependencies {
compile project(':core')
compile "com.google.guava:guava:$guavaVersion"
compile "com.sparkjava:spark-core:$sparkVersion"

compileOnly "org.projectlombok:lombok:$lombokVersion"
annotationProcessor "org.projectlombok:lombok:$lombokVersion"
}
}

configure(project(':apitest')) {
mainClassName = 'bisq.apitest.ApiTestMain'

Expand Down
8 changes: 4 additions & 4 deletions common/src/main/java/bisq/common/setup/CommonSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,13 @@ protected static void setSystemProperties() {

protected static void setupSigIntHandlers(GracefulShutDownHandler gracefulShutDownHandler) {
Signal.handle(new Signal("INT"), signal -> {
gracefulShutDownHandler.gracefulShutDown(() -> {
});
UserThread.execute(() -> gracefulShutDownHandler.gracefulShutDown(() -> {
}));
});

Signal.handle(new Signal("TERM"), signal -> {
gracefulShutDownHandler.gracefulShutDown(() -> {
});
UserThread.execute(() -> gracefulShutDownHandler.gracefulShutDown(() -> {
}));
});
}

Expand Down
8 changes: 6 additions & 2 deletions common/src/main/java/bisq/common/util/Profiler.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@ public static void printSystemLoad() {
}

public static long getUsedMemoryInMB() {
return getUsedMemoryInBytes() / 1024 / 1024;
}

public static long getUsedMemoryInBytes() {
Runtime runtime = Runtime.getRuntime();
long free = runtime.freeMemory() / 1024 / 1024;
long total = runtime.totalMemory() / 1024 / 1024;
long free = runtime.freeMemory();
long total = runtime.totalMemory();
return total - free;
}

Expand Down
9 changes: 9 additions & 0 deletions common/src/main/java/bisq/common/util/Utilities.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import javafx.scene.input.KeyCombination;
import javafx.scene.input.KeyEvent;

import java.text.DecimalFormat;

import java.net.URI;
import java.net.URISyntaxException;

Expand Down Expand Up @@ -523,4 +525,11 @@ public static <T> Predicate<T> distinctByKey(Function<? super T, Object> keyExtr
return t -> map.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null;
}

public static String readableFileSize(long size) {
if (size <= 0) return "0";
chimp1984 marked this conversation as resolved.
Show resolved Hide resolved
String[] units = new String[]{"B", "kB", "MB", "GB", "TB"};
int digitGroups = (int) (Math.log10(size) / Math.log10(1024));
return new DecimalFormat("#,##0.###").format(size / Math.pow(1024, digitGroups)) + " " + units[digitGroups];
}

}
1 change: 0 additions & 1 deletion core/src/main/java/bisq/core/app/BisqSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,6 @@ public void start() {
}

private void step2() {
torSetup.cleanupTorFiles();
readMapsFromResources(this::step3);
checkForCorrectOSArchitecture();
checkOSXVersion();
Expand Down
10 changes: 1 addition & 9 deletions core/src/main/java/bisq/core/app/TorSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,7 @@ public TorSetup(@Named(Config.TOR_DIR) File torDir) {
this.torDir = checkDir(torDir);
}

public void cleanupTorFiles() {
cleanupTorFiles(null, null);
}

// We get sometimes Tor startup problems which is related to some tor files in the tor directory. It happens
// more often if the application got killed (not graceful shutdown).
// Creating all tor files newly takes about 3-4 sec. longer and it does not benefit from cache files.
// TODO: We should fix those startup problems in the netlayer library, once fixed there we can remove that call at the
// Bisq startup again.
// Should only be called if needed. Slows down Tor startup from about 5 sec. to 30 sec. if it gets deleted.
public void cleanupTorFiles(@Nullable Runnable resultHandler, @Nullable ErrorMessageHandler errorMessageHandler) {
File hiddenservice = new File(Paths.get(torDir.getAbsolutePath(), "hiddenservice").toString());
try {
Expand Down
1 change: 0 additions & 1 deletion core/src/main/java/bisq/core/app/misc/AppSetupWithP2P.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ public AppSetupWithP2P(P2PService p2PService,

@Override
public void initPersistedDataHosts() {
torSetup.cleanupTorFiles();
persistedDataHosts.add(p2PService);

// we apply at startup the reading of persisted data but don't want to get it triggered in the constructor
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/bisq/core/filter/FilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -216,13 +216,13 @@ public void setFilterWarningHandler(Consumer<String> filterWarningHandler) {
addListener(filter -> {
if (filter != null && filterWarningHandler != null) {
if (filter.getSeedNodes() != null && !filter.getSeedNodes().isEmpty()) {
log.info(Res.get("popup.warning.nodeBanned", Res.get("popup.warning.seed")));
log.info("One of the seed nodes got banned. {}", filter.getSeedNodes());
// Let's keep that more silent. Might be used in case a node is unstable and we don't want to confuse users.
// filterWarningHandler.accept(Res.get("popup.warning.nodeBanned", Res.get("popup.warning.seed")));
}

if (filter.getPriceRelayNodes() != null && !filter.getPriceRelayNodes().isEmpty()) {
log.info(Res.get("popup.warning.nodeBanned", Res.get("popup.warning.priceRelay")));
log.info("One of the price relay nodes got banned. {}", filter.getPriceRelayNodes());
// Let's keep that more silent. Might be used in case a node is unstable and we don't want to confuse users.
// filterWarningHandler.accept(Res.get("popup.warning.nodeBanned", Res.get("popup.warning.priceRelay")));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/

package bisq.core.network.p2p.inventory;

import bisq.core.dao.monitoring.BlindVoteStateMonitoringService;
import bisq.core.dao.monitoring.DaoStateMonitoringService;
import bisq.core.dao.monitoring.ProposalStateMonitoringService;
import bisq.core.dao.monitoring.model.BlindVoteStateBlock;
import bisq.core.dao.monitoring.model.DaoStateBlock;
import bisq.core.dao.monitoring.model.ProposalStateBlock;
import bisq.core.dao.state.DaoStateService;
import bisq.core.network.p2p.inventory.messages.GetInventoryRequest;
import bisq.core.network.p2p.inventory.messages.GetInventoryResponse;
import bisq.core.network.p2p.inventory.model.InventoryItem;

import bisq.network.p2p.network.Connection;
import bisq.network.p2p.network.MessageListener;
import bisq.network.p2p.network.NetworkNode;
import bisq.network.p2p.network.Statistic;
import bisq.network.p2p.peers.PeerManager;
import bisq.network.p2p.storage.P2PDataStorage;
import bisq.network.p2p.storage.payload.ProtectedStorageEntry;

import bisq.common.app.Version;
import bisq.common.config.Config;
import bisq.common.proto.network.NetworkEnvelope;
import bisq.common.util.Profiler;
import bisq.common.util.Utilities;

import javax.inject.Inject;
import javax.inject.Named;

import com.google.common.base.Enums;
import com.google.common.base.Optional;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;

import java.lang.management.ManagementFactory;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class GetInventoryRequestHandler implements MessageListener {
private final NetworkNode networkNode;
private final PeerManager peerManager;
private final P2PDataStorage p2PDataStorage;
private final DaoStateService daoStateService;
private final DaoStateMonitoringService daoStateMonitoringService;
private final ProposalStateMonitoringService proposalStateMonitoringService;
private final BlindVoteStateMonitoringService blindVoteStateMonitoringService;
private final int maxConnections;

@Inject
public GetInventoryRequestHandler(NetworkNode networkNode,
PeerManager peerManager,
P2PDataStorage p2PDataStorage,
DaoStateService daoStateService,
DaoStateMonitoringService daoStateMonitoringService,
ProposalStateMonitoringService proposalStateMonitoringService,
BlindVoteStateMonitoringService blindVoteStateMonitoringService,
@Named(Config.MAX_CONNECTIONS) int maxConnections) {
this.networkNode = networkNode;
this.peerManager = peerManager;
this.p2PDataStorage = p2PDataStorage;
this.daoStateService = daoStateService;
this.daoStateMonitoringService = daoStateMonitoringService;
this.proposalStateMonitoringService = proposalStateMonitoringService;
this.blindVoteStateMonitoringService = blindVoteStateMonitoringService;
this.maxConnections = maxConnections;

this.networkNode.addMessageListener(this);
}

@Override
public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
if (networkEnvelope instanceof GetInventoryRequest) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As always, prefer to return early instead of huge blocks.

// Data
GetInventoryRequest getInventoryRequest = (GetInventoryRequest) networkEnvelope;
Map<InventoryItem, Integer> dataObjects = new HashMap<>();
p2PDataStorage.getMapForDataResponse(getInventoryRequest.getVersion()).values().stream()
.map(e -> e.getClass().getSimpleName())
.forEach(className -> {
Optional<InventoryItem> optionalEnum = Enums.getIfPresent(InventoryItem.class, className);
if (optionalEnum.isPresent()) {
InventoryItem key = optionalEnum.get();
dataObjects.putIfAbsent(key, 0);
int prev = dataObjects.get(key);
dataObjects.put(key, prev + 1);
}
});
p2PDataStorage.getMap().values().stream()
.map(ProtectedStorageEntry::getProtectedStoragePayload)
.filter(Objects::nonNull)
.map(e -> e.getClass().getSimpleName())
.forEach(className -> {
Optional<InventoryItem> optionalEnum = Enums.getIfPresent(InventoryItem.class, className);
if (optionalEnum.isPresent()) {
InventoryItem key = optionalEnum.get();
dataObjects.putIfAbsent(key, 0);
int prev = dataObjects.get(key);
dataObjects.put(key, prev + 1);
}
});
Comment on lines +98 to +121
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
p2PDataStorage.getMapForDataResponse(getInventoryRequest.getVersion()).values().stream()
.map(e -> e.getClass().getSimpleName())
.forEach(className -> {
Optional<InventoryItem> optionalEnum = Enums.getIfPresent(InventoryItem.class, className);
if (optionalEnum.isPresent()) {
InventoryItem key = optionalEnum.get();
dataObjects.putIfAbsent(key, 0);
int prev = dataObjects.get(key);
dataObjects.put(key, prev + 1);
}
});
p2PDataStorage.getMap().values().stream()
.map(ProtectedStorageEntry::getProtectedStoragePayload)
.filter(Objects::nonNull)
.map(e -> e.getClass().getSimpleName())
.forEach(className -> {
Optional<InventoryItem> optionalEnum = Enums.getIfPresent(InventoryItem.class, className);
if (optionalEnum.isPresent()) {
InventoryItem key = optionalEnum.get();
dataObjects.putIfAbsent(key, 0);
int prev = dataObjects.get(key);
dataObjects.put(key, prev + 1);
}
});
p2PDataStorage.getMapForDataResponse(getInventoryRequest.getVersion()).values().stream()
.map(e -> e.getClass().getSimpleName())
.forEach(className -> addItem(dataObjects, className));
p2PDataStorage.getMap().values().stream()
.map(ProtectedStorageEntry::getProtectedStoragePayload)
.filter(Objects::nonNull)
.map(e -> e.getClass().getSimpleName())
.forEach(className -> addItem(dataObjects, className));

With addItem()

    private void addItem(Map<InventoryItem, Integer> dataObjects, String className) {
        Optional<InventoryItem> optionalEnum = Enums.getIfPresent(InventoryItem.class, className);
        if (optionalEnum.isPresent()) {
            InventoryItem key = optionalEnum.get();
            dataObjects.putIfAbsent(key, 0);
            int prev = dataObjects.get(key);
            dataObjects.put(key, prev + 1);
        }
    }

Map<InventoryItem, String> inventory = new HashMap<>();
dataObjects.forEach((key, value) -> inventory.put(key, String.valueOf(value)));

// DAO
int numBsqBlocks = daoStateService.getBlocks().size();
inventory.put(InventoryItem.numBsqBlocks, String.valueOf(numBsqBlocks));

int daoStateChainHeight = daoStateService.getChainHeight();
inventory.put(InventoryItem.daoStateChainHeight, String.valueOf(daoStateChainHeight));

LinkedList<DaoStateBlock> daoStateBlockChain = daoStateMonitoringService.getDaoStateBlockChain();
if (!daoStateBlockChain.isEmpty()) {
String daoStateHash = Utilities.bytesAsHexString(daoStateBlockChain.getLast().getMyStateHash().getHash());
inventory.put(InventoryItem.daoStateHash, daoStateHash);
}

LinkedList<ProposalStateBlock> proposalStateBlockChain = proposalStateMonitoringService.getProposalStateBlockChain();
if (!proposalStateBlockChain.isEmpty()) {
String proposalHash = Utilities.bytesAsHexString(proposalStateBlockChain.getLast().getMyStateHash().getHash());
inventory.put(InventoryItem.proposalHash, proposalHash);
}

LinkedList<BlindVoteStateBlock> blindVoteStateBlockChain = blindVoteStateMonitoringService.getBlindVoteStateBlockChain();
if (!blindVoteStateBlockChain.isEmpty()) {
String blindVoteHash = Utilities.bytesAsHexString(blindVoteStateBlockChain.getLast().getMyStateHash().getHash());
inventory.put(InventoryItem.blindVoteHash, blindVoteHash);
}

// network
inventory.put(InventoryItem.maxConnections, String.valueOf(maxConnections));
inventory.put(InventoryItem.numConnections, String.valueOf(networkNode.getAllConnections().size()));
inventory.put(InventoryItem.peakNumConnections, String.valueOf(peerManager.getPeakNumConnections()));
inventory.put(InventoryItem.numAllConnectionsLostEvents, String.valueOf(peerManager.getNumAllConnectionsLostEvents()));
inventory.put(InventoryItem.sentBytes, String.valueOf(Statistic.totalSentBytesProperty().get()));
inventory.put(InventoryItem.sentBytesPerSec, String.valueOf(Statistic.totalSentBytesPerSecProperty().get()));
inventory.put(InventoryItem.receivedBytes, String.valueOf(Statistic.totalReceivedBytesProperty().get()));
inventory.put(InventoryItem.receivedBytesPerSec, String.valueOf(Statistic.totalReceivedBytesPerSecProperty().get()));
inventory.put(InventoryItem.receivedMessagesPerSec, String.valueOf(Statistic.numTotalReceivedMessagesPerSecProperty().get()));
inventory.put(InventoryItem.sentMessagesPerSec, String.valueOf(Statistic.numTotalSentMessagesPerSecProperty().get()));

// node
inventory.put(InventoryItem.version, Version.VERSION);
inventory.put(InventoryItem.usedMemory, String.valueOf(Profiler.getUsedMemoryInBytes()));
inventory.put(InventoryItem.jvmStartTime, String.valueOf(ManagementFactory.getRuntimeMXBean().getStartTime()));

log.info("Send inventory {} to {}", inventory, connection.getPeersNodeAddressOptional());
GetInventoryResponse getInventoryResponse = new GetInventoryResponse(inventory);
networkNode.sendMessage(connection, getInventoryResponse);
}
}

public void shutDown() {
networkNode.removeMessageListener(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/

package bisq.network.p2p.inventory;
package bisq.core.network.p2p.inventory;

import bisq.core.network.p2p.inventory.model.InventoryItem;

import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.network.NetworkNode;
Expand All @@ -41,11 +43,12 @@ public GetInventoryRequestManager(NetworkNode networkNode) {
}

public void request(NodeAddress nodeAddress,
Consumer<Map<String, Integer>> resultHandler,
Consumer<Map<InventoryItem, String>> resultHandler,
ErrorMessageHandler errorMessageHandler) {
if (requesterMap.containsKey(nodeAddress)) {
log.warn("There is still an open request pending for {}", nodeAddress.getFullAddress());
return;
log.warn("There was still a pending request for {}. We shut it down and make a new request",
nodeAddress.getFullAddress());
requesterMap.get(nodeAddress).shutDown();
}

GetInventoryRequester getInventoryRequester = new GetInventoryRequester(networkNode,
Expand Down
Loading