Skip to content

Commit

Permalink
Merge pull request #3 from kazeburo/log-tcp-dump
Browse files Browse the repository at this point in the history
Log tcp dump
  • Loading branch information
kazeburo committed Feb 5, 2019
2 parents ed99439 + b1ec1c4 commit f072df4
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 15 deletions.
9 changes: 9 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 14 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,18 @@ db, err := sql.Open("mysql", "yyyy:xxx@websocket(https://example.com/proxy/mysql

```
Usage of ./wsgate-server:
-dial_timeout duration
Dial timeout. (default 10s)
-handshake_timeout duration
Handshake timeout. (default 10s)
-listen string
Address to listen to. (default "127.0.0.1:8086")
-map string
path and proxy host mapping file
-public-key string
public key for signing auth header
-version
show version
-write_timeout duration
Write timeout. (default 10s)
-dial_timeout duration
Dial timeout. (default 10s)
-handshake_timeout duration
Handshake timeout. (default 10s)
-listen string
Address to listen to. (default "127.0.0.1:8086")
-map string
path and proxy host mapping file
-public-key string
public key for signing auth header
-version
show version
-write_timeout duration
Write timeout. (default 10s)
```
62 changes: 62 additions & 0 deletions dumper/dumper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package dumper

import (
"bytes"
"encoding/hex"
"strings"
"sync"

"go.uber.org/zap"
)

// Dumper dumper struct
type Dumper struct {
direction uint
logger *zap.Logger
buf *bytes.Buffer
mu *sync.RWMutex
}

// New new handler
func New(direction uint, logger *zap.Logger) *Dumper {
d := &Dumper{
direction: direction,
logger: logger,
buf: new(bytes.Buffer),
mu: new(sync.RWMutex),
}
return d
}

// Write to dump
func (d *Dumper) Write(p []byte) (n int, err error) {
d.mu.Lock()
defer d.mu.Unlock()
d.buf.Write(p)
return len(p), nil
}

// Flush flush buffer
func (d *Dumper) Flush() {
d.mu.Lock()
defer d.mu.Unlock()
if d.buf.Len() == 0 {
return
}
hexdump := strings.Split(hex.Dump(d.buf.Bytes()), "\n")
d.buf.Truncate(0)
byteString := []string{}
ascii := []string{}
for _, hd := range hexdump {
if hd == "" {
continue
}
byteString = append(byteString, strings.TrimRight(strings.Replace(hd[10:58], " ", " ", 1), " "))
ascii = append(ascii, hd[61:len(hd)-1])
}
d.logger.Info("dump",
zap.Uint("direction", d.direction),
zap.String("hex", strings.Join(byteString, " ")),
zap.String("ascii", strings.Join(ascii, "")),
)
}
41 changes: 40 additions & 1 deletion handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,19 @@ import (

"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/kazeburo/wsgate-server/dumper"
"github.com/kazeburo/wsgate-server/mapping"
"github.com/kazeburo/wsgate-server/publickey"
"github.com/kazeburo/wsgate-server/seq"
"go.uber.org/zap"
)

var (
websocketUpstream uint = 1
upstreamWebsocket uint = 2
flushDumperInterval time.Duration = 300
)

// Handler handlers
type Handler struct {
logger *zap.Logger
Expand All @@ -22,6 +30,8 @@ type Handler struct {
writeTimeout time.Duration
mp *mapping.Mapping
pk *publickey.Publickey
dumpTCP uint
sq *seq.Seq
}

// New new handler
Expand All @@ -31,6 +41,7 @@ func New(
writeTimeout time.Duration,
mp *mapping.Mapping,
pk *publickey.Publickey,
dumpTCP uint,
logger *zap.Logger) (*Handler, error) {

upgrader := websocket.Upgrader{
Expand All @@ -49,6 +60,8 @@ func New(
writeTimeout: writeTimeout,
mp: mp,
pk: pk,
dumpTCP: dumpTCP,
sq: seq.New(),
}, nil
}

Expand All @@ -71,6 +84,7 @@ func (h *Handler) Proxy() func(w http.ResponseWriter, r *http.Request) {
disconnectAt := ""

logger := h.logger.With(
zap.Uint64("seq", h.sq.Next()),
zap.String("user-email", r.Header.Get("X-Goog-Authenticated-User-Email")),
zap.String("x-forwarded-for", r.Header.Get("X-Forwarded-For")),
zap.String("remote-addr", r.RemoteAddr),
Expand Down Expand Up @@ -114,8 +128,12 @@ func (h *Handler) Proxy() func(w http.ResponseWriter, r *http.Request) {
}

logger.Info("log", zap.String("status", "Connected"))
dr := dumper.New(websocketUpstream, logger)
ds := dumper.New(upstreamWebsocket, logger)

defer func() {
dr.Flush()
ds.Flush()
status := "Suceeded"
if hasError {
status = "Failed"
Expand All @@ -128,6 +146,22 @@ func (h *Handler) Proxy() func(w http.ResponseWriter, r *http.Request) {
)
}()

ticker := time.NewTicker(flushDumperInterval * time.Millisecond)
defer ticker.Stop()
go func() {
for {
select {
case <-r.Context().Done():
dr.Flush()
ds.Flush()
return
case _ = <-ticker.C:
dr.Flush()
ds.Flush()
}
}
}()

doneCh := make(chan bool)
goClose := false

Expand Down Expand Up @@ -157,6 +191,9 @@ func (h *Handler) Proxy() func(w http.ResponseWriter, r *http.Request) {
hasError = true
return
}
if h.dumpTCP > 0 {
r = io.TeeReader(r, dr)
}
n, err := io.Copy(s, r)
if err != nil {
if !goClose {
Expand Down Expand Up @@ -190,7 +227,9 @@ func (h *Handler) Proxy() func(w http.ResponseWriter, r *http.Request) {
}

b = b[:n]

if h.dumpTCP > 1 {
ds.Write(b)
}
if err := conn.WriteMessage(websocket.BinaryMessage, b); err != nil {
if !goClose {
logger.Warn("WriteMessage", zap.Error(err))
Expand Down
25 changes: 25 additions & 0 deletions seq/seq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package seq

import "sync"

// Seq struct
type Seq struct {
i uint64
mu *sync.RWMutex
}

// New create sequencer
func New() *Seq {
return &Seq{
i: 0,
mu: new(sync.RWMutex),
}
}

// Next fetch new sequence
func (s *Seq) Next() uint64 {
s.mu.Lock()
defer s.mu.Unlock()
s.i = s.i + 1
return s.i
}
2 changes: 2 additions & 0 deletions wsgate-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
writeTimeout = flag.Duration("write_timeout", 10*time.Second, "Write timeout.")
mapFile = flag.String("map", "", "path and proxy host mapping file")
publicKeyFile = flag.String("public-key", "", "public key for signing auth header")
dumpTCP = flag.Uint("dump-tcp", 0, "Dump TCP. 0 = disable, 1 = src to dest, 2 = both")
)

func printVersion() {
Expand Down Expand Up @@ -62,6 +63,7 @@ func main() {
*writeTimeout,
mp,
pk,
*dumpTCP,
logger,
)
if err != nil {
Expand Down

0 comments on commit f072df4

Please sign in to comment.