Skip to content

Commit

Permalink
Merge pull request #10 from mo3et/msg-sync
Browse files Browse the repository at this point in the history
fix: optimize large sessions blocking in login
  • Loading branch information
FGadvancer committed Jun 28, 2024
2 parents 4715ac6 + 119ae40 commit ac6263b
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 25 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,9 @@ else
endif

# Copy githook scripts when execute makefile
# NEED Remove. DON'T REJECT!!
# TODO! GIT_FILE_SIZE_LIMIT=42000000 git commit -m "This commit is allowed file sizes up to 42MB"
COPY_GITHOOK:=$(shell cp -f scripts/githooks/* .git/hooks/; chmod +x .git/hooks/*)
# COPY_GITHOOK:=$(shell cp -f scripts/githooks/* .git/hooks/; chmod +x .git/hooks/*)

# Linux command settings
FIND := find . ! -path './image/*' ! -path './vendor/*' ! -path './bin/*'
Expand Down
File renamed without changes.
1 change: 1 addition & 0 deletions internal/conversation_msg/message_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (m *MessageController) BatchUpdateMessageList(ctx context.Context, updateMs
latestMsg.ServerMsgID = v.ServerMsgID
latestMsg.Seq = v.Seq
latestMsg.SendTime = v.SendTime
latestMsg.Status = v.Status
conversation.LatestMsg = utils.StructToJsonString(latestMsg)
_ = common.TriggerCmdUpdateConversation(ctx, common.UpdateConNode{ConID: conversation.ConversationID,
Action: constant.AddConOrUpLatMsg, Args: *conversation}, m.ch)
Expand Down
21 changes: 20 additions & 1 deletion internal/conversation_msg/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -1034,7 +1034,26 @@ func (c *Conversation) SearchLocalMessages(ctx context.Context, searchParam *sdk

}
func (c *Conversation) SetMessageLocalEx(ctx context.Context, conversationID string, clientMsgID string, localEx string) error {
return c.db.UpdateColumnsMessage(ctx, conversationID, clientMsgID, map[string]interface{}{"local_ex": localEx})
err := c.db.UpdateColumnsMessage(ctx, conversationID, clientMsgID, map[string]interface{}{"local_ex": localEx})
if err != nil {
return err
}
conversation, err := c.db.GetConversation(ctx, conversationID)
if err != nil {
return err
}
var latestMsg sdk_struct.MsgStruct
utils.JsonStringToStruct(conversation.LatestMsg, &latestMsg)
if latestMsg.ClientMsgID == clientMsgID {
log.ZDebug(ctx, "latestMsg local ex changed", "seq", latestMsg.Seq, "clientMsgID", latestMsg.ClientMsgID)
latestMsg.LocalEx = localEx
latestMsgStr := utils.StructToJsonString(latestMsg)
if err = c.db.UpdateColumnsConversation(ctx, conversationID, map[string]interface{}{"latest_msg": latestMsgStr, "latest_msg_send_time": latestMsg.SendTime}); err != nil {
return err
}
c.doUpdateConversation(common.Cmd2Value{Value: common.UpdateConNode{Action: constant.ConChange, Args: []string{conversationID}}})
}
return nil
}

func (c *Conversation) initBasicInfo(ctx context.Context, message *sdk_struct.MsgStruct, msgFrom, contentType int32) error {
Expand Down
53 changes: 46 additions & 7 deletions internal/interaction/msg_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ package interaction

import (
"context"
"github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct"
"strings"
"sync"
"time"

"github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct"

"github.com/openimsdk/openim-sdk-core/v3/pkg/common"
"github.com/openimsdk/openim-sdk-core/v3/pkg/constant"
Expand Down Expand Up @@ -78,15 +81,51 @@ func (m *MsgSyncer) loadSeq(ctx context.Context) error {
if len(conversationIDList) == 0 {
m.reinstalled = true
}
//TODO With a large number of sessions, this could potentially cause blocking and needs optimization.

// TODO With a large number of sessions, this could potentially cause blocking and needs optimization.
type SyncedSeq struct {
ConversationID string
MaxSyncedSeq int64
Err error
}
concurrency := 10
t2 := time.Now()
SyncedSeqs := make(chan SyncedSeq, len(conversationIDList))
sem := make(chan struct{}, concurrency)

var wg sync.WaitGroup
for _, v := range conversationIDList {
maxSyncedSeq, err := m.db.GetConversationNormalMsgSeq(ctx, v)
if err != nil {
log.ZError(ctx, "get group normal seq failed", err, "conversationID", v)
wg.Add(1)
sem <- struct{}{} // Acquire a token
go func(conversationID string) {
defer wg.Done()
defer func() { <-sem }() // Release the token

maxSyncedSeq, err := m.db.GetConversationNormalMsgSeq(ctx, conversationID)
SyncedSeqs <- SyncedSeq{
ConversationID: conversationID,
MaxSyncedSeq: maxSyncedSeq,
Err: err,
}
}(v)
log.ZDebug(ctx, "goroutine done.", "goroutine cost time", time.Since(t2))
}

// Close the results channel once all goroutines have finished
go func() {
wg.Wait()
close(SyncedSeqs)
}()

// Collect the results
for res := range SyncedSeqs {
if res.Err != nil {
log.ZError(ctx, "get group normal seq failed", res.Err, "conversationID", res.ConversationID)
} else {
m.syncedMaxSeqs[v] = maxSyncedSeq
m.syncedMaxSeqs[res.ConversationID] = res.MaxSyncedSeq
}
}

notificationSeqs, err := m.db.GetNotificationAllSeqs(ctx)
if err != nil {
log.ZError(ctx, "get notification seq failed", err)
Expand Down Expand Up @@ -246,7 +285,7 @@ func (m *MsgSyncer) doConnected(ctx context.Context) {
common.TriggerCmdNotification(m.ctx, sdk_struct.CmdNewMsgComeToConversation{SyncFlag: constant.MsgSyncFailed}, m.conversationCh)
return
} else {
log.ZDebug(m.ctx, "get max seq success", "resp", resp)
log.ZDebug(m.ctx, "get max seq success", "resp", resp.MaxSeqs)
}
m.compareSeqsAndBatchSync(ctx, resp.MaxSeqs, connectPullNums)
if reinstalled {
Expand Down
22 changes: 6 additions & 16 deletions testv2/conversation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,22 +216,12 @@ func Test_SearchLocalMessages(t *testing.T) {
}
}

// // delete
// funcation Test_DeleteMessageFromLocalStorage(t *testing.T) {
// err := open_im_sdk.UserForSDK.Conversation().DeleteMessageFromLocalStorage(ctx, &sdk_struct.MsgStruct{SessionType: 1, ContentType: 1203,
// ClientMsgID: "ef02943b05b02d02f92b0e92516099a3", Seq: 31, SendID: "kernaltestuid8", RecvID: "kernaltestuid9"})
// if err != nil {
// t.Fatal(err)
// }
// }
//
// funcation Test_DeleteMessage(t *testing.T) {
// err := open_im_sdk.UserForSDK.Conversation().DeleteMessage(ctx, &sdk_struct.MsgStruct{SessionType: 1, ContentType: 1203,
// ClientMsgID: "ef02943b05b02d02f92b0e92516099a3", Seq: 31, SendID: "kernaltestuid8", RecvID: "kernaltestuid9"})
// if err != nil {
// t.Fatal(err)
// }
// }
func Test_SetMessageLocalEx(t *testing.T) {
err := open_im_sdk.UserForSDK.Conversation().SetMessageLocalEx(ctx, "si_2975755104_6386894923", "53ca4b3be29f7ea231a5e82e7af8a43f", "{key,value}")
if err != nil {
t.Fatal(err)
}
}

func Test_DeleteAllMsgFromLocalAndSvr(t *testing.T) {
err := open_im_sdk.UserForSDK.Conversation().DeleteAllMsgFromLocalAndSvr(ctx)
Expand Down

0 comments on commit ac6263b

Please sign in to comment.