Skip to content

Commit

Permalink
refactor: performance optimization of pull messages and reduce redund…
Browse files Browse the repository at this point in the history
…ant data synchronization. (openimsdk#694)

* fix: Bug fix for clearing unread messages.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: Bug fix for pull messages.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* refactor: performance optimization of pull messages.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* refactor: performance optimization of pull messages.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* refactor: performance optimization of pull messages.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* refactor: performance optimization of pull messages.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* refactor: performance optimization of pull messages and reduce redundant data synchronization.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

---------

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>
  • Loading branch information
FGadvancer committed Sep 9, 2024
1 parent 2522772 commit c01611e
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 155 deletions.
3 changes: 2 additions & 1 deletion internal/conversation_msg/conversation_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"math"
"sync"

Expand Down Expand Up @@ -470,7 +471,7 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
}
}

log.ZDebug(ctx, "insert msg", "cost time", time.Since(b).Seconds(), "len", len(allMsg))
log.ZDebug(ctx, "insert msg", "duration", fmt.Sprintf("%dms", time.Since(b)), "len", len(allMsg))
}

func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) {
Expand Down
58 changes: 4 additions & 54 deletions internal/interaction/long_conn_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,15 @@ import (
"sync"
"time"

"github.com/golang/protobuf/proto"
"github.com/gorilla/websocket"

"github.com/openimsdk/openim-sdk-core/v3/open_im_sdk_callback"
"github.com/openimsdk/openim-sdk-core/v3/pkg/ccontext"
"github.com/openimsdk/openim-sdk-core/v3/pkg/common"
"github.com/openimsdk/openim-sdk-core/v3/pkg/constant"
"github.com/openimsdk/openim-sdk-core/v3/pkg/sdkerrs"
"github.com/openimsdk/openim-sdk-core/v3/pkg/utils"
"github.com/openimsdk/openim-sdk-core/v3/sdk_struct"

"github.com/golang/protobuf/proto"
"github.com/gorilla/websocket"

"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/errs"
Expand Down Expand Up @@ -88,7 +87,6 @@ type LongConnMgr struct {
pushMsgAndMaxSeqCh chan common.Cmd2Value
conversationCh chan common.Cmd2Value
loginMgrCh chan common.Cmd2Value
heartbeatCh chan common.Cmd2Value
closedErr error
ctx context.Context
IsCompression bool
Expand All @@ -110,7 +108,7 @@ type Message struct {
Resp chan *GeneralWsResp
}

func NewLongConnMgr(ctx context.Context, listener open_im_sdk_callback.OnConnListener, userOnline func(map[string][]int32), heartbeatCmdCh, pushMsgAndMaxSeqCh, loginMgrCh chan common.Cmd2Value) *LongConnMgr {
func NewLongConnMgr(ctx context.Context, listener open_im_sdk_callback.OnConnListener, userOnline func(map[string][]int32), pushMsgAndMaxSeqCh, loginMgrCh chan common.Cmd2Value) *LongConnMgr {
l := &LongConnMgr{
listener: listener,
userOnline: userOnline,
Expand All @@ -127,7 +125,6 @@ func NewLongConnMgr(ctx context.Context, listener open_im_sdk_callback.OnConnLis
l.conn = NewWebSocket(WebSocket)
l.connWrite = new(sync.Mutex)
l.ctx = ctx
l.heartbeatCh = heartbeatCmdCh
return l
}
func (c *LongConnMgr) Run(ctx context.Context) {
Expand Down Expand Up @@ -318,8 +315,6 @@ func (c *LongConnMgr) heartbeat(ctx context.Context) {
case <-ctx.Done():
log.ZInfo(ctx, "heartbeat done sdk logout.....")
return
case <-c.heartbeatCh:
c.retrieveMaxSeq(ctx)
case <-ticker.C:
log.ZInfo(ctx, "sendPingMessage", "goroutine ID:", getGoroutineID())
c.sendPingMessage(ctx)
Expand Down Expand Up @@ -356,51 +351,6 @@ func getGoroutineID() int64 {
return id
}

func (c *LongConnMgr) retrieveMaxSeq(ctx context.Context) {
if c.conn == nil {
return
}
var m sdkws.GetMaxSeqReq
m.UserID = ccontext.Info(ctx).UserID()
opID := utils.OperationIDGenerator()
sCtx := ccontext.WithOperationID(c.ctx, opID)
log.ZInfo(sCtx, "retrieveMaxSeq start", "goroutine ID:", getGoroutineID())
data, err := proto.Marshal(&m)
if err != nil {
log.ZError(sCtx, "proto.Marshal", err)
return
}
req := &GeneralWsReq{
ReqIdentifier: constant.GetNewestSeq,
SendID: m.UserID,
OperationID: opID,
Data: data,
}
resp, err := c.sendAndWaitResp(req)
if err != nil {
log.ZError(sCtx, "sendAndWaitResp", err)
_ = c.close()
time.Sleep(time.Second * 1)
return
} else {
if resp.ErrCode != 0 {
log.ZError(sCtx, "retrieveMaxSeq failed", nil, "errCode:", resp.ErrCode, "errMsg:", resp.ErrMsg)
}
var wsSeqResp sdkws.GetMaxSeqResp
err = proto.Unmarshal(resp.Data, &wsSeqResp)
if err != nil {
log.ZError(sCtx, "proto.Unmarshal", err)
}
var cmd sdk_struct.CmdMaxSeqToMsgSync
cmd.ConversationMaxSeqOnSvr = wsSeqResp.MaxSeqs

err := common.TriggerCmdMaxSeq(sCtx, &cmd, c.pushMsgAndMaxSeqCh)
if err != nil {
log.ZError(sCtx, "TriggerCmdMaxSeq failed", err)
}
}
}

func (c *LongConnMgr) sendAndWaitResp(msg *GeneralWsReq) (*GeneralWsResp, error) {
tempChan, err := c.writeBinaryMsgAndRetry(msg)
defer c.Syncer.DelCh(msg.MsgIncr)
Expand Down
171 changes: 107 additions & 64 deletions internal/interaction/msg_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"runtime/debug"
"strings"
"sync"
"time"

"golang.org/x/sync/errgroup"

Expand All @@ -42,7 +43,8 @@ const (
pullMsgGoroutineLimit = 10
)

// The callback synchronization starts. The reconnection ends
// MsgSyncer is a central hub for message relay, responsible for sequential message gap pulling,
// handling network events, and managing app foreground and background events.
type MsgSyncer struct {
loginUserID string // login user ID
longConnMgr *LongConnMgr // long connection manager
Expand All @@ -54,6 +56,8 @@ type MsgSyncer struct {
syncTimes int // times of sync
ctx context.Context // context
reinstalled bool //true if the app was uninstalled and reinstalled
isSyncing bool // indicates whether data is being synced
isSyncingLock sync.Mutex // lock for syncing state

}

Expand Down Expand Up @@ -183,9 +187,19 @@ func (m *MsgSyncer) handlePushMsgAndEvent(cmd common.Cmd2Value) {
switch cmd.Cmd {
case constant.CmdConnSuccesss:
log.ZInfo(cmd.Ctx, "recv long conn mgr connected", "cmd", cmd.Cmd, "value", cmd.Value)
m.doConnected(cmd.Ctx)
case constant.CmdMaxSeq:
log.ZInfo(cmd.Ctx, "recv max seqs from long conn mgr, start sync msgs", "cmd", cmd.Cmd, "value", cmd.Value)
if m.startSync() {
m.doConnected(cmd.Ctx)
} else {
log.ZWarn(cmd.Ctx, "syncing, ignore connected event", nil, "cmd", cmd.Cmd, "value", cmd.Value)
}
case constant.CmdWakeUpDataSync:
log.ZInfo(cmd.Ctx, "app wake up, start sync msgs", "cmd", cmd.Cmd, "value", cmd.Value)
if m.startSync() {
m.doWakeupDataSync(cmd.Ctx)
} else {
log.ZWarn(cmd.Ctx, "syncing, ignore wake up event", nil, "cmd", cmd.Cmd, "value", cmd.Value)

}
m.compareSeqsAndBatchSync(cmd.Ctx, cmd.Value.(*sdk_struct.CmdMaxSeqToMsgSync).ConversationMaxSeqOnSvr, defaultPullNums)
case constant.CmdPushMsg:
m.doPushMsg(cmd.Ctx, cmd.Value.(*sdkws.PushMessages))
Expand All @@ -200,7 +214,9 @@ func (m *MsgSyncer) compareSeqsAndBatchSync(ctx context.Context, maxSeqToSync ma
messagesSeqMap := make(map[string]int64)
for conversationID, seq := range maxSeqToSync {
if IsNotification(conversationID) {
notificationsSeqMap[conversationID] = seq
if seq != 0 { // seq is 0, no need to sync
notificationsSeqMap[conversationID] = seq
}
} else {
messagesSeqMap[conversationID] = seq
}
Expand Down Expand Up @@ -243,13 +259,40 @@ func (m *MsgSyncer) compareSeqsAndBatchSync(ctx context.Context, maxSeqToSync ma
needSyncSeqMap[conversationID] = [2]int64{syncedMaxSeq + 1, maxSeq}
}
} else {
needSyncSeqMap[conversationID] = [2]int64{0, maxSeq}
if maxSeq != 0 { // seq is 0, no need to sync
needSyncSeqMap[conversationID] = [2]int64{0, maxSeq}
}
}
}
_ = m.syncAndTriggerMsgs(m.ctx, needSyncSeqMap, pullNums)
}
}

// startSync checks if the sync is already in progress.
// If syncing is in progress, it returns false. Otherwise, it starts syncing and returns true.
func (ms *MsgSyncer) startSync() bool {
ms.isSyncingLock.Lock()
defer ms.isSyncingLock.Unlock()

if ms.isSyncing {
// If already syncing, return false
return false
}

// Set syncing to true and start the sync
ms.isSyncing = true

// Create a goroutine that waits for 5 seconds and then sets isSyncing to false
go func() {
time.Sleep(5 * time.Second)
ms.isSyncingLock.Lock()
ms.isSyncing = false
ms.isSyncingLock.Unlock()
}()

return true
}

func (m *MsgSyncer) doPushMsg(ctx context.Context, push *sdkws.PushMessages) {
log.ZDebug(ctx, "push msgs", "push", push, "syncedMaxSeqs", m.syncedMaxSeqs)
m.pushTriggerAndSync(ctx, push.Msgs, m.triggerConversation)
Expand Down Expand Up @@ -308,83 +351,83 @@ func (m *MsgSyncer) doConnected(ctx context.Context) {
}
}

func (m *MsgSyncer) doWakeupDataSync(ctx context.Context) {
common.TriggerCmdSyncData(ctx, m.conversationCh)
var resp sdkws.GetMaxSeqResp
if err := m.longConnMgr.SendReqWaitResp(m.ctx, &sdkws.GetMaxSeqReq{UserID: m.loginUserID}, constant.GetNewestSeq, &resp); err != nil {
log.ZError(m.ctx, "get max seq error", err)
return
} else {
log.ZDebug(m.ctx, "get max seq success", "resp", resp.MaxSeqs)
}
m.compareSeqsAndBatchSync(ctx, resp.MaxSeqs, defaultPullNums)
}

func IsNotification(conversationID string) bool {
return strings.HasPrefix(conversationID, "n_")
}

// Fragment synchronization message, seq refresh after successful trigger
func (m *MsgSyncer) syncAndTriggerMsgs(ctx context.Context, seqMap map[string][2]int64, syncMsgNum int64) error {
if len(seqMap) > 0 {
log.ZDebug(ctx, "current sync seqMap", "seqMap", seqMap)
var (
tempSeqMap = make(map[string][2]int64, 50)
msgNum = 0
)
for k, v := range seqMap {
oneConversationSyncNum := v[1] - v[0] + 1
if (oneConversationSyncNum/SplitPullMsgNum) > 1 && IsNotification(k) {
nSeqMap := make(map[string][2]int64, 1)
count := int(oneConversationSyncNum / SplitPullMsgNum)
startSeq := v[0]
var end int64
for i := 0; i <= count; i++ {
if i == count {
nSeqMap[k] = [2]int64{startSeq, v[1]}
} else {
end = startSeq + int64(SplitPullMsgNum)
if end > v[1] {
end = v[1]
i = count
}
nSeqMap[k] = [2]int64{startSeq, end}
}
resp, err := m.pullMsgBySeqRange(ctx, nSeqMap, syncMsgNum)
if err != nil {
log.ZError(ctx, "syncMsgFromSvr err", err, "nSeqMap", nSeqMap)
return err
}
_ = m.triggerConversation(ctx, resp.Msgs)
_ = m.triggerNotification(ctx, resp.NotificationMsgs)
for conversationID, seqs := range nSeqMap {
m.syncedMaxSeqs[conversationID] = seqs[1]
}
startSeq = end + 1
}
continue
if len(seqMap) == 0 {
log.ZDebug(ctx, "nothing to sync", "syncMsgNum", syncMsgNum)
return nil
}

log.ZDebug(ctx, "current sync seqMap", "seqMap", seqMap)
var (
tempSeqMap = make(map[string][2]int64, 50)
msgNum = 0
)

for k, v := range seqMap {
oneConversationSyncNum := v[1] - v[0] + 1
tempSeqMap[k] = v
// For notification conversations, use oneConversationSyncNum directly
if IsNotification(k) {
msgNum += int(oneConversationSyncNum)
} else {
// For regular conversations, ensure msgNum is the minimum of oneConversationSyncNum and syncMsgNum
currentSyncMsgNum := int64(0)
if oneConversationSyncNum > syncMsgNum {
currentSyncMsgNum = syncMsgNum
} else {
currentSyncMsgNum = oneConversationSyncNum
}
tempSeqMap[k] = v
if oneConversationSyncNum > 0 {
msgNum += int(oneConversationSyncNum)
msgNum += int(currentSyncMsgNum)
}

// If accumulated msgNum reaches SplitPullMsgNum, trigger a batch pull
if msgNum >= SplitPullMsgNum {
resp, err := m.pullMsgBySeqRange(ctx, tempSeqMap, syncMsgNum)
if err != nil {
log.ZError(ctx, "syncMsgFromSvr error", err, "tempSeqMap", tempSeqMap)
return err
}
if msgNum >= SplitPullMsgNum {
resp, err := m.pullMsgBySeqRange(ctx, tempSeqMap, syncMsgNum)
if err != nil {
log.ZError(ctx, "syncMsgFromSvr err", err, "tempSeqMap", tempSeqMap)
return err
}
_ = m.triggerConversation(ctx, resp.Msgs)
_ = m.triggerNotification(ctx, resp.NotificationMsgs)
for conversationID, seqs := range tempSeqMap {
m.syncedMaxSeqs[conversationID] = seqs[1]
}
tempSeqMap = make(map[string][2]int64, 50)
msgNum = 0
_ = m.triggerConversation(ctx, resp.Msgs)
_ = m.triggerNotification(ctx, resp.NotificationMsgs)
for conversationID, seqs := range tempSeqMap {
m.syncedMaxSeqs[conversationID] = seqs[1]
}
// Reset tempSeqMap and msgNum to handle the next batch
tempSeqMap = make(map[string][2]int64, 50)
msgNum = 0
}
}

// Handle remaining messages to ensure all are synced
if len(tempSeqMap) > 0 {
resp, err := m.pullMsgBySeqRange(ctx, tempSeqMap, syncMsgNum)
if err != nil {
log.ZError(ctx, "syncMsgFromSvr err", err, "seqMap", seqMap)
log.ZError(ctx, "syncMsgFromSvr error", err, "tempSeqMap", tempSeqMap)
return err
}
_ = m.triggerConversation(ctx, resp.Msgs)
_ = m.triggerNotification(ctx, resp.NotificationMsgs)
for conversationID, seqs := range seqMap {
for conversationID, seqs := range tempSeqMap {
m.syncedMaxSeqs[conversationID] = seqs[1]
}
} else {
log.ZDebug(ctx, "noting conversation to sync", "syncMsgNum", syncMsgNum)
}

return nil
}

Expand Down
4 changes: 2 additions & 2 deletions internal/util/post.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ func ApiPost(ctx context.Context, api string, req, resp any) (err error) {
defer func(start time.Time) {
elapsed := time.Since(start).Milliseconds()
if err == nil {
log.ZDebug(ctx, "CallApi", "api", api, "state", "success", "cost time", fmt.Sprintf("%dms", elapsed))
log.ZDebug(ctx, "CallApi", "duration", fmt.Sprintf("%dms", elapsed), "api", api, "state", "success")
} else {
log.ZError(ctx, "CallApi", err, "api", api, "state", "failed", "cost time", fmt.Sprintf("%dms", elapsed))
log.ZError(ctx, "CallApi", err, "duration", fmt.Sprintf("%dms", elapsed), "api", api, "state", "failed")
}
}(time.Now())

Expand Down
2 changes: 1 addition & 1 deletion msgtest/module/msg_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func newUserCtx(userID, token string, imConfig sdk_struct.IMConfig) context.Cont
func NewUser(userID, token string, timeOffset int64, p *PressureTester, imConfig sdk_struct.IMConfig, opts ...func(core *SendMsgUser)) *SendMsgUser {
pushMsgAndMaxSeqCh := make(chan common.Cmd2Value, 1000)
ctx := newUserCtx(userID, token, imConfig)
longConnMgr := interaction.NewLongConnMgr(ctx, &ConnListener{}, func(m map[string][]int32) {}, nil, pushMsgAndMaxSeqCh, nil)
longConnMgr := interaction.NewLongConnMgr(ctx, &ConnListener{}, func(m map[string][]int32) {}, pushMsgAndMaxSeqCh, nil)
core := &SendMsgUser{
pushMsgAndMaxSeqCh: pushMsgAndMaxSeqCh,
longConnMgr: longConnMgr,
Expand Down
Loading

0 comments on commit c01611e

Please sign in to comment.