Skip to content

Commit

Permalink
Improve IPFIX collector (#338)
Browse files Browse the repository at this point in the history
1. Disable printing records whenever we receive them. Instead, we store records in a string array.
2. Add http listener and handler to receive request to return or reset records.

Signed-off-by: Yun-Tang Hsu <hsuy@vmware.com>
  • Loading branch information
yuntanghsu committed Dec 15, 2023
1 parent 1e31bbc commit 837050b
Showing 1 changed file with 74 additions and 3 deletions.
77 changes: 74 additions & 3 deletions cmd/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@ package main

import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"net/http"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"

Expand All @@ -42,6 +46,8 @@ var (
IPFIXAddr string
IPFIXPort uint16
IPFIXTransport string
flowRecords []string
mutex sync.Mutex
)

func initLoggingToFile(fs *pflag.FlagSet) {
Expand All @@ -60,7 +66,7 @@ func addIPFIXFlags(fs *pflag.FlagSet) {
fs.StringVar(&IPFIXTransport, "ipfix.transport", "tcp", "IPFIX collector transport layer")
}

func printIPFIXMessage(msg *entities.Message) {
func addIPFIXMessage(msg *entities.Message) {
var buf bytes.Buffer
fmt.Fprint(&buf, "\nIPFIX-HDR:\n")
fmt.Fprintf(&buf, " version: %v, Message Length: %v\n", msg.GetVersion(), msg.GetMessageLen())
Expand Down Expand Up @@ -126,7 +132,9 @@ func printIPFIXMessage(msg *entities.Message) {
}
}
}
klog.Infof(buf.String())
mutex.Lock()
defer mutex.Unlock()
flowRecords = append(flowRecords, buf.String())
}

func signalHandler(stopCh chan struct{}, messageReceived chan *entities.Message) {
Expand All @@ -136,7 +144,7 @@ func signalHandler(stopCh chan struct{}, messageReceived chan *entities.Message)
for {
select {
case msg := <-messageReceived:
printIPFIXMessage(msg)
addIPFIXMessage(msg)
case <-signalCh:
close(stopCh)
return
Expand Down Expand Up @@ -173,13 +181,41 @@ func run() error {
}
}()

var server *http.Server
// Start the HTTP server in a separate goroutine
go func() {
mux := http.NewServeMux()
mux.HandleFunc("/records", flowRecordHandler)
mux.HandleFunc("/reset", resetRecordHandler)

server = &http.Server{
Addr: ":8080",
Handler: mux,
ReadHeaderTimeout: 5 * time.Second,
}

klog.InfoS("Server listening on :8080")
if err := server.ListenAndServe(); err != http.ErrServerClosed {
klog.Fatalf("ListenAndServe error: %v", err)
}
}()

stopCh := make(chan struct{})
go signalHandler(stopCh, messageReceived)

<-stopCh
// Stop the collector process
cp.Stop()
klog.Info("Stopping IPFIX collector")
// Create a context with a 5-second timeout
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Attempt to gracefully shut down the server
if err := server.Shutdown(shutdownCtx); err != nil {
klog.ErrorS(err, "Error during server shutdown")
} else {
klog.InfoS("Server gracefully stopped")
}
return nil
}

Expand All @@ -202,6 +238,41 @@ func newCollectorCommand() *cobra.Command {
return cmd
}

func flowRecordHandler(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
mutex.Lock()
defer mutex.Unlock()
// Retrieve data
klog.InfoS("Return flow records", "length", len(flowRecords))
// Convert data to JSON
responseData := map[string]interface{}{"flowRecords": flowRecords}
jsonData, err := json.Marshal(responseData)
if err != nil {
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
// Set response headers
w.Header().Set("Content-Type", "application/json")
// Write JSON response
w.Write(jsonData)
} else {
http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
}
}

func resetRecordHandler(w http.ResponseWriter, r *http.Request) {
if r.Method == "POST" {
mutex.Lock()
defer mutex.Unlock()
klog.InfoS("Reset flow records")
flowRecords = []string{}
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, "Flow records successfully reset")
} else {
http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
}
}

func main() {
logs.InitLogs()
defer logs.FlushLogs()
Expand Down

0 comments on commit 837050b

Please sign in to comment.