Skip to content

Commit

Permalink
fix: add graceful exit logic (openimsdk#422)
Browse files Browse the repository at this point in the history
* fix: fix the component check logic

* fix: add graceful_exit in admin-api

* fix: fix the graceful exit logic

* fix: add rpc graceful logic

* fix: del the http port

* fix: add graceful logic in chat-api

* fix: update the func location

* fix: fix the error

* fix: fix the error

* fix: test the func loc

* fix: fix the tools version
  • Loading branch information
luhaoling committed Mar 5, 2024
1 parent c6be969 commit b5e99dc
Show file tree
Hide file tree
Showing 9 changed files with 200 additions and 27 deletions.
40 changes: 37 additions & 3 deletions cmd/api/admin-api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@
package main

import (
"context"
"fmt"
"github.com/OpenIMSDK/chat/pkg/util"
"math/rand"
"net"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"
"time"

"github.com/OpenIMSDK/tools/errs"
Expand Down Expand Up @@ -95,8 +100,37 @@ func main() {
api.NewAdminRoute(engine, zk)

address := net.JoinHostPort(config.Config.AdminApi.ListenIP, strconv.Itoa(ginPort))
if err := engine.Run(address); err != nil {
fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err)
os.Exit(-1)

server := http.Server{Addr: address, Handler: engine}

var (
netDone = make(chan struct{}, 1)
netErr error
)

go func() {
err = server.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
netErr = errs.Wrap(err, fmt.Sprintf("api start err: %s", server.Addr))
netDone <- struct{}{}
}
}()

sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM)

select {
case <-sigs:
util.SIGTERMExit()
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
err = server.Shutdown(ctx)
if err != nil {
util.ExitWithError(errs.Wrap(err, "shutdown err"))
}
case <-netDone:
close(netDone)
util.ExitWithError(netErr)
}

}
39 changes: 36 additions & 3 deletions cmd/api/chat-api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,17 @@
package main

import (
"context"
"flag"
"fmt"
"github.com/OpenIMSDK/chat/pkg/util"
"math/rand"
"net"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"
"time"

"github.com/OpenIMSDK/tools/errs"
Expand Down Expand Up @@ -105,8 +110,36 @@ func main() {
api.NewChatRoute(engine, zk)

address := net.JoinHostPort(config.Config.ChatApi.ListenIP, strconv.Itoa(ginPort))
if err := engine.Run(address); err != nil {
fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err)
os.Exit(-1)

server := http.Server{Addr: address, Handler: engine}

var (
netDone = make(chan struct{}, 1)
netErr error
)

go func() {
err = server.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
netErr = errs.Wrap(err, fmt.Sprintf("api start err: %s", server.Addr))
netDone <- struct{}{}
}
}()

sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM)

select {
case <-sigs:
util.SIGTERMExit()
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
err = server.Shutdown(ctx)
if err != nil {
util.ExitWithError(errs.Wrap(err, "shutdown err"))
}
case <-netDone:
close(netDone)
util.ExitWithError(netErr)
}
}
4 changes: 2 additions & 2 deletions cmd/rpc/admin-rpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package main
import (
"flag"
"fmt"
"github.com/OpenIMSDK/chat/pkg/util"
"math/rand"
"os"
"time"
Expand Down Expand Up @@ -70,7 +71,6 @@ func main() {
}
err = chatrpcstart.Start(rpcPort, config.Config.RpcRegisterName.OpenImAdminName, 0, admin.Start)
if err != nil {
fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err)
os.Exit(-1)
util.ExitWithError(err)
}
}
4 changes: 2 additions & 2 deletions cmd/rpc/chat-rpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package main

import (
"fmt"
"github.com/OpenIMSDK/chat/pkg/util"
"math/rand"
"os"
"time"
Expand Down Expand Up @@ -66,7 +67,6 @@ func main() {
}
err = chatrpcstart.Start(rpcPort, config.Config.RpcRegisterName.OpenImChatName, 0, chat.Start)
if err != nil {
fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err)
os.Exit(-1)
util.ExitWithError(err)
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (

require (
github.com/OpenIMSDK/protocol v0.0.21
github.com/OpenIMSDK/tools v0.0.36
github.com/OpenIMSDK/tools v0.0.37
github.com/livekit/protocol v1.5.0
github.com/redis/go-redis/v9 v9.2.1
github.com/xuri/excelize/v2 v2.8.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ github.com/IBM/sarama v1.42.1 h1:wugyWa15TDEHh2kvq2gAy1IHLjEjuYOYgXz/ruC/OSQ=
github.com/IBM/sarama v1.42.1/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ=
github.com/OpenIMSDK/protocol v0.0.21 h1:5H6H+hJ9d/VgRqttvxD/zfK9Asd+4M8Eknk5swSbUVY=
github.com/OpenIMSDK/protocol v0.0.21/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y=
github.com/OpenIMSDK/tools v0.0.36 h1:BT0q64l4f3QJDW16Rc0uJYt1gQFkiPoUQYQ33vo0EcE=
github.com/OpenIMSDK/tools v0.0.36/go.mod h1:wBfR5CYmEyvxl03QJbTkhz1CluK6J4/lX0lviu8JAjE=
github.com/OpenIMSDK/tools v0.0.37 h1:qvDqmA4RbEJtPjZouWCkVuf/pjm6Y8nUrG5iH2gcnOg=
github.com/OpenIMSDK/tools v0.0.37/go.mod h1:wBfR5CYmEyvxl03QJbTkhz1CluK6J4/lX0lviu8JAjE=
github.com/alibabacloud-go/alibabacloud-gateway-spi v0.0.4 h1:iC9YFYKDGEy3n/FtqJnOkZsene9olVspKmkX5A2YBEo=
github.com/alibabacloud-go/alibabacloud-gateway-spi v0.0.4/go.mod h1:sCavSAvdzOjul4cEqeVtvlSaSScfNsTQ+46HwlTL1hc=
github.com/alibabacloud-go/darabonba-openapi v0.1.18/go.mod h1:PB4HffMhJVmAgNKNq3wYbTUlFvPgxJpTzd1F5pTuUsc=
Expand Down
80 changes: 67 additions & 13 deletions pkg/common/chatrpcstart/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,17 @@
package chatrpcstart

import (
"context"
"errors"
"fmt"
"github.com/OpenIMSDK/chat/pkg/util"
"net"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"

"github.com/OpenIMSDK/chat/pkg/common/config"
chatMw "github.com/OpenIMSDK/chat/pkg/common/mw"
Expand All @@ -26,7 +34,6 @@ import (
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/mw"
"github.com/OpenIMSDK/tools/network"
"github.com/OpenIMSDK/tools/utils"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
Expand All @@ -36,33 +43,80 @@ func Start(rpcPort int, rpcRegisterName string, prometheusPort int, rpcFn func(c

var zkClient discoveryregistry.SvcDiscoveryRegistry
zkClient, err := discovery_register.NewDiscoveryRegister(config.Config.Envs.Discovery)
/*
zkClient, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema,
openKeeper.WithFreq(time.Hour), openKeeper.WithUserNameAndPassword(config.Config.Zookeeper.Username,
config.Config.Zookeeper.Password), openKeeper.WithRoundRobin(), openKeeper.WithTimeout(10), openKeeper.WithLogger(log.NewZkLogger()))*/if err != nil {
if err != nil {
return errs.Wrap(err, fmt.Sprintf(";the addr is:%v", &config.Config.Zookeeper.ZkAddr))
}
// defer zkClient.CloseZK()
defer zkClient.Close()
zkClient.AddOption(chatMw.AddUserType(), mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()))
registerIP, err := network.GetRpcRegisterIP(config.Config.Rpc.RegisterIP)
if err != nil {
return utils.Wrap1(err)
return errs.Wrap(err)
}

rpcTcpAddr := net.JoinHostPort(network.GetListenIP(config.Config.Rpc.ListenIP), strconv.Itoa(rpcPort))
listener, err := net.Listen("tcp", rpcTcpAddr)
if err != nil {
return errs.Wrap(err)
}
defer listener.Close()

srv := grpc.NewServer(append(options, mw.GrpcServer())...)
defer srv.GracefulStop()
once := sync.Once{}
defer func() {
once.Do(srv.GracefulStop)
}()

err = rpcFn(zkClient, srv)
if err != nil {
return err
}

err = zkClient.Register(rpcRegisterName, registerIP, rpcPort, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return utils.Wrap1(err)
return errs.Wrap(err)
}
listener, err := net.Listen("tcp", net.JoinHostPort(network.GetListenIP(config.Config.Rpc.ListenIP), strconv.Itoa(rpcPort)))
if err != nil {
return utils.Wrap1(err)

var (
netDone = make(chan struct{}, 1)
netErr error
)

go func() {
err := srv.Serve(listener)
if err != nil {
netErr = errs.Wrap(err, "rpc start err: ", rpcTcpAddr)
netDone <- struct{}{}
}
}()

sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM)
select {
case <-sigs:
util.SIGTERMExit()
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
if err := gracefulStopWithCtx(ctx, srv.GracefulStop); err != nil {
return err
}
case <-netDone:
close(netDone)
return netErr
}
return nil
}

func gracefulStopWithCtx(ctx context.Context, f func()) error {
done := make(chan struct{}, 1)
go func() {
f()
close(done)
}()
select {
case <-ctx.Done():
return errs.Wrap(errors.New("timeout, ctx graceful stop"))
case <-done:
return nil
}
defer listener.Close()
return utils.Wrap1(srv.Serve(listener))
}
2 changes: 1 addition & 1 deletion pkg/common/config/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func FlagParse() (string, int, bool, error) {
var hide bool
flag.BoolVar(&hide, "hide", false, "hide the ComponentCheck result")

// Version flag
// Version flagqq
var showVersion bool
flag.BoolVar(&showVersion, "version", false, "show version and exit")

Expand Down
52 changes: 52 additions & 0 deletions pkg/util/genutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package util

import (
"fmt"
"os"
"path/filepath"
)

// OutDir creates the absolute path name from path and checks path exists.
// Returns absolute path including trailing '/' or error if path does not exist.
func OutDir(path string) (string, error) {
outDir, err := filepath.Abs(path)
if err != nil {
return "", err
}

stat, err := os.Stat(outDir)
if err != nil {
return "", err
}

if !stat.IsDir() {
return "", fmt.Errorf("output directory %s is not a directory", outDir)
}
outDir += "/"
return outDir, nil
}

func ExitWithError(err error) {
progName := filepath.Base(os.Args[0])
fmt.Fprintf(os.Stderr, "%s exit -1: %+v\n", progName, err)
os.Exit(-1)
}

func SIGTERMExit() {
progName := filepath.Base(os.Args[0])
fmt.Fprintf(os.Stderr, "Warning %s receive process terminal SIGTERM exit 0\n", progName)
}

0 comments on commit b5e99dc

Please sign in to comment.