Skip to content

Commit

Permalink
[grid] release the upstream websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
joerg1985 committed Aug 15, 2024
1 parent ed3edee commit 5bac479
Showing 1 changed file with 28 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,15 @@ private void handleHttpRequest(ChannelHandlerContext ctx, HttpRequest req) {

private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof CloseWebSocketFrame) {
CloseWebSocketFrame close = (CloseWebSocketFrame) frame.retain();
handshaker.close(ctx.channel(), close);
// Pass on to the rest of the channel
ctx.fireChannelRead(close);
try {
CloseWebSocketFrame close = (CloseWebSocketFrame) frame.retain();
handshaker.close(ctx.channel(), close);
// Pass on to the rest of the channel
ctx.fireChannelRead(close);
} finally {
// set null to ensure we do not send another close
ctx.channel().attr(key).set(null);
}
} else if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.content()));
} else if (frame instanceof PongWebSocketFrame) {
Expand All @@ -187,7 +192,7 @@ private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame fram
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
try {
Consumer<Message> consumer = ctx.channel().attr(key).get();
Consumer<Message> consumer = ctx.channel().attr(key).getAndSet(null);

if (consumer != null) {
byte[] reason = Objects.toString(cause).getBytes(UTF_8);
Expand All @@ -201,12 +206,29 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
try {
consumer.accept(new CloseMessage(1011, new String(reason, UTF_8)));
} catch (Exception ex) {
LOG.log(Level.FINE, "failed to send the close message", ex);
LOG.log(Level.FINE, "failed to send the close message, code: 1011", ex);
}
}
} finally {
LOG.log(Level.FINE, "exception caught, close the context", cause);
ctx.close();
}
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
try {
super.channelInactive(ctx);
} finally {
Consumer<Message> consumer = ctx.channel().attr(key).getAndSet(null);

if (consumer != null) {
try {
consumer.accept(new CloseMessage(1001, "channel got inactive"));
} catch (Exception ex) {
LOG.log(Level.FINE, "failed to send the close message, code: 1001", ex);
}
}
}
}
}

0 comments on commit 5bac479

Please sign in to comment.