From 9b92b7985567276fb9ff316f856f3967f177d440 Mon Sep 17 00:00:00 2001 From: icey-yu <1186114839@qq.com> Date: Fri, 13 Sep 2024 18:50:50 +0800 Subject: [PATCH 1/3] refactor: cache --- internal/conversation_msg/conversation_msg.go | 8 +- internal/group/api.go | 59 +--------- internal/group/cache.go | 66 +++++++++++ internal/group/group.go | 12 +- internal/user/user.go | 39 ++----- pkg/cache/cache.go | 8 +- pkg/cache/manager.go | 106 ++++++++++++++++++ 7 files changed, 194 insertions(+), 104 deletions(-) create mode 100644 internal/group/cache.go create mode 100644 pkg/cache/manager.go diff --git a/internal/conversation_msg/conversation_msg.go b/internal/conversation_msg/conversation_msg.go index a9568e5f5..02d414fe6 100644 --- a/internal/conversation_msg/conversation_msg.go +++ b/internal/conversation_msg/conversation_msg.go @@ -981,13 +981,7 @@ func (c *Conversation) batchGetUserNameAndFaceURL(ctx context.Context, userIDs . } m[localFriend.FriendUserID] = userInfo } - usersInfo, err := c.user.GetUsersInfoWithCache(ctx, notInFriend, func(ctx context.Context, missingKeys []string) ([]*model_struct.LocalUser, error) { - users, err := c.user.GetUserInfoFromServer(ctx, missingKeys) - if err != nil { - return nil, err - } - return users, nil - }) + usersInfo, err := c.user.GetUsersInfoWithCache(ctx, notInFriend) if err != nil { return nil, err } diff --git a/internal/group/api.go b/internal/group/api.go index 1b77119ff..b96b76bf7 100644 --- a/internal/group/api.go +++ b/internal/group/api.go @@ -18,9 +18,6 @@ import ( "context" "time" - "github.com/openimsdk/openim-sdk-core/v3/pkg/cache" - "github.com/openimsdk/openim-sdk-core/v3/sdk_struct" - "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/utils/datautil" @@ -524,60 +521,8 @@ func (g *Group) HandlerGroupApplication(ctx context.Context, req *group.GroupApp return nil } -func (g *Group) GetGroupMemberNameAndFaceURL(ctx context.Context, groupID string, userIDs []string) (map[string]*sdk_struct.BasicInfo, error) { - memberInfo, err := g.db.GetGroupSomeMemberInfo(ctx, groupID, userIDs) - if err != nil { - return nil, err - } - res := datautil.SliceToMapAny(memberInfo, func(e *model_struct.LocalGroupMember) (string, *sdk_struct.BasicInfo) { - return e.UserID, &sdk_struct.BasicInfo{ - Nickname: e.Nickname, - FaceURL: e.FaceURL, - } - }) - unFind := datautil.SliceSubAny(userIDs, memberInfo, func(t *model_struct.LocalGroupMember) string { - return t.UserID - }) - - queryUserIDs := make([]string, 0) - - var ( - groupMap *cache.Cache[string, *sdk_struct.BasicInfo] - ok bool - ) - - if groupMap, ok = g.groupMemberCache.Load(groupID); ok { - for _, userID := range unFind { - if data, ok := groupMap.Load(userID); ok { - res[userID] = data - } else { - queryUserIDs = append(queryUserIDs, userID) - } - } - } else { - groupMap = cache.NewCache[string, *sdk_struct.BasicInfo]() - queryUserIDs = append(queryUserIDs, unFind...) - } - - if len(queryUserIDs) != 0 { - members, err := g.GetDesignatedGroupMembers(ctx, groupID, queryUserIDs) - if err != nil { - return nil, err - } - - for _, member := range members { - info := &sdk_struct.BasicInfo{ - Nickname: member.Nickname, - FaceURL: member.FaceURL, - } - - res[member.UserID] = info - groupMap.Store(member.UserID, info) - } - g.groupMemberCache.Store(groupID, groupMap) - } - - return res, nil +func (g *Group) GetGroupMemberNameAndFaceURL(ctx context.Context, groupID string, userIDs []string) (map[string]*model_struct.LocalGroupMember, error) { + return g.GetGroupMembersInfo(ctx, groupID, userIDs) } //func (g *Group) SearchGroupMembersV2(ctx context.Context, req *group.SearchGroupMemberReq) ([]*model_struct.LocalGroupMember, error) { diff --git a/internal/group/cache.go b/internal/group/cache.go new file mode 100644 index 000000000..6ef8a1e6f --- /dev/null +++ b/internal/group/cache.go @@ -0,0 +1,66 @@ +package group + +import ( + "context" + "github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct" + "github.com/openimsdk/tools/utils/datautil" +) + +func (g *Group) buildGroupMemberKey(groupID string, userID string) string { + return groupID + ":" + userID +} + +func (g *Group) GetGroupMembersInfoFunc(ctx context.Context, groupID string, userIDs []string, + fetchFunc func(ctx context.Context, missingKeys []string) ([]*model_struct.LocalGroupMember, error), +) (map[string]*model_struct.LocalGroupMember, error) { + var ( + res = make(map[string]*model_struct.LocalGroupMember) + missingKeys []string + ) + + for _, userID := range userIDs { + key := g.buildGroupMemberKey(groupID, userID) + if member, ok := g.groupMemberCache.Load(groupID); ok { + res[key] = member + } else { + missingKeys = append(missingKeys, userIDs...) + } + } + + fetchData, err := fetchFunc(ctx, missingKeys) + if err != nil { + return nil, err + } + + for i, data := range fetchData { + key := g.buildGroupMemberKey(groupID, data.UserID) + res[key] = fetchData[i] + g.groupMemberCache.Store(key, fetchData[i]) + } + + return res, nil +} + +func (g *Group) GetGroupMembersInfo(ctx context.Context, groupID string, userIDs []string) (map[string]*model_struct.LocalGroupMember, error) { + return g.GetGroupMembersInfoFunc(ctx, groupID, userIDs, func(ctx context.Context, dbKeys []string) ([]*model_struct.LocalGroupMember, error) { + if len(dbKeys) == 0 { + return nil, nil + } + dbData, err := g.db.GetGroupSomeMemberInfo(ctx, groupID, dbKeys) + if err != nil { + return nil, err + } + queryKeys := datautil.SliceSubAny(dbKeys, dbData, func(t *model_struct.LocalGroupMember) string { + return t.UserID + }) + if len(queryKeys) != 0 { + queryData, err := g.GetDesignatedGroupMembers(ctx, groupID, queryKeys) + if err != nil { + return nil, err + } + + dbData = append(dbData, datautil.Batch(ServerGroupMemberToLocalGroupMember, queryData)...) + } + return dbData, nil + }) +} diff --git a/internal/group/group.go b/internal/group/group.go index 1773148d1..19f141da2 100644 --- a/internal/group/group.go +++ b/internal/group/group.go @@ -18,18 +18,16 @@ import ( "context" "sync" + "github.com/openimsdk/openim-sdk-core/v3/open_im_sdk_callback" "github.com/openimsdk/openim-sdk-core/v3/pkg/api" "github.com/openimsdk/openim-sdk-core/v3/pkg/cache" - "github.com/openimsdk/openim-sdk-core/v3/pkg/datafetcher" - "github.com/openimsdk/openim-sdk-core/v3/pkg/sdkerrs" - "github.com/openimsdk/openim-sdk-core/v3/sdk_struct" - - "github.com/openimsdk/openim-sdk-core/v3/open_im_sdk_callback" "github.com/openimsdk/openim-sdk-core/v3/pkg/common" "github.com/openimsdk/openim-sdk-core/v3/pkg/constant" + "github.com/openimsdk/openim-sdk-core/v3/pkg/datafetcher" "github.com/openimsdk/openim-sdk-core/v3/pkg/db/db_interface" "github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct" "github.com/openimsdk/openim-sdk-core/v3/pkg/page" + "github.com/openimsdk/openim-sdk-core/v3/pkg/sdkerrs" "github.com/openimsdk/openim-sdk-core/v3/pkg/syncer" "github.com/openimsdk/openim-sdk-core/v3/pkg/utils" "github.com/openimsdk/protocol/group" @@ -51,7 +49,7 @@ func NewGroup(loginUserID string, db db_interface.DataBase, conversationCh: conversationCh, } g.initSyncer() - g.groupMemberCache = cache.NewCache[string, *cache.Cache[string, *sdk_struct.BasicInfo]]() + g.groupMemberCache = cache.NewCache[string, *cache.Cache[string, *model_struct.LocalGroupMember]]() return g } @@ -70,7 +68,7 @@ type Group struct { groupSyncMutex sync.Mutex listenerForService open_im_sdk_callback.OnListenerForService - groupMemberCache *cache.Cache[string, *cache.Cache[string, *sdk_struct.BasicInfo]] + groupMemberCache *cache.Cache[string, *model_struct.LocalGroupMember] } func (g *Group) initSyncer() { diff --git a/internal/user/user.go b/internal/user/user.go index 87d70edb2..fa20274dd 100644 --- a/internal/user/user.go +++ b/internal/user/user.go @@ -36,8 +36,12 @@ import ( func NewUser(dataBase db_interface.DataBase, loginUserID string, conversationCh chan common.Cmd2Value) *User { user := &User{DataBase: dataBase, loginUserID: loginUserID, conversationCh: conversationCh} user.initSyncer() - user.UserCache = cache.NewCache[string, *model_struct.LocalUser]() //user.OnlineStatusCache = cache.NewCache[string, *userPb.OnlineStatus]() + user.UserCache = cache.NewManager[string, *model_struct.LocalUser]( + func(value *model_struct.LocalUser) string { return value.UserID }, + nil, + user.GetUserInfoFromServer, + ) return user } @@ -49,7 +53,7 @@ type User struct { userSyncer *syncer.Syncer[*model_struct.LocalUser, syncer.NoResp, string] commandSyncer *syncer.Syncer[*model_struct.LocalUserCommand, syncer.NoResp, string] conversationCh chan common.Cmd2Value - UserCache *cache.Cache[string, *model_struct.LocalUser] + UserCache *cache.Manager[string, *model_struct.LocalUser] //OnlineStatusCache *cache.Cache[string, *userPb.OnlineStatus] } @@ -177,33 +181,6 @@ func (u *User) updateSelfUserInfo(ctx context.Context, userInfo *sdkws.UserInfoW return nil } -func (u *User) GetUsersInfoWithCache(ctx context.Context, cacheKeys []string, fetchFunc func(ctx context.Context, missingKeys []string) ([]*model_struct.LocalUser, error)) (map[string]*model_struct.LocalUser, error) { - result := make(map[string]*model_struct.LocalUser) - var missingKeys []string - - for _, key := range cacheKeys { - if userInfo, ok := u.UserCache.Load(key); ok { - result[key] = userInfo - } else { - missingKeys = append(missingKeys, key) - } - } - - if len(missingKeys) > 0 { - fetchedData, err := fetchFunc(ctx, missingKeys) - if err != nil { - return nil, err - } - - for i, key := range missingKeys { - result[key] = fetchedData[i] - u.UserCache.Store(key, fetchedData[i]) - } - } - - return result, nil -} - func (u *User) GetUserInfoWithCache(ctx context.Context, cacheKey string, fetchFunc func(ctx context.Context, key string) (*model_struct.LocalUser, error)) (*model_struct.LocalUser, error) { if userInfo, ok := u.UserCache.Load(cacheKey); ok { return userInfo, nil @@ -217,3 +194,7 @@ func (u *User) GetUserInfoWithCache(ctx context.Context, cacheKey string, fetchF u.UserCache.Store(cacheKey, fetchedData) return fetchedData, nil } + +func (u *User) GetUsersInfoWithCache(ctx context.Context, cacheKeys []string) (map[string]*model_struct.LocalUser, error) { + return u.UserCache.MultiFetchGet(ctx, cacheKeys) +} diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index ed719b1e7..56421078a 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -2,15 +2,15 @@ package cache import "sync" +func NewCache[K comparable, V any]() *Cache[K, V] { + return &Cache[K, V]{} +} + // Cache is a Generic sync.Map structure. type Cache[K comparable, V any] struct { m sync.Map } -func NewCache[K comparable, V any]() *Cache[K, V] { - return &Cache[K, V]{} -} - // Load returns the value stored in the map for a key, or nil if no value is present. func (c *Cache[K, V]) Load(key K) (value V, ok bool) { rawValue, ok := c.m.Load(key) diff --git a/pkg/cache/manager.go b/pkg/cache/manager.go new file mode 100644 index 000000000..5dad62fc3 --- /dev/null +++ b/pkg/cache/manager.go @@ -0,0 +1,106 @@ +package cache + +import ( + "context" + "github.com/openimsdk/tools/utils/datautil" +) + +func NewManager[K comparable, V any]( + getKeyFunc func(value V) K, + dbFunc func(ctx context.Context, keys []K) ([]V, error), + queryFunc func(ctx context.Context, keys []K) ([]V, error), +) *Manager[K, V] { + return &Manager[K, V]{ + Cache: Cache[K, V]{}, + getKeyFunc: getKeyFunc, + dbFunc: dbFunc, + queryFunc: queryFunc, + } +} + +type Manager[K comparable, V any] struct { + Cache[K, V] + getKeyFunc func(value V) K + dbFunc func(ctx context.Context, keys []K) ([]V, error) + queryFunc func(ctx context.Context, keys []K) ([]V, error) +} + +func (m *Manager[K, V]) MultiFetchGet(ctx context.Context, keys []K) (map[K]V, error) { + var ( + res = make(map[K]V) + queryKeys []K + ) + + for _, key := range keys { + if data, ok := m.Load(key); ok { + res[key] = data + } else { + queryKeys = append(queryKeys, keys...) + } + } + + writeData, err := m.Fetch(ctx, queryKeys) + if err != nil { + return nil, err + } + + for i, data := range writeData { + res[m.getKeyFunc(data)] = writeData[i] + m.Store(m.getKeyFunc(data), writeData[i]) + } + + return res, nil +} + +func (m *Manager[K, V]) FetchGet(ctx context.Context, key K) (V, error) { + var nilData V + + if data, ok := m.Load(key); ok { + return data, nil + } + + fetchedData, err := m.Fetch(ctx, []K{key}) + if err != nil { + return nilData, err + } + if len(fetchedData) > 0 { + m.Store(key, fetchedData[0]) + return fetchedData[0], nil + } + + // todo: return error or nilData? + return nilData, nil +} + +func (m *Manager[K, V]) Fetch(ctx context.Context, keys []K) ([]V, error) { + if len(keys) == 0 { + return nil, nil + } + var ( + queryKeys = keys + writeData []V + ) + + if m.dbFunc != nil { + dbData, err := m.dbFunc(ctx, queryKeys) + if err != nil { + return nil, err + } + writeData = dbData + queryKeys = datautil.SliceSubAny(queryKeys, dbData, m.getKeyFunc) + } + + if len(queryKeys) == 0 { + return writeData, nil + } + + if m.queryFunc != nil { + queryData, err := m.queryFunc(ctx, queryKeys) + if err != nil { + return nil, err + } + writeData = append(writeData, queryData...) + } + + return writeData, nil +} From 0b2bb79be547c51669be4343804f27a7dc25c6f2 Mon Sep 17 00:00:00 2001 From: icey-yu <1186114839@qq.com> Date: Fri, 13 Sep 2024 18:51:36 +0800 Subject: [PATCH 2/3] refactor: cache --- internal/group/group.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/group/group.go b/internal/group/group.go index 19f141da2..34e1fd77c 100644 --- a/internal/group/group.go +++ b/internal/group/group.go @@ -49,7 +49,7 @@ func NewGroup(loginUserID string, db db_interface.DataBase, conversationCh: conversationCh, } g.initSyncer() - g.groupMemberCache = cache.NewCache[string, *cache.Cache[string, *model_struct.LocalGroupMember]]() + g.groupMemberCache = cache.NewCache[string, *model_struct.LocalGroupMember]() return g } From 1b71a2c5b8ef5be2d6f44aebb51e52f893be9e8f Mon Sep 17 00:00:00 2001 From: icey-yu <1186114839@qq.com> Date: Sat, 14 Sep 2024 09:54:41 +0800 Subject: [PATCH 3/3] refactor: cache --- internal/conversation_msg/api.go | 8 +------- internal/conversation_msg/conversation_msg.go | 12 +----------- internal/user/user.go | 6 +++++- 3 files changed, 7 insertions(+), 19 deletions(-) diff --git a/internal/conversation_msg/api.go b/internal/conversation_msg/api.go index 56a9320c6..1ff4a533a 100644 --- a/internal/conversation_msg/api.go +++ b/internal/conversation_msg/api.go @@ -985,13 +985,7 @@ func (c *Conversation) initBasicInfo(ctx context.Context, message *sdk_struct.Ms message.IsRead = false message.Status = constant.MsgStatusSending message.SendID = c.loginUserID - userInfo, err := c.user.GetUserInfoWithCache(ctx, c.loginUserID, func(ctx context.Context, key string) (*model_struct.LocalUser, error) { - info, err := c.db.GetLoginUser(ctx, key) - if err != nil { - return nil, err - } - return info, nil - }) + userInfo, err := c.user.GetUserInfoWithCacheFunc(ctx, c.loginUserID, c.db.GetLoginUser) if err != nil { return err } diff --git a/internal/conversation_msg/conversation_msg.go b/internal/conversation_msg/conversation_msg.go index 4a0edb237..cdb79c395 100644 --- a/internal/conversation_msg/conversation_msg.go +++ b/internal/conversation_msg/conversation_msg.go @@ -23,7 +23,6 @@ import ( "github.com/openimsdk/openim-sdk-core/v3/pkg/db/db_interface" "github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct" "github.com/openimsdk/openim-sdk-core/v3/pkg/page" - "github.com/openimsdk/openim-sdk-core/v3/pkg/sdkerrs" "github.com/openimsdk/openim-sdk-core/v3/pkg/syncer" pbConversation "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/sdkws" @@ -1000,16 +999,7 @@ func (c *Conversation) getUserNameAndFaceURL(ctx context.Context, userID string) } return faceURL, name, nil } - userInfo, err := c.user.GetUserInfoWithCache(ctx, userID, func(ctx context.Context, key string) (*model_struct.LocalUser, error) { - users, err := c.user.GetUserInfoFromServer(ctx, []string{userID}) - if err != nil { - return nil, err - } - if len(users) > 0 { - return users[0], nil - } - return nil, sdkerrs.ErrUserIDNotFound.Wrap() - }) + userInfo, err := c.user.GetUserInfoWithCache(ctx, userID) if err != nil { return "", "", nil } diff --git a/internal/user/user.go b/internal/user/user.go index 0288cbf18..b883bcd35 100644 --- a/internal/user/user.go +++ b/internal/user/user.go @@ -178,7 +178,11 @@ func (u *User) updateSelfUserInfo(ctx context.Context, userInfo *sdkws.UserInfoW return nil } -func (u *User) GetUserInfoWithCache(ctx context.Context, cacheKey string, fetchFunc func(ctx context.Context, key string) (*model_struct.LocalUser, error)) (*model_struct.LocalUser, error) { +func (u *User) GetUserInfoWithCache(ctx context.Context, cacheKey string) (*model_struct.LocalUser, error) { + return u.UserCache.FetchGet(ctx, cacheKey) +} + +func (u *User) GetUserInfoWithCacheFunc(ctx context.Context, cacheKey string, fetchFunc func(ctx context.Context, key string) (*model_struct.LocalUser, error)) (*model_struct.LocalUser, error) { if userInfo, ok := u.UserCache.Load(cacheKey); ok { return userInfo, nil }