Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement server-initiated heartbeat in msgGateway module #2404

Merged
merged 10 commits into from
Jul 10, 2024
53 changes: 53 additions & 0 deletions internal/msggateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"runtime/debug"
"sync"
"sync/atomic"
"time"

"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/protocol/constant"
Expand Down Expand Up @@ -72,6 +73,8 @@ type Client struct {
closed atomic.Bool
closedErr error
token string
hbCtx context.Context
hbCancel context.CancelFunc
}

// ResetClient updates the client's state with new connection and context information.
Expand All @@ -88,6 +91,7 @@ func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer
c.closed.Store(false)
c.closedErr = nil
c.token = ctx.GetToken()
c.hbCtx, c.hbCancel = context.WithCancel(c.ctx)
}

func (c *Client) pingHandler(_ string) error {
Expand All @@ -98,6 +102,13 @@ func (c *Client) pingHandler(_ string) error {
return c.writePongMsg()
}

func (c *Client) pongHandler(_ string) error {
if err := c.conn.SetReadDeadline(pongWait); err != nil {
return err
}
return nil
}

// readMessage continuously reads messages from the connection.
func (c *Client) readMessage() {
defer func() {
Expand All @@ -110,7 +121,9 @@ func (c *Client) readMessage() {

c.conn.SetReadLimit(maxMessageSize)
_ = c.conn.SetReadDeadline(pongWait)
c.conn.SetPongHandler(c.pongHandler)
c.conn.SetPingHandler(c.pingHandler)
c.activeHeartbeat(c.hbCtx)

for {
log.ZDebug(c.ctx, "readMessage")
Expand Down Expand Up @@ -147,6 +160,7 @@ func (c *Client) readMessage() {
case CloseMessage:
c.closedErr = ErrClientClosed
return

default:
}
}
Expand Down Expand Up @@ -235,6 +249,7 @@ func (c *Client) close() {

c.closed.Store(true)
c.conn.Close()
c.hbCancel() // Close server-initiated heartbeat.
c.longConnServer.UnRegister(c)
}

Expand Down Expand Up @@ -321,6 +336,44 @@ func (c *Client) writeBinaryMsg(resp Resp) error {
return c.conn.WriteMessage(MessageBinary, encodedBuf)
}

// Actively initiate Heartbeat when platform in Web.
func (c *Client) activeHeartbeat(ctx context.Context) {
if c.PlatformID == constant.WebPlatformID {
go func() {
log.ZDebug(ctx, "server initiative send heartbeat start.")
ticker := time.NewTicker(pingPeriod)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if err := c.writePingMsg(); err != nil {
log.ZWarn(c.ctx, "send Ping Message error.", err)
return
}
case <-c.hbCtx.Done():
return
}
}
}()
}
}
func (c *Client) writePingMsg() error {
if c.closed.Load() {
return nil
}

c.w.Lock()
defer c.w.Unlock()

err := c.conn.SetWriteDeadline(writeWait)
if err != nil {
return err
}

return c.conn.WriteMessage(PingMessage, nil)
}

func (c *Client) writePongMsg() error {
if c.closed.Load() {
return nil
Expand Down
3 changes: 3 additions & 0 deletions internal/msggateway/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ const (
// Time allowed to read the next pong message from the peer.
pongWait = 30 * time.Second

// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10

// Maximum message size allowed from peer.
maxMessageSize = 51200
)
3 changes: 2 additions & 1 deletion internal/msggateway/long_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ package msggateway

import (
"encoding/json"
"github.com/openimsdk/tools/apiresp"
"net/http"
"time"

"github.com/openimsdk/tools/apiresp"

"github.com/gorilla/websocket"
"github.com/openimsdk/tools/errs"
)
Expand Down
File renamed without changes.
Loading