Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cps-2.8]:fix: sending ping message for websocket #1420

Merged
merged 1 commit into from
May 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion build/toolbox/resolver-runner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ set -o pipefail

function finish {
echo "sleep 1 second to collect logs"
sleep 1
sleep 0.5
# kill child process fstream
pkill -P $$
sleep 0.5
}
trap finish EXIT

Expand Down
11 changes: 9 additions & 2 deletions cmd/workflow/coordinator/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"flag"
"fmt"
"os"
Expand All @@ -9,6 +10,7 @@ import (
log "github.com/sirupsen/logrus"

"github.com/caicloud/cyclone/pkg/common"
"github.com/caicloud/cyclone/pkg/common/signals"
utilk8s "github.com/caicloud/cyclone/pkg/util/k8s"
"github.com/caicloud/cyclone/pkg/workflow/coordinator"
)
Expand All @@ -29,9 +31,14 @@ func main() {

var err error
var message string
ctx, cancel := context.WithCancel(context.Background())
signals.GracefulShutdown(cancel)
defer func() {
// graceful showdown, need delay time to collect logs of other containers
time.Sleep(exitDelayTime)
cancel()
// sleep 1 second to let the background processes do exit
time.Sleep(time.Second)
if err != nil {
log.Error(message)
os.Exit(1)
Expand All @@ -49,9 +56,9 @@ func main() {
}

// New workflow stage coordinator.
c, err := coordinator.NewCoordinator(client)
c, err := coordinator.NewCoordinator(ctx, client)
if err != nil {
log.Errorf("New coornidator failed: %v", err)
log.Errorf("New coordinator failed: %v", err)
return
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/common/signals/signal.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ func GracefulShutdown(cancel context.CancelFunc) {
signal.Notify(c, os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
go func() {
s := <-c
log.WithField("signal", s).Debug("System signal caught, cancel context.")
log.WithField("signal", s).Info("System signal caught, cancel context.")
cancel()
s = <-c
log.WithField("signal", s).Debug("Another system signal caught, exit directly.")
log.WithField("signal", s).Info("Another system signal caught, exit directly.")
os.Exit(1)
}()
}
29 changes: 25 additions & 4 deletions pkg/server/handler/v1alpha1/workflowrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,16 +324,19 @@ func receiveContainerLogStream(tenant, project, workflow, workflowrun, stage, co
}
defer file.Close()

// Send ping message periodically to keep the connection not idle.
go ping(ws)

var message []byte
for {
_, message, err = ws.ReadMessage()
if err != nil {
if !websocket.IsUnexpectedCloseError(err, websocket.CloseAbnormalClosure) {
return nil
if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
log.Errorf("read message for %s/%s/%s/%s error: %v", tenant, workflowrun, stage, container, err)
return err
}

log.Infoln(err)
return err
return nil
}
_, err = file.Write(message)
if err != nil {
Expand All @@ -342,6 +345,23 @@ func receiveContainerLogStream(tenant, project, workflow, workflowrun, stage, co
}
}

func ping(ws *websocket.Conn) {
pingTicker := time.NewTicker(websocketutil.PingPeriod)
defer func() {
pingTicker.Stop()
}()

for range pingTicker.C {
if err := ws.SetWriteDeadline(time.Now().Add(websocketutil.WriteWait)); err != nil {
return
}

if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
return
}
}
}

// GetContainerLogStream gets real-time logs of a certain stage.
func GetContainerLogStream(ctx context.Context, project, workflow, workflowrun, tenant, stage string) error {
request := contextutil.GetHTTPRequest(ctx)
Expand Down Expand Up @@ -377,6 +397,7 @@ func getContainerLogStream(tenant, project, workflow, workflowrun, stage string,
defer folderReader.Close()

go watchStageTermination(common.TenantNamespace(tenant), workflowrun, stage, cancel)

err = websocketutil.Write(ws, folderReader, ctx.Done())
if err != nil {
log.Error("websocket writer error:", err)
Expand Down
78 changes: 51 additions & 27 deletions pkg/util/websocket/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package websocket
import (
"bufio"
"io"
"net"
"net/http"
"net/url"
"strings"
Expand Down Expand Up @@ -93,69 +94,92 @@ type ReadBytes interface {

// Write writes message from reader to websocket
func Write(ws *websocket.Conn, reader ReadBytes, stop <-chan struct{}) error {
go func() {
// Handle ping message send by peer, since the ping Handler function will be
// called from the NextReader, ReadMessage and message reader Read methods.
for {
_, _, err := ws.ReadMessage()
if err != nil {
return
}
}
}()

ws.SetPongHandler(func(string) error {
log.Info("Handle pong, Connection OK")
return nil
})

ws.SetPingHandler(func(message string) error {
log.Info("Handle ping ...")
err := ws.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(WriteWait))
if err == websocket.ErrCloseSent {
return nil
} else if e, ok := err.(net.Error); ok && e.Temporary() {
return nil
}
return err
})

pingTicker := time.NewTicker(PingPeriod)
sendTicker := time.NewTicker(10 * time.Millisecond)
exit := make(chan struct{})
defer func() {
log.Info("close ticker and websocket")
log.Info("close pingTicker and sendTicker")
pingTicker.Stop()
sendTicker.Stop()
ws.Close()
close(exit)
}()

for {
select {
case <-pingTicker.C:
err := ws.SetWriteDeadline(time.Now().Add(WriteWait))
if err != nil {
if err := ws.SetWriteDeadline(time.Now().Add(WriteWait)); err != nil {
log.Warning("set write deadline error:", err)
}
if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
log.Warning("write ping message error:", err)
if !websocket.IsUnexpectedCloseError(err, websocket.CloseAbnormalClosure) {
return nil
if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
return err
}
return err
return nil
}
case <-sendTicker.C:
// With buf.ReadBytes, when err is not nil (often io.EOF), line is not guaranteed to be empty,
// it holds data before the error occurs.
line, err := reader.ReadBytes('\n')
if err != nil && err != io.EOF {
log.Warning("folder reader read bytes error:", err)
err = ws.SetWriteDeadline(time.Now().Add(WriteWait))
if err != nil {
line, readErr := reader.ReadBytes('\n')
if readErr != nil && readErr != io.EOF {
log.Warning("reader read bytes error:", readErr)
if err := ws.SetWriteDeadline(time.Now().Add(WriteWait)); err != nil {
log.Warning("set write deadline error:", err)
}
err = ws.WriteMessage(websocket.CloseMessage, []byte("Interval error happens, TERMINATE"))
if err != nil {

closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "Interval error happens, TERMINATE")
if err := ws.WriteMessage(websocket.CloseMessage, closeMsg); err != nil {
log.Warning("write close message error:", err)
}
break
}

if len(line) > 0 {
err = ws.SetWriteDeadline(time.Now().Add(WriteWait))
if err != nil {
if err := ws.SetWriteDeadline(time.Now().Add(WriteWait)); err != nil {
log.Warning("set write deadline error:", err)
}
err = ws.WriteMessage(websocket.TextMessage, line)
if err != nil {

if err := ws.WriteMessage(websocket.TextMessage, line); err != nil {
log.Warning("write text message error:", err)
if !websocket.IsUnexpectedCloseError(err, websocket.CloseAbnormalClosure) {
return nil
if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
return err
}
return err
return nil
}
}
case <-stop:
err := ws.SetWriteDeadline(time.Now().Add(WriteWait))
if err != nil {
log.Info("receive stop signal")
if err := ws.SetWriteDeadline(time.Now().Add(WriteWait)); err != nil {
log.Warning("set write deadline error:", err)
}
err = ws.WriteMessage(websocket.CloseMessage, []byte("Message sending complete, TERMINATE"))
if err != nil {

closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "Stop sending message, TERMINATE")
if err := ws.WriteMessage(websocket.CloseMessage, closeMsg); err != nil {
log.Error("write close message error: ", err)
return err
}
Expand Down
17 changes: 10 additions & 7 deletions pkg/workflow/coordinator/coordinator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package coordinator

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -30,6 +31,7 @@ type Coordinator struct {
Wfr *v1alpha1.WorkflowRun
// OutputResources represents output resources the related stage configured.
OutputResources []*v1alpha1.Resource
ctx context.Context
}

// RuntimeExecutor is an interface defined some methods
Expand All @@ -38,9 +40,9 @@ type RuntimeExecutor interface {
// WaitContainers waits selected containers to state.
WaitContainers(state common.ContainerState, selectors ...common.ContainerSelector) error
// CollectLog collects container logs to cyclone server.
CollectLog(container, wrorkflowrun, stage string) error
CollectLog(container, wrorkflowrun, stage string, close <-chan struct{}) error
// MarkLogEOF marks end of stage logs.
MarkLogEOF(workflowrun, stage string) error
MarkLogEOF(workflowrun, stage string, close <-chan struct{}) error
// CopyFromContainer copy a file or directory from container:path to dst.
CopyFromContainer(container, path, dst string) error
// GetPod get the stage related pod.
Expand All @@ -51,7 +53,7 @@ type RuntimeExecutor interface {
}

// NewCoordinator create a coordinator instance.
func NewCoordinator(client clientset.Interface) (*Coordinator, error) {
func NewCoordinator(ctx context.Context, client clientset.Interface) (*Coordinator, error) {
// Get stage from Env
var stage *v1alpha1.Stage
stageInfo := os.Getenv(common.EnvStageInfo)
Expand Down Expand Up @@ -94,6 +96,7 @@ func NewCoordinator(client clientset.Interface) (*Coordinator, error) {
Stage: stage,
Wfr: wfr,
OutputResources: rscs,
ctx: ctx,
}, nil
}

Expand All @@ -105,20 +108,20 @@ func (co *Coordinator) CollectLogs() error {
}

for _, c := range cs {
go func(container, workflowrun, stage string) {
err := co.runtimeExec.CollectLog(container, workflowrun, stage)
go func(container, workflowrun, stage string, done <-chan struct{}) {
err := co.runtimeExec.CollectLog(container, workflowrun, stage, done)
if err != nil {
log.Errorf("Collect %s log failed:%v", container, err)
}
}(c, co.Wfr.Name, co.Stage.Name)
}(c, co.Wfr.Name, co.Stage.Name, co.ctx.Done())
}

return nil
}

// MarkLogEOF marks end of stage logs.
func (co *Coordinator) MarkLogEOF() error {
return co.runtimeExec.MarkLogEOF(co.Wfr.Name, co.Stage.Name)
return co.runtimeExec.MarkLogEOF(co.Wfr.Name, co.Stage.Name, co.ctx.Done())
}

// WaitRunning waits all containers to start run.
Expand Down
9 changes: 3 additions & 6 deletions pkg/workflow/coordinator/cycloneserver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"net/url"
"strings"

log "github.com/sirupsen/logrus"

websocketutil "github.com/caicloud/cyclone/pkg/util/websocket"
)

Expand All @@ -20,7 +18,7 @@ const (

// Client ...
type Client interface {
PushLogStream(ns, workflowrun, stage, container string, reader io.Reader) error
PushLogStream(ns, workflowrun, stage, container string, reader io.Reader, close <-chan struct{}) error
}

type client struct {
Expand All @@ -42,7 +40,7 @@ func NewClient(cycloneServer string) Client {
}

// PushLogStream ...
func (c *client) PushLogStream(ns, workflowrun, stage, container string, reader io.Reader) error {
func (c *client) PushLogStream(ns, workflowrun, stage, container string, reader io.Reader, close <-chan struct{}) error {
path := fmt.Sprintf(apiPathForLogStream, workflowrun)
host := strings.TrimPrefix(c.baseURL, "http://")
host = strings.TrimPrefix(host, "https://")
Expand All @@ -53,6 +51,5 @@ func (c *client) PushLogStream(ns, workflowrun, stage, container string, reader
Scheme: "ws",
}

log.Infof("Path: %s", requestURL.String())
return websocketutil.SendStream(requestURL.String(), reader, make(chan struct{}))
return websocketutil.SendStream(requestURL.String(), reader, close)
}
10 changes: 5 additions & 5 deletions pkg/workflow/coordinator/k8sapi/k8sapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (k *Executor) GetPod() (*core_v1.Pod, error) {
}

// CollectLog collects container logs.
func (k *Executor) CollectLog(container, workflowrun, stage string) error {
func (k *Executor) CollectLog(container, workflowrun, stage string, close <-chan struct{}) error {
log.Infof("Start to collect %s log", container)
stream, err := k.client.CoreV1().Pods(k.namespace).GetLogs(k.podName, &core_v1.PodLogOptions{
Container: container,
Expand All @@ -112,16 +112,16 @@ func (k *Executor) CollectLog(container, workflowrun, stage string) error {
stream.Close()
}()

err = k.cycloneClient.PushLogStream(k.metaNamespace, workflowrun, stage, container, stream)
err = k.cycloneClient.PushLogStream(k.metaNamespace, workflowrun, stage, container, stream, close)
if err != nil {
return err
}
return nil
}

// MarkLogEOF marks the end of stage logs
func (k *Executor) MarkLogEOF(workflowrun, stage string) error {
err := k.cycloneClient.PushLogStream(k.metaNamespace, workflowrun, stage, cyclone_common.FolderEOFFile, strings.NewReader(""))
func (k *Executor) MarkLogEOF(workflowrun, stage string, close <-chan struct{}) error {
err := k.cycloneClient.PushLogStream(k.metaNamespace, workflowrun, stage, cyclone_common.FolderEOFFile, strings.NewReader(""), close)
if err != nil {
return err
}
Expand All @@ -135,7 +135,7 @@ func (k *Executor) CopyFromContainer(container, path, dst string) error {
cmd := exec.Command("docker", args...)
log.WithField("args", args).Info()
ret, err := cmd.CombinedOutput()
log.WithField("message", string(ret)).WithField("error", err).Info("copy file result")
log.WithField("message", string(ret)).WithField("error", err).WithField("container", container).Info("copy file result")
if err != nil {
return fmt.Errorf("%s, error: %v", string(ret), err)
}
Expand Down