From 25227722b00a4da2129e886493e8fe9544f0daa8 Mon Sep 17 00:00:00 2001 From: OpenIM-Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Thu, 5 Sep 2024 15:18:04 +0800 Subject: [PATCH 1/2] refactor: performance optimization of pull messages. (#690) * fix: Bug fix for clearing unread messages. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: Bug fix for pull messages. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * refactor: performance optimization of pull messages. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * refactor: performance optimization of pull messages. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * refactor: performance optimization of pull messages. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * refactor: performance optimization of pull messages. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> --------- Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> --- internal/conversation_msg/message_check.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/conversation_msg/message_check.go b/internal/conversation_msg/message_check.go index 37afe2ddb..43b2bf796 100644 --- a/internal/conversation_msg/message_check.go +++ b/internal/conversation_msg/message_check.go @@ -184,6 +184,7 @@ func (c *Conversation) pullMessageAndReGetHistoryMessages(ctx context.Context, c var conversationSeqs msg.ConversationSeqs conversationSeqs.ConversationID = conversationID conversationSeqs.Seqs = newSeqList + getSeqMessageReq.Conversations = append(getSeqMessageReq.Conversations, &conversationSeqs) log.ZDebug(ctx, "conversation pull message, ", "req", getSeqMessageReq) if notStartTime && !c.LongConnMgr.IsConnected() { return From c01611eae5e69c1f6467ede0590efbc1886ddf58 Mon Sep 17 00:00:00 2001 From: OpenIM-Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Mon, 9 Sep 2024 17:30:20 +0800 Subject: [PATCH 2/2] refactor: performance optimization of pull messages and reduce redundant data synchronization. (#694) * fix: Bug fix for clearing unread messages. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: Bug fix for pull messages. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * refactor: performance optimization of pull messages. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * refactor: performance optimization of pull messages. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * refactor: performance optimization of pull messages. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * refactor: performance optimization of pull messages. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * refactor: performance optimization of pull messages and reduce redundant data synchronization. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> --------- Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> --- internal/conversation_msg/conversation_msg.go | 3 +- internal/interaction/long_conn_mgr.go | 58 +----- internal/interaction/msg_sync.go | 171 +++++++++++------- internal/util/post.go | 4 +- msgtest/module/msg_sender.go | 2 +- open_im_sdk/userRelated.go | 7 +- pkg/common/trigger_channel.go | 23 +-- pkg/constant/constant.go | 24 +-- 8 files changed, 137 insertions(+), 155 deletions(-) diff --git a/internal/conversation_msg/conversation_msg.go b/internal/conversation_msg/conversation_msg.go index c5e0210cf..3b455e7ff 100644 --- a/internal/conversation_msg/conversation_msg.go +++ b/internal/conversation_msg/conversation_msg.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "math" "sync" @@ -470,7 +471,7 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) { } } - log.ZDebug(ctx, "insert msg", "cost time", time.Since(b).Seconds(), "len", len(allMsg)) + log.ZDebug(ctx, "insert msg", "duration", fmt.Sprintf("%dms", time.Since(b)), "len", len(allMsg)) } func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) { diff --git a/internal/interaction/long_conn_mgr.go b/internal/interaction/long_conn_mgr.go index 302b5df6f..4f82f3a9c 100644 --- a/internal/interaction/long_conn_mgr.go +++ b/internal/interaction/long_conn_mgr.go @@ -27,16 +27,15 @@ import ( "sync" "time" + "github.com/golang/protobuf/proto" + "github.com/gorilla/websocket" + "github.com/openimsdk/openim-sdk-core/v3/open_im_sdk_callback" "github.com/openimsdk/openim-sdk-core/v3/pkg/ccontext" "github.com/openimsdk/openim-sdk-core/v3/pkg/common" "github.com/openimsdk/openim-sdk-core/v3/pkg/constant" "github.com/openimsdk/openim-sdk-core/v3/pkg/sdkerrs" "github.com/openimsdk/openim-sdk-core/v3/pkg/utils" - "github.com/openimsdk/openim-sdk-core/v3/sdk_struct" - - "github.com/golang/protobuf/proto" - "github.com/gorilla/websocket" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/errs" @@ -88,7 +87,6 @@ type LongConnMgr struct { pushMsgAndMaxSeqCh chan common.Cmd2Value conversationCh chan common.Cmd2Value loginMgrCh chan common.Cmd2Value - heartbeatCh chan common.Cmd2Value closedErr error ctx context.Context IsCompression bool @@ -110,7 +108,7 @@ type Message struct { Resp chan *GeneralWsResp } -func NewLongConnMgr(ctx context.Context, listener open_im_sdk_callback.OnConnListener, userOnline func(map[string][]int32), heartbeatCmdCh, pushMsgAndMaxSeqCh, loginMgrCh chan common.Cmd2Value) *LongConnMgr { +func NewLongConnMgr(ctx context.Context, listener open_im_sdk_callback.OnConnListener, userOnline func(map[string][]int32), pushMsgAndMaxSeqCh, loginMgrCh chan common.Cmd2Value) *LongConnMgr { l := &LongConnMgr{ listener: listener, userOnline: userOnline, @@ -127,7 +125,6 @@ func NewLongConnMgr(ctx context.Context, listener open_im_sdk_callback.OnConnLis l.conn = NewWebSocket(WebSocket) l.connWrite = new(sync.Mutex) l.ctx = ctx - l.heartbeatCh = heartbeatCmdCh return l } func (c *LongConnMgr) Run(ctx context.Context) { @@ -318,8 +315,6 @@ func (c *LongConnMgr) heartbeat(ctx context.Context) { case <-ctx.Done(): log.ZInfo(ctx, "heartbeat done sdk logout.....") return - case <-c.heartbeatCh: - c.retrieveMaxSeq(ctx) case <-ticker.C: log.ZInfo(ctx, "sendPingMessage", "goroutine ID:", getGoroutineID()) c.sendPingMessage(ctx) @@ -356,51 +351,6 @@ func getGoroutineID() int64 { return id } -func (c *LongConnMgr) retrieveMaxSeq(ctx context.Context) { - if c.conn == nil { - return - } - var m sdkws.GetMaxSeqReq - m.UserID = ccontext.Info(ctx).UserID() - opID := utils.OperationIDGenerator() - sCtx := ccontext.WithOperationID(c.ctx, opID) - log.ZInfo(sCtx, "retrieveMaxSeq start", "goroutine ID:", getGoroutineID()) - data, err := proto.Marshal(&m) - if err != nil { - log.ZError(sCtx, "proto.Marshal", err) - return - } - req := &GeneralWsReq{ - ReqIdentifier: constant.GetNewestSeq, - SendID: m.UserID, - OperationID: opID, - Data: data, - } - resp, err := c.sendAndWaitResp(req) - if err != nil { - log.ZError(sCtx, "sendAndWaitResp", err) - _ = c.close() - time.Sleep(time.Second * 1) - return - } else { - if resp.ErrCode != 0 { - log.ZError(sCtx, "retrieveMaxSeq failed", nil, "errCode:", resp.ErrCode, "errMsg:", resp.ErrMsg) - } - var wsSeqResp sdkws.GetMaxSeqResp - err = proto.Unmarshal(resp.Data, &wsSeqResp) - if err != nil { - log.ZError(sCtx, "proto.Unmarshal", err) - } - var cmd sdk_struct.CmdMaxSeqToMsgSync - cmd.ConversationMaxSeqOnSvr = wsSeqResp.MaxSeqs - - err := common.TriggerCmdMaxSeq(sCtx, &cmd, c.pushMsgAndMaxSeqCh) - if err != nil { - log.ZError(sCtx, "TriggerCmdMaxSeq failed", err) - } - } -} - func (c *LongConnMgr) sendAndWaitResp(msg *GeneralWsReq) (*GeneralWsResp, error) { tempChan, err := c.writeBinaryMsgAndRetry(msg) defer c.Syncer.DelCh(msg.MsgIncr) diff --git a/internal/interaction/msg_sync.go b/internal/interaction/msg_sync.go index ad9703f3f..a3c9f8214 100644 --- a/internal/interaction/msg_sync.go +++ b/internal/interaction/msg_sync.go @@ -20,6 +20,7 @@ import ( "runtime/debug" "strings" "sync" + "time" "golang.org/x/sync/errgroup" @@ -42,7 +43,8 @@ const ( pullMsgGoroutineLimit = 10 ) -// The callback synchronization starts. The reconnection ends +// MsgSyncer is a central hub for message relay, responsible for sequential message gap pulling, +// handling network events, and managing app foreground and background events. type MsgSyncer struct { loginUserID string // login user ID longConnMgr *LongConnMgr // long connection manager @@ -54,6 +56,8 @@ type MsgSyncer struct { syncTimes int // times of sync ctx context.Context // context reinstalled bool //true if the app was uninstalled and reinstalled + isSyncing bool // indicates whether data is being synced + isSyncingLock sync.Mutex // lock for syncing state } @@ -183,9 +187,19 @@ func (m *MsgSyncer) handlePushMsgAndEvent(cmd common.Cmd2Value) { switch cmd.Cmd { case constant.CmdConnSuccesss: log.ZInfo(cmd.Ctx, "recv long conn mgr connected", "cmd", cmd.Cmd, "value", cmd.Value) - m.doConnected(cmd.Ctx) - case constant.CmdMaxSeq: - log.ZInfo(cmd.Ctx, "recv max seqs from long conn mgr, start sync msgs", "cmd", cmd.Cmd, "value", cmd.Value) + if m.startSync() { + m.doConnected(cmd.Ctx) + } else { + log.ZWarn(cmd.Ctx, "syncing, ignore connected event", nil, "cmd", cmd.Cmd, "value", cmd.Value) + } + case constant.CmdWakeUpDataSync: + log.ZInfo(cmd.Ctx, "app wake up, start sync msgs", "cmd", cmd.Cmd, "value", cmd.Value) + if m.startSync() { + m.doWakeupDataSync(cmd.Ctx) + } else { + log.ZWarn(cmd.Ctx, "syncing, ignore wake up event", nil, "cmd", cmd.Cmd, "value", cmd.Value) + + } m.compareSeqsAndBatchSync(cmd.Ctx, cmd.Value.(*sdk_struct.CmdMaxSeqToMsgSync).ConversationMaxSeqOnSvr, defaultPullNums) case constant.CmdPushMsg: m.doPushMsg(cmd.Ctx, cmd.Value.(*sdkws.PushMessages)) @@ -200,7 +214,9 @@ func (m *MsgSyncer) compareSeqsAndBatchSync(ctx context.Context, maxSeqToSync ma messagesSeqMap := make(map[string]int64) for conversationID, seq := range maxSeqToSync { if IsNotification(conversationID) { - notificationsSeqMap[conversationID] = seq + if seq != 0 { // seq is 0, no need to sync + notificationsSeqMap[conversationID] = seq + } } else { messagesSeqMap[conversationID] = seq } @@ -243,13 +259,40 @@ func (m *MsgSyncer) compareSeqsAndBatchSync(ctx context.Context, maxSeqToSync ma needSyncSeqMap[conversationID] = [2]int64{syncedMaxSeq + 1, maxSeq} } } else { - needSyncSeqMap[conversationID] = [2]int64{0, maxSeq} + if maxSeq != 0 { // seq is 0, no need to sync + needSyncSeqMap[conversationID] = [2]int64{0, maxSeq} + } } } _ = m.syncAndTriggerMsgs(m.ctx, needSyncSeqMap, pullNums) } } +// startSync checks if the sync is already in progress. +// If syncing is in progress, it returns false. Otherwise, it starts syncing and returns true. +func (ms *MsgSyncer) startSync() bool { + ms.isSyncingLock.Lock() + defer ms.isSyncingLock.Unlock() + + if ms.isSyncing { + // If already syncing, return false + return false + } + + // Set syncing to true and start the sync + ms.isSyncing = true + + // Create a goroutine that waits for 5 seconds and then sets isSyncing to false + go func() { + time.Sleep(5 * time.Second) + ms.isSyncingLock.Lock() + ms.isSyncing = false + ms.isSyncingLock.Unlock() + }() + + return true +} + func (m *MsgSyncer) doPushMsg(ctx context.Context, push *sdkws.PushMessages) { log.ZDebug(ctx, "push msgs", "push", push, "syncedMaxSeqs", m.syncedMaxSeqs) m.pushTriggerAndSync(ctx, push.Msgs, m.triggerConversation) @@ -308,83 +351,83 @@ func (m *MsgSyncer) doConnected(ctx context.Context) { } } +func (m *MsgSyncer) doWakeupDataSync(ctx context.Context) { + common.TriggerCmdSyncData(ctx, m.conversationCh) + var resp sdkws.GetMaxSeqResp + if err := m.longConnMgr.SendReqWaitResp(m.ctx, &sdkws.GetMaxSeqReq{UserID: m.loginUserID}, constant.GetNewestSeq, &resp); err != nil { + log.ZError(m.ctx, "get max seq error", err) + return + } else { + log.ZDebug(m.ctx, "get max seq success", "resp", resp.MaxSeqs) + } + m.compareSeqsAndBatchSync(ctx, resp.MaxSeqs, defaultPullNums) +} + func IsNotification(conversationID string) bool { return strings.HasPrefix(conversationID, "n_") } -// Fragment synchronization message, seq refresh after successful trigger func (m *MsgSyncer) syncAndTriggerMsgs(ctx context.Context, seqMap map[string][2]int64, syncMsgNum int64) error { - if len(seqMap) > 0 { - log.ZDebug(ctx, "current sync seqMap", "seqMap", seqMap) - var ( - tempSeqMap = make(map[string][2]int64, 50) - msgNum = 0 - ) - for k, v := range seqMap { - oneConversationSyncNum := v[1] - v[0] + 1 - if (oneConversationSyncNum/SplitPullMsgNum) > 1 && IsNotification(k) { - nSeqMap := make(map[string][2]int64, 1) - count := int(oneConversationSyncNum / SplitPullMsgNum) - startSeq := v[0] - var end int64 - for i := 0; i <= count; i++ { - if i == count { - nSeqMap[k] = [2]int64{startSeq, v[1]} - } else { - end = startSeq + int64(SplitPullMsgNum) - if end > v[1] { - end = v[1] - i = count - } - nSeqMap[k] = [2]int64{startSeq, end} - } - resp, err := m.pullMsgBySeqRange(ctx, nSeqMap, syncMsgNum) - if err != nil { - log.ZError(ctx, "syncMsgFromSvr err", err, "nSeqMap", nSeqMap) - return err - } - _ = m.triggerConversation(ctx, resp.Msgs) - _ = m.triggerNotification(ctx, resp.NotificationMsgs) - for conversationID, seqs := range nSeqMap { - m.syncedMaxSeqs[conversationID] = seqs[1] - } - startSeq = end + 1 - } - continue + if len(seqMap) == 0 { + log.ZDebug(ctx, "nothing to sync", "syncMsgNum", syncMsgNum) + return nil + } + + log.ZDebug(ctx, "current sync seqMap", "seqMap", seqMap) + var ( + tempSeqMap = make(map[string][2]int64, 50) + msgNum = 0 + ) + + for k, v := range seqMap { + oneConversationSyncNum := v[1] - v[0] + 1 + tempSeqMap[k] = v + // For notification conversations, use oneConversationSyncNum directly + if IsNotification(k) { + msgNum += int(oneConversationSyncNum) + } else { + // For regular conversations, ensure msgNum is the minimum of oneConversationSyncNum and syncMsgNum + currentSyncMsgNum := int64(0) + if oneConversationSyncNum > syncMsgNum { + currentSyncMsgNum = syncMsgNum + } else { + currentSyncMsgNum = oneConversationSyncNum } - tempSeqMap[k] = v - if oneConversationSyncNum > 0 { - msgNum += int(oneConversationSyncNum) + msgNum += int(currentSyncMsgNum) + } + + // If accumulated msgNum reaches SplitPullMsgNum, trigger a batch pull + if msgNum >= SplitPullMsgNum { + resp, err := m.pullMsgBySeqRange(ctx, tempSeqMap, syncMsgNum) + if err != nil { + log.ZError(ctx, "syncMsgFromSvr error", err, "tempSeqMap", tempSeqMap) + return err } - if msgNum >= SplitPullMsgNum { - resp, err := m.pullMsgBySeqRange(ctx, tempSeqMap, syncMsgNum) - if err != nil { - log.ZError(ctx, "syncMsgFromSvr err", err, "tempSeqMap", tempSeqMap) - return err - } - _ = m.triggerConversation(ctx, resp.Msgs) - _ = m.triggerNotification(ctx, resp.NotificationMsgs) - for conversationID, seqs := range tempSeqMap { - m.syncedMaxSeqs[conversationID] = seqs[1] - } - tempSeqMap = make(map[string][2]int64, 50) - msgNum = 0 + _ = m.triggerConversation(ctx, resp.Msgs) + _ = m.triggerNotification(ctx, resp.NotificationMsgs) + for conversationID, seqs := range tempSeqMap { + m.syncedMaxSeqs[conversationID] = seqs[1] } + // Reset tempSeqMap and msgNum to handle the next batch + tempSeqMap = make(map[string][2]int64, 50) + msgNum = 0 } + } + // Handle remaining messages to ensure all are synced + if len(tempSeqMap) > 0 { resp, err := m.pullMsgBySeqRange(ctx, tempSeqMap, syncMsgNum) if err != nil { - log.ZError(ctx, "syncMsgFromSvr err", err, "seqMap", seqMap) + log.ZError(ctx, "syncMsgFromSvr error", err, "tempSeqMap", tempSeqMap) return err } _ = m.triggerConversation(ctx, resp.Msgs) _ = m.triggerNotification(ctx, resp.NotificationMsgs) - for conversationID, seqs := range seqMap { + for conversationID, seqs := range tempSeqMap { m.syncedMaxSeqs[conversationID] = seqs[1] } - } else { - log.ZDebug(ctx, "noting conversation to sync", "syncMsgNum", syncMsgNum) } + return nil } diff --git a/internal/util/post.go b/internal/util/post.go index dbfe2e168..151a5ef30 100644 --- a/internal/util/post.go +++ b/internal/util/post.go @@ -66,9 +66,9 @@ func ApiPost(ctx context.Context, api string, req, resp any) (err error) { defer func(start time.Time) { elapsed := time.Since(start).Milliseconds() if err == nil { - log.ZDebug(ctx, "CallApi", "api", api, "state", "success", "cost time", fmt.Sprintf("%dms", elapsed)) + log.ZDebug(ctx, "CallApi", "duration", fmt.Sprintf("%dms", elapsed), "api", api, "state", "success") } else { - log.ZError(ctx, "CallApi", err, "api", api, "state", "failed", "cost time", fmt.Sprintf("%dms", elapsed)) + log.ZError(ctx, "CallApi", err, "duration", fmt.Sprintf("%dms", elapsed), "api", api, "state", "failed") } }(time.Now()) diff --git a/msgtest/module/msg_sender.go b/msgtest/module/msg_sender.go index fcb9c87c0..b74098529 100644 --- a/msgtest/module/msg_sender.go +++ b/msgtest/module/msg_sender.go @@ -129,7 +129,7 @@ func newUserCtx(userID, token string, imConfig sdk_struct.IMConfig) context.Cont func NewUser(userID, token string, timeOffset int64, p *PressureTester, imConfig sdk_struct.IMConfig, opts ...func(core *SendMsgUser)) *SendMsgUser { pushMsgAndMaxSeqCh := make(chan common.Cmd2Value, 1000) ctx := newUserCtx(userID, token, imConfig) - longConnMgr := interaction.NewLongConnMgr(ctx, &ConnListener{}, func(m map[string][]int32) {}, nil, pushMsgAndMaxSeqCh, nil) + longConnMgr := interaction.NewLongConnMgr(ctx, &ConnListener{}, func(m map[string][]int32) {}, pushMsgAndMaxSeqCh, nil) core := &SendMsgUser{ pushMsgAndMaxSeqCh: pushMsgAndMaxSeqCh, longConnMgr: longConnMgr, diff --git a/open_im_sdk/userRelated.go b/open_im_sdk/userRelated.go index 6f21edf42..99e84ebb0 100644 --- a/open_im_sdk/userRelated.go +++ b/open_im_sdk/userRelated.go @@ -118,7 +118,6 @@ type LoginMgr struct { conversationCh chan common.Cmd2Value cmdWsCh chan common.Cmd2Value - heartbeatCmdCh chan common.Cmd2Value pushMsgAndMaxSeqCh chan common.Cmd2Value loginMgrCh chan common.Cmd2Value @@ -439,10 +438,9 @@ func (u *LoginMgr) initResources() { convChanLen = 1000 } u.conversationCh = make(chan common.Cmd2Value, convChanLen) - u.heartbeatCmdCh = make(chan common.Cmd2Value, 10) u.pushMsgAndMaxSeqCh = make(chan common.Cmd2Value, 1000) u.loginMgrCh = make(chan common.Cmd2Value, 1) - u.longConnMgr = interaction.NewLongConnMgr(u.ctx, u.connListener, u.userOnlineStatusChange, u.heartbeatCmdCh, u.pushMsgAndMaxSeqCh, u.loginMgrCh) + u.longConnMgr = interaction.NewLongConnMgr(u.ctx, u.connListener, u.userOnlineStatusChange, u.pushMsgAndMaxSeqCh, u.loginMgrCh) u.ctx = ccontext.WithApiErrCode(u.ctx, &apiErrCallback{loginMgrCh: u.loginMgrCh, listener: u.connListener}) u.setLoginStatus(LogoutStatus) } @@ -499,8 +497,7 @@ func (u *LoginMgr) setAppBackgroundStatus(ctx context.Context, isBackground bool } else { u.longConnMgr.SetBackground(isBackground) if !isBackground { - _ = common.TriggerCmdWakeUp(u.heartbeatCmdCh) - _ = common.TriggerCmdSyncData(ctx, u.conversationCh) + _ = common.TriggerCmdWakeUpDataSync(u.pushMsgAndMaxSeqCh) } return nil diff --git a/pkg/common/trigger_channel.go b/pkg/common/trigger_channel.go index daaedeb5e..01d2d73e8 100644 --- a/pkg/common/trigger_channel.go +++ b/pkg/common/trigger_channel.go @@ -70,20 +70,20 @@ func TriggerCmdSyncFlag(ctx context.Context, syncFlag int, conversationCh chan C } } -func TriggerCmdWakeUp(ch chan Cmd2Value) error { +func TriggerCmdWakeUpDataSync(ch chan Cmd2Value) error { if ch == nil { return errs.Wrap(ErrChanNil) } - c2v := Cmd2Value{Cmd: constant.CmdWakeUp, Value: nil} + c2v := Cmd2Value{Cmd: constant.CmdWakeUpDataSync, Value: nil} return sendCmd(ch, c2v, timeOut) } -func TriggerCmdSyncData(ctx context.Context, ch chan Cmd2Value) error { - if ch == nil { - return errs.Wrap(ErrChanNil) - } +func TriggerCmdSyncData(ctx context.Context, ch chan Cmd2Value) { c2v := Cmd2Value{Cmd: constant.CmdSyncData, Value: nil, Ctx: ctx} - return sendCmd(ch, c2v, timeOut) + err := sendCmd(ch, c2v, timeOut) + if err != nil { + log.ZWarn(ctx, "TriggerCmdSyncData error", err) + } } func TriggerCmdSyncReactionExtensions(node SyncReactionExtensionsNode, conversationCh chan Cmd2Value) error { @@ -128,15 +128,6 @@ func TriggerCmdPushMsg(ctx context.Context, msg *sdkws.PushMessages, ch chan Cmd return sendCmd(ch, c2v, timeOut) } -// seq trigger -func TriggerCmdMaxSeq(ctx context.Context, seq *sdk_struct.CmdMaxSeqToMsgSync, ch chan Cmd2Value) error { - if ch == nil { - return errs.Wrap(ErrChanNil) - } - c2v := Cmd2Value{Cmd: constant.CmdMaxSeq, Value: seq, Ctx: ctx} - return sendCmd(ch, c2v, timeOut) -} - func TriggerCmdLogOut(ctx context.Context, ch chan Cmd2Value) error { if ch == nil { return errs.Wrap(ErrChanNil) diff --git a/pkg/constant/constant.go b/pkg/constant/constant.go index d916241a8..3c705c2c2 100644 --- a/pkg/constant/constant.go +++ b/pkg/constant/constant.go @@ -15,13 +15,13 @@ package constant const ( - CmdSyncData = "001" - CmdSyncFlag = "002" - CmdNotification = "003" - CmdMsgSyncInReinstall = "004" - CmdNewMsgCome = "005" + CmdSyncData = "syncData" + CmdSyncFlag = "syncFlag" + CmdNotification = "notification" + CmdMsgSyncInReinstall = "msgSyncInReinstall" + CmdNewMsgCome = "newMsgCome" CmdSuperGroupMsgCome = "006" - CmdUpdateConversation = "007" + CmdUpdateConversation = "updateConversation" CmSyncReactionExtensions = "008" CmdFroceSyncBlackList = "009" CmdForceSyncFriendApplication = "010" @@ -34,16 +34,16 @@ const ( CmdAddFriend = "017" CmdJoinedSuperGroup = "018" - CmdUpdateMessage = "019" + CmdUpdateMessage = "updateMessage" CmdReconnect = "020" CmdInit = "021" - CmdMaxSeq = "maxSeq" - CmdPushMsg = "pushMsg" - CmdConnSuccesss = "connSuccess" - CmdWakeUp = "wakeUp" - CmdLogOut = "loginOut" + CmdMaxSeq = "maxSeq" + CmdPushMsg = "pushMsg" + CmdConnSuccesss = "connSuccess" + CmdWakeUpDataSync = "wakeUpDataSync" + CmdLogOut = "loginOut" ) const (