diff --git a/build/toolbox/resolver-runner.sh b/build/toolbox/resolver-runner.sh index 7cc484edb..74406aa37 100755 --- a/build/toolbox/resolver-runner.sh +++ b/build/toolbox/resolver-runner.sh @@ -6,7 +6,10 @@ set -o pipefail function finish { echo "sleep 1 second to collect logs" - sleep 1 + sleep 0.5 + # kill child process fstream + pkill -P $$ + sleep 0.5 } trap finish EXIT diff --git a/cmd/workflow/coordinator/main.go b/cmd/workflow/coordinator/main.go index f905dbfff..c66a86b89 100644 --- a/cmd/workflow/coordinator/main.go +++ b/cmd/workflow/coordinator/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "flag" "fmt" "os" @@ -9,6 +10,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/caicloud/cyclone/pkg/common" + "github.com/caicloud/cyclone/pkg/common/signals" utilk8s "github.com/caicloud/cyclone/pkg/util/k8s" "github.com/caicloud/cyclone/pkg/workflow/coordinator" ) @@ -29,9 +31,14 @@ func main() { var err error var message string + ctx, cancel := context.WithCancel(context.Background()) + signals.GracefulShutdown(cancel) defer func() { // graceful showdown, need delay time to collect logs of other containers time.Sleep(exitDelayTime) + cancel() + // sleep 1 second to let the background processes do exit + time.Sleep(time.Second) if err != nil { log.Error(message) os.Exit(1) @@ -49,9 +56,9 @@ func main() { } // New workflow stage coordinator. - c, err := coordinator.NewCoordinator(client) + c, err := coordinator.NewCoordinator(ctx, client) if err != nil { - log.Errorf("New coornidator failed: %v", err) + log.Errorf("New coordinator failed: %v", err) return } diff --git a/pkg/common/signals/signal.go b/pkg/common/signals/signal.go index 24ebd1673..852e13b74 100644 --- a/pkg/common/signals/signal.go +++ b/pkg/common/signals/signal.go @@ -23,10 +23,10 @@ func GracefulShutdown(cancel context.CancelFunc) { signal.Notify(c, os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) go func() { s := <-c - log.WithField("signal", s).Debug("System signal caught, cancel context.") + log.WithField("signal", s).Info("System signal caught, cancel context.") cancel() s = <-c - log.WithField("signal", s).Debug("Another system signal caught, exit directly.") + log.WithField("signal", s).Info("Another system signal caught, exit directly.") os.Exit(1) }() } diff --git a/pkg/server/handler/v1alpha1/workflowrun.go b/pkg/server/handler/v1alpha1/workflowrun.go index 4ef4b05a4..4807a1c2d 100644 --- a/pkg/server/handler/v1alpha1/workflowrun.go +++ b/pkg/server/handler/v1alpha1/workflowrun.go @@ -324,16 +324,19 @@ func receiveContainerLogStream(tenant, project, workflow, workflowrun, stage, co } defer file.Close() + // Send ping message periodically to keep the connection not idle. + go ping(ws) + var message []byte for { _, message, err = ws.ReadMessage() if err != nil { - if !websocket.IsUnexpectedCloseError(err, websocket.CloseAbnormalClosure) { - return nil + if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) { + log.Errorf("read message for %s/%s/%s/%s error: %v", tenant, workflowrun, stage, container, err) + return err } - log.Infoln(err) - return err + return nil } _, err = file.Write(message) if err != nil { @@ -342,6 +345,23 @@ func receiveContainerLogStream(tenant, project, workflow, workflowrun, stage, co } } +func ping(ws *websocket.Conn) { + pingTicker := time.NewTicker(websocketutil.PingPeriod) + defer func() { + pingTicker.Stop() + }() + + for range pingTicker.C { + if err := ws.SetWriteDeadline(time.Now().Add(websocketutil.WriteWait)); err != nil { + return + } + + if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil { + return + } + } +} + // GetContainerLogStream gets real-time logs of a certain stage. func GetContainerLogStream(ctx context.Context, project, workflow, workflowrun, tenant, stage string) error { request := contextutil.GetHTTPRequest(ctx) @@ -377,6 +397,7 @@ func getContainerLogStream(tenant, project, workflow, workflowrun, stage string, defer folderReader.Close() go watchStageTermination(common.TenantNamespace(tenant), workflowrun, stage, cancel) + err = websocketutil.Write(ws, folderReader, ctx.Done()) if err != nil { log.Error("websocket writer error:", err) diff --git a/pkg/util/websocket/websocket.go b/pkg/util/websocket/websocket.go index f4b72e5fb..0017c8105 100644 --- a/pkg/util/websocket/websocket.go +++ b/pkg/util/websocket/websocket.go @@ -3,6 +3,7 @@ package websocket import ( "bufio" "io" + "net" "net/http" "net/url" "strings" @@ -93,69 +94,92 @@ type ReadBytes interface { // Write writes message from reader to websocket func Write(ws *websocket.Conn, reader ReadBytes, stop <-chan struct{}) error { + go func() { + // Handle ping message send by peer, since the ping Handler function will be + // called from the NextReader, ReadMessage and message reader Read methods. + for { + _, _, err := ws.ReadMessage() + if err != nil { + return + } + } + }() + + ws.SetPongHandler(func(string) error { + log.Info("Handle pong, Connection OK") + return nil + }) + + ws.SetPingHandler(func(message string) error { + log.Info("Handle ping ...") + err := ws.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(WriteWait)) + if err == websocket.ErrCloseSent { + return nil + } else if e, ok := err.(net.Error); ok && e.Temporary() { + return nil + } + return err + }) + pingTicker := time.NewTicker(PingPeriod) sendTicker := time.NewTicker(10 * time.Millisecond) - exit := make(chan struct{}) defer func() { - log.Info("close ticker and websocket") + log.Info("close pingTicker and sendTicker") pingTicker.Stop() sendTicker.Stop() - ws.Close() - close(exit) }() for { select { case <-pingTicker.C: - err := ws.SetWriteDeadline(time.Now().Add(WriteWait)) - if err != nil { + if err := ws.SetWriteDeadline(time.Now().Add(WriteWait)); err != nil { log.Warning("set write deadline error:", err) } if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil { log.Warning("write ping message error:", err) - if !websocket.IsUnexpectedCloseError(err, websocket.CloseAbnormalClosure) { - return nil + if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) { + return err } - return err + return nil } case <-sendTicker.C: // With buf.ReadBytes, when err is not nil (often io.EOF), line is not guaranteed to be empty, // it holds data before the error occurs. - line, err := reader.ReadBytes('\n') - if err != nil && err != io.EOF { - log.Warning("folder reader read bytes error:", err) - err = ws.SetWriteDeadline(time.Now().Add(WriteWait)) - if err != nil { + line, readErr := reader.ReadBytes('\n') + if readErr != nil && readErr != io.EOF { + log.Warning("reader read bytes error:", readErr) + if err := ws.SetWriteDeadline(time.Now().Add(WriteWait)); err != nil { log.Warning("set write deadline error:", err) } - err = ws.WriteMessage(websocket.CloseMessage, []byte("Interval error happens, TERMINATE")) - if err != nil { + + closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "Interval error happens, TERMINATE") + if err := ws.WriteMessage(websocket.CloseMessage, closeMsg); err != nil { log.Warning("write close message error:", err) } break } if len(line) > 0 { - err = ws.SetWriteDeadline(time.Now().Add(WriteWait)) - if err != nil { + if err := ws.SetWriteDeadline(time.Now().Add(WriteWait)); err != nil { log.Warning("set write deadline error:", err) } - err = ws.WriteMessage(websocket.TextMessage, line) - if err != nil { + + if err := ws.WriteMessage(websocket.TextMessage, line); err != nil { log.Warning("write text message error:", err) - if !websocket.IsUnexpectedCloseError(err, websocket.CloseAbnormalClosure) { - return nil + if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) { + return err } - return err + return nil } } case <-stop: - err := ws.SetWriteDeadline(time.Now().Add(WriteWait)) - if err != nil { + log.Info("receive stop signal") + if err := ws.SetWriteDeadline(time.Now().Add(WriteWait)); err != nil { log.Warning("set write deadline error:", err) } - err = ws.WriteMessage(websocket.CloseMessage, []byte("Message sending complete, TERMINATE")) - if err != nil { + + closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "Stop sending message, TERMINATE") + if err := ws.WriteMessage(websocket.CloseMessage, closeMsg); err != nil { log.Error("write close message error: ", err) return err } diff --git a/pkg/workflow/coordinator/coordinator.go b/pkg/workflow/coordinator/coordinator.go index a6e1b5be2..a71672d94 100644 --- a/pkg/workflow/coordinator/coordinator.go +++ b/pkg/workflow/coordinator/coordinator.go @@ -1,6 +1,7 @@ package coordinator import ( + "context" "encoding/json" "fmt" "io/ioutil" @@ -30,6 +31,7 @@ type Coordinator struct { Wfr *v1alpha1.WorkflowRun // OutputResources represents output resources the related stage configured. OutputResources []*v1alpha1.Resource + ctx context.Context } // RuntimeExecutor is an interface defined some methods @@ -38,9 +40,9 @@ type RuntimeExecutor interface { // WaitContainers waits selected containers to state. WaitContainers(state common.ContainerState, selectors ...common.ContainerSelector) error // CollectLog collects container logs to cyclone server. - CollectLog(container, wrorkflowrun, stage string) error + CollectLog(container, wrorkflowrun, stage string, close <-chan struct{}) error // MarkLogEOF marks end of stage logs. - MarkLogEOF(workflowrun, stage string) error + MarkLogEOF(workflowrun, stage string, close <-chan struct{}) error // CopyFromContainer copy a file or directory from container:path to dst. CopyFromContainer(container, path, dst string) error // GetPod get the stage related pod. @@ -51,7 +53,7 @@ type RuntimeExecutor interface { } // NewCoordinator create a coordinator instance. -func NewCoordinator(client clientset.Interface) (*Coordinator, error) { +func NewCoordinator(ctx context.Context, client clientset.Interface) (*Coordinator, error) { // Get stage from Env var stage *v1alpha1.Stage stageInfo := os.Getenv(common.EnvStageInfo) @@ -94,6 +96,7 @@ func NewCoordinator(client clientset.Interface) (*Coordinator, error) { Stage: stage, Wfr: wfr, OutputResources: rscs, + ctx: ctx, }, nil } @@ -105,12 +108,12 @@ func (co *Coordinator) CollectLogs() error { } for _, c := range cs { - go func(container, workflowrun, stage string) { - err := co.runtimeExec.CollectLog(container, workflowrun, stage) + go func(container, workflowrun, stage string, done <-chan struct{}) { + err := co.runtimeExec.CollectLog(container, workflowrun, stage, done) if err != nil { log.Errorf("Collect %s log failed:%v", container, err) } - }(c, co.Wfr.Name, co.Stage.Name) + }(c, co.Wfr.Name, co.Stage.Name, co.ctx.Done()) } return nil @@ -118,7 +121,7 @@ func (co *Coordinator) CollectLogs() error { // MarkLogEOF marks end of stage logs. func (co *Coordinator) MarkLogEOF() error { - return co.runtimeExec.MarkLogEOF(co.Wfr.Name, co.Stage.Name) + return co.runtimeExec.MarkLogEOF(co.Wfr.Name, co.Stage.Name, co.ctx.Done()) } // WaitRunning waits all containers to start run. diff --git a/pkg/workflow/coordinator/cycloneserver/client.go b/pkg/workflow/coordinator/cycloneserver/client.go index 509be52a9..e3d7d338d 100644 --- a/pkg/workflow/coordinator/cycloneserver/client.go +++ b/pkg/workflow/coordinator/cycloneserver/client.go @@ -7,8 +7,6 @@ import ( "net/url" "strings" - log "github.com/sirupsen/logrus" - websocketutil "github.com/caicloud/cyclone/pkg/util/websocket" ) @@ -20,7 +18,7 @@ const ( // Client ... type Client interface { - PushLogStream(ns, workflowrun, stage, container string, reader io.Reader) error + PushLogStream(ns, workflowrun, stage, container string, reader io.Reader, close <-chan struct{}) error } type client struct { @@ -42,7 +40,7 @@ func NewClient(cycloneServer string) Client { } // PushLogStream ... -func (c *client) PushLogStream(ns, workflowrun, stage, container string, reader io.Reader) error { +func (c *client) PushLogStream(ns, workflowrun, stage, container string, reader io.Reader, close <-chan struct{}) error { path := fmt.Sprintf(apiPathForLogStream, workflowrun) host := strings.TrimPrefix(c.baseURL, "http://") host = strings.TrimPrefix(host, "https://") @@ -53,6 +51,5 @@ func (c *client) PushLogStream(ns, workflowrun, stage, container string, reader Scheme: "ws", } - log.Infof("Path: %s", requestURL.String()) - return websocketutil.SendStream(requestURL.String(), reader, make(chan struct{})) + return websocketutil.SendStream(requestURL.String(), reader, close) } diff --git a/pkg/workflow/coordinator/k8sapi/k8sapi.go b/pkg/workflow/coordinator/k8sapi/k8sapi.go index e629109fc..f6c524f18 100644 --- a/pkg/workflow/coordinator/k8sapi/k8sapi.go +++ b/pkg/workflow/coordinator/k8sapi/k8sapi.go @@ -98,7 +98,7 @@ func (k *Executor) GetPod() (*core_v1.Pod, error) { } // CollectLog collects container logs. -func (k *Executor) CollectLog(container, workflowrun, stage string) error { +func (k *Executor) CollectLog(container, workflowrun, stage string, close <-chan struct{}) error { log.Infof("Start to collect %s log", container) stream, err := k.client.CoreV1().Pods(k.namespace).GetLogs(k.podName, &core_v1.PodLogOptions{ Container: container, @@ -112,7 +112,7 @@ func (k *Executor) CollectLog(container, workflowrun, stage string) error { stream.Close() }() - err = k.cycloneClient.PushLogStream(k.metaNamespace, workflowrun, stage, container, stream) + err = k.cycloneClient.PushLogStream(k.metaNamespace, workflowrun, stage, container, stream, close) if err != nil { return err } @@ -120,8 +120,8 @@ func (k *Executor) CollectLog(container, workflowrun, stage string) error { } // MarkLogEOF marks the end of stage logs -func (k *Executor) MarkLogEOF(workflowrun, stage string) error { - err := k.cycloneClient.PushLogStream(k.metaNamespace, workflowrun, stage, cyclone_common.FolderEOFFile, strings.NewReader("")) +func (k *Executor) MarkLogEOF(workflowrun, stage string, close <-chan struct{}) error { + err := k.cycloneClient.PushLogStream(k.metaNamespace, workflowrun, stage, cyclone_common.FolderEOFFile, strings.NewReader(""), close) if err != nil { return err } @@ -135,7 +135,7 @@ func (k *Executor) CopyFromContainer(container, path, dst string) error { cmd := exec.Command("docker", args...) log.WithField("args", args).Info() ret, err := cmd.CombinedOutput() - log.WithField("message", string(ret)).WithField("error", err).Info("copy file result") + log.WithField("message", string(ret)).WithField("error", err).WithField("container", container).Info("copy file result") if err != nil { return fmt.Errorf("%s, error: %v", string(ret), err) }