Skip to content

Commit

Permalink
Merge pull request #6428 from HenrikJannsen/limit-get-data-response-size
Browse files Browse the repository at this point in the history
Limit getDataResponse size
  • Loading branch information
sqrrm committed Nov 25, 2022
2 parents 6052221 + ffd2cc1 commit ef1d78f
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import bisq.common.proto.ProtobufferException;
import bisq.common.proto.network.NetworkEnvelope;
import bisq.common.proto.persistable.PersistedDataHost;
import bisq.common.util.Tuple2;
import bisq.common.util.Utilities;

import javax.inject.Inject;
Expand Down Expand Up @@ -79,6 +80,7 @@
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -160,50 +162,69 @@ public MailboxMessageService(NetworkNode networkNode,
@Override
public void readPersisted(Runnable completeHandler) {
persistenceManager.readPersisted(persisted -> {
log.trace("## readPersisted persisted {}", persisted.size());
Map<String, Long> numItemsPerDay = new HashMap<>();
// We sort by creation date and limit to max 3000 entries, so oldest items get skipped even if TTL
// is not reached to cap the memory footprint. 3000 items is about 10 MB.
Map<String, Tuple2<AtomicLong, List<Integer>>> numItemsPerDay = new HashMap<>();
AtomicLong totalSize = new AtomicLong();
// We sort by creation date and limit to max 3000 entries, so the oldest items get skipped even if TTL
// is not reached. 3000 items is about 60 MB with max size of 20kb supported for storage.
persisted.stream()
.sorted(Comparator.comparingLong(o -> ((MailboxItem) o).getProtectedMailboxStorageEntry().getCreationTimeStamp()).reversed())
.limit(3000)
.filter(e -> !e.isExpired(clock))
.filter(e -> !mailboxItemsByUid.containsKey(e.getUid()))
.limit(3000)
.forEach(mailboxItem -> {
ProtectedMailboxStorageEntry protectedMailboxStorageEntry = mailboxItem.getProtectedMailboxStorageEntry();
int serializedSize = protectedMailboxStorageEntry.toProtoMessage().getSerializedSize();
// Usual size is 3-4kb. A few are about 15kb and very few are larger and about 100kb or
// more (probably attachments in disputes)
// We ignore those large data to reduce memory footprint.
String date = new Date(protectedMailboxStorageEntry.getCreationTimeStamp()).toString();
String day = date.substring(4, 10);
numItemsPerDay.putIfAbsent(day, new Tuple2<>(new AtomicLong(0), new ArrayList<>()));
Tuple2<AtomicLong, List<Integer>> tuple = numItemsPerDay.get(day);
tuple.first.getAndIncrement();
tuple.second.add(serializedSize);

// We only keep small items, to reduce the potential impact of missed remove messages.
// E.g. if a seed at a longer restart period missed the remove messages, then when loading from
// persisted data the messages, they would add those again and distribute then later at requests to peers.
// Those outdated messages would then stay in the network until TTL triggers removal.
// By not applying large messages we reduce the impact of such cases at costs of extra loading costs if the message is still alive.
if (serializedSize < 20000) {
String date = new Date(protectedMailboxStorageEntry.getCreationTimeStamp()).toString();
String day = date.substring(4, 10);
numItemsPerDay.putIfAbsent(day, 0L);
numItemsPerDay.put(day, numItemsPerDay.get(day) + 1);

String uid = mailboxItem.getUid();
mailboxItemsByUid.put(uid, mailboxItem);
mailboxItemsByUid.put(mailboxItem.getUid(), mailboxItem);
mailboxMessageList.add(mailboxItem);
totalSize.getAndAdd(serializedSize);

// We add it to our map so that it get added to the excluded key set we send for
// the initial data requests. So that helps to lower the load for mailbox messages at
// initial data requests.
//todo check if listeners are called too early
p2PDataStorage.addProtectedMailboxStorageEntryToMap(protectedMailboxStorageEntry);

log.trace("## readPersisted uid={}\nhash={}\nisMine={}\ndate={}\nsize={}",
uid,
P2PDataStorage.get32ByteHashAsByteArray(protectedMailboxStorageEntry.getProtectedStoragePayload()),
mailboxItem.isMine(),
date,
serializedSize);
} else {
log.info("We ignore this large persisted mailboxItem. If still valid we will reload it from seed nodes at getData requests.\n" +
"Size={}; date={}; sender={}", Utilities.readableFileSize(serializedSize), date,
mailboxItem.getProtectedMailboxStorageEntry().getMailboxStoragePayload().getPrefixedSealedAndSignedMessage().getSenderNodeAddress());
}
});

List<Map.Entry<String, Long>> perDay = numItemsPerDay.entrySet().stream()
List<String> perDay = numItemsPerDay.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.map(entry -> {
Tuple2<AtomicLong, List<Integer>> tuple = entry.getValue();
List<Integer> sizes = tuple.second;
long sum = sizes.stream().mapToLong(s -> s).sum();
List<String> largeItems = sizes.stream()
.filter(s -> s > 20000)
.map(Utilities::readableFileSize)
.collect(Collectors.toList());
String largeMsgInfo = largeItems.isEmpty() ? "" : "; Large messages: " + largeItems;
return entry.getKey() + ": Num messages: " + tuple.first + "; Total size: " +
Utilities.readableFileSize(sum) + largeMsgInfo;
})
.collect(Collectors.toList());
log.info("We loaded {} persisted mailbox messages.\nPer day distribution:\n{}", mailboxMessageList.size(), Joiner.on("\n").join(perDay));

log.info("We loaded {} persisted mailbox messages with {}.\nPer day distribution:\n{}",
mailboxMessageList.size(),
Utilities.readableFileSize(totalSize.get()),
Joiner.on("\n").join(perDay));

requestPersistence();
completeHandler.run();
},
Expand Down
4 changes: 4 additions & 0 deletions p2p/src/main/java/bisq/network/p2p/network/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ public static int getPermittedMessageSize() {
return PERMITTED_MESSAGE_SIZE;
}

public static int getMaxPermittedMessageSize() {
return MAX_PERMITTED_MESSAGE_SIZE;
}


///////////////////////////////////////////////////////////////////////////////////////////
// Class fields
Expand Down
18 changes: 11 additions & 7 deletions p2p/src/main/java/bisq/network/p2p/network/ProtoOutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@
class ProtoOutputStream {
private static final Logger log = LoggerFactory.getLogger(ProtoOutputStream.class);

private final OutputStream delegate;
private final OutputStream outputStream;
private final Statistic statistic;

ProtoOutputStream(OutputStream delegate, Statistic statistic) {
this.delegate = delegate;
ProtoOutputStream(OutputStream outputStream, Statistic statistic) {
this.outputStream = outputStream;
this.statistic = statistic;
}

Expand All @@ -52,17 +52,21 @@ void writeEnvelope(NetworkEnvelope envelope) {

void onConnectionShutdown() {
try {
delegate.close();
outputStream.close();
} catch (Throwable t) {
log.error("Failed to close connection", t);
}
}

private void writeEnvelopeOrThrow(NetworkEnvelope envelope) throws IOException {
long ts = System.currentTimeMillis();
protobuf.NetworkEnvelope proto = envelope.toProtoNetworkEnvelope();
proto.writeDelimitedTo(delegate);
delegate.flush();

proto.writeDelimitedTo(outputStream);
outputStream.flush();
long duration = System.currentTimeMillis() - ts;
if (duration > 10000) {
log.info("Sending {} to peer took {} sec.", envelope.getClass().getSimpleName(), duration / 1000d);
}
statistic.addSentBytes(proto.getSerializedSize());
statistic.addSentMessage(envelope);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import bisq.common.Timer;
import bisq.common.UserThread;
import bisq.common.app.Log;
import bisq.common.proto.network.NetworkProtoResolver;
import bisq.common.util.Utilities;

Expand Down Expand Up @@ -210,7 +209,7 @@ private void createTorAndHiddenService(int localPort, int servicePort) {
torStartupFuture = executorService.submit(() -> {
try {
// temporarily switch tor to debug logging
String savedLogLevel = Log.pushCustomLogLevel("org.berndpruenster.netlayer", "DEBUG");
// String savedLogLevel = Log.pushCustomLogLevel("org.berndpruenster.netlayer", "DEBUG");
// get tor
Tor.setDefault(torMode.getTor());

Expand All @@ -226,7 +225,7 @@ private void createTorAndHiddenService(int localPort, int servicePort) {
"################################################################",
(new Date().getTime() - ts2), socket); //takes usually 30-40 sec
// tor has started, revert from debug to original log level
Log.pushCustomLogLevel("org.berndpruenster.netlayer", savedLogLevel);
// Log.pushCustomLogLevel("org.berndpruenster.netlayer", savedLogLevel);
new Thread() {
@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,11 @@ public void handle(GetDataRequest getDataRequest, final Connection connection) {
connection.getCapabilities());

if (wasPersistableNetworkPayloadsTruncated.get()) {
log.warn("The getData request from peer with {} caused too much PersistableNetworkPayload " +
"entries to get delivered. We limited the entries for the response to {} entries",
connectionInfo, MAX_ENTRIES);
log.warn("The getDataResponse for peer {} got truncated.", connectionInfo);
}

if (wasProtectedStorageEntriesTruncated.get()) {
log.warn("The getData request from peer with {} caused too much ProtectedStorageEntry " +
"entries to get delivered. We limited the entries for the response to {} entries",
connectionInfo, MAX_ENTRIES);
log.warn("The getDataResponse for peer {} got truncated.", connectionInfo);
}

log.info("The getDataResponse to peer with {} contains {} ProtectedStorageEntries and {} PersistableNetworkPayloads",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ public protobuf.NetworkEnvelope toProtoNetworkEnvelope() {
public static GetDataResponse fromProto(protobuf.GetDataResponse proto,
NetworkProtoResolver resolver,
int messageVersion) {
log.info("Received a GetDataResponse with {}", Utilities.readableFileSize(proto.getSerializedSize()));
boolean wasTruncated = proto.getWasTruncated();
log.info("Received a GetDataResponse with {} {}",
Utilities.readableFileSize(proto.getSerializedSize()),
wasTruncated ? " (was truncated)" : "");
Set<ProtectedStorageEntry> dataSet = proto.getDataSetList().stream()
.map(entry -> (ProtectedStorageEntry) resolver.fromProto(entry)).collect(Collectors.toSet());
Set<PersistableNetworkPayload> persistableNetworkPayloadSet = proto.getPersistableNetworkPayloadItemsList().stream()
Expand All @@ -134,7 +137,7 @@ public static GetDataResponse fromProto(protobuf.GetDataResponse proto,
persistableNetworkPayloadSet,
proto.getRequestNonce(),
proto.getIsGetUpdatedDataResponse(),
proto.getWasTruncated(),
wasTruncated,
Capabilities.fromIntList(proto.getSupportedCapabilitiesList()),
messageVersion);
}
Expand Down
Loading

0 comments on commit ef1d78f

Please sign in to comment.