Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(net):send message P2pDisconnectMessage before closing channel #63

Merged
merged 13 commits into from
Sep 12, 2023
7 changes: 7 additions & 0 deletions src/main/java/org/tron/p2p/P2pService.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,23 @@
public class P2pService {

private StatsManager statsManager = new StatsManager();
private volatile boolean isShutdown = false;

public void start(P2pConfig p2pConfig) {
Parameter.p2pConfig = p2pConfig;
NodeManager.init();
ChannelManager.init();
DnsManager.init();
log.info("P2p service started");

Runtime.getRuntime().addShutdownHook(new Thread(this::close));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be closed manually.
image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

libp2p can be an independent process, so it uses shutdown hook. Any thread pool in libp2p will not be closed twice as it will read the condition. We may test it more completely after merging it into java-tron.

}

public void close() {
317787106 marked this conversation as resolved.
Show resolved Hide resolved
if (isShutdown) {
return;
}
isShutdown = true;
DnsManager.close();
NodeManager.close();
ChannelManager.close();
Expand Down
26 changes: 13 additions & 13 deletions src/main/java/org/tron/p2p/connection/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,12 @@ public void processException(Throwable throwable) {
}
SocketAddress address = ctx.channel().remoteAddress();
if (throwable instanceof ReadTimeoutException
|| throwable instanceof IOException
|| throwable instanceof CorruptedFrameException) {
|| throwable instanceof IOException
|| throwable instanceof CorruptedFrameException) {
log.warn("Close peer {}, reason: {}", address, throwable.getMessage());
} else if (baseThrowable instanceof P2pException) {
log.warn("Close peer {}, type: ({}), info: {}",
address, ((P2pException) baseThrowable).getType(), baseThrowable.getMessage());
address, ((P2pException) baseThrowable).getType(), baseThrowable.getMessage());
} else {
log.error("Close peer {}, exception caught", address, throwable);
}
Expand Down Expand Up @@ -134,12 +134,12 @@ public void close() {
close(Parameter.DEFAULT_BAN_TIME);
}

// public void send(byte[] data) {
// send(data, data[0]);
// }

public void send(Message message) {
log.debug("Send message to {}, {}", inetSocketAddress, message);
if (message.needToLog()) {
log.info("Send message to channel {}, {}", inetSocketAddress, message);
} else {
log.debug("Send message to channel {}, {}", inetSocketAddress, message);
}
send(message.getSendData());
}

Expand All @@ -148,7 +148,7 @@ public void send(byte[] data) {
byte type = data[0];
if (isDisconnect) {
log.warn("Send to {} failed as channel has closed, message-type:{} ",
ctx.channel().remoteAddress(), type);
ctx.channel().remoteAddress(), type);
return;
}

Expand All @@ -160,14 +160,14 @@ public void send(byte[] data) {
ctx.writeAndFlush(byteBuf).addListener((ChannelFutureListener) future -> {
if (!future.isSuccess() && !isDisconnect) {
log.warn("Send to {} failed, message-type:{}, cause:{}",
ctx.channel().remoteAddress(), ByteArray.byte2int(type),
future.cause().getMessage());
ctx.channel().remoteAddress(), ByteArray.byte2int(type),
future.cause().getMessage());
}
});
setLastSendTime(System.currentTimeMillis());
} catch (Exception e) {
log.warn("Send message to {} failed, {}", inetSocketAddress, e.getMessage());
ctx.channel();
ctx.channel().close();
}
}

Expand Down Expand Up @@ -197,7 +197,7 @@ public int hashCode() {
@Override
public String toString() {
return String.format("%s | %s", inetSocketAddress,
StringUtils.isEmpty(nodeId) ? "<null>" : nodeId);
StringUtils.isEmpty(nodeId) ? "<null>" : nodeId);
}

}
54 changes: 50 additions & 4 deletions src/main/java/org/tron/p2p/connection/ChannelManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import org.tron.p2p.connection.business.keepalive.KeepAliveService;
import org.tron.p2p.connection.business.pool.ConnPoolService;
import org.tron.p2p.connection.message.Message;
import org.tron.p2p.connection.message.base.P2pDisconnectMessage;
import org.tron.p2p.connection.socket.PeerClient;
import org.tron.p2p.connection.socket.PeerServer;
import org.tron.p2p.discover.Node;
import org.tron.p2p.exception.P2pException;
import org.tron.p2p.exception.P2pException.TypeEnum;
import org.tron.p2p.protos.Connect.DisconnectReason;
import org.tron.p2p.utils.ByteArray;
import org.tron.p2p.utils.NetUtil;

Expand Down Expand Up @@ -147,6 +149,35 @@ public static synchronized DisconnectCode processPeer(Channel channel) {
return DisconnectCode.NORMAL;
}

public static DisconnectReason getDisconnectReason(DisconnectCode code) {
DisconnectReason disconnectReason;
switch (code) {
case DIFFERENT_VERSION:
disconnectReason = DisconnectReason.DIFFERENT_VERSION;
break;
case TIME_BANNED:
disconnectReason = DisconnectReason.RECENT_DISCONNECT;
break;
case DUPLICATE_PEER:
disconnectReason = DisconnectReason.DUPLICATE_PEER;
break;
case TOO_MANY_PEERS:
disconnectReason = DisconnectReason.TOO_MANY_PEERS;
break;
case MAX_CONNECTION_WITH_SAME_IP:
disconnectReason = DisconnectReason.TOO_MANY_PEERS_WITH_SAME_IP;
break;
default: {
disconnectReason = DisconnectReason.UNKNOWN;
}
}
return disconnectReason;
}

public static void logDisconnectReason(Channel channel, DisconnectReason reason) {
log.info("Try to close channel: {}, reason: {}", channel.getInetSocketAddress(), reason.name());
}

public static void banNode(InetAddress inetAddress, Long banTime) {
long now = System.currentTimeMillis();
if (bannedNodes.getIfPresent(inetAddress) == null
Expand All @@ -167,6 +198,7 @@ public static void close() {
nodeDetectService.close();
}


public static void processMessage(Channel channel, byte[] data) throws P2pException {
if (data == null || data.length == 0) {
throw new P2pException(TypeEnum.EMPTY_MESSAGE, "");
Expand All @@ -178,7 +210,11 @@ public static void processMessage(Channel channel, byte[] data) throws P2pExcept

Message message = Message.parse(data);

log.debug("Receive message from {}, {}", channel.getInetSocketAddress(), message);
if (message.needToLog()) {
log.info("Receive message from channel: {}, {}", channel.getInetSocketAddress(), message);
} else {
log.debug("Receive message from channel {}, {}", channel.getInetSocketAddress(), message);
}

switch (message.getType()) {
case KEEP_ALIVE_PING:
Expand All @@ -191,6 +227,9 @@ public static void processMessage(Channel channel, byte[] data) throws P2pExcept
case STATUS:
nodeDetectService.processMessage(channel, message);
break;
case DISCONNECT:
channel.close();
break;
default:
throw new P2pException(P2pException.TypeEnum.NO_SUCH_MESSAGE, "type:" + data[0]);
}
Expand All @@ -202,13 +241,17 @@ private static void handMessage(Channel channel, byte[] data) throws P2pExceptio
throw new P2pException(P2pException.TypeEnum.NO_SUCH_MESSAGE, "type:" + data[0]);
}
if (channel.isDiscoveryMode()) {
channel.send(new P2pDisconnectMessage(DisconnectReason.DISCOVER_MODE));
channel.getCtx().close();
return;
}

if (!channel.isFinishHandshake()) {
channel.setFinishHandshake(true);
if (!DisconnectCode.NORMAL.equals(processPeer(channel))) {
DisconnectCode code = processPeer(channel);
if (!DisconnectCode.NORMAL.equals(code)) {
DisconnectReason disconnectReason = getDisconnectReason(code);
channel.send(new P2pDisconnectMessage(disconnectReason));
channel.getCtx().close();
return;
}
Expand All @@ -222,6 +265,7 @@ public static synchronized void updateNodeId(Channel channel, String nodeId) {
channel.setNodeId(nodeId);
if (nodeId.equals(Hex.toHexString(Parameter.p2pConfig.getNodeID()))) {
log.warn("Channel {} is myself", channel.getInetSocketAddress());
channel.send(new P2pDisconnectMessage(DisconnectReason.DUPLICATE_PEER));
channel.close();
return;
}
Expand All @@ -238,10 +282,12 @@ public static synchronized void updateNodeId(Channel channel, String nodeId) {
Channel c1 = list.get(0);
Channel c2 = list.get(1);
if (c1.getStartTime() > c2.getStartTime()) {
log.info("close channel {}, other channel {} is earlier", c1, c2);
log.info("Close channel {}, other channel {} is earlier", c1, c2);
c1.send(new P2pDisconnectMessage(DisconnectReason.DUPLICATE_PEER));
c1.close();
} else {
log.info("close channel {}, other channel {} is earlier", c2, c1);
log.info("Close channel {}, other channel {} is earlier", c2, c1);
c2.send(new P2pDisconnectMessage(DisconnectReason.DUPLICATE_PEER));
c2.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ public enum DisconnectCode {
DIFFERENT_VERSION(2),
TIME_BANNED(3),
DUPLICATE_PEER(4),
MAX_CONNECTION_WITH_SAME_IP(5);
MAX_CONNECTION_WITH_SAME_IP(5),
UNKNOWN(256);

private final Integer value;

Expand All @@ -18,4 +19,12 @@ public Integer getValue() {
return value;
}

public static DisconnectCode forNumber(int code) {
for (DisconnectCode disconnectCode : values()) {
if (disconnectCode.value == code) {
return disconnectCode;
}
}
return UNKNOWN;
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package org.tron.p2p.connection.business.handshake;

import static org.tron.p2p.connection.ChannelManager.getDisconnectReason;
import static org.tron.p2p.connection.ChannelManager.logDisconnectReason;

import lombok.extern.slf4j.Slf4j;
import org.tron.p2p.base.Parameter;
import org.tron.p2p.connection.Channel;
import org.tron.p2p.connection.ChannelManager;
import org.tron.p2p.connection.business.MessageProcess;
import org.tron.p2p.connection.message.Message;
import org.tron.p2p.connection.message.base.P2pDisconnectMessage;
import org.tron.p2p.connection.message.handshake.HelloMessage;
import org.tron.p2p.protos.Connect.DisconnectReason;

@Slf4j(topic = "net")
public class HandshakeService implements MessageProcess {
Expand All @@ -20,10 +25,10 @@ public void startHandshake(Channel channel) {
@Override
public void processMessage(Channel channel, Message message) {
HelloMessage msg = (HelloMessage) message;
log.debug("Receive message from {}, {}", channel.getInetSocketAddress(), message);

if (channel.isFinishHandshake()) {
log.warn("Close channel {}, handshake is finished", channel.getInetSocketAddress());
channel.send(new P2pDisconnectMessage(DisconnectReason.DUP_HANDSHAKE));
channel.close();
return;
}
Expand All @@ -33,6 +38,7 @@ public void processMessage(Channel channel, Message message) {
DisconnectCode code = ChannelManager.processPeer(channel);
if (code != DisconnectCode.NORMAL) {
sendHelloMsg(channel, code);
logDisconnectReason(channel, getDisconnectReason(code));
channel.close();
return;
}
Expand All @@ -45,21 +51,25 @@ public void processMessage(Channel channel, Message message) {
if (channel.isActive()) {
if (msg.getCode() != DisconnectCode.NORMAL.getValue()
|| (msg.getNetworkId() != networkId && msg.getVersion() != networkId)) {
DisconnectCode disconnectCode = DisconnectCode.forNumber(msg.getCode());
317787106 marked this conversation as resolved.
Show resolved Hide resolved
//v0.1 have version, v0.2 both have version and networkId
log.info("Handshake failed {}, code: {}, networkId:{}, version: {}",
log.info("Handshake failed {}, code: {}, reason: {}, networkId: {}, version: {}",
channel.getInetSocketAddress(),
msg.getCode(),
disconnectCode.name(),
msg.getNetworkId(),
msg.getVersion());
logDisconnectReason(channel, getDisconnectReason(disconnectCode));
channel.close();
return;
}
} else {

if (msg.getNetworkId() != networkId) {
log.info("Peer {} different network id, peer->{}, me->{}",
channel.getInetSocketAddress(), msg.getNetworkId(), networkId);
channel.getInetSocketAddress(), msg.getNetworkId(), networkId);
sendHelloMsg(channel, DisconnectCode.DIFFERENT_VERSION);
logDisconnectReason(channel, DisconnectReason.DIFFERENT_VERSION);
channel.close();
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
import org.tron.p2p.connection.ChannelManager;
import org.tron.p2p.connection.business.MessageProcess;
import org.tron.p2p.connection.message.Message;
import org.tron.p2p.connection.message.base.P2pDisconnectMessage;
import org.tron.p2p.connection.message.keepalive.PingMessage;
import org.tron.p2p.connection.message.keepalive.PongMessage;
import org.tron.p2p.protos.Connect.DisconnectReason;

@Slf4j(topic = "net")
public class KeepAliveService implements MessageProcess {
Expand All @@ -30,6 +32,7 @@ public void init() {
.forEach(p -> {
if (p.waitForPong) {
if (now - p.pingSent > KEEP_ALIVE_TIMEOUT) {
p.send(new P2pDisconnectMessage(DisconnectReason.PING_TIMEOUT));
p.close();
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
import org.tron.p2p.base.Parameter;
import org.tron.p2p.connection.Channel;
import org.tron.p2p.connection.ChannelManager;
import org.tron.p2p.connection.message.base.P2pDisconnectMessage;
import org.tron.p2p.connection.socket.PeerClient;
import org.tron.p2p.discover.Node;
import org.tron.p2p.discover.NodeManager;
import org.tron.p2p.dns.DnsManager;
import org.tron.p2p.dns.DnsNode;
import org.tron.p2p.exception.P2pException;
import org.tron.p2p.protos.Connect.DisconnectReason;
import org.tron.p2p.utils.CollectionUtils;
import org.tron.p2p.utils.NetUtil;

Expand Down Expand Up @@ -250,6 +252,7 @@ private void check() {
List<Channel> list = new ArrayList<>(peers);
Channel peer = list.get(new Random().nextInt(peers.size()));
log.info("Disconnect with peer randomly: {}", peer);
peer.send(new P2pDisconnectMessage(DisconnectReason.RANDOM_ELIMINATION));
peer.close();
}
}
Expand Down Expand Up @@ -315,6 +318,7 @@ public void close() {
try {
channels.forEach(p -> {
if (!p.isDisconnect()) {
p.send(new P2pDisconnectMessage(DisconnectReason.PEER_QUITING));
p.close();
}
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package org.tron.p2p.connection.business.upgrade;

import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import org.tron.p2p.base.Parameter;
import org.tron.p2p.exception.P2pException;
import org.tron.p2p.protos.Connect;
import org.tron.p2p.exception.P2pException.TypeEnum;
import org.tron.p2p.protos.Connect.CompressMessage;

import org.tron.p2p.utils.ProtoUtil;

public class UpgradeController {
Expand All @@ -15,12 +18,17 @@ public static byte[] codeSendData(int version, byte[] data) throws IOException {
return ProtoUtil.compressMessage(data).toByteArray();
}

public static byte[] decodeReceiveData(int version, byte[] data)
throws IOException, P2pException {
public static byte[] decodeReceiveData(int version, byte[] data) throws P2pException, IOException {
if (!supportCompress(version)) {
return data;
}
return ProtoUtil.uncompressMessage(Connect.CompressMessage.parseFrom(data));
CompressMessage compressMessage;
try {
compressMessage = CompressMessage.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
throw new P2pException(TypeEnum.PARSE_MESSAGE_FAILED, e);
}
return ProtoUtil.uncompressMessage(compressMessage);
}

private static boolean supportCompress(int version) {
Expand Down
Loading