Skip to content

Commit

Permalink
Merge pull request #3 from mo3et/version-sync
Browse files Browse the repository at this point in the history
fix uncorrect field name
  • Loading branch information
FGadvancer committed Jun 18, 2024
2 parents e314060 + e4d424d commit f5564ad
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 28 deletions.
6 changes: 6 additions & 0 deletions internal/friend/sync2.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package friend

import (
"context"

"github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct"
)

const (
Expand Down Expand Up @@ -47,3 +49,7 @@ func (f *Friend) IncrSyncFriends(ctx context.Context) error {
//return opt.Sync()
return nil
}

func (f *Friend) friendListTableName() string {
return model_struct.LocalFriend{}.TableName()
}
27 changes: 14 additions & 13 deletions internal/group/sync2.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package group

import (
"context"
"sync"

"github.com/openimsdk/openim-sdk-core/v3/internal/incrversion"
"github.com/openimsdk/openim-sdk-core/v3/internal/util"
"github.com/openimsdk/openim-sdk-core/v3/pkg/constant"
Expand All @@ -12,7 +14,6 @@ import (
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil"
"sync"
)

type BatchIncrementalReq struct {
Expand Down Expand Up @@ -109,10 +110,10 @@ func (g *Group) IncrSyncGroupMember(ctx context.Context, groupIDs ...string) err

func (g *Group) syncGroupMember(ctx context.Context, groupID string, resp *group.GetIncrementalGroupMemberResp) error {
groupMemberSyncer := incrversion.VersionSynchronizer[*model_struct.LocalGroupMember, *group.GetIncrementalGroupMemberResp]{
Ctx: ctx,
DB: g.db,
TabName: g.groupMemberTableName(),
EntityID: groupID,
Ctx: ctx,
DB: g.db,
TableName: g.groupMemberTableName(),
EntityID: groupID,
Key: func(localGroupMember *model_struct.LocalGroupMember) string {
return localGroupMember.UserID
},
Expand Down Expand Up @@ -158,10 +159,10 @@ func (g *Group) syncGroupMember(ctx context.Context, groupID string, resp *group

func (g *Group) onlineSyncGroupMember(ctx context.Context, groupID string, delete, update, insert []*sdkws.GroupMemberFullInfo, version uint64) error {
groupMemberSyncer := incrversion.VersionSynchronizer[*model_struct.LocalGroupMember, *group.GetIncrementalGroupMemberResp]{
Ctx: ctx,
DB: g.db,
TabName: g.groupMemberTableName(),
EntityID: groupID,
Ctx: ctx,
DB: g.db,
TableName: g.groupMemberTableName(),
EntityID: groupID,
Key: func(localGroupMember *model_struct.LocalGroupMember) string {
return localGroupMember.UserID
},
Expand Down Expand Up @@ -235,10 +236,10 @@ func (g *Group) onlineSyncGroupMember(ctx context.Context, groupID string, delet

func (g *Group) IncrSyncJoinGroup(ctx context.Context) error {
opt := incrversion.VersionSynchronizer[*model_struct.LocalGroup, *group.GetIncrementalJoinGroupResp]{
Ctx: ctx,
DB: g.db,
TabName: g.groupTableName(),
EntityID: g.loginUserID,
Ctx: ctx,
DB: g.db,
TableName: g.groupTableName(),
EntityID: g.loginUserID,
Key: func(LocalGroup *model_struct.LocalGroup) string {
return LocalGroup.GroupID
},
Expand Down
12 changes: 6 additions & 6 deletions internal/incrversion/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
type VersionSynchronizer[V, R any] struct {
Ctx context.Context
DB db_interface.VersionSyncModel
TabName string
TableName string
EntityID string
Key func(V) string
Local func() ([]V, error)
Expand All @@ -31,7 +31,7 @@ type VersionSynchronizer[V, R any] struct {
}

func (o *VersionSynchronizer[V, R]) getVersionInfo() (*model_struct.LocalVersionSync, error) {
versionInfo, err := o.DB.GetVersionSync(o.Ctx, o.TabName, o.EntityID)
versionInfo, err := o.DB.GetVersionSync(o.Ctx, o.TableName, o.EntityID)
if err != nil && errs.Unwrap(err) != gorm.ErrRecordNotFound {
log.ZWarn(o.Ctx, "get version info", err)
return nil, err
Expand All @@ -41,7 +41,7 @@ func (o *VersionSynchronizer[V, R]) getVersionInfo() (*model_struct.LocalVersion
}

func (o *VersionSynchronizer[V, R]) updateVersionInfo(lvs *model_struct.LocalVersionSync, resp R) error {
lvs.Table = o.TabName
lvs.Table = o.TableName
lvs.EntityID = o.EntityID
lvs.VersionID, lvs.Version = o.Version(resp)
return o.DB.SetVersionSync(o.Ctx, lvs)
Expand Down Expand Up @@ -73,7 +73,7 @@ func (o *VersionSynchronizer[V, R]) Sync() error {
insert := o.Insert(resp)

if len(delIDs) == 0 && len(changes) == 0 && len(insert) == 0 && !o.Full(resp) {
log.ZDebug(o.Ctx, "no data to sync", "table", o.TabName, "entityID", o.EntityID)
log.ZDebug(o.Ctx, "no data to sync", "table", o.TableName, "entityID", o.EntityID)
return nil
}

Expand Down Expand Up @@ -128,7 +128,7 @@ func (o *VersionSynchronizer[V, R]) CheckVersionSync() error {
insert := o.Insert(resp)
_, version := o.Version(resp)
if len(delIDs) == 0 && len(changes) == 0 && len(insert) == 0 && !o.Full(resp) {
log.ZWarn(o.Ctx, "exception no data to sync", errs.New("notification no data"), "table", o.TabName, "entityID", o.EntityID)
log.ZWarn(o.Ctx, "exception no data to sync", errs.New("notification no data"), "table", o.TableName, "entityID", o.EntityID)
return nil
}
if lvs.Version+1 == version {
Expand Down Expand Up @@ -161,7 +161,7 @@ func (o *VersionSynchronizer[V, R]) CheckVersionSync() error {
return o.updateVersionInfo(lvs, resp)
} else if version <= lvs.Version {
log.ZWarn(o.Ctx, "version less than local version", errs.New("version less than local version"),
"table", o.TabName, "entityID", o.EntityID, "version", version, "localVersion", lvs.Version)
"table", o.TableName, "entityID", o.EntityID, "version", version, "localVersion", lvs.Version)
return nil
} else {
// Re-fetch the version number from the server, compare it with the local version number, and fetch the difference once.
Expand Down
3 changes: 2 additions & 1 deletion msgtest/module/friend_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package module

import (
"github.com/openimsdk/openim-sdk-core/v3/pkg/constant"
"github.com/openimsdk/protocol/relation"
)

type TestFriendManager struct {
*MetaManager
}

func (t *TestFriendManager) ImportFriends(ownerUserID string, friendUserIDs []string) error {
req := &friend.ImportFriendReq{
req := &relation.ImportFriendReq{
OwnerUserID: ownerUserID,
FriendUserIDs: friendUserIDs,
}
Expand Down
7 changes: 4 additions & 3 deletions msgtest/pressure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"context"
"flag"
"fmt"
"github.com/openimsdk/openim-sdk-core/v3/msgtest/module"
"github.com/openimsdk/openim-sdk-core/v3/sdk_struct"
"sync"
"testing"
"time"

"github.com/openimsdk/openim-sdk-core/v3/msgtest/module"
"github.com/openimsdk/openim-sdk-core/v3/sdk_struct"

"github.com/openimsdk/tools/log"
)

Expand Down Expand Up @@ -56,7 +57,7 @@ func init() {
InitWithFlag()

if err := log.InitFromConfig("sdk.log", "sdk", 4,
true, false, "./chat_log", 2, 24); err != nil {
true, false, "./chat_log", 2, 24, "v1.0.0"); err != nil {
panic(err)
}
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/datafetcher/datafetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package datafetcher

import (
"context"

"github.com/openimsdk/openim-sdk-core/v3/pkg/db/db_interface"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/datautil"
Expand All @@ -10,7 +11,7 @@ import (
// DataFetcher is a struct that handles data synchronization
type DataFetcher[T any] struct {
db db_interface.VersionSyncModel
TabName string
TableName string
EntityID string
Key func(T) string
batchInsert func(ctx context.Context, servers []T) error
Expand All @@ -25,11 +26,11 @@ type FetchDataFunc[T any] func(ctx context.Context, uids []string) ([]T, error)
type FetchFromServerFunc[T any] func(ctx context.Context, uids []string) ([]T, error)

// NewDataFetcher creates a new NewDataFetcher
func NewDataFetcher[T any](db db_interface.VersionSyncModel, tabName string, entityID string, key func(T) string,
func NewDataFetcher[T any](db db_interface.VersionSyncModel, tableName string, entityID string, key func(T) string,
batchInsert func(ctx context.Context, servers []T) error, fetchFromLocal FetchDataFunc[T], fetchFromServer FetchFromServerFunc[T]) *DataFetcher[T] {
return &DataFetcher[T]{
db: db,
TabName: tabName,
TableName: tableName,
EntityID: entityID,
Key: key,
batchInsert: batchInsert,
Expand All @@ -40,7 +41,7 @@ func NewDataFetcher[T any](db db_interface.VersionSyncModel, tabName string, ent

// FetchWithPagination fetches data with pagination and fills missing data from server
func (ds *DataFetcher[T]) FetchWithPagination(ctx context.Context, offset, limit int) ([]T, error) {
versionInfo, err := ds.db.GetVersionSync(ctx, ds.TabName, ds.EntityID)
versionInfo, err := ds.db.GetVersionSync(ctx, ds.TableName, ds.EntityID)
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/db/db_interface/databse.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package db_interface

import (
"context"

"github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct"
"github.com/openimsdk/openim-sdk-core/v3/sdk_struct"
)
Expand Down Expand Up @@ -57,6 +58,7 @@ type GroupModel interface {
GetGroupAdminID(ctx context.Context, groupID string) ([]string, error)
GetGroupMemberListByGroupID(ctx context.Context, groupID string) ([]*model_struct.LocalGroupMember, error)
GetGroupMemberListSplit(ctx context.Context, groupID string, filter int32, offset, count int) ([]*model_struct.LocalGroupMember, error)
GetGroupMemberListByUserIDs(ctx context.Context, groupID string, filter int32, userIDs []string) ([]*model_struct.LocalGroupMember, error)
GetGroupMemberOwnerAndAdminDB(ctx context.Context, groupID string) ([]*model_struct.LocalGroupMember, error)
GetGroupMemberOwner(ctx context.Context, groupID string) (*model_struct.LocalGroupMember, error)
GetGroupMemberListSplitByJoinTimeFilter(ctx context.Context, groupID string, offset, count int, joinTimeBegin, joinTimeEnd int64, userIDList []string) ([]*model_struct.LocalGroupMember, error)
Expand Down
35 changes: 34 additions & 1 deletion pkg/db/group_member_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"context"
"errors"
"fmt"

"github.com/openimsdk/openim-sdk-core/v3/pkg/constant"
"github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct"
"github.com/openimsdk/openim-sdk-core/v3/pkg/utils"
"github.com/openimsdk/tools/errs"
)

func (d *DataBase) GetGroupMemberInfoByGroupIDUserID(ctx context.Context, groupID, userID string) (*model_struct.LocalGroupMember, error) {
Expand Down Expand Up @@ -86,6 +88,37 @@ func (d *DataBase) GetGroupMemberListByGroupID(ctx context.Context, groupID stri
}
return transfer, utils.Wrap(err, "GetGroupMemberListByGroupID failed ")
}

func (d *DataBase) GetGroupMemberListByUserIDs(ctx context.Context, groupID string, filter int32, userIDs []string) ([]*model_struct.LocalGroupMember, error) {
d.groupMtx.Lock()
defer d.groupMtx.Unlock()
var groupMemberList []model_struct.LocalGroupMember
var err error
switch filter {
case constant.GroupFilterAll:
err = d.conn.WithContext(ctx).Where("group_id = ? AND user_id IN ?", groupID, userIDs).Order("role_level DESC, join_time ASC").Find(&groupMemberList).Error
case constant.GroupFilterOwner:
err = d.conn.WithContext(ctx).Where("group_id = ? AND role_level = ? AND user_id IN ?", groupID, constant.GroupOwner, userIDs).Find(&groupMemberList).Error
case constant.GroupFilterAdmin:
err = d.conn.WithContext(ctx).Where("group_id = ? AND role_level = ? AND user_id IN ?", groupID, constant.GroupAdmin, userIDs).Order("join_time ASC").Find(&groupMemberList).Error
case constant.GroupFilterOrdinaryUsers:
err = d.conn.WithContext(ctx).Where("group_id = ? AND role_level = ? AND user_id IN ?", groupID, constant.GroupOrdinaryUsers, userIDs).Order("join_time ASC").Find(&groupMemberList).Error
case constant.GroupFilterAdminAndOrdinaryUsers:
err = d.conn.WithContext(ctx).Where("group_id = ? AND (role_level = ? OR role_level = ?) AND user_id IN ?", groupID, constant.GroupAdmin, constant.GroupOrdinaryUsers, userIDs).Order("role_level DESC, join_time ASC").Find(&groupMemberList).Error
case constant.GroupFilterOwnerAndAdmin:
err = d.conn.WithContext(ctx).Where("group_id = ? AND (role_level = ? OR role_level = ?) AND user_id IN ?", groupID, constant.GroupOwner, constant.GroupAdmin, userIDs).Order("role_level DESC, join_time ASC").Find(&groupMemberList).Error
default:
return nil, errs.New("filter args failed.", "filter", filter).Wrap()
}
var transfer []*model_struct.LocalGroupMember
for _, member := range groupMemberList {
memberCopy := member
transfer = append(transfer, &memberCopy)
}
return transfer, errs.Wrap(err)

}

func (d *DataBase) GetGroupMemberListSplit(ctx context.Context, groupID string, filter int32, offset, count int) ([]*model_struct.LocalGroupMember, error) {
d.groupMtx.Lock()
defer d.groupMtx.Unlock()
Expand All @@ -105,7 +138,7 @@ func (d *DataBase) GetGroupMemberListSplit(ctx context.Context, groupID string,
case constant.GroupFilterOwnerAndAdmin:
err = d.conn.WithContext(ctx).Where("group_id = ? And (role_level = ? or role_level = ?)", groupID, constant.GroupOwner, constant.GroupAdmin).Order("role_level DESC,join_time ASC").Offset(offset).Limit(count).Find(&groupMemberList).Error
default:
return nil, fmt.Errorf("filter args failed %d", filter)
return nil, errs.New("filter args failed", "filter", filter).Wrap()
}
var transfer []*model_struct.LocalGroupMember
for _, v := range groupMemberList {
Expand Down
4 changes: 4 additions & 0 deletions pkg/db/model_struct/data_model_struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ type LocalFriend struct {
IsPinned bool `gorm:"column:is_pinned;" json:"isPinned"`
}

func (LocalFriend) TableName() string {
return "local_friends"
}

// message FriendRequest{
// string FromUserID = 1;
// string ToUserID = 2;
Expand Down

0 comments on commit f5564ad

Please sign in to comment.