Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve IPFIX collector #338

Merged
merged 1 commit into from
Dec 15, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 74 additions & 3 deletions cmd/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@ package main

import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"net/http"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"

Expand All @@ -42,6 +46,8 @@ var (
IPFIXAddr string
IPFIXPort uint16
IPFIXTransport string
flowRecords []string
mutex sync.Mutex
)

func initLoggingToFile(fs *pflag.FlagSet) {
Expand All @@ -60,7 +66,7 @@ func addIPFIXFlags(fs *pflag.FlagSet) {
fs.StringVar(&IPFIXTransport, "ipfix.transport", "tcp", "IPFIX collector transport layer")
}

func printIPFIXMessage(msg *entities.Message) {
func addIPFIXMessage(msg *entities.Message) {
var buf bytes.Buffer
fmt.Fprint(&buf, "\nIPFIX-HDR:\n")
fmt.Fprintf(&buf, " version: %v, Message Length: %v\n", msg.GetVersion(), msg.GetMessageLen())
Expand Down Expand Up @@ -126,7 +132,9 @@ func printIPFIXMessage(msg *entities.Message) {
}
}
}
klog.Infof(buf.String())
mutex.Lock()
defer mutex.Unlock()
flowRecords = append(flowRecords, buf.String())
}

func signalHandler(stopCh chan struct{}, messageReceived chan *entities.Message) {
Expand All @@ -136,7 +144,7 @@ func signalHandler(stopCh chan struct{}, messageReceived chan *entities.Message)
for {
select {
case msg := <-messageReceived:
printIPFIXMessage(msg)
addIPFIXMessage(msg)
case <-signalCh:
close(stopCh)
return
Expand Down Expand Up @@ -173,13 +181,41 @@ func run() error {
}
}()

var server *http.Server
// Start the HTTP server in a separate goroutine
go func() {
mux := http.NewServeMux()
mux.HandleFunc("/records", flowRecordHandler)
mux.HandleFunc("/reset", resetRecordHandler)

server = &http.Server{
yuntanghsu marked this conversation as resolved.
Show resolved Hide resolved
Addr: ":8080",
Handler: mux,
ReadHeaderTimeout: 5 * time.Second,
}

klog.InfoS("Server listening on :8080")
if err := server.ListenAndServe(); err != http.ErrServerClosed {
klog.Fatalf("ListenAndServe error: %v", err)
}
}()

stopCh := make(chan struct{})
go signalHandler(stopCh, messageReceived)

<-stopCh
// Stop the collector process
cp.Stop()
klog.Info("Stopping IPFIX collector")
// Create a context with a 5-second timeout
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Attempt to gracefully shut down the server
if err := server.Shutdown(shutdownCtx); err != nil {
klog.ErrorS(err, "Error during server shutdown")
} else {
klog.InfoS("Server gracefully stopped")
}
return nil
}

Expand All @@ -202,6 +238,41 @@ func newCollectorCommand() *cobra.Command {
return cmd
}

func flowRecordHandler(w http.ResponseWriter, r *http.Request) {
yuntanghsu marked this conversation as resolved.
Show resolved Hide resolved
if r.Method == "GET" {
mutex.Lock()
defer mutex.Unlock()
// Retrieve data
klog.InfoS("Return flow records", "length", len(flowRecords))
// Convert data to JSON
responseData := map[string]interface{}{"flowRecords": flowRecords}
jsonData, err := json.Marshal(responseData)
if err != nil {
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
// Set response headers
w.Header().Set("Content-Type", "application/json")
// Write JSON response
w.Write(jsonData)
} else {
http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
}
}

func resetRecordHandler(w http.ResponseWriter, r *http.Request) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above, but you should request the HTTP method to be POST in this handler

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should this method be POST?
Do you think we still need this Handler. After disabling printing the log, the retrieve time is decreased from ~4s to ~80ms.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@antoninbas
I think this concern hasn't been solved?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I didn't see the question before
It should be POST because it does a mutation the server. That's an HTTP convention. You can lookup which HTTP verbs are most appropriate for different type of APIs.

Do you think we still need this Handler. After disabling printing the log, the retrieve time is decreased from ~4s to ~80ms.

I thought you wrote 0.8s in the PR description, which would be 800ms?

Regardless, let's add the reset method, it's easy to do and if we don't end up using it in Antrea tests, so be it.
It's easier than opening a new PR in this repo later and publishing a new collector image tag...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0.8s to 4s will be the range when we use "kubectl logs" to get the logs.
Under the current changes, this time will be approximately 80ms regardless of the number of records inside the IPFIX collector.

if r.Method == "POST" {
mutex.Lock()
defer mutex.Unlock()
klog.InfoS("Reset flow records")
flowRecords = []string{}
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, "Flow records successfully reset")
} else {
http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
}
}

func main() {
logs.InitLogs()
defer logs.FlushLogs()
Expand Down
Loading