From b5e99dccbfdf222da9b97e4c607850699f1ea961 Mon Sep 17 00:00:00 2001 From: Brabem <69128477+luhaoling@users.noreply.github.com> Date: Tue, 5 Mar 2024 14:04:33 +0800 Subject: [PATCH] fix: add graceful exit logic (#422) * fix: fix the component check logic * fix: add graceful_exit in admin-api * fix: fix the graceful exit logic * fix: add rpc graceful logic * fix: del the http port * fix: add graceful logic in chat-api * fix: update the func location * fix: fix the error * fix: fix the error * fix: test the func loc * fix: fix the tools version --- cmd/api/admin-api/main.go | 40 ++++++++++++++-- cmd/api/chat-api/main.go | 39 ++++++++++++++-- cmd/rpc/admin-rpc/main.go | 4 +- cmd/rpc/chat-rpc/main.go | 4 +- go.mod | 2 +- go.sum | 4 +- pkg/common/chatrpcstart/start.go | 80 ++++++++++++++++++++++++++------ pkg/common/config/parse.go | 2 +- pkg/util/genutil.go | 52 +++++++++++++++++++++ 9 files changed, 200 insertions(+), 27 deletions(-) create mode 100644 pkg/util/genutil.go diff --git a/cmd/api/admin-api/main.go b/cmd/api/admin-api/main.go index 13a4a2d04e..a0193c926e 100644 --- a/cmd/api/admin-api/main.go +++ b/cmd/api/admin-api/main.go @@ -15,11 +15,16 @@ package main import ( + "context" "fmt" + "github.com/OpenIMSDK/chat/pkg/util" "math/rand" "net" + "net/http" "os" + "os/signal" "strconv" + "syscall" "time" "github.com/OpenIMSDK/tools/errs" @@ -95,8 +100,37 @@ func main() { api.NewAdminRoute(engine, zk) address := net.JoinHostPort(config.Config.AdminApi.ListenIP, strconv.Itoa(ginPort)) - if err := engine.Run(address); err != nil { - fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err) - os.Exit(-1) + + server := http.Server{Addr: address, Handler: engine} + + var ( + netDone = make(chan struct{}, 1) + netErr error + ) + + go func() { + err = server.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + netErr = errs.Wrap(err, fmt.Sprintf("api start err: %s", server.Addr)) + netDone <- struct{}{} + } + }() + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGTERM) + + select { + case <-sigs: + util.SIGTERMExit() + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + err = server.Shutdown(ctx) + if err != nil { + util.ExitWithError(errs.Wrap(err, "shutdown err")) + } + case <-netDone: + close(netDone) + util.ExitWithError(netErr) } + } diff --git a/cmd/api/chat-api/main.go b/cmd/api/chat-api/main.go index 7311b3d512..39d6d4ddc2 100644 --- a/cmd/api/chat-api/main.go +++ b/cmd/api/chat-api/main.go @@ -15,12 +15,17 @@ package main import ( + "context" "flag" "fmt" + "github.com/OpenIMSDK/chat/pkg/util" "math/rand" "net" + "net/http" "os" + "os/signal" "strconv" + "syscall" "time" "github.com/OpenIMSDK/tools/errs" @@ -105,8 +110,36 @@ func main() { api.NewChatRoute(engine, zk) address := net.JoinHostPort(config.Config.ChatApi.ListenIP, strconv.Itoa(ginPort)) - if err := engine.Run(address); err != nil { - fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err) - os.Exit(-1) + + server := http.Server{Addr: address, Handler: engine} + + var ( + netDone = make(chan struct{}, 1) + netErr error + ) + + go func() { + err = server.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + netErr = errs.Wrap(err, fmt.Sprintf("api start err: %s", server.Addr)) + netDone <- struct{}{} + } + }() + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGTERM) + + select { + case <-sigs: + util.SIGTERMExit() + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + err = server.Shutdown(ctx) + if err != nil { + util.ExitWithError(errs.Wrap(err, "shutdown err")) + } + case <-netDone: + close(netDone) + util.ExitWithError(netErr) } } diff --git a/cmd/rpc/admin-rpc/main.go b/cmd/rpc/admin-rpc/main.go index bc1f3252b4..2705e2efd8 100644 --- a/cmd/rpc/admin-rpc/main.go +++ b/cmd/rpc/admin-rpc/main.go @@ -17,6 +17,7 @@ package main import ( "flag" "fmt" + "github.com/OpenIMSDK/chat/pkg/util" "math/rand" "os" "time" @@ -70,7 +71,6 @@ func main() { } err = chatrpcstart.Start(rpcPort, config.Config.RpcRegisterName.OpenImAdminName, 0, admin.Start) if err != nil { - fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err) - os.Exit(-1) + util.ExitWithError(err) } } diff --git a/cmd/rpc/chat-rpc/main.go b/cmd/rpc/chat-rpc/main.go index 472cc46217..abc786ee8e 100644 --- a/cmd/rpc/chat-rpc/main.go +++ b/cmd/rpc/chat-rpc/main.go @@ -16,6 +16,7 @@ package main import ( "fmt" + "github.com/OpenIMSDK/chat/pkg/util" "math/rand" "os" "time" @@ -66,7 +67,6 @@ func main() { } err = chatrpcstart.Start(rpcPort, config.Config.RpcRegisterName.OpenImChatName, 0, chat.Start) if err != nil { - fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err) - os.Exit(-1) + util.ExitWithError(err) } } diff --git a/go.mod b/go.mod index 2a4a2a1728..905cef699f 100644 --- a/go.mod +++ b/go.mod @@ -26,7 +26,7 @@ require ( require ( github.com/OpenIMSDK/protocol v0.0.21 - github.com/OpenIMSDK/tools v0.0.36 + github.com/OpenIMSDK/tools v0.0.37 github.com/livekit/protocol v1.5.0 github.com/redis/go-redis/v9 v9.2.1 github.com/xuri/excelize/v2 v2.8.0 diff --git a/go.sum b/go.sum index 83a0efd434..f080412792 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ github.com/IBM/sarama v1.42.1 h1:wugyWa15TDEHh2kvq2gAy1IHLjEjuYOYgXz/ruC/OSQ= github.com/IBM/sarama v1.42.1/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= github.com/OpenIMSDK/protocol v0.0.21 h1:5H6H+hJ9d/VgRqttvxD/zfK9Asd+4M8Eknk5swSbUVY= github.com/OpenIMSDK/protocol v0.0.21/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= -github.com/OpenIMSDK/tools v0.0.36 h1:BT0q64l4f3QJDW16Rc0uJYt1gQFkiPoUQYQ33vo0EcE= -github.com/OpenIMSDK/tools v0.0.36/go.mod h1:wBfR5CYmEyvxl03QJbTkhz1CluK6J4/lX0lviu8JAjE= +github.com/OpenIMSDK/tools v0.0.37 h1:qvDqmA4RbEJtPjZouWCkVuf/pjm6Y8nUrG5iH2gcnOg= +github.com/OpenIMSDK/tools v0.0.37/go.mod h1:wBfR5CYmEyvxl03QJbTkhz1CluK6J4/lX0lviu8JAjE= github.com/alibabacloud-go/alibabacloud-gateway-spi v0.0.4 h1:iC9YFYKDGEy3n/FtqJnOkZsene9olVspKmkX5A2YBEo= github.com/alibabacloud-go/alibabacloud-gateway-spi v0.0.4/go.mod h1:sCavSAvdzOjul4cEqeVtvlSaSScfNsTQ+46HwlTL1hc= github.com/alibabacloud-go/darabonba-openapi v0.1.18/go.mod h1:PB4HffMhJVmAgNKNq3wYbTUlFvPgxJpTzd1F5pTuUsc= diff --git a/pkg/common/chatrpcstart/start.go b/pkg/common/chatrpcstart/start.go index 5b1fc42589..b5dffbe68c 100644 --- a/pkg/common/chatrpcstart/start.go +++ b/pkg/common/chatrpcstart/start.go @@ -15,9 +15,17 @@ package chatrpcstart import ( + "context" + "errors" "fmt" + "github.com/OpenIMSDK/chat/pkg/util" "net" + "os" + "os/signal" "strconv" + "sync" + "syscall" + "time" "github.com/OpenIMSDK/chat/pkg/common/config" chatMw "github.com/OpenIMSDK/chat/pkg/common/mw" @@ -26,7 +34,6 @@ import ( "github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/mw" "github.com/OpenIMSDK/tools/network" - "github.com/OpenIMSDK/tools/utils" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) @@ -36,10 +43,7 @@ func Start(rpcPort int, rpcRegisterName string, prometheusPort int, rpcFn func(c var zkClient discoveryregistry.SvcDiscoveryRegistry zkClient, err := discovery_register.NewDiscoveryRegister(config.Config.Envs.Discovery) - /* - zkClient, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, - openKeeper.WithFreq(time.Hour), openKeeper.WithUserNameAndPassword(config.Config.Zookeeper.Username, - config.Config.Zookeeper.Password), openKeeper.WithRoundRobin(), openKeeper.WithTimeout(10), openKeeper.WithLogger(log.NewZkLogger()))*/if err != nil { + if err != nil { return errs.Wrap(err, fmt.Sprintf(";the addr is:%v", &config.Config.Zookeeper.ZkAddr)) } // defer zkClient.CloseZK() @@ -47,22 +51,72 @@ func Start(rpcPort int, rpcRegisterName string, prometheusPort int, rpcFn func(c zkClient.AddOption(chatMw.AddUserType(), mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) registerIP, err := network.GetRpcRegisterIP(config.Config.Rpc.RegisterIP) if err != nil { - return utils.Wrap1(err) + return errs.Wrap(err) + } + + rpcTcpAddr := net.JoinHostPort(network.GetListenIP(config.Config.Rpc.ListenIP), strconv.Itoa(rpcPort)) + listener, err := net.Listen("tcp", rpcTcpAddr) + if err != nil { + return errs.Wrap(err) } + defer listener.Close() + srv := grpc.NewServer(append(options, mw.GrpcServer())...) - defer srv.GracefulStop() + once := sync.Once{} + defer func() { + once.Do(srv.GracefulStop) + }() + err = rpcFn(zkClient, srv) if err != nil { return err } + err = zkClient.Register(rpcRegisterName, registerIP, rpcPort, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { - return utils.Wrap1(err) + return errs.Wrap(err) } - listener, err := net.Listen("tcp", net.JoinHostPort(network.GetListenIP(config.Config.Rpc.ListenIP), strconv.Itoa(rpcPort))) - if err != nil { - return utils.Wrap1(err) + + var ( + netDone = make(chan struct{}, 1) + netErr error + ) + + go func() { + err := srv.Serve(listener) + if err != nil { + netErr = errs.Wrap(err, "rpc start err: ", rpcTcpAddr) + netDone <- struct{}{} + } + }() + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGTERM) + select { + case <-sigs: + util.SIGTERMExit() + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + if err := gracefulStopWithCtx(ctx, srv.GracefulStop); err != nil { + return err + } + case <-netDone: + close(netDone) + return netErr + } + return nil +} + +func gracefulStopWithCtx(ctx context.Context, f func()) error { + done := make(chan struct{}, 1) + go func() { + f() + close(done) + }() + select { + case <-ctx.Done(): + return errs.Wrap(errors.New("timeout, ctx graceful stop")) + case <-done: + return nil } - defer listener.Close() - return utils.Wrap1(srv.Serve(listener)) } diff --git a/pkg/common/config/parse.go b/pkg/common/config/parse.go index e80d7f2b5f..d286005226 100644 --- a/pkg/common/config/parse.go +++ b/pkg/common/config/parse.go @@ -220,7 +220,7 @@ func FlagParse() (string, int, bool, error) { var hide bool flag.BoolVar(&hide, "hide", false, "hide the ComponentCheck result") - // Version flag + // Version flagqq var showVersion bool flag.BoolVar(&showVersion, "version", false, "show version and exit") diff --git a/pkg/util/genutil.go b/pkg/util/genutil.go new file mode 100644 index 0000000000..0bb0078202 --- /dev/null +++ b/pkg/util/genutil.go @@ -0,0 +1,52 @@ +// Copyright © 2024 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "fmt" + "os" + "path/filepath" +) + +// OutDir creates the absolute path name from path and checks path exists. +// Returns absolute path including trailing '/' or error if path does not exist. +func OutDir(path string) (string, error) { + outDir, err := filepath.Abs(path) + if err != nil { + return "", err + } + + stat, err := os.Stat(outDir) + if err != nil { + return "", err + } + + if !stat.IsDir() { + return "", fmt.Errorf("output directory %s is not a directory", outDir) + } + outDir += "/" + return outDir, nil +} + +func ExitWithError(err error) { + progName := filepath.Base(os.Args[0]) + fmt.Fprintf(os.Stderr, "%s exit -1: %+v\n", progName, err) + os.Exit(-1) +} + +func SIGTERMExit() { + progName := filepath.Base(os.Args[0]) + fmt.Fprintf(os.Stderr, "Warning %s receive process terminal SIGTERM exit 0\n", progName) +}