Skip to content

Commit

Permalink
refactor: lower the level of code nesting (#1370)
Browse files Browse the repository at this point in the history
* refactor: lower the level of code nesting

Signed-off-by: rfyiamcool <rfyiamcool@163.com>

* refactor: lower the level of code nesting

Signed-off-by: rfyiamcool <rfyiamcool@163.com>

---------

Signed-off-by: rfyiamcool <rfyiamcool@163.com>
  • Loading branch information
rfyiamcool committed Nov 10, 2023
1 parent a32e94b commit 686fa80
Showing 1 changed file with 64 additions and 43 deletions.
107 changes: 64 additions & 43 deletions internal/push/push_to_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,9 @@ import (
"context"
"encoding/json"
"errors"
"github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/dummy"

"github.com/OpenIMSDK/protocol/conversation"

"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"

"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/conversation"
"github.com/OpenIMSDK/protocol/msggateway"
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/discoveryregistry"
Expand All @@ -34,13 +29,16 @@ import (
"github.com/OpenIMSDK/tools/utils"

"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/dummy"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/fcm"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/getui"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/jpush"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
)

Expand All @@ -53,7 +51,6 @@ type Pusher struct {
msgRpcClient *rpcclient.MessageRpcClient
conversationRpcClient *rpcclient.ConversationRpcClient
groupRpcClient *rpcclient.GroupRpcClient
successCount int
}

var errNoOfflinePusher = errors.New("no offlinePusher is configured")
Expand Down Expand Up @@ -104,24 +101,29 @@ func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.Msg
if err := callbackOnlinePush(ctx, userIDs, msg); err != nil {
return err
}

// push
wsResults, err := p.GetConnsAndOnlinePush(ctx, msg, userIDs)
if err != nil {
return err
}

isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush)
log.ZDebug(ctx, "push_result", "ws push result", wsResults, "sendData", msg, "isOfflinePush", isOfflinePush, "push_to_userID", userIDs)
p.successCount++
if isOfflinePush {
for _, v := range wsResults {
if msg.SendID != v.UserID && (!v.OnlinePush) {
if err := callbackOfflinePush(ctx, userIDs, msg, &[]string{}); err != nil {
return err
}
err = p.offlinePushMsg(ctx, msg.SendID, msg, []string{v.UserID})
if err != nil {
return err
}

if !isOfflinePush {
return nil
}

for _, v := range wsResults {
if msg.SendID != v.UserID && (!v.OnlinePush) {
if err = callbackOfflinePush(ctx, userIDs, msg, &[]string{}); err != nil {
return err
}

err = p.offlinePushMsg(ctx, msg.SendID, msg, []string{v.UserID})
if err != nil {
return err
}
}
}
Expand All @@ -140,22 +142,24 @@ func (p *Pusher) UnmarshalNotificationElem(bytes []byte, t interface{}) error {
func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) {
log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID)
var pushToUserIDs []string
if err := callbackBeforeSuperGroupOnlinePush(ctx, groupID, msg, &pushToUserIDs); err != nil {
if err = callbackBeforeSuperGroupOnlinePush(ctx, groupID, msg, &pushToUserIDs); err != nil {
return err
}

if len(pushToUserIDs) == 0 {
pushToUserIDs, err = p.groupLocalCache.GetGroupMemberIDs(ctx, groupID)
if err != nil {
return err
}

switch msg.ContentType {
case constant.MemberQuitNotification:
var tips sdkws.MemberQuitTips
if p.UnmarshalNotificationElem(msg.Content, &tips) != nil {
return err
}
defer func(groupID string, userIDs []string) {
if err := p.DeleteMemberAndSetConversationSeq(ctx, groupID, userIDs); err != nil {
if err = p.DeleteMemberAndSetConversationSeq(ctx, groupID, userIDs); err != nil {
log.ZError(ctx, "MemberQuitNotification DeleteMemberAndSetConversationSeq", err, "groupID", groupID, "userIDs", userIDs)
}
}(groupID, []string{tips.QuitUser.UserID})
Expand All @@ -167,7 +171,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
}
kickedUsers := utils.Slice(tips.KickedUserList, func(e *sdkws.GroupMemberFullInfo) string { return e.UserID })
defer func(groupID string, userIDs []string) {
if err := p.DeleteMemberAndSetConversationSeq(ctx, groupID, userIDs); err != nil {
if err = p.DeleteMemberAndSetConversationSeq(ctx, groupID, userIDs); err != nil {
log.ZError(ctx, "MemberKickedNotification DeleteMemberAndSetConversationSeq", err, "groupID", groupID, "userIDs", userIDs)
}
}(groupID, kickedUsers)
Expand All @@ -183,48 +187,61 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
ctx = mcontext.WithOpUserIDContext(ctx, config.Config.Manager.UserID[0])
}
defer func(groupID string) {
if err := p.groupRpcClient.DismissGroup(ctx, groupID); err != nil {
if err = p.groupRpcClient.DismissGroup(ctx, groupID); err != nil {
log.ZError(ctx, "DismissGroup Notification clear members", err, "groupID", groupID)
}
}(groupID)
}
}
}

wsResults, err := p.GetConnsAndOnlinePush(ctx, msg, pushToUserIDs)
if err != nil {
return err
}

log.ZDebug(ctx, "get conn and online push success", "result", wsResults, "msg", msg)
p.successCount++
isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush)
if isOfflinePush {
var onlineSuccessUserIDs []string
var WebAndPcBackgroundUserIDs []string
onlineSuccessUserIDs = append(onlineSuccessUserIDs, msg.SendID)
var (
onlineSuccessUserIDs = []string{msg.SendID}
webAndPcBackgroundUserIDs []string
)

for _, v := range wsResults {
if v.OnlinePush && v.UserID != msg.SendID {
onlineSuccessUserIDs = append(onlineSuccessUserIDs, v.UserID)
}
if !v.OnlinePush {
if len(v.Resp) != 0 {
for _, singleResult := range v.Resp {
if singleResult.ResultCode == -2 {
if constant.PlatformIDToName(int(singleResult.RecvPlatFormID)) == constant.TerminalPC ||
singleResult.RecvPlatFormID == constant.WebPlatformID {
WebAndPcBackgroundUserIDs = append(WebAndPcBackgroundUserIDs, v.UserID)
}
}
}

if v.OnlinePush {
continue
}

if len(v.Resp) == 0 {
continue
}

for _, singleResult := range v.Resp {
if singleResult.ResultCode != -2 {
continue
}

isPC := constant.PlatformIDToName(int(singleResult.RecvPlatFormID)) == constant.TerminalPC
isWebID := singleResult.RecvPlatFormID == constant.WebPlatformID

if isPC || isWebID {
webAndPcBackgroundUserIDs = append(webAndPcBackgroundUserIDs, v.UserID)
}
}
}

needOfflinePushUserIDs := utils.DifferenceString(onlineSuccessUserIDs, pushToUserIDs)
if msg.ContentType != constant.SignalingNotification {
notNotificationUserIDs, err := p.conversationLocalCache.GetRecvMsgNotNotifyUserIDs(ctx, groupID)
if err != nil {
// log.ZError(ctx, "GetRecvMsgNotNotifyUserIDs failed", err, "groupID", groupID)
return err
}

needOfflinePushUserIDs = utils.SliceSub(needOfflinePushUserIDs, notNotificationUserIDs)
}
// Use offline push messaging
Expand All @@ -234,6 +251,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
if err != nil {
return err
}

if len(offlinePushUserIDs) > 0 {
needOfflinePushUserIDs = offlinePushUserIDs
}
Expand All @@ -250,8 +268,8 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg)
return err
}
if _, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(resp.UserIDs, WebAndPcBackgroundUserIDs)); err != nil {
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", utils.IntersectString(needOfflinePushUserIDs, WebAndPcBackgroundUserIDs))
if _, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(resp.UserIDs, webAndPcBackgroundUserIDs)); err != nil {
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", utils.IntersectString(needOfflinePushUserIDs, webAndPcBackgroundUserIDs))
return err
}
}
Expand Down Expand Up @@ -319,15 +337,18 @@ func (p *Pusher) getOfflinePushInfos(conversationID string, msg *sdkws.MsgData)
err = errNoOfflinePusher
return
}
type AtContent struct {

type atContent struct {
Text string `json:"text"`
AtUserList []string `json:"atUserList"`
IsAtSelf bool `json:"isAtSelf"`
}

opts, err = p.GetOfflinePushOpts(msg)
if err != nil {
return
}

if msg.OfflinePushInfo != nil {
title = msg.OfflinePushInfo.Title
content = msg.OfflinePushInfo.Desc
Expand All @@ -345,9 +366,9 @@ func (p *Pusher) getOfflinePushInfos(conversationID string, msg *sdkws.MsgData)
case constant.File:
title = constant.ContentType2PushContent[int64(msg.ContentType)]
case constant.AtText:
a := AtContent{}
_ = utils.JsonStringToStruct(string(msg.Content), &a)
if utils.IsContain(conversationID, a.AtUserList) {
ac := atContent{}
_ = utils.JsonStringToStruct(string(msg.Content), &ac)
if utils.IsContain(conversationID, ac.AtUserList) {
title = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common]
} else {
title = constant.ContentType2PushContent[constant.GroupMsg]
Expand Down

0 comments on commit 686fa80

Please sign in to comment.