diff --git a/cmd/collector/collector.go b/cmd/collector/collector.go index 55f7b890..4bd9fb93 100644 --- a/cmd/collector/collector.go +++ b/cmd/collector/collector.go @@ -16,11 +16,15 @@ package main import ( "bytes" + "context" + "encoding/json" "flag" "fmt" + "net/http" "os" "os/signal" "strconv" + "sync" "syscall" "time" @@ -42,6 +46,8 @@ var ( IPFIXAddr string IPFIXPort uint16 IPFIXTransport string + flowRecords []string + mutex sync.Mutex ) func initLoggingToFile(fs *pflag.FlagSet) { @@ -60,7 +66,7 @@ func addIPFIXFlags(fs *pflag.FlagSet) { fs.StringVar(&IPFIXTransport, "ipfix.transport", "tcp", "IPFIX collector transport layer") } -func printIPFIXMessage(msg *entities.Message) { +func addIPFIXMessage(msg *entities.Message) { var buf bytes.Buffer fmt.Fprint(&buf, "\nIPFIX-HDR:\n") fmt.Fprintf(&buf, " version: %v, Message Length: %v\n", msg.GetVersion(), msg.GetMessageLen()) @@ -126,7 +132,9 @@ func printIPFIXMessage(msg *entities.Message) { } } } - klog.Infof(buf.String()) + mutex.Lock() + defer mutex.Unlock() + flowRecords = append(flowRecords, buf.String()) } func signalHandler(stopCh chan struct{}, messageReceived chan *entities.Message) { @@ -136,7 +144,7 @@ func signalHandler(stopCh chan struct{}, messageReceived chan *entities.Message) for { select { case msg := <-messageReceived: - printIPFIXMessage(msg) + addIPFIXMessage(msg) case <-signalCh: close(stopCh) return @@ -173,6 +181,25 @@ func run() error { } }() + var server *http.Server + // Start the HTTP server in a separate goroutine + go func() { + mux := http.NewServeMux() + mux.HandleFunc("/records", flowRecordHandler) + mux.HandleFunc("/reset", resetRecordHandler) + + server = &http.Server{ + Addr: ":8080", + Handler: mux, + ReadHeaderTimeout: 5 * time.Second, + } + + klog.InfoS("Server listening on :8080") + if err := server.ListenAndServe(); err != http.ErrServerClosed { + klog.Fatalf("ListenAndServe error: %v", err) + } + }() + stopCh := make(chan struct{}) go signalHandler(stopCh, messageReceived) @@ -180,6 +207,15 @@ func run() error { // Stop the collector process cp.Stop() klog.Info("Stopping IPFIX collector") + // Create a context with a 5-second timeout + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + // Attempt to gracefully shut down the server + if err := server.Shutdown(shutdownCtx); err != nil { + klog.ErrorS(err, "Error during server shutdown") + } else { + klog.InfoS("Server gracefully stopped") + } return nil } @@ -202,6 +238,41 @@ func newCollectorCommand() *cobra.Command { return cmd } +func flowRecordHandler(w http.ResponseWriter, r *http.Request) { + if r.Method == "GET" { + mutex.Lock() + defer mutex.Unlock() + // Retrieve data + klog.InfoS("Return flow records", "length", len(flowRecords)) + // Convert data to JSON + responseData := map[string]interface{}{"flowRecords": flowRecords} + jsonData, err := json.Marshal(responseData) + if err != nil { + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + // Set response headers + w.Header().Set("Content-Type", "application/json") + // Write JSON response + w.Write(jsonData) + } else { + http.Error(w, "Invalid request method", http.StatusMethodNotAllowed) + } +} + +func resetRecordHandler(w http.ResponseWriter, r *http.Request) { + if r.Method == "POST" { + mutex.Lock() + defer mutex.Unlock() + klog.InfoS("Reset flow records") + flowRecords = []string{} + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, "Flow records successfully reset") + } else { + http.Error(w, "Invalid request method", http.StatusMethodNotAllowed) + } +} + func main() { logs.InitLogs() defer logs.FlushLogs()