Skip to content

Commit

Permalink
Improve IPFIX collector
Browse files Browse the repository at this point in the history
In this commit, we do:
1. We disable printing records whenever we receive it. Instead, we store records in a string array.
2. Add http listener and handler to receive request to return or reset records.
3. When the correlationRequired of the previous record is true and the correlationRequired of the next record is false, set readyToSend=true.

Signed-off-by: Yun-Tang Hsu <hsuy@vmware.com>
  • Loading branch information
yuntanghsu committed Dec 13, 2023
1 parent 1e31bbc commit ff5363a
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 3 deletions.
77 changes: 74 additions & 3 deletions cmd/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@ package main

import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"net/http"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"

Expand All @@ -42,6 +46,8 @@ var (
IPFIXAddr string
IPFIXPort uint16
IPFIXTransport string
flowRecords []string
mutex sync.Mutex
)

func initLoggingToFile(fs *pflag.FlagSet) {
Expand All @@ -60,7 +66,7 @@ func addIPFIXFlags(fs *pflag.FlagSet) {
fs.StringVar(&IPFIXTransport, "ipfix.transport", "tcp", "IPFIX collector transport layer")
}

func printIPFIXMessage(msg *entities.Message) {
func addIPFIXMessage(msg *entities.Message) {
var buf bytes.Buffer
fmt.Fprint(&buf, "\nIPFIX-HDR:\n")
fmt.Fprintf(&buf, " version: %v, Message Length: %v\n", msg.GetVersion(), msg.GetMessageLen())
Expand Down Expand Up @@ -126,7 +132,9 @@ func printIPFIXMessage(msg *entities.Message) {
}
}
}
klog.Infof(buf.String())
mutex.Lock()
defer mutex.Unlock()
flowRecords = append(flowRecords, buf.String())
}

func signalHandler(stopCh chan struct{}, messageReceived chan *entities.Message) {
Expand All @@ -136,7 +144,7 @@ func signalHandler(stopCh chan struct{}, messageReceived chan *entities.Message)
for {
select {
case msg := <-messageReceived:
printIPFIXMessage(msg)
addIPFIXMessage(msg)
case <-signalCh:
close(stopCh)
return
Expand Down Expand Up @@ -173,13 +181,41 @@ func run() error {
}
}()

var server *http.Server
// Start the HTTP server in a separate goroutine
go func() {
mux := http.NewServeMux()
mux.HandleFunc("/records", flowRecordHandler)
mux.HandleFunc("/reset", resetRecordHandler)

server = &http.Server{
Addr: ":8080",
Handler: mux,
ReadHeaderTimeout: 5 * time.Second,
}

klog.InfoS("Server listening on :8080")
if err := server.ListenAndServe(); err != http.ErrServerClosed {
klog.Fatalf("ListenAndServe error: %v", err)
}
}()

stopCh := make(chan struct{})
go signalHandler(stopCh, messageReceived)

<-stopCh
// Stop the collector process
cp.Stop()
klog.Info("Stopping IPFIX collector")
// Create a context with a 5-second timeout
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Attempt to gracefully shut down the server
if err := server.Shutdown(shutdownCtx); err != nil {
klog.ErrorS(err, "Error during server shutdown")
} else {
klog.InfoS("Server gracefully stopped")
}
return nil
}

Expand All @@ -202,6 +238,41 @@ func newCollectorCommand() *cobra.Command {
return cmd
}

func flowRecordHandler(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
mutex.Lock()
defer mutex.Unlock()
// Retrieve data
klog.InfoS("Return flow records", "length", len(flowRecords))
// Convert data to JSON
responseData := map[string]interface{}{"flowRecords": flowRecords}
jsonData, err := json.Marshal(responseData)
if err != nil {
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
// Set response headers
w.Header().Set("Content-Type", "application/json")
// Write JSON response
w.Write(jsonData)
} else {
http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
}
}

func resetRecordHandler(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
mutex.Lock()
defer mutex.Unlock()
klog.InfoS("Reset flow records")
flowRecords = []string{}
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, "Flow records successfully reset")
} else {
http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
}
}

func main() {
logs.InitLogs()
defer logs.FlushLogs()
Expand Down
10 changes: 10 additions & 0 deletions pkg/intermediate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,16 @@ func (a *AggregationProcess) addOrUpdateRecordInMap(flowKey *FlowKey, record ent
}
}
} else {
// For inter-node traffic with deny np or drop anp, we may receive the record from the conntrack table first.
// In this case, the record from the conntrack table will be considered to do correlation job and the ReadyToSend
// is false until we receive another record from PacketIn from another node. However, the record From PacketIN
// is not correlationRequired, so we won't send this kind of connection due to this conflict.
aggregationRecord.ReadyToSend = true
if flowType == registry.FlowTypeInterNode {
aggregationRecord.areCorrelatedFieldsFilled = false
} else {
aggregationRecord.areCorrelatedFieldsFilled = true

Check warning on line 377 in pkg/intermediate/aggregate.go

View check run for this annotation

Codecov / codecov/patch

pkg/intermediate/aggregate.go#L373-L377

Added lines #L373 - L377 were not covered by tests
}
// For flows that do not need correlation, just do aggregation of the
// flow record with existing record by updating the stats and flow timestamps.
if err = a.aggregateRecords(record, aggregationRecord.Record, true, true); err != nil {
Expand Down

0 comments on commit ff5363a

Please sign in to comment.