Skip to content

Commit

Permalink
feat: incremental synchronization of session list (#2408)
Browse files Browse the repository at this point in the history
* fix: GroupApplicationAcceptedNotification

* fix: GroupApplicationAcceptedNotification

* fix: NotificationUserInfoUpdate

* cicd: robot automated Change

* fix: component

* fix: getConversationInfo

* feat: cron task

* feat: cron task

* feat: cron task

* feat: cron task

* feat: cron task

* fix: minio config url recognition error

* update gomake version

* update gomake version

* conversation version incremental

* GetOwnerConversation

* fix: change incremental syncer router name.

* fix: GetMsgDocModelByIndex bug

* update go.mod

---------

Co-authored-by: withchao <withchao@users.noreply.github.com>
Co-authored-by: Gordon <46924906+FGadvancer@users.noreply.github.com>
  • Loading branch information
3 people committed Jul 15, 2024
1 parent ea7e505 commit 5f52fa1
Show file tree
Hide file tree
Showing 18 changed files with 314 additions and 45 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ require (
github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.69-alpha.17
github.com/openimsdk/tools v0.0.49-alpha.45
github.com/openimsdk/protocol v0.0.69-alpha.30
github.com/openimsdk/tools v0.0.49-alpha.49
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.9.0
Expand Down
77 changes: 73 additions & 4 deletions go.sum

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions internal/api/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,15 @@ func (o *ConversationApi) SetConversations(c *gin.Context) {
func (o *ConversationApi) GetConversationOfflinePushUserIDs(c *gin.Context) {
a2r.Call(conversation.ConversationClient.GetConversationOfflinePushUserIDs, o.Client, c)
}

func (o *ConversationApi) GetFullOwnerConversationIDs(c *gin.Context) {
a2r.Call(conversation.ConversationClient.GetFullOwnerConversationIDs, o.Client, c)
}

func (o *ConversationApi) GetIncrementalConversation(c *gin.Context) {
a2r.Call(conversation.ConversationClient.GetIncrementalConversation, o.Client, c)
}

func (o *ConversationApi) GetOwnerConversation(c *gin.Context) {
a2r.Call(conversation.ConversationClient.GetOwnerConversation, o.Client, c)
}
3 changes: 3 additions & 0 deletions internal/api/friend.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package api

import (
"github.com/gin-gonic/gin"

"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/protocol/relation"
"github.com/openimsdk/tools/a2r"
Expand Down Expand Up @@ -100,6 +101,8 @@ func (o *FriendApi) GetIncrementalFriends(c *gin.Context) {
a2r.Call(relation.FriendClient.GetIncrementalFriends, o.Client, c)
}

// GetIncrementalBlacks is temporarily unused.
// Deprecated: This function is currently unused and may be removed in future versions.
func (o *FriendApi) GetIncrementalBlacks(c *gin.Context) {
a2r.Call(relation.FriendClient.GetIncrementalBlacks, o.Client, c)
}
Expand Down
14 changes: 9 additions & 5 deletions internal/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@ import (
"github.com/gin-gonic/gin"
"github.com/gin-gonic/gin/binding"
"github.com/go-playground/validator/v10"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/apiresp"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mw"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.Engine {
Expand Down Expand Up @@ -118,9 +119,9 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
groupRouterGroup.POST("/get_group_abstract_info", g.GetGroupAbstractInfo)
groupRouterGroup.POST("/get_groups", g.GetGroups)
groupRouterGroup.POST("/get_group_member_user_id", g.GetGroupMemberUserIDs)
groupRouterGroup.POST("/get_incremental_join_group", g.GetIncrementalJoinGroup)
groupRouterGroup.POST("/get_incremental_group_member", g.GetIncrementalGroupMember)
groupRouterGroup.POST("/get_incremental_group_member_batch", g.GetIncrementalGroupMemberBatch)
groupRouterGroup.POST("/get_incremental_join_groups", g.GetIncrementalJoinGroup)
groupRouterGroup.POST("/get_incremental_group_members", g.GetIncrementalGroupMember)
groupRouterGroup.POST("/get_incremental_group_members_batch", g.GetIncrementalGroupMemberBatch)
groupRouterGroup.POST("/get_full_group_member_user_ids", g.GetFullGroupMemberUserIDs)
groupRouterGroup.POST("/get_full_join_group_ids", g.GetFullJoinGroupIDs)
}
Expand Down Expand Up @@ -192,6 +193,9 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
conversationGroup.POST("/get_conversations", c.GetConversations)
conversationGroup.POST("/set_conversations", c.SetConversations)
conversationGroup.POST("/get_conversation_offline_push_user_ids", c.GetConversationOfflinePushUserIDs)
conversationGroup.POST("/get_full_conversation_ids", c.GetFullOwnerConversationIDs)
conversationGroup.POST("/get_incremental_conversations", c.GetIncrementalConversation)
conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation)
}

statisticsGroup := r.Group("/statistics")
Expand Down
25 changes: 23 additions & 2 deletions internal/rpc/conversation/conversaion.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,23 @@ func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbcon
}

func (c *conversationServer) GetConversations(ctx context.Context, req *pbconversation.GetConversationsReq) (*pbconversation.GetConversationsResp, error) {
conversations, err := c.conversationDatabase.FindConversations(ctx, req.OwnerUserID, req.ConversationIDs)
conversations, err := c.getConversations(ctx, req.OwnerUserID, req.ConversationIDs)
if err != nil {
return nil, err
}
return &pbconversation.GetConversationsResp{
Conversations: conversations,
}, nil
}

func (c *conversationServer) getConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*pbconversation.Conversation, error) {
conversations, err := c.conversationDatabase.FindConversations(ctx, ownerUserID, conversationIDs)
if err != nil {
return nil, err
}
resp := &pbconversation.GetConversationsResp{Conversations: []*pbconversation.Conversation{}}
resp.Conversations = convert.ConversationsDB2Pb(conversations)
return resp, nil
return convert.ConversationsDB2Pb(conversations), nil
}

func (c *conversationServer) SetConversation(ctx context.Context, req *pbconversation.SetConversationReq) (*pbconversation.SetConversationResp, error) {
Expand Down Expand Up @@ -581,3 +591,14 @@ func (c *conversationServer) UpdateConversation(ctx context.Context, req *pbconv
}
return &pbconversation.UpdateConversationResp{}, nil
}

func (c *conversationServer) GetOwnerConversation(ctx context.Context, req *pbconversation.GetOwnerConversationReq) (*pbconversation.GetOwnerConversationResp, error) {
total, conversations, err := c.conversationDatabase.GetOwnerConversation(ctx, req.UserID, req.Pagination)
if err != nil {
return nil, err
}
return &pbconversation.GetOwnerConversationResp{
Total: total,
Conversations: convert.ConversationsDB2Pb(conversations),
}, nil
}
56 changes: 56 additions & 0 deletions internal/rpc/conversation/sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package conversation

import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/rpc/incrversion"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/util/hashutil"
"github.com/openimsdk/protocol/conversation"
)

func (c *conversationServer) GetFullOwnerConversationIDs(ctx context.Context, req *conversation.GetFullOwnerConversationIDsReq) (*conversation.GetFullOwnerConversationIDsResp, error) {
vl, err := c.conversationDatabase.FindMaxConversationUserVersionCache(ctx, req.UserID)
if err != nil {
return nil, err
}
conversationIDs, err := c.conversationDatabase.GetConversationIDs(ctx, req.UserID)
if err != nil {
return nil, err
}
idHash := hashutil.IdHash(conversationIDs)
if req.IdHash == idHash {
conversationIDs = nil
}
return &conversation.GetFullOwnerConversationIDsResp{
Version: idHash,
VersionID: vl.ID.Hex(),
Equal: req.IdHash == idHash,
ConversationIDs: conversationIDs,
}, nil
}

func (c *conversationServer) GetIncrementalConversation(ctx context.Context, req *conversation.GetIncrementalConversationReq) (*conversation.GetIncrementalConversationResp, error) {
opt := incrversion.Option[*conversation.Conversation, conversation.GetIncrementalConversationResp]{
Ctx: ctx,
VersionKey: req.UserID,
VersionID: req.VersionID,
VersionNumber: req.Version,
Version: c.conversationDatabase.FindConversationUserVersion,
CacheMaxVersion: c.conversationDatabase.FindMaxConversationUserVersionCache,
Find: func(ctx context.Context, conversationIDs []string) ([]*conversation.Conversation, error) {
return c.getConversations(ctx, req.UserID, conversationIDs)
},
ID: func(elem *conversation.Conversation) string { return elem.GroupID },
Resp: func(version *model.VersionLog, delIDs []string, insertList, updateList []*conversation.Conversation, full bool) *conversation.GetIncrementalConversationResp {
return &conversation.GetIncrementalConversationResp{
VersionID: version.ID.Hex(),
Version: uint64(version.Version),
Full: full,
Delete: delIDs,
Insert: insertList,
Update: updateList,
}
},
}
return opt.Build()
}
5 changes: 5 additions & 0 deletions internal/rpc/third/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,8 @@ type FormDataMate struct {
Group string `json:"group"`
Key string `json:"key"`
}

func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) {
//TODO implement me
panic("implement me")
}
1 change: 1 addition & 0 deletions internal/rpc/third/third.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type thirdServer struct {
defaultExpire time.Duration
config *Config
}

type Config struct {
RpcConfig config.Third
RedisConfig config.Redis
Expand Down
5 changes: 5 additions & 0 deletions internal/rpc/user/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,3 +736,8 @@ func (s *userServer) SortQuery(ctx context.Context, req *pbuser.SortQueryReq) (*
}
return &pbuser.SortQueryResp{Users: convert.UsersDB2Pb(users)}, nil
}

func (s *userServer) SetUserOnlineStatus(ctx context.Context, req *pbuser.SetUserOnlineStatusReq) (*pbuser.SetUserOnlineStatusResp, error) {
//TODO implement me
panic("implement me")
}
5 changes: 5 additions & 0 deletions pkg/common/storage/cache/cachekey/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
SuperGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:"
SuperGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:"
ConversationNotReceiveMessageUserIDsKey = "CONVERSATION_NOT_RECEIVE_MESSAGE_USER_IDS:"
ConversationUserMaxKey = "CONVERSATION_USER_MAX:"
)

func GetConversationKey(ownerUserID, conversationID string) string {
Expand Down Expand Up @@ -56,3 +57,7 @@ func GetConversationNotReceiveMessageUserIDsKey(conversationID string) string {
func GetUserConversationIDsHashKey(ownerUserID string) string {
return ConversationIDsHashKey + ownerUserID
}

func GetConversationUserMaxVersionKey(userID string) string {
return ConversationUserMaxKey + userID
}
4 changes: 4 additions & 0 deletions pkg/common/storage/cache/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,8 @@ type ConversationCache interface {

GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache

DelConversationVersionUserIDs(userIDs ...string) ConversationCache

FindMaxConversationUserVersion(ctx context.Context, userID string) (*relationtb.VersionLog, error)
}
17 changes: 17 additions & 0 deletions pkg/common/storage/cache/redis/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ func (c *ConversationRedisCache) getUserConversationIDsHashKey(ownerUserID strin
return cachekey.GetUserConversationIDsHashKey(ownerUserID)
}

func (c *ConversationRedisCache) getConversationUserMaxVersionKey(ownerUserID string) string {
return cachekey.GetConversationUserMaxVersionKey(ownerUserID)
}

func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) {
return getCache(ctx, c.rcClient, c.getConversationIDsKey(ownerUserID), c.expireTime, func(ctx context.Context) ([]string, error) {
return c.conversationDB.FindUserIDAllConversationID(ctx, ownerUserID)
Expand Down Expand Up @@ -233,6 +237,19 @@ func (c *ConversationRedisCache) DelConversationNotReceiveMessageUserIDs(convers
for _, conversationID := range conversationIDs {
cache.AddKeys(c.getConversationNotReceiveMessageUserIDsKey(conversationID))
}
return cache
}

func (c *ConversationRedisCache) DelConversationVersionUserIDs(userIDs ...string) cache.ConversationCache {
cache := c.CloneConversationCache()
for _, userID := range userIDs {
cache.AddKeys(c.getConversationUserMaxVersionKey(userID))
}
return cache
}

func (c *ConversationRedisCache) FindMaxConversationUserVersion(ctx context.Context, userID string) (*model.VersionLog, error) {
return getCache(ctx, c.rcClient, c.getConversationUserMaxVersionKey(userID), c.expireTime, func(ctx context.Context) (*model.VersionLog, error) {
return c.conversationDB.FindConversationUserVersion(ctx, userID, 0, 0)
})
}
35 changes: 33 additions & 2 deletions pkg/common/storage/controller/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ type ConversationDatabase interface {
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
// GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error)
// FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error)
FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*relationtb.VersionLog, error)
FindMaxConversationUserVersionCache(ctx context.Context, userID string) (*relationtb.VersionLog, error)
GetOwnerConversation(ctx context.Context, ownerUserID string, pagination pagination.Pagination) (int64, []*relationtb.Conversation, error)
}

func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase {
Expand Down Expand Up @@ -106,6 +109,7 @@ func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context,
if _, ok := fieldMap["recv_msg_opt"]; ok {
cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID)
}
cache = cache.DelConversationVersionUserIDs(haveUserIDs...)
}
NotUserIDs := stringutil.DifferenceString(haveUserIDs, userIDs)
log.ZDebug(ctx, "SetUsersConversationFieldTx", "NotUserIDs", NotUserIDs, "haveUserIDs", haveUserIDs, "userIDs", userIDs)
Expand Down Expand Up @@ -137,7 +141,7 @@ func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context,
return err
}
cache := c.cache.CloneConversationCache()
cache = cache.DelUsersConversation(conversationID, userIDs...)
cache = cache.DelUsersConversation(conversationID, userIDs...).DelConversationVersionUserIDs(userIDs...)
if _, ok := args["recv_msg_opt"]; ok {
cache = cache.DelConversationNotReceiveMessageUserIDs(conversationID)
}
Expand All @@ -155,13 +159,14 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat
cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID)
userIDs = append(userIDs, conversation.OwnerUserID)
}
return cache.DelConversationIDs(userIDs...).DelUserConversationIDsHash(userIDs...).ChainExecDel(ctx)
return cache.DelConversationIDs(userIDs...).DelUserConversationIDsHash(userIDs...).DelConversationVersionUserIDs(userIDs...).ChainExecDel(ctx)
}

func (c *conversationDatabase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversations []*relationtb.Conversation) error {
return c.tx.Transaction(ctx, func(ctx context.Context) error {
cache := c.cache.CloneConversationCache()
for _, conversation := range conversations {
cache = cache.DelConversationVersionUserIDs(conversation.OwnerUserID)
for _, v := range [][2]string{{conversation.OwnerUserID, conversation.UserID}, {conversation.UserID, conversation.OwnerUserID}} {
ownerUserID := v[0]
userID := v[1]
Expand Down Expand Up @@ -207,6 +212,7 @@ func (c *conversationDatabase) GetUserAllConversation(ctx context.Context, owner
func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationtb.Conversation) error {
return c.tx.Transaction(ctx, func(ctx context.Context) error {
cache := c.cache.CloneConversationCache()
cache = cache.DelConversationVersionUserIDs(ownerUserID)
groupIDs := datautil.Distinct(datautil.Filter(conversations, func(e *relationtb.Conversation) (string, bool) {
return e.GroupID, e.GroupID != ""
}))
Expand Down Expand Up @@ -322,3 +328,28 @@ func (c *conversationDatabase) GetConversationIDsNeedDestruct(ctx context.Contex
func (c *conversationDatabase) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) {
return c.cache.GetConversationNotReceiveMessageUserIDs(ctx, conversationID)
}

func (c *conversationDatabase) FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*relationtb.VersionLog, error) {
return c.conversationDB.FindConversationUserVersion(ctx, userID, version, limit)
}

func (c *conversationDatabase) FindMaxConversationUserVersionCache(ctx context.Context, userID string) (*relationtb.VersionLog, error) {
return c.cache.FindMaxConversationUserVersion(ctx, userID)
}

func (c *conversationDatabase) GetOwnerConversation(ctx context.Context, ownerUserID string, pagination pagination.Pagination) (int64, []*relationtb.Conversation, error) {
conversationIDs, err := c.cache.GetUserConversationIDs(ctx, ownerUserID)
if err != nil {
return 0, nil, err
}
findConversationIDs := datautil.Paginate(conversationIDs, int(pagination.GetPageNumber()), int(pagination.GetShowNumber()))
conversations := make([]*relationtb.Conversation, 0, len(findConversationIDs))
for _, conversationID := range findConversationIDs {
conversation, err := c.cache.GetConversation(ctx, ownerUserID, conversationID)
if err != nil {
return 0, nil, err
}
conversations = append(conversations, conversation)
}
return int64(len(conversationIDs)), conversations, nil
}
2 changes: 1 addition & 1 deletion pkg/common/storage/database/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

type Conversation interface {
Create(ctx context.Context, conversations []*model.Conversation) (err error)
Delete(ctx context.Context, groupIDs []string) (err error)
UpdateByMap(ctx context.Context, userIDs []string, conversationID string, args map[string]any) (rows int64, err error)
Update(ctx context.Context, conversation *model.Conversation) (err error)
Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*model.Conversation, err error)
Expand All @@ -39,4 +38,5 @@ type Conversation interface {
GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*model.Conversation, error)
GetConversationIDsNeedDestruct(ctx context.Context) ([]*model.Conversation, error)
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error)
}
Loading

0 comments on commit 5f52fa1

Please sign in to comment.