Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove the conversation trigger to prevent waiting in a circular… #663

Merged
merged 1 commit into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 74 additions & 4 deletions internal/conversation_msg/conversation_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ import (
"context"
"encoding/json"
"errors"
"github.com/openimsdk/tools/utils/stringutil"
"math"
"sync"

"github.com/openimsdk/tools/utils/stringutil"

"github.com/openimsdk/openim-sdk-core/v3/internal/business"
"github.com/openimsdk/openim-sdk-core/v3/internal/cache"
"github.com/openimsdk/openim-sdk-core/v3/internal/file"
Expand Down Expand Up @@ -380,12 +381,12 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
"changedConversations", string(stringutil.StructToJsonBytes(conversationChangedSet)))

//seq sync message update
if err := c.messageController.BatchUpdateMessageList(ctx, updateMsg); err != nil {
if err := c.batchUpdateMessageList(ctx, updateMsg); err != nil {
log.ZError(ctx, "sync seq normal message err :", err)
}

//Normal message storage
_ = c.messageController.BatchInsertMessageList(ctx, insertMsg)
_ = c.batchInsertMessageList(ctx, insertMsg)

hList, _ := c.db.GetHiddenConversationList(ctx)
for _, v := range hList {
Expand Down Expand Up @@ -531,7 +532,7 @@ func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) {
}

// message storage
_ = c.messageController.BatchInsertMessageList(ctx, insertMsg)
_ = c.batchInsertMessageList(ctx, insertMsg)

// conversation storage
if err := c.db.BatchUpdateConversationList(ctx, conversationList); err != nil {
Expand Down Expand Up @@ -625,6 +626,75 @@ func (c *Conversation) tempCacheChatLog(ctx context.Context, messageList []*sdk_
}
}

func (c *Conversation) batchUpdateMessageList(ctx context.Context, updateMsg map[string][]*model_struct.LocalChatLog) error {
if updateMsg == nil {
return nil
}
for conversationID, messages := range updateMsg {
conversation, err := c.db.GetConversation(ctx, conversationID)
if err != nil {
log.ZError(ctx, "GetConversation err", err, "conversationID", conversationID)
continue
}
latestMsg := &sdk_struct.MsgStruct{}
if err := json.Unmarshal([]byte(conversation.LatestMsg), latestMsg); err != nil {
log.ZError(ctx, "Unmarshal err", err, "conversationID",
conversationID, "latestMsg", conversation.LatestMsg, "messages", messages)
continue
}
for _, v := range messages {
v1 := new(model_struct.LocalChatLog)
v1.ClientMsgID = v.ClientMsgID
v1.Seq = v.Seq
v1.Status = v.Status
v1.RecvID = v.RecvID
v1.SessionType = v.SessionType
v1.ServerMsgID = v.ServerMsgID
v1.SendTime = v.SendTime
err := c.db.UpdateMessage(ctx, conversationID, v1)
if err != nil {
return utils.Wrap(err, "BatchUpdateMessageList failed")
}
if latestMsg.ClientMsgID == v.ClientMsgID {
latestMsg.ServerMsgID = v.ServerMsgID
latestMsg.Seq = v.Seq
latestMsg.SendTime = v.SendTime
latestMsg.Status = v.Status
conversation.LatestMsg = utils.StructToJsonString(latestMsg)

c.doUpdateConversation(common.Cmd2Value{Value: common.UpdateConNode{ConID: conversation.ConversationID,
Action: constant.AddConOrUpLatMsg, Args: *conversation}})

}
}

}
return nil
}

func (c *Conversation) batchInsertMessageList(ctx context.Context, insertMsg map[string][]*model_struct.LocalChatLog) error {
if insertMsg == nil {
return nil
}
for conversationID, messages := range insertMsg {
if len(messages) == 0 {
continue
}
err := c.db.BatchInsertMessageList(ctx, conversationID, messages)
if err != nil {
log.ZError(ctx, "insert GetMessage detail err:", err, "conversationID", conversationID, "messages", messages)
for _, v := range messages {
e := c.db.InsertMessage(ctx, conversationID, v)
if e != nil {
log.ZError(ctx, "InsertMessage err", err, "conversationID", conversationID, "message", v)
}
}
}

}
return nil
}

func (c *Conversation) DoMsgReaction(msgReactionList []*sdk_struct.MsgStruct) {

//for _, v := range msgReactionList {
Expand Down
4 changes: 2 additions & 2 deletions internal/conversation_msg/message_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,12 +275,12 @@ func (c *Conversation) pullMessageIntoTable(ctx context.Context, pullMsgData map
updateMsg[conversationID] = updateMessage

//update message
if err6 := c.messageController.BatchUpdateMessageList(ctx, updateMsg); err6 != nil {
if err6 := c.batchUpdateMessageList(ctx, updateMsg); err6 != nil {
log.ZError(ctx, "sync seq normal message err :", err6)
}
b3 := utils.GetCurrentTimestampByMill()
//Normal message storage
_ = c.messageController.BatchInsertMessageList(ctx, insertMsg)
_ = c.batchInsertMessageList(ctx, insertMsg)
b4 := utils.GetCurrentTimestampByMill()
log.ZDebug(ctx, "BatchInsertMessageListController, ", "cost time", b4-b3)

Expand Down
73 changes: 1 addition & 72 deletions internal/conversation_msg/message_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,13 @@ package conversation_msg

import (
"context"
"encoding/json"

"github.com/openimsdk/openim-sdk-core/v3/internal/util"
"github.com/openimsdk/openim-sdk-core/v3/pkg/common"
"github.com/openimsdk/openim-sdk-core/v3/pkg/constant"
"github.com/openimsdk/openim-sdk-core/v3/pkg/db/db_interface"
"github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct"
"github.com/openimsdk/openim-sdk-core/v3/pkg/utils"
"github.com/openimsdk/openim-sdk-core/v3/sdk_struct"

"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/log"
)

type MessageController struct {
Expand All @@ -37,73 +33,6 @@ type MessageController struct {
func NewMessageController(db db_interface.DataBase, ch chan common.Cmd2Value) *MessageController {
return &MessageController{db: db, ch: ch}
}
func (m *MessageController) BatchUpdateMessageList(ctx context.Context, updateMsg map[string][]*model_struct.LocalChatLog) error {
if updateMsg == nil {
return nil
}
for conversationID, messages := range updateMsg {
conversation, err := m.db.GetConversation(ctx, conversationID)
if err != nil {
log.ZError(ctx, "GetConversation err", err, "conversationID", conversationID)
continue
}
latestMsg := &sdk_struct.MsgStruct{}
if err := json.Unmarshal([]byte(conversation.LatestMsg), latestMsg); err != nil {
log.ZError(ctx, "Unmarshal err", err, "conversationID",
conversationID, "latestMsg", conversation.LatestMsg, "messages", messages)
continue
}
for _, v := range messages {
v1 := new(model_struct.LocalChatLog)
v1.ClientMsgID = v.ClientMsgID
v1.Seq = v.Seq
v1.Status = v.Status
v1.RecvID = v.RecvID
v1.SessionType = v.SessionType
v1.ServerMsgID = v.ServerMsgID
v1.SendTime = v.SendTime
err := m.db.UpdateMessage(ctx, conversationID, v1)
if err != nil {
return utils.Wrap(err, "BatchUpdateMessageList failed")
}
if latestMsg.ClientMsgID == v.ClientMsgID {
latestMsg.ServerMsgID = v.ServerMsgID
latestMsg.Seq = v.Seq
latestMsg.SendTime = v.SendTime
latestMsg.Status = v.Status
conversation.LatestMsg = utils.StructToJsonString(latestMsg)
_ = common.TriggerCmdUpdateConversation(ctx, common.UpdateConNode{ConID: conversation.ConversationID,
Action: constant.AddConOrUpLatMsg, Args: *conversation}, m.ch)

}
}

}
return nil
}

func (m *MessageController) BatchInsertMessageList(ctx context.Context, insertMsg map[string][]*model_struct.LocalChatLog) error {
if insertMsg == nil {
return nil
}
for conversationID, messages := range insertMsg {
if len(messages) == 0 {
continue
}
err := m.db.BatchInsertMessageList(ctx, conversationID, messages)
if err != nil {
log.ZError(ctx, "insert GetMessage detail err:", err, "conversationID", conversationID, "messages", messages)
for _, v := range messages {
e := m.db.InsertMessage(ctx, conversationID, v)
if e != nil {
log.ZError(ctx, "InsertMessage err", err, "conversationID", conversationID, "message", v)
}
}
}

}
return nil
}

func (c *Conversation) PullMessageBySeqs(ctx context.Context, seqs []*sdkws.SeqRange) (*sdkws.PullMessageBySeqsResp, error) {
return util.CallApi[sdkws.PullMessageBySeqsResp](ctx, constant.PullUserMsgBySeqRouter, sdkws.PullMessageBySeqsReq{UserID: c.loginUserID, SeqRanges: seqs})
Expand Down
Loading