From 557a5a3b46796679cfde14d5f28a918c2d1b821d Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Tue, 9 Jul 2024 18:26:17 +0800 Subject: [PATCH 1/9] feat: implement send ping msg when platform is web in gateway. --- internal/msggateway/client.go | 36 +++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index 0581a025b4..a901ebdfe8 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -20,6 +20,7 @@ import ( "runtime/debug" "sync" "sync/atomic" + "time" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/protocol/constant" @@ -112,6 +113,25 @@ func (c *Client) readMessage() { _ = c.conn.SetReadDeadline(pongWait) c.conn.SetPingHandler(c.pingHandler) + if c.PlatformID == 5 { + go func() { + ticker := time.NewTicker(20) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if err := c.writePingMsg(); err != nil { + log.ZError(c.ctx, "send Ping Message error.", err) + return + } + case <-c.ctx.Done(): + return + } + } + }() + } + for { log.ZDebug(c.ctx, "readMessage") messageType, message, returnErr := c.conn.ReadMessage() @@ -321,6 +341,22 @@ func (c *Client) writeBinaryMsg(resp Resp) error { return c.conn.WriteMessage(MessageBinary, encodedBuf) } +func (c *Client) writePingMsg() error { + if c.closed.Load() { + return nil + } + + c.w.Lock() + defer c.w.Unlock() + + err := c.conn.SetWriteDeadline(writeWait) + if err != nil { + return err + } + + return c.conn.WriteMessage(PingMessage, nil) +} + func (c *Client) writePongMsg() error { if c.closed.Load() { return nil From 1b1bc0b7563a1d8a99cbb2552b01a8b70947bad1 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Tue, 9 Jul 2024 18:36:10 +0800 Subject: [PATCH 2/9] add context life cycle control. --- internal/msggateway/client.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index a901ebdfe8..9d594c1fa7 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -138,6 +138,7 @@ func (c *Client) readMessage() { if returnErr != nil { log.ZWarn(c.ctx, "readMessage", returnErr, "messageType", messageType) c.closedErr = returnErr + <-c.ctx.Done() return } @@ -145,6 +146,7 @@ func (c *Client) readMessage() { if c.closed.Load() { // The scenario where the connection has just been closed, but the coroutine has not exited c.closedErr = ErrConnClosed + <-c.ctx.Done() return } @@ -154,10 +156,12 @@ func (c *Client) readMessage() { parseDataErr := c.handleMessage(message) if parseDataErr != nil { c.closedErr = parseDataErr + <-c.ctx.Done() return } case MessageText: c.closedErr = ErrNotSupportMessageProtocol + <-c.ctx.Done() return case PingMessage: @@ -166,7 +170,9 @@ func (c *Client) readMessage() { case CloseMessage: c.closedErr = ErrClientClosed + <-c.ctx.Done() return + default: } } From fa1c9b4329cae66d45d484c4d04cdb9acf2e76f2 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Wed, 10 Jul 2024 10:31:28 +0800 Subject: [PATCH 3/9] feat: implement heartbeat logic in msggateway. --- internal/msggateway/client.go | 53 ++++++++++++-------------------- internal/msggateway/constant.go | 3 ++ internal/msggateway/long_conn.go | 3 +- 3 files changed, 24 insertions(+), 35 deletions(-) diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index 9d594c1fa7..ca72bfb846 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -73,6 +73,7 @@ type Client struct { closed atomic.Bool closedErr error token string + hbCtx context.Context } // ResetClient updates the client's state with new connection and context information. @@ -89,6 +90,7 @@ func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer c.closed.Store(false) c.closedErr = nil c.token = ctx.GetToken() + c.hbCtx, _ = context.WithTimeout(c.ctx, pongWait*2) } func (c *Client) pingHandler(_ string) error { @@ -113,23 +115,8 @@ func (c *Client) readMessage() { _ = c.conn.SetReadDeadline(pongWait) c.conn.SetPingHandler(c.pingHandler) - if c.PlatformID == 5 { - go func() { - ticker := time.NewTicker(20) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - if err := c.writePingMsg(); err != nil { - log.ZError(c.ctx, "send Ping Message error.", err) - return - } - case <-c.ctx.Done(): - return - } - } - }() + if c.PlatformID == constant.WebPlatformID { + go c.heartbeat(c.hbCtx) } for { @@ -138,7 +125,6 @@ func (c *Client) readMessage() { if returnErr != nil { log.ZWarn(c.ctx, "readMessage", returnErr, "messageType", messageType) c.closedErr = returnErr - <-c.ctx.Done() return } @@ -146,7 +132,6 @@ func (c *Client) readMessage() { if c.closed.Load() { // The scenario where the connection has just been closed, but the coroutine has not exited c.closedErr = ErrConnClosed - <-c.ctx.Done() return } @@ -156,12 +141,10 @@ func (c *Client) readMessage() { parseDataErr := c.handleMessage(message) if parseDataErr != nil { c.closedErr = parseDataErr - <-c.ctx.Done() return } case MessageText: c.closedErr = ErrNotSupportMessageProtocol - <-c.ctx.Done() return case PingMessage: @@ -170,7 +153,6 @@ func (c *Client) readMessage() { case CloseMessage: c.closedErr = ErrClientClosed - <-c.ctx.Done() return default: @@ -261,6 +243,7 @@ func (c *Client) close() { c.closed.Store(true) c.conn.Close() + <-c.hbCtx.Done() // Close initiated heartbeat in server send. c.longConnServer.UnRegister(c) } @@ -347,20 +330,22 @@ func (c *Client) writeBinaryMsg(resp Resp) error { return c.conn.WriteMessage(MessageBinary, encodedBuf) } -func (c *Client) writePingMsg() error { - if c.closed.Load() { - return nil - } - - c.w.Lock() - defer c.w.Unlock() +func (c *Client) heartbeat(ctx context.Context) { + log.ZDebug(ctx, "server initiative send heartbeat start.") + ticker := time.NewTicker(pingPeriod) + defer ticker.Stop() - err := c.conn.SetWriteDeadline(writeWait) - if err != nil { - return err + for { + select { + case <-ticker.C: + if err := c.conn.WriteMessage(PingMessage, nil); err != nil { + log.ZError(c.ctx, "send Ping Message error.", err) + return + } + case <-c.hbCtx.Done(): + return + } } - - return c.conn.WriteMessage(PingMessage, nil) } func (c *Client) writePongMsg() error { diff --git a/internal/msggateway/constant.go b/internal/msggateway/constant.go index 64664ac0ab..125be16358 100644 --- a/internal/msggateway/constant.go +++ b/internal/msggateway/constant.go @@ -53,6 +53,9 @@ const ( // Time allowed to read the next pong message from the peer. pongWait = 30 * time.Second + // Send pings to peer with this period. Must be less than pongWait. + pingPeriod = (pongWait * 9) / 10 + // Maximum message size allowed from peer. maxMessageSize = 51200 ) diff --git a/internal/msggateway/long_conn.go b/internal/msggateway/long_conn.go index 7d5bef4c3a..c1b3e27c93 100644 --- a/internal/msggateway/long_conn.go +++ b/internal/msggateway/long_conn.go @@ -16,10 +16,11 @@ package msggateway import ( "encoding/json" - "github.com/openimsdk/tools/apiresp" "net/http" "time" + "github.com/openimsdk/tools/apiresp" + "github.com/gorilla/websocket" "github.com/openimsdk/tools/errs" ) From a91a8b9c8b8fd5b8b8926bab15ebd8733ad98fe6 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Wed, 10 Jul 2024 10:44:06 +0800 Subject: [PATCH 4/9] update heartbeat logic. --- internal/msggateway/client.go | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index ca72bfb846..7a43034106 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -90,7 +90,7 @@ func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer c.closed.Store(false) c.closedErr = nil c.token = ctx.GetToken() - c.hbCtx, _ = context.WithTimeout(c.ctx, pongWait*2) + c.hbCtx, _ = context.WithCancel(c.ctx) } func (c *Client) pingHandler(_ string) error { @@ -114,10 +114,7 @@ func (c *Client) readMessage() { c.conn.SetReadLimit(maxMessageSize) _ = c.conn.SetReadDeadline(pongWait) c.conn.SetPingHandler(c.pingHandler) - - if c.PlatformID == constant.WebPlatformID { - go c.heartbeat(c.hbCtx) - } + go c.heartbeat(c.hbCtx) for { log.ZDebug(c.ctx, "readMessage") @@ -331,19 +328,21 @@ func (c *Client) writeBinaryMsg(resp Resp) error { } func (c *Client) heartbeat(ctx context.Context) { - log.ZDebug(ctx, "server initiative send heartbeat start.") - ticker := time.NewTicker(pingPeriod) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - if err := c.conn.WriteMessage(PingMessage, nil); err != nil { - log.ZError(c.ctx, "send Ping Message error.", err) + if c.PlatformID == constant.WebPlatformID { + log.ZDebug(ctx, "server initiative send heartbeat start.") + ticker := time.NewTicker(pingPeriod) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if err := c.conn.WriteMessage(PingMessage, nil); err != nil { + log.ZError(c.ctx, "send Ping Message error.", err) + return + } + case <-c.hbCtx.Done(): return } - case <-c.hbCtx.Done(): - return } } } From 58cd8ee0a76c0dac9715a869c89d8b744378bb25 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Wed, 10 Jul 2024 10:53:51 +0800 Subject: [PATCH 5/9] update to correct method name and comment. --- internal/msggateway/client.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index 7a43034106..e7e11f79c2 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -114,7 +114,7 @@ func (c *Client) readMessage() { c.conn.SetReadLimit(maxMessageSize) _ = c.conn.SetReadDeadline(pongWait) c.conn.SetPingHandler(c.pingHandler) - go c.heartbeat(c.hbCtx) + go c.activeHeartbeat(c.hbCtx) for { log.ZDebug(c.ctx, "readMessage") @@ -240,7 +240,7 @@ func (c *Client) close() { c.closed.Store(true) c.conn.Close() - <-c.hbCtx.Done() // Close initiated heartbeat in server send. + <-c.hbCtx.Done() // Close server-initiated heartbeat. c.longConnServer.UnRegister(c) } @@ -327,7 +327,8 @@ func (c *Client) writeBinaryMsg(resp Resp) error { return c.conn.WriteMessage(MessageBinary, encodedBuf) } -func (c *Client) heartbeat(ctx context.Context) { +// Actively initiate Heartbeat when platform in Web. +func (c *Client) activeHeartbeat(ctx context.Context) { if c.PlatformID == constant.WebPlatformID { log.ZDebug(ctx, "server initiative send heartbeat start.") ticker := time.NewTicker(pingPeriod) From 557e8099c69c0964e3eca7efc2f6f75a59c3a073 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Wed, 10 Jul 2024 11:29:35 +0800 Subject: [PATCH 6/9] update initiate heartbeat logic. --- internal/msggateway/client.go | 41 ++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index e7e11f79c2..96c5dc8968 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -74,6 +74,7 @@ type Client struct { closedErr error token string hbCtx context.Context + hbCancel context.CancelFunc } // ResetClient updates the client's state with new connection and context information. @@ -90,7 +91,7 @@ func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer c.closed.Store(false) c.closedErr = nil c.token = ctx.GetToken() - c.hbCtx, _ = context.WithCancel(c.ctx) + c.hbCtx, c.hbCancel = context.WithCancel(c.ctx) } func (c *Client) pingHandler(_ string) error { @@ -101,6 +102,13 @@ func (c *Client) pingHandler(_ string) error { return c.writePongMsg() } +func (c *Client) pongHandler(_ string) error { + if err := c.conn.SetReadDeadline(pongWait); err != nil { + return err + } + return nil +} + // readMessage continuously reads messages from the connection. func (c *Client) readMessage() { defer func() { @@ -113,8 +121,9 @@ func (c *Client) readMessage() { c.conn.SetReadLimit(maxMessageSize) _ = c.conn.SetReadDeadline(pongWait) + c.conn.SetPongHandler(c.pongHandler) c.conn.SetPingHandler(c.pingHandler) - go c.activeHeartbeat(c.hbCtx) + c.activeHeartbeat(c.hbCtx) for { log.ZDebug(c.ctx, "readMessage") @@ -240,7 +249,7 @@ func (c *Client) close() { c.closed.Store(true) c.conn.Close() - <-c.hbCtx.Done() // Close server-initiated heartbeat. + c.hbCancel() // Close server-initiated heartbeat. c.longConnServer.UnRegister(c) } @@ -330,21 +339,23 @@ func (c *Client) writeBinaryMsg(resp Resp) error { // Actively initiate Heartbeat when platform in Web. func (c *Client) activeHeartbeat(ctx context.Context) { if c.PlatformID == constant.WebPlatformID { - log.ZDebug(ctx, "server initiative send heartbeat start.") - ticker := time.NewTicker(pingPeriod) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - if err := c.conn.WriteMessage(PingMessage, nil); err != nil { - log.ZError(c.ctx, "send Ping Message error.", err) + go func() { + log.ZDebug(ctx, "server initiative send heartbeat start.") + ticker := time.NewTicker(pingPeriod) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if err := c.conn.WriteMessage(PingMessage, nil); err != nil { + log.ZError(c.ctx, "send Ping Message error.", err) + return + } + case <-c.hbCtx.Done(): return } - case <-c.hbCtx.Done(): - return } - } + }() } } From 32457ab8b5308f7552257cf7fa2a69647060487e Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Wed, 10 Jul 2024 11:29:47 +0800 Subject: [PATCH 7/9] rename ws_server --- internal/msggateway/{n_ws_server.go => ws_server.go} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename internal/msggateway/{n_ws_server.go => ws_server.go} (100%) diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/ws_server.go similarity index 100% rename from internal/msggateway/n_ws_server.go rename to internal/msggateway/ws_server.go From b454aacd362927ee90e373fcf512b62e54230057 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Wed, 10 Jul 2024 11:40:46 +0800 Subject: [PATCH 8/9] update writePingMsg logic --- internal/msggateway/client.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index 96c5dc8968..bb64ef5679 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -347,7 +347,7 @@ func (c *Client) activeHeartbeat(ctx context.Context) { for { select { case <-ticker.C: - if err := c.conn.WriteMessage(PingMessage, nil); err != nil { + if err := c.writePingMsg(); err != nil { log.ZError(c.ctx, "send Ping Message error.", err) return } @@ -358,6 +358,21 @@ func (c *Client) activeHeartbeat(ctx context.Context) { }() } } +func (c *Client) writePingMsg() error { + if c.closed.Load() { + return nil + } + + c.w.Lock() + defer c.w.Unlock() + + err := c.conn.SetWriteDeadline(writeWait) + if err != nil { + return err + } + + return c.conn.WriteMessage(PingMessage, nil) +} func (c *Client) writePongMsg() error { if c.closed.Load() { From 1d98c7eefda5362e8a42f63df0351664754342da Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Wed, 10 Jul 2024 11:42:09 +0800 Subject: [PATCH 9/9] update log level to warn. --- internal/msggateway/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index bb64ef5679..1270eb9781 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -348,7 +348,7 @@ func (c *Client) activeHeartbeat(ctx context.Context) { select { case <-ticker.C: if err := c.writePingMsg(); err != nil { - log.ZError(c.ctx, "send Ping Message error.", err) + log.ZWarn(c.ctx, "send Ping Message error.", err) return } case <-c.hbCtx.Done():