Skip to content

Commit

Permalink
Merge pull request #1 from kazeburo/use-zap
Browse files Browse the repository at this point in the history
use zap
  • Loading branch information
kazeburo committed Feb 4, 2019
2 parents b69bc8c + 37692d4 commit 2abc29b
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 45 deletions.
81 changes: 67 additions & 14 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

83 changes: 52 additions & 31 deletions wsgate-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
"os"
Expand All @@ -20,6 +19,7 @@ import (
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
ss "github.com/lestrrat/go-server-starter-listener"
"go.uber.org/zap"
)

var (
Expand All @@ -42,7 +42,7 @@ func handleHello(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("OK\n"))
}

func handleProxy(w http.ResponseWriter, r *http.Request) {
func handleProxy(w http.ResponseWriter, r *http.Request, logger *zap.Logger) {
vars := mux.Vars(r)
proxyDest := vars["dest"]
upstream := ""
Expand All @@ -51,6 +51,13 @@ func handleProxy(w http.ResponseWriter, r *http.Request) {
hasError := false
disconnectAt := ""

logger = logger.With(
zap.String("user-email", r.Header.Get("X-Goog-Authenticated-User-Email")),
zap.String("x-forwarded-for", r.Header.Get("X-Forwarded-For")),
zap.String("remote-addr", r.RemoteAddr),
zap.String("destination", proxyDest),
)

if *publicKeyFile != "" {
tokenString := r.Header.Get("Authorization")
if tokenString == "" {
Expand Down Expand Up @@ -79,16 +86,18 @@ func handleProxy(w http.ResponseWriter, r *http.Request) {
upstream, ok := mapping[proxyDest]
if !ok {
hasError = true
log.Printf("No map for '%s' found", proxyDest)
logger.Warn("No map found")
http.Error(w, fmt.Sprintf("Not found: %s", proxyDest), 404)
return
}

logger = logger.With(zap.String("upstream", upstream))

s, err := net.DialTimeout("tcp", upstream, *dialTimeout)

if err != nil {
hasError = true
log.Printf("DialTimeout: %v", err)
logger.Warn("DialTimeout", zap.Error(err))
http.Error(w, fmt.Sprintf("Could not connect upstream: %v", err), 500)
return
}
Expand All @@ -97,21 +106,23 @@ func handleProxy(w http.ResponseWriter, r *http.Request) {
if err != nil {
hasError = true
s.Close()
log.Printf("Failed to Upgrade: %v", err)
logger.Warn("Failed to Upgrade", zap.Error(err))
return
}
log.Printf("status:Connected dest:%s upstream:%s x-forwarded-for:%s remote_addr:%s",
proxyDest, upstream, r.Header.Get("X-Forwarded-For"),
r.RemoteAddr)

logger.Info("log", zap.String("status", "Connected"))

defer func() {
status := "Suceeded"
if hasError {
status = "Failed"
}
log.Printf("status:%s dest:%s upstream:%s x-forwarded-for:%s remote_addr:%s read:%d write:%d disconnect_at:%s",
status, proxyDest, upstream, r.Header.Get("X-Forwarded-For"),
r.RemoteAddr, readLen, writeLen, disconnectAt)
logger.Info("log",
zap.String("status", status),
zap.Int64("read", readLen),
zap.Int64("write", writeLen),
zap.String("disconnect_at", disconnectAt),
)
}()

doneCh := make(chan bool)
Expand All @@ -130,7 +141,7 @@ func handleProxy(w http.ResponseWriter, r *http.Request) {
}
if err != nil {
if !goClose {
log.Printf("NextReader: %v", err)
logger.Warn("NextReader", zap.Error(err))
hasError = true
}
if disconnectAt == "" {
Expand All @@ -139,14 +150,14 @@ func handleProxy(w http.ResponseWriter, r *http.Request) {
return
}
if mt != websocket.BinaryMessage {
log.Printf("BinaryMessage required: %d", mt)
logger.Warn("BinaryMessage required", zap.Int("messageType", mt))
hasError = true
return
}
n, err := io.Copy(s, r)
if err != nil {
if !goClose {
log.Printf("Reading from websocket: %v", err)
logger.Warn("Reading from websocket", zap.Error(err))
hasError = true
}
if disconnectAt == "" {
Expand All @@ -166,7 +177,7 @@ func handleProxy(w http.ResponseWriter, r *http.Request) {
n, err := s.Read(b)
if err != nil {
if !goClose && err != io.EOF {
log.Printf("Reading from dest: %v", err)
logger.Warn("Reading from dest", zap.Error(err))
hasError = true
}
if disconnectAt == "" {
Expand All @@ -179,7 +190,7 @@ func handleProxy(w http.ResponseWriter, r *http.Request) {

if err := conn.WriteMessage(websocket.BinaryMessage, b); err != nil {
if !goClose {
log.Printf("WriteMessage: %v", err)
logger.Warn("WriteMessage", zap.Error(err))
hasError = true
}
if disconnectAt == "" {
Expand All @@ -199,26 +210,30 @@ func handleProxy(w http.ResponseWriter, r *http.Request) {

}

func printVersion() {
fmt.Printf(`wsgate-server %s
Compiler: %s %s
`,
Version,
runtime.Compiler,
runtime.Version())
}
func main() {
flag.Parse()

if *showVersion {
fmt.Printf(`wsgate-server %s
Compiler: %s %s
`,
Version,
runtime.Compiler,
runtime.Version())
printVersion()
return

}

logger, _ := zap.NewProduction()

r := regexp.MustCompile(`^ *#`)
mapping = make(map[string]string)
if *mapFile != "" {
f, err := os.Open(*mapFile)
if err != nil {
log.Fatal(err)
logger.Fatal("Failed to open mapFile", zap.Error(err))
}
s := bufio.NewScanner(f)
for s.Scan() {
Expand All @@ -227,21 +242,25 @@ Compiler: %s %s
}
l := strings.SplitN(s.Text(), ",", 2)
if len(l) != 2 {
log.Fatalf("Invalid line in %s: %s", *mapFile, s.Text())
logger.Fatal("Invalid line",
zap.String("mapFile", *mapFile),
zap.String("line", s.Text()))
}
log.Printf("Create map: %s => %s", l[0], l[1])
logger.Info("Created map",
zap.String("from", l[0]),
zap.String("to", l[1]))
mapping[l[0]] = l[1]
}
}

if *publicKeyFile != "" {
verifyBytes, err := ioutil.ReadFile(*publicKeyFile)
if err != nil {
log.Fatalf("Failed read pubkey: %v", err)
logger.Fatal("Failed read pubkey", zap.Error(err))
}
verifyKey, err = jwt.ParseRSAPublicKeyFromPEM(verifyBytes)
if err != nil {
log.Fatalf("Failed read pubkey: %v", err)
logger.Fatal("Failed read pubkey", zap.Error(err))
}
}

Expand All @@ -257,14 +276,16 @@ Compiler: %s %s
m := mux.NewRouter()
m.HandleFunc("/", handleHello)
m.HandleFunc("/live", handleHello)
m.HandleFunc("/proxy/{dest}", handleProxy)
m.HandleFunc("/proxy/{dest}", func(w http.ResponseWriter, r *http.Request) {
handleProxy(w, r, logger)
})

l, err := ss.NewListener()
if l == nil || err != nil {
// Fallback if not running under Server::Starter
l, err = net.Listen("tcp", *listen)
if err != nil {
panic(fmt.Sprintf("Failed to listen to port %s", *listen))
logger.Fatal("Failed to listen to port", zap.String("listen", *listen))
}
}

Expand All @@ -274,5 +295,5 @@ Compiler: %s %s
WriteTimeout: 10 * time.Second,
MaxHeaderBytes: 1 << 20,
}
log.Fatal(s.Serve(l))
s.Serve(l)
}

0 comments on commit 2abc29b

Please sign in to comment.