Skip to content

Commit

Permalink
feat: group member full sync and incr sync.
Browse files Browse the repository at this point in the history
  • Loading branch information
FGadvancer committed Jun 18, 2024
1 parent 46f0f2f commit c8cb470
Show file tree
Hide file tree
Showing 4 changed files with 374 additions and 3 deletions.
251 changes: 251 additions & 0 deletions internal/group/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"github.com/openimsdk/openim-sdk-core/v3/pkg/constant"
"github.com/openimsdk/openim-sdk-core/v3/pkg/utils"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/datautil"

"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/log"
Expand Down Expand Up @@ -281,3 +283,252 @@ func (g *Group) doNotification(ctx context.Context, msg *sdkws.MsgData) error {
return fmt.Errorf("unknown tips type: %d", msg.ContentType)
}
}

func (g *Group) doNotification2(ctx context.Context, msg *sdkws.MsgData) error {
switch msg.ContentType {
case constant.GroupCreatedNotification: // 1501
var detail sdkws.GroupCreatedTips
if err := utils.UnmarshalNotificationElem(msg.Content, &detail); err != nil {
return err
}

if err := g.IncrSyncJoinGroup(ctx); err != nil {
return err
}
return g.IncrSyncGroupMember(ctx, detail.Group.GroupID)

case constant.GroupInfoSetNotification: // 1502
var detail sdkws.GroupInfoSetTips
if err := utils.UnmarshalNotificationElem(msg.Content, &detail); err != nil {
return err
}
return g.IncrSyncJoinGroup(ctx)
case constant.JoinGroupApplicationNotification: // 1503
var detail sdkws.JoinGroupApplicationTips
if err := utils.UnmarshalNotificationElem(msg.Content, &detail); err != nil {
return err
}
if detail.Applicant.UserID == g.loginUserID {
return g.SyncSelfGroupApplications(ctx, detail.Group.GroupID)
} else {
return g.SyncAdminGroupApplications(ctx, detail.Group.GroupID)
}
case constant.GroupApplicationAcceptedNotification: // 1505
var detail sdkws.GroupApplicationAcceptedTips
if err := utils.UnmarshalNotificationElem(msg.Content, &detail); err != nil {
return err
}
switch detail.ReceiverAs {
case 0:
return g.SyncAllSelfGroupApplication(ctx)
case 1:
return g.SyncAdminGroupApplications(ctx, detail.Group.GroupID)
default:
return fmt.Errorf("GroupApplicationAcceptedNotification ReceiverAs unknown %d", detail.ReceiverAs)
}
case constant.GroupApplicationRejectedNotification: // 1506
var detail sdkws.GroupApplicationRejectedTips
if err := utils.UnmarshalNotificationElem(msg.Content, &detail); err != nil {
return err
}
switch detail.ReceiverAs {
case 0:
return g.SyncAllSelfGroupApplication(ctx)
case 1:
return g.SyncAdminGroupApplications(ctx, detail.Group.GroupID)
default:
return fmt.Errorf("GroupApplicationRejectedNotification ReceiverAs unknown %d", detail.ReceiverAs)
}
case constant.GroupOwnerTransferredNotification: // 1507
var detail sdkws.GroupOwnerTransferredTips
if err := utils.UnmarshalNotificationElem(msg.Content, &detail); err != nil {
return err
}
if err := g.IncrSyncJoinGroup(ctx); err != nil {
return err
}
if detail.Group == nil {
return errs.New(fmt.Sprintf("group is nil, groupID: %s", detail.Group.GroupID))
}
return g.SyncAllGroupMember(ctx, detail.Group.GroupID)
case constant.MemberKickedNotification: // 1508
var detail sdkws.MemberKickedTips
if err := utils.UnmarshalNotificationElem(msg.Content, &detail); err != nil {
return err
}
var self bool
for _, info := range detail.KickedUserList {
if info.UserID == g.loginUserID {
self = true
break
}
}
if self {
members, err := g.db.GetGroupMemberListSplit(ctx, detail.Group.GroupID, 0, 0, 999999)
if err != nil {
return err
}
if err := g.db.DeleteGroupAllMembers(ctx, detail.Group.GroupID); err != nil {
return err
}
for _, member := range members {
data, err := json.Marshal(member)
if err != nil {
return err
}
g.listener().OnGroupMemberDeleted(string(data))
}
group, err := g.db.GetGroupInfoByGroupID(ctx, detail.Group.GroupID)
if err != nil {
return err
}
group.MemberCount = 0
data, err := json.Marshal(group)
if err != nil {
return err
}
if err := g.db.DeleteGroup(ctx, detail.Group.GroupID); err != nil {
return err
}
g.listener().OnGroupInfoChanged(string(data))
g.listener().OnJoinedGroupDeleted(string(data))
return nil
} else {
var userIDs []string
for _, info := range detail.KickedUserList {
userIDs = append(userIDs, info.UserID)
}
return g.SyncGroupMembers(ctx, detail.Group.GroupID, userIDs...)
}
case constant.MemberQuitNotification: // 1504
var detail sdkws.MemberQuitTips
if err := utils.UnmarshalNotificationElem(msg.Content, &detail); err != nil {
return err
}
if detail.QuitUser.UserID == g.loginUserID {
members, err := g.db.GetGroupMemberListSplit(ctx, detail.Group.GroupID, 0, 0, 999999)
if err != nil {
return err
}
if err := g.db.DeleteGroupAllMembers(ctx, detail.Group.GroupID); err != nil {
return err
}
for _, member := range members {
data, err := json.Marshal(member)
if err != nil {
return err
}
g.listener().OnGroupMemberDeleted(string(data))
}
group, err := g.db.GetGroupInfoByGroupID(ctx, detail.Group.GroupID)
if err != nil {
return err
}
group.MemberCount = 0
data, err := json.Marshal(group)
if err != nil {
return err
}
if err := g.db.DeleteGroup(ctx, detail.Group.GroupID); err != nil {
return err
}
g.listener().OnGroupInfoChanged(string(data))
return nil
} else {
//return g.SyncGroupMembers(ctx, detail.Group.GroupID, detail.QuitUser.UserID)
return g.onlineSyncGroupMember(ctx, detail.Group.GroupID, []*sdkws.GroupMemberFullInfo{detail.QuitUser}, nil, nil, detail.GroupMemberVersion)
}
case constant.MemberInvitedNotification: // 1509
var detail sdkws.MemberInvitedTips
if err := utils.UnmarshalNotificationElem(msg.Content, &detail); err != nil {
return err
}
userIDMap := datautil.SliceSetAny(detail.InvitedUserList, func(e *sdkws.GroupMemberFullInfo) string {
return e.UserID
})
//自己也是被邀请的一员
if _, ok := userIDMap[g.loginUserID]; ok {
if err := g.IncrSyncJoinGroup(ctx); err != nil {
return err
}
return g.IncrSyncGroupMember(ctx, detail.Group.GroupID)
} else {
return g.onlineSyncGroupMember(ctx, detail.Group.GroupID, nil, nil, detail.InvitedUserList, detail.GroupMemberVersion)
}
case constant.MemberEnterNotification: // 1510
var detail sdkws.MemberEnterTips
if err := utils.UnmarshalNotificationElem(msg.Content, &detail); err != nil {
return err
}
if detail.EntrantUser.UserID == g.loginUserID {
if err := g.IncrSyncJoinGroup(ctx); err != nil {
return err
}
return g.IncrSyncGroupMember(ctx, detail.Group.GroupID)
} else {
return g.onlineSyncGroupMember(ctx, detail.Group.GroupID, nil, nil, []*sdkws.GroupMemberFullInfo{detail.EntrantUser}, detail.GroupMemberVersion)
}
case constant.GroupDismissedNotification: // 1511
var detail sdkws.GroupDismissedTips
if err := utils.UnmarshalNotificationElem(msg.Content, &detail); err != nil {
return err
}
g.listener().OnGroupDismissed(utils.StructToJsonString(detail.Group))
if err := g.db.DeleteGroupAllMembers(ctx, detail.Group.GroupID); err != nil {
return err
}
if err := g.db.DeleteGroup(ctx, detail.Group.GroupID); err != nil {
return err
}
return g.SyncAllGroupMember(ctx, detail.Group.GroupID)
case constant.GroupMemberMutedNotification: // 1512
var detail sdkws.GroupMemberMutedTips
if err := utils.UnmarshalNotificationElem(msg.Content, &detail); err != nil {
return err
}
return g.SyncGroupMembers(ctx, detail.Group.GroupID, detail.MutedUser.UserID)
case constant.GroupMemberCancelMutedNotification: // 1513
var detail sdkws.GroupMemberCancelMutedTips
if err := utils.UnmarshalNotificationElem(msg.Content, &detail); err != nil {
return err
}
return g.SyncGroupMembers(ctx, detail.Group.GroupID, detail.MutedUser.UserID)
case constant.GroupMutedNotification: // 1514
return g.SyncGroups(ctx, msg.GroupID)
case constant.GroupCancelMutedNotification: // 1515
return g.SyncGroups(ctx, msg.GroupID)
case constant.GroupMemberInfoSetNotification: // 1516
var detail sdkws.GroupMemberInfoSetTips
if err := utils.UnmarshalNotificationElem(msg.Content, &detail); err != nil {
return err
}

return g.SyncGroupMembers(ctx, detail.Group.GroupID, detail.ChangedUser.UserID) //detail.ChangedUser.UserID
case constant.GroupMemberSetToAdminNotification: // 1517
var detail sdkws.GroupMemberInfoSetTips
if err := utils.UnmarshalNotificationElem(msg.Content, &detail); err != nil {
return err
}
return g.SyncGroupMembers(ctx, detail.Group.GroupID, detail.ChangedUser.UserID)
case constant.GroupMemberSetToOrdinaryUserNotification: // 1518
var detail sdkws.GroupMemberInfoSetTips
if err := utils.UnmarshalNotificationElem(msg.Content, &detail); err != nil {
return err
}
return g.SyncGroupMembers(ctx, detail.Group.GroupID, detail.ChangedUser.UserID)
case constant.GroupInfoSetAnnouncementNotification: // 1519
var detail sdkws.GroupInfoSetAnnouncementTips //
if err := utils.UnmarshalNotificationElem(msg.Content, &detail); err != nil {
return err
}
return g.SyncGroups(ctx, detail.Group.GroupID)
case constant.GroupInfoSetNameNotification: // 1520
var detail sdkws.GroupInfoSetNameTips //
if err := utils.UnmarshalNotificationElem(msg.Content, &detail); err != nil {
return err
}
return g.SyncGroups(ctx, detail.Group.GroupID)
default:
return fmt.Errorf("unknown tips type: %d", msg.ContentType)
}
}
26 changes: 25 additions & 1 deletion internal/group/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package group

import (
"context"
"github.com/openimsdk/openim-sdk-core/v3/pkg/datafetcher"
"github.com/openimsdk/tools/utils/datautil"
"time"

Expand Down Expand Up @@ -250,7 +251,30 @@ func (g *Group) SetGroupInfo(ctx context.Context, groupInfo *sdkws.GroupInfoForS
}

func (g *Group) GetGroupMemberList(ctx context.Context, groupID string, filter, offset, count int32) ([]*model_struct.LocalGroupMember, error) {
return g.db.GetGroupMemberListSplit(ctx, groupID, filter, int(offset), int(count))
dataFetcher := datafetcher.NewDataFetcher(
g.db,
g.groupMemberTableName(),
groupID,
func(localGroupMember *model_struct.LocalGroupMember) string {
return localGroupMember.UserID
},
func(ctx context.Context, values []*model_struct.LocalGroupMember) error {
return g.db.BatchInsertGroupMember(ctx, values)
},
func(ctx context.Context, uids []string) ([]*model_struct.LocalGroupMember, error) {
//todo GetGroupMemberListSplit change to
return g.db.GetGroupMemberListSplit(ctx, groupID, filter, int(offset), int(count))
},
func(ctx context.Context, userIDs []string) ([]*model_struct.LocalGroupMember, error) {
serverGroupMember, err := g.GetDesignatedGroupMembers(ctx, groupID, userIDs)
if err != nil {
return nil, err
}
return datautil.Batch(ServerGroupMemberToLocalGroupMember, serverGroupMember), nil
},
)
return dataFetcher.FetchWithPagination(ctx, int(offset), int(count))

}

func (g *Group) GetGroupMemberOwnerAndAdmin(ctx context.Context, groupID string) ([]*model_struct.LocalGroupMember, error) {
Expand Down
97 changes: 97 additions & 0 deletions pkg/datafetcher/datafetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package datafetcher

import (
"context"
"github.com/openimsdk/openim-sdk-core/v3/pkg/db/db_interface"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/datautil"
)

// DataFetcher is a struct that handles data synchronization
type DataFetcher[T any] struct {
db db_interface.VersionSyncModel
TabName string
EntityID string
Key func(T) string
batchInsert func(ctx context.Context, servers []T) error
FetchFromLocal FetchDataFunc[T]
fetchFromServer FetchFromServerFunc[T]
}

// FetchDataFunc is a function type for fetching data
type FetchDataFunc[T any] func(ctx context.Context, uids []string) ([]T, error)

// FetchFromServerFunc is a function type for fetching data from server
type FetchFromServerFunc[T any] func(ctx context.Context, uids []string) ([]T, error)

// NewDataFetcher creates a new NewDataFetcher
func NewDataFetcher[T any](db db_interface.VersionSyncModel, tabName string, entityID string, key func(T) string,
batchInsert func(ctx context.Context, servers []T) error, fetchFromLocal FetchDataFunc[T], fetchFromServer FetchFromServerFunc[T]) *DataFetcher[T] {
return &DataFetcher[T]{
db: db,
TabName: tabName,
EntityID: entityID,
Key: key,
batchInsert: batchInsert,
FetchFromLocal: fetchFromLocal,
fetchFromServer: fetchFromServer,
}
}

// FetchWithPagination fetches data with pagination and fills missing data from server
func (ds *DataFetcher[T]) FetchWithPagination(ctx context.Context, offset, limit int) ([]T, error) {
versionInfo, err := ds.db.GetVersionSync(ctx, ds.TabName, ds.EntityID)
if err != nil {
return nil, err
}

if offset > len(versionInfo.UIDList) {
return nil, errs.New("offset exceeds the length of the UID list").Wrap()
}

end := offset + limit
if end > len(versionInfo.UIDList) {
end = len(versionInfo.UIDList)
}

paginatedUIDs := versionInfo.UIDList[offset:end]

localData, err := ds.FetchMissingAndFillLocal(ctx, paginatedUIDs)
if err != nil {
return nil, err
}

return localData, nil
}

// FetchMissingAndFillLocal fetches missing data from server and fills local database
func (ds *DataFetcher[T]) FetchMissingAndFillLocal(ctx context.Context, uids []string) ([]T, error) {
localData, err := ds.FetchFromLocal(ctx, uids)
if err != nil {
return nil, err
}

localUIDSet := datautil.SliceSetAny(localData, ds.Key)

var missingUIDs []string
for _, uid := range uids {
if _, found := localUIDSet[uid]; !found {
missingUIDs = append(missingUIDs, uid)
}
}

if len(missingUIDs) > 0 {
serverData, err := ds.fetchFromServer(ctx, missingUIDs)
if err != nil {
return nil, err
}

if err := ds.batchInsert(ctx, serverData); err != nil {
return nil, err
}

localData = append(localData, serverData...)
}

return localData, nil
}
Loading

0 comments on commit c8cb470

Please sign in to comment.