Skip to content

Commit

Permalink
fix: sending ping message for websocket (#1422) (#1420)
Browse files Browse the repository at this point in the history
* fix: sending ping message for websocket

- Stream logs receiver sends ping message periodically to keep the
connection not idle. The reason why we need this is sender peer may
stuck on reading kubernetes stream logs and could not have a chance
to send ping message periodically.

- Add somes log

* fix: make coordinator gracefully shuwdown config simpler
  • Loading branch information
zhujian7 committed May 27, 2020
1 parent d74266b commit 67463a9
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 54 deletions.
5 changes: 4 additions & 1 deletion build/toolbox/resolver-runner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ set -o pipefail

function finish {
echo "sleep 1 second to collect logs"
sleep 1
sleep 0.5
# kill child process fstream
pkill -P $$
sleep 0.5
}
trap finish EXIT

Expand Down
11 changes: 9 additions & 2 deletions cmd/workflow/coordinator/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"flag"
"fmt"
"os"
Expand All @@ -9,6 +10,7 @@ import (
log "github.com/sirupsen/logrus"

"github.com/caicloud/cyclone/pkg/common"
"github.com/caicloud/cyclone/pkg/common/signals"
utilk8s "github.com/caicloud/cyclone/pkg/util/k8s"
"github.com/caicloud/cyclone/pkg/workflow/coordinator"
)
Expand All @@ -29,9 +31,14 @@ func main() {

var err error
var message string
ctx, cancel := context.WithCancel(context.Background())
signals.GracefulShutdown(cancel)
defer func() {
// graceful showdown, need delay time to collect logs of other containers
time.Sleep(exitDelayTime)
cancel()
// sleep 1 second to let the background processes do exit
time.Sleep(time.Second)
if err != nil {
log.Error(message)
os.Exit(1)
Expand All @@ -49,9 +56,9 @@ func main() {
}

// New workflow stage coordinator.
c, err := coordinator.NewCoordinator(client)
c, err := coordinator.NewCoordinator(ctx, client)
if err != nil {
log.Errorf("New coornidator failed: %v", err)
log.Errorf("New coordinator failed: %v", err)
return
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/common/signals/signal.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ func GracefulShutdown(cancel context.CancelFunc) {
signal.Notify(c, os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
go func() {
s := <-c
log.WithField("signal", s).Debug("System signal caught, cancel context.")
log.WithField("signal", s).Info("System signal caught, cancel context.")
cancel()
s = <-c
log.WithField("signal", s).Debug("Another system signal caught, exit directly.")
log.WithField("signal", s).Info("Another system signal caught, exit directly.")
os.Exit(1)
}()
}
29 changes: 25 additions & 4 deletions pkg/server/handler/v1alpha1/workflowrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,16 +324,19 @@ func receiveContainerLogStream(tenant, project, workflow, workflowrun, stage, co
}
defer file.Close()

// Send ping message periodically to keep the connection not idle.
go ping(ws)

var message []byte
for {
_, message, err = ws.ReadMessage()
if err != nil {
if !websocket.IsUnexpectedCloseError(err, websocket.CloseAbnormalClosure) {
return nil
if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
log.Errorf("read message for %s/%s/%s/%s error: %v", tenant, workflowrun, stage, container, err)
return err
}

log.Infoln(err)
return err
return nil
}
_, err = file.Write(message)
if err != nil {
Expand All @@ -342,6 +345,23 @@ func receiveContainerLogStream(tenant, project, workflow, workflowrun, stage, co
}
}

func ping(ws *websocket.Conn) {
pingTicker := time.NewTicker(websocketutil.PingPeriod)
defer func() {
pingTicker.Stop()
}()

for range pingTicker.C {
if err := ws.SetWriteDeadline(time.Now().Add(websocketutil.WriteWait)); err != nil {
return
}

if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
return
}
}
}

// GetContainerLogStream gets real-time logs of a certain stage.
func GetContainerLogStream(ctx context.Context, project, workflow, workflowrun, tenant, stage string) error {
request := contextutil.GetHTTPRequest(ctx)
Expand Down Expand Up @@ -377,6 +397,7 @@ func getContainerLogStream(tenant, project, workflow, workflowrun, stage string,
defer folderReader.Close()

go watchStageTermination(common.TenantNamespace(tenant), workflowrun, stage, cancel)

err = websocketutil.Write(ws, folderReader, ctx.Done())
if err != nil {
log.Error("websocket writer error:", err)
Expand Down
78 changes: 51 additions & 27 deletions pkg/util/websocket/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package websocket
import (
"bufio"
"io"
"net"
"net/http"
"net/url"
"strings"
Expand Down Expand Up @@ -93,69 +94,92 @@ type ReadBytes interface {

// Write writes message from reader to websocket
func Write(ws *websocket.Conn, reader ReadBytes, stop <-chan struct{}) error {
go func() {
// Handle ping message send by peer, since the ping Handler function will be
// called from the NextReader, ReadMessage and message reader Read methods.
for {
_, _, err := ws.ReadMessage()
if err != nil {
return
}
}
}()

ws.SetPongHandler(func(string) error {
log.Info("Handle pong, Connection OK")
return nil
})

ws.SetPingHandler(func(message string) error {
log.Info("Handle ping ...")
err := ws.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(WriteWait))
if err == websocket.ErrCloseSent {
return nil
} else if e, ok := err.(net.Error); ok && e.Temporary() {
return nil
}
return err
})

pingTicker := time.NewTicker(PingPeriod)
sendTicker := time.NewTicker(10 * time.Millisecond)
exit := make(chan struct{})
defer func() {
log.Info("close ticker and websocket")
log.Info("close pingTicker and sendTicker")
pingTicker.Stop()
sendTicker.Stop()
ws.Close()
close(exit)
}()

for {
select {
case <-pingTicker.C:
err := ws.SetWriteDeadline(time.Now().Add(WriteWait))
if err != nil {
if err := ws.SetWriteDeadline(time.Now().Add(WriteWait)); err != nil {
log.Warning("set write deadline error:", err)
}
if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
log.Warning("write ping message error:", err)
if !websocket.IsUnexpectedCloseError(err, websocket.CloseAbnormalClosure) {
return nil
if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
return err
}
return err
return nil
}
case <-sendTicker.C:
// With buf.ReadBytes, when err is not nil (often io.EOF), line is not guaranteed to be empty,
// it holds data before the error occurs.
line, err := reader.ReadBytes('\n')
if err != nil && err != io.EOF {
log.Warning("folder reader read bytes error:", err)
err = ws.SetWriteDeadline(time.Now().Add(WriteWait))
if err != nil {
line, readErr := reader.ReadBytes('\n')
if readErr != nil && readErr != io.EOF {
log.Warning("reader read bytes error:", readErr)
if err := ws.SetWriteDeadline(time.Now().Add(WriteWait)); err != nil {
log.Warning("set write deadline error:", err)
}
err = ws.WriteMessage(websocket.CloseMessage, []byte("Interval error happens, TERMINATE"))
if err != nil {

closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "Interval error happens, TERMINATE")
if err := ws.WriteMessage(websocket.CloseMessage, closeMsg); err != nil {
log.Warning("write close message error:", err)
}
break
}

if len(line) > 0 {
err = ws.SetWriteDeadline(time.Now().Add(WriteWait))
if err != nil {
if err := ws.SetWriteDeadline(time.Now().Add(WriteWait)); err != nil {
log.Warning("set write deadline error:", err)
}
err = ws.WriteMessage(websocket.TextMessage, line)
if err != nil {

if err := ws.WriteMessage(websocket.TextMessage, line); err != nil {
log.Warning("write text message error:", err)
if !websocket.IsUnexpectedCloseError(err, websocket.CloseAbnormalClosure) {
return nil
if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
return err
}
return err
return nil
}
}
case <-stop:
err := ws.SetWriteDeadline(time.Now().Add(WriteWait))
if err != nil {
log.Info("receive stop signal")
if err := ws.SetWriteDeadline(time.Now().Add(WriteWait)); err != nil {
log.Warning("set write deadline error:", err)
}
err = ws.WriteMessage(websocket.CloseMessage, []byte("Message sending complete, TERMINATE"))
if err != nil {

closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "Stop sending message, TERMINATE")
if err := ws.WriteMessage(websocket.CloseMessage, closeMsg); err != nil {
log.Error("write close message error: ", err)
return err
}
Expand Down
17 changes: 10 additions & 7 deletions pkg/workflow/coordinator/coordinator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package coordinator

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -30,6 +31,7 @@ type Coordinator struct {
Wfr *v1alpha1.WorkflowRun
// OutputResources represents output resources the related stage configured.
OutputResources []*v1alpha1.Resource
ctx context.Context
}

// RuntimeExecutor is an interface defined some methods
Expand All @@ -38,9 +40,9 @@ type RuntimeExecutor interface {
// WaitContainers waits selected containers to state.
WaitContainers(state common.ContainerState, selectors ...common.ContainerSelector) error
// CollectLog collects container logs to cyclone server.
CollectLog(container, wrorkflowrun, stage string) error
CollectLog(container, wrorkflowrun, stage string, close <-chan struct{}) error
// MarkLogEOF marks end of stage logs.
MarkLogEOF(workflowrun, stage string) error
MarkLogEOF(workflowrun, stage string, close <-chan struct{}) error
// CopyFromContainer copy a file or directory from container:path to dst.
CopyFromContainer(container, path, dst string) error
// GetPod get the stage related pod.
Expand All @@ -51,7 +53,7 @@ type RuntimeExecutor interface {
}

// NewCoordinator create a coordinator instance.
func NewCoordinator(client clientset.Interface) (*Coordinator, error) {
func NewCoordinator(ctx context.Context, client clientset.Interface) (*Coordinator, error) {
// Get stage from Env
var stage *v1alpha1.Stage
stageInfo := os.Getenv(common.EnvStageInfo)
Expand Down Expand Up @@ -94,6 +96,7 @@ func NewCoordinator(client clientset.Interface) (*Coordinator, error) {
Stage: stage,
Wfr: wfr,
OutputResources: rscs,
ctx: ctx,
}, nil
}

Expand All @@ -105,20 +108,20 @@ func (co *Coordinator) CollectLogs() error {
}

for _, c := range cs {
go func(container, workflowrun, stage string) {
err := co.runtimeExec.CollectLog(container, workflowrun, stage)
go func(container, workflowrun, stage string, done <-chan struct{}) {
err := co.runtimeExec.CollectLog(container, workflowrun, stage, done)
if err != nil {
log.Errorf("Collect %s log failed:%v", container, err)
}
}(c, co.Wfr.Name, co.Stage.Name)
}(c, co.Wfr.Name, co.Stage.Name, co.ctx.Done())
}

return nil
}

// MarkLogEOF marks end of stage logs.
func (co *Coordinator) MarkLogEOF() error {
return co.runtimeExec.MarkLogEOF(co.Wfr.Name, co.Stage.Name)
return co.runtimeExec.MarkLogEOF(co.Wfr.Name, co.Stage.Name, co.ctx.Done())
}

// WaitRunning waits all containers to start run.
Expand Down
9 changes: 3 additions & 6 deletions pkg/workflow/coordinator/cycloneserver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"net/url"
"strings"

log "github.com/sirupsen/logrus"

websocketutil "github.com/caicloud/cyclone/pkg/util/websocket"
)

Expand All @@ -20,7 +18,7 @@ const (

// Client ...
type Client interface {
PushLogStream(ns, workflowrun, stage, container string, reader io.Reader) error
PushLogStream(ns, workflowrun, stage, container string, reader io.Reader, close <-chan struct{}) error
}

type client struct {
Expand All @@ -42,7 +40,7 @@ func NewClient(cycloneServer string) Client {
}

// PushLogStream ...
func (c *client) PushLogStream(ns, workflowrun, stage, container string, reader io.Reader) error {
func (c *client) PushLogStream(ns, workflowrun, stage, container string, reader io.Reader, close <-chan struct{}) error {
path := fmt.Sprintf(apiPathForLogStream, workflowrun)
host := strings.TrimPrefix(c.baseURL, "http://")
host = strings.TrimPrefix(host, "https://")
Expand All @@ -53,6 +51,5 @@ func (c *client) PushLogStream(ns, workflowrun, stage, container string, reader
Scheme: "ws",
}

log.Infof("Path: %s", requestURL.String())
return websocketutil.SendStream(requestURL.String(), reader, make(chan struct{}))
return websocketutil.SendStream(requestURL.String(), reader, close)
}
10 changes: 5 additions & 5 deletions pkg/workflow/coordinator/k8sapi/k8sapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (k *Executor) GetPod() (*core_v1.Pod, error) {
}

// CollectLog collects container logs.
func (k *Executor) CollectLog(container, workflowrun, stage string) error {
func (k *Executor) CollectLog(container, workflowrun, stage string, close <-chan struct{}) error {
log.Infof("Start to collect %s log", container)
stream, err := k.client.CoreV1().Pods(k.namespace).GetLogs(k.podName, &core_v1.PodLogOptions{
Container: container,
Expand All @@ -112,16 +112,16 @@ func (k *Executor) CollectLog(container, workflowrun, stage string) error {
stream.Close()
}()

err = k.cycloneClient.PushLogStream(k.metaNamespace, workflowrun, stage, container, stream)
err = k.cycloneClient.PushLogStream(k.metaNamespace, workflowrun, stage, container, stream, close)
if err != nil {
return err
}
return nil
}

// MarkLogEOF marks the end of stage logs
func (k *Executor) MarkLogEOF(workflowrun, stage string) error {
err := k.cycloneClient.PushLogStream(k.metaNamespace, workflowrun, stage, cyclone_common.FolderEOFFile, strings.NewReader(""))
func (k *Executor) MarkLogEOF(workflowrun, stage string, close <-chan struct{}) error {
err := k.cycloneClient.PushLogStream(k.metaNamespace, workflowrun, stage, cyclone_common.FolderEOFFile, strings.NewReader(""), close)
if err != nil {
return err
}
Expand All @@ -135,7 +135,7 @@ func (k *Executor) CopyFromContainer(container, path, dst string) error {
cmd := exec.Command("docker", args...)
log.WithField("args", args).Info()
ret, err := cmd.CombinedOutput()
log.WithField("message", string(ret)).WithField("error", err).Info("copy file result")
log.WithField("message", string(ret)).WithField("error", err).WithField("container", container).Info("copy file result")
if err != nil {
return fmt.Errorf("%s, error: %v", string(ret), err)
}
Expand Down

0 comments on commit 67463a9

Please sign in to comment.