Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: lower the level of code nesting #1396

Merged
merged 1 commit into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 64 additions & 54 deletions internal/msggateway/hub_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,19 @@ package msggateway
import (
"context"

"github.com/OpenIMSDK/tools/mcontext"

"github.com/openimsdk/open-im-server/v3/pkg/authverify"

"github.com/OpenIMSDK/tools/errs"
"google.golang.org/grpc"

"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"

"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/msggateway"
"github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext"
"github.com/OpenIMSDK/tools/utils"

"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
)

Expand All @@ -41,6 +38,7 @@ func (s *Server) InitServer(disCov discoveryregistry.SvcDiscoveryRegistry, serve
if err != nil {
return err
}

msgModel := cache.NewMsgCacheModel(rdb)
s.LongConnServer.SetDiscoveryRegistry(disCov)
s.LongConnServer.SetCacheHandler(msgModel)
Expand Down Expand Up @@ -97,22 +95,25 @@ func (s *Server) GetUsersOnlineStatus(
if !ok {
continue
}
temp := new(msggateway.GetUsersOnlineStatusResp_SuccessResult)
temp.UserID = userID

uresp := new(msggateway.GetUsersOnlineStatusResp_SuccessResult)
uresp.UserID = userID
for _, client := range clients {
if client != nil {
ps := new(msggateway.GetUsersOnlineStatusResp_SuccessDetail)
ps.Platform = constant.PlatformIDToName(client.PlatformID)
ps.Status = constant.OnlineStatus
ps.ConnID = client.ctx.GetConnID()
ps.Token = client.token
ps.IsBackground = client.IsBackground
temp.Status = constant.OnlineStatus
temp.DetailPlatformStatus = append(temp.DetailPlatformStatus, ps)
if client == nil {
continue
}

ps := new(msggateway.GetUsersOnlineStatusResp_SuccessDetail)
ps.Platform = constant.PlatformIDToName(client.PlatformID)
ps.Status = constant.OnlineStatus
ps.ConnID = client.ctx.GetConnID()
ps.Token = client.token
ps.IsBackground = client.IsBackground
uresp.Status = constant.OnlineStatus
uresp.DetailPlatformStatus = append(uresp.DetailPlatformStatus, ps)
}
if temp.Status == constant.OnlineStatus {
resp.SuccessResult = append(resp.SuccessResult, temp)
if uresp.Status == constant.OnlineStatus {
resp.SuccessResult = append(resp.SuccessResult, uresp)
}
}
return &resp, nil
Expand All @@ -129,50 +130,55 @@ func (s *Server) SuperGroupOnlineBatchPushOneMsg(
ctx context.Context,
req *msggateway.OnlineBatchPushOneMsgReq,
) (*msggateway.OnlineBatchPushOneMsgResp, error) {
var singleUserResult []*msggateway.SingleMsgToUserResults

var singleUserResults []*msggateway.SingleMsgToUserResults

for _, v := range req.PushToUserIDs {
var resp []*msggateway.SingleMsgToUserPlatform
tempT := &msggateway.SingleMsgToUserResults{
results := &msggateway.SingleMsgToUserResults{
UserID: v,
}
clients, ok := s.LongConnServer.GetUserAllCons(v)
if !ok {
log.ZDebug(ctx, "push user not online", "userID", v)
tempT.Resp = resp
singleUserResult = append(singleUserResult, tempT)
results.Resp = resp
singleUserResults = append(singleUserResults, results)
continue
}

log.ZDebug(ctx, "push user online", "clients", clients, "userID", v)
for _, client := range clients {
if client != nil {
temp := &msggateway.SingleMsgToUserPlatform{
RecvID: v,
RecvPlatFormID: int32(client.PlatformID),
}
if !client.IsBackground ||
(client.IsBackground == true && client.PlatformID != constant.IOSPlatformID) {
err := client.PushMessage(ctx, req.MsgData)
if err != nil {
temp.ResultCode = -2
resp = append(resp, temp)
} else {
if utils.IsContainInt(client.PlatformID, s.pushTerminal) {
tempT.OnlinePush = true
resp = append(resp, temp)
}
}
if client == nil {
continue
}

userPlatform := &msggateway.SingleMsgToUserPlatform{
RecvID: v,
RecvPlatFormID: int32(client.PlatformID),
}
if !client.IsBackground ||
(client.IsBackground && client.PlatformID != constant.IOSPlatformID) {
err := client.PushMessage(ctx, req.MsgData)
if err != nil {
userPlatform.ResultCode = -2
resp = append(resp, userPlatform)
} else {
temp.ResultCode = -3
resp = append(resp, temp)
if utils.IsContainInt(client.PlatformID, s.pushTerminal) {
results.OnlinePush = true
resp = append(resp, userPlatform)
}
}
} else {
userPlatform.ResultCode = -3
resp = append(resp, userPlatform)
}
}
tempT.Resp = resp
singleUserResult = append(singleUserResult, tempT)
results.Resp = resp
singleUserResults = append(singleUserResults, results)
}

return &msggateway.OnlineBatchPushOneMsgResp{
SinglePushResult: singleUserResult,
SinglePushResult: singleUserResults,
}, nil
}

Expand All @@ -181,17 +187,21 @@ func (s *Server) KickUserOffline(
req *msggateway.KickUserOfflineReq,
) (*msggateway.KickUserOfflineResp, error) {
for _, v := range req.KickUserIDList {
if clients, _, ok := s.LongConnServer.GetUserPlatformCons(v, int(req.PlatformID)); ok {
for _, client := range clients {
log.ZDebug(ctx, "kick user offline", "userID", v, "platformID", req.PlatformID, "client", client)
if err := client.longConnServer.KickUserConn(client); err != nil {
log.ZWarn(ctx, "kick user offline failed", err, "userID", v, "platformID", req.PlatformID)
}
}
} else {
clients, _, ok := s.LongConnServer.GetUserPlatformCons(v, int(req.PlatformID))
if !ok {
log.ZInfo(ctx, "conn not exist", "userID", v, "platformID", req.PlatformID)
continue
}

for _, client := range clients {
log.ZDebug(ctx, "kick user offline", "userID", v, "platformID", req.PlatformID, "client", client)
if err := client.longConnServer.KickUserConn(client); err != nil {
log.ZWarn(ctx, "kick user offline failed", err, "userID", v, "platformID", req.PlatformID)
}
}
continue
}

return &msggateway.KickUserOfflineResp{}, nil
}

Expand Down
8 changes: 1 addition & 7 deletions internal/msggateway/n_ws_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,6 @@ type LongConnServer interface {
MessageHandler
}

var bufferPool = sync.Pool{
New: func() interface{} {
return make([]byte, 1024)
},
}

type WsServer struct {
port int
wsMaxConnNum int64
Expand Down Expand Up @@ -374,7 +368,7 @@ func (ws *WsServer) unregisterClient(client *Client) {
}
ws.onlineUserConnNum.Add(-1)
ws.SetUserOnlineStatus(client.ctx, client, constant.Offline)
log.ZInfo(client.ctx, "user offline", "close reason", client.closedErr, "online user Num", ws.onlineUserNum, "online user conn Num",
log.ZInfo(client.ctx, "user offline", "close reason", client.closedErr, "online user Num", ws.onlineUserNum.Load(), "online user conn Num",
ws.onlineUserConnNum.Load(),
)
}
Expand Down
Loading