Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Events refactor #30

Merged
merged 4 commits into from
Aug 26, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ RUN go mod download
RUN make build-linux

# Build BPF
FROM public.ecr.aws/eks-distro-build-tooling/eks-distro-base:latest-al23 as bpfbuilder
FROM public.ecr.aws/amazonlinux/amazonlinux:2023 as bpfbuilder
jayanthvn marked this conversation as resolved.
Show resolved Hide resolved
WORKDIR /bpfbuilder
RUN yum update -y && \
yum install -y iproute procps-ng && \
Expand Down
252 changes: 92 additions & 160 deletions pkg/ebpf/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@
"fmt"
"os"
"strconv"
"sync"
"time"

"github.com/aws/aws-network-policy-agent/pkg/aws"
"github.com/aws/aws-network-policy-agent/pkg/aws/services"
"github.com/aws/aws-network-policy-agent/pkg/utils"

goebpfevents "github.com/aws/aws-ebpf-sdk-go/pkg/events"

Check failure on line 15 in pkg/ebpf/events/events.go

View workflow job for this annotation

GitHub Actions / Unit test

missing go.sum entry for module providing package github.com/aws/aws-ebpf-sdk-go/pkg/events (imported by github.com/aws/aws-network-policy-agent/pkg/ebpf/events); to add:
awssdk "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
Expand All @@ -32,7 +31,7 @@
NON_EKS_CW_PATH = "/aws/"
)

type Event_t struct {
type ringBufferDataV4_t struct {
SourceIP uint32
SourcePort uint32
DestIP uint32
Expand All @@ -41,7 +40,7 @@
Verdict uint32
}

type EventV6_t struct {
type ringBufferDataV6_t struct {
SourceIP [16]byte
SourcePort uint32
DestIP [16]byte
Expand All @@ -50,10 +49,6 @@
Verdict uint32
}

type EvProgram struct {
wg sync.WaitGroup
}

func ConfigurePolicyEventsLogging(logger logr.Logger, enableCloudWatchLogs bool, mapFD int, enableIPv6 bool) error {
// Enable logging and setup ring buffer
if mapFD <= 0 {
Expand All @@ -69,8 +64,7 @@
return err
} else {
logger.Info("Configure Event loop ... ")
p := EvProgram{wg: sync.WaitGroup{}}
p.capturePolicyEvents(eventChanList[mapFD], logger, enableCloudWatchLogs, enableIPv6)
capturePolicyEvents(eventChanList[mapFD], logger, enableCloudWatchLogs, enableIPv6)
if enableCloudWatchLogs {
logger.Info("Cloudwatch log support is enabled")
err = setupCW(logger)
Expand Down Expand Up @@ -112,182 +106,120 @@
return nil
}

func (p *EvProgram) capturePolicyV6Events(events <-chan []byte, log logr.Logger, enableCloudWatchLogs bool) {
nodeName := os.Getenv("MY_NODE_NAME")
go func(events <-chan []byte) {
defer p.wg.Done()

for {
if b, ok := <-events; ok {
var logQueue []*cloudwatchlogs.InputLogEvent
func getProtocol(protocolNum int) string {
protocolStr := "UNKNOWN"
if protocolNum == utils.TCP_PROTOCOL_NUMBER {
protocolStr = "TCP"
} else if protocolNum == utils.UDP_PROTOCOL_NUMBER {
protocolStr = "UDP"
} else if protocolNum == utils.SCTP_PROTOCOL_NUMBER {
protocolStr = "SCTP"
} else if protocolNum == utils.ICMP_PROTOCOL_NUMBER {
protocolStr = "ICMP"
}
return protocolStr
}

var ev EventV6_t
buf := bytes.NewBuffer(b)
if err := binary.Read(buf, binary.LittleEndian, &ev); err != nil {
log.Info("Read Ring buf", "Failed ", err)
continue
}
func getVerdict(verdict int) string {
verdictStr := "DENY"
if verdict == 1 {
jayanthvn marked this conversation as resolved.
Show resolved Hide resolved
verdictStr = "ACCEPT"
} else if verdict == 2 {
verdictStr = "EXPIRED/DELETED"
}
return verdictStr
}

protocol := "UNKNOWN"
if int(ev.Protocol) == utils.TCP_PROTOCOL_NUMBER {
protocol = "TCP"
} else if int(ev.Protocol) == utils.UDP_PROTOCOL_NUMBER {
protocol = "UDP"
} else if int(ev.Protocol) == utils.SCTP_PROTOCOL_NUMBER {
protocol = "SCTP"
} else if int(ev.Protocol) == utils.ICMP_PROTOCOL_NUMBER {
protocol = "ICMP"
}
func publishData(logQueue []*cloudwatchlogs.InputLogEvent, message string, log logr.Logger) bool {
jayanthvn marked this conversation as resolved.
Show resolved Hide resolved
logQueue = append(logQueue, &cloudwatchlogs.InputLogEvent{
Message: &message,
Timestamp: awssdk.Int64(time.Now().UnixNano() / int64(time.Millisecond)),
})
if len(logQueue) > 0 {
log.Info("Sending CW")
jayanthvn marked this conversation as resolved.
Show resolved Hide resolved
input := cloudwatchlogs.PutLogEventsInput{
LogEvents: logQueue,
LogGroupName: &logGroupName,
}

verdict := "DENY"
if ev.Verdict == 1 {
verdict = "ACCEPT"
} else if ev.Verdict == 2 {
verdict = "EXPIRED/DELETED"
}
if sequenceToken == "" {
err := createLogStream()
if err != nil {
log.Info("Failed to create log stream")
panic(err)
}
} else {
input = *input.SetSequenceToken(sequenceToken)
}

log.Info("Flow Info: ", "Src IP", utils.ConvByteToIPv6(ev.SourceIP).String(), "Src Port", ev.SourcePort,
"Dest IP", utils.ConvByteToIPv6(ev.DestIP).String(), "Dest Port", ev.DestPort,
"Proto", protocol, "Verdict", verdict)
input = *input.SetLogStreamName(logStreamName)

message := "Node: " + nodeName + ";" + "SIP: " + utils.ConvByteToIPv6(ev.SourceIP).String() + ";" + "SPORT: " + strconv.Itoa(int(ev.SourcePort)) + ";" + "DIP: " + utils.ConvByteToIPv6(ev.DestIP).String() + ";" + "DPORT: " + strconv.Itoa(int(ev.DestPort)) + ";" + "PROTOCOL: " + protocol + ";" + "PolicyVerdict: " + verdict
resp, err := cwl.PutLogEvents(&input)
if err != nil {
log.Info("Push log events", "Failed ", err)
}

if enableCloudWatchLogs {
logQueue = append(logQueue, &cloudwatchlogs.InputLogEvent{
Message: &message,
Timestamp: awssdk.Int64(time.Now().UnixNano() / int64(time.Millisecond)),
})
if len(logQueue) > 0 {
log.Info("Sending CW")
input := cloudwatchlogs.PutLogEventsInput{
LogEvents: logQueue,
LogGroupName: &logGroupName,
}

if sequenceToken == "" {
err := createLogStream()
if err != nil {
log.Info("Failed to create log stream")
panic(err)
}
} else {
input = *input.SetSequenceToken(sequenceToken)
}

input = *input.SetLogStreamName(logStreamName)

resp, err := cwl.PutLogEvents(&input)
if err != nil {
log.Info("Kprobe", "Failed ", err)
}

if resp != nil {
sequenceToken = *resp.NextSequenceToken
}

logQueue = []*cloudwatchlogs.InputLogEvent{}
} else {
break
}
}
}
if resp != nil {
sequenceToken = *resp.NextSequenceToken
}
}(events)

logQueue = []*cloudwatchlogs.InputLogEvent{}
return false
}
return true
}

func (p *EvProgram) capturePolicyV4Events(events <-chan []byte, log logr.Logger, enableCloudWatchLogs bool) {
func capturePolicyEvents(ringbufferdata <-chan []byte, log logr.Logger, enableCloudWatchLogs bool, enableIPv6 bool) {
nodeName := os.Getenv("MY_NODE_NAME")
go func(events <-chan []byte) {
defer p.wg.Done()

// Read from ringbuffer channel, perf buffer support is not there and 5.10 kernel is needed.
go func(ringbufferdata <-chan []byte) {
done := false
for {
if b, ok := <-events; ok {
if record, ok := <-ringbufferdata; ok {
var logQueue []*cloudwatchlogs.InputLogEvent
var message string
if enableIPv6 {
var rb ringBufferDataV6_t
buf := bytes.NewBuffer(record)
if err := binary.Read(buf, binary.LittleEndian, &rb); err != nil {
log.Info("Failed to read from Ring buf", err)
continue
}

var ev Event_t
buf := bytes.NewBuffer(b)
if err := binary.Read(buf, binary.LittleEndian, &ev); err != nil {
log.Info("Read Ring buf", "Failed ", err)
continue
}
protocol := getProtocol(int(rb.Protocol))
verdict := getVerdict(int(rb.Verdict))

protocol := "UNKNOWN"
if int(ev.Protocol) == utils.TCP_PROTOCOL_NUMBER {
protocol = "TCP"
} else if int(ev.Protocol) == utils.UDP_PROTOCOL_NUMBER {
protocol = "UDP"
} else if int(ev.Protocol) == utils.SCTP_PROTOCOL_NUMBER {
protocol = "SCTP"
} else if int(ev.Protocol) == utils.ICMP_PROTOCOL_NUMBER {
protocol = "ICMP"
}
log.Info("Flow Info: ", "Src IP", utils.ConvByteToIPv6(rb.SourceIP).String(), "Src Port", rb.SourcePort,
"Dest IP", utils.ConvByteToIPv6(rb.DestIP).String(), "Dest Port", rb.DestPort,
"Proto", protocol, "Verdict", verdict)

verdict := "DENY"
if ev.Verdict == 1 {
verdict = "ACCEPT"
} else if ev.Verdict == 2 {
verdict = "EXPIRED/DELETED"
}
message = "Node: " + nodeName + ";" + "SIP: " + utils.ConvByteToIPv6(rb.SourceIP).String() + ";" + "SPORT: " + strconv.Itoa(int(rb.SourcePort)) + ";" + "DIP: " + utils.ConvByteToIPv6(rb.DestIP).String() + ";" + "DPORT: " + strconv.Itoa(int(rb.DestPort)) + ";" + "PROTOCOL: " + protocol + ";" + "PolicyVerdict: " + verdict
} else {
var rb ringBufferDataV4_t
buf := bytes.NewBuffer(record)
if err := binary.Read(buf, binary.LittleEndian, &rb); err != nil {
log.Info("Failed to read from Ring buf", err)
continue
}
protocol := getProtocol(int(rb.Protocol))
verdict := getVerdict(int(rb.Verdict))

log.Info("Flow Info: ", "Src IP", utils.ConvByteArrayToIP(ev.SourceIP), "Src Port", ev.SourcePort,
"Dest IP", utils.ConvByteArrayToIP(ev.DestIP), "Dest Port", ev.DestPort,
"Proto", protocol, "Verdict", verdict)
log.Info("Flow Info: ", "Src IP", utils.ConvByteArrayToIP(rb.SourceIP), "Src Port", rb.SourcePort,
"Dest IP", utils.ConvByteArrayToIP(rb.DestIP), "Dest Port", rb.DestPort,
"Proto", protocol, "Verdict", verdict)

message := "Node: " + nodeName + ";" + "SIP: " + utils.ConvByteArrayToIP(ev.SourceIP) + ";" + "SPORT: " + strconv.Itoa(int(ev.SourcePort)) + ";" + "DIP: " + utils.ConvByteArrayToIP(ev.DestIP) + ";" + "DPORT: " + strconv.Itoa(int(ev.DestPort)) + ";" + "PROTOCOL: " + protocol + ";" + "PolicyVerdict: " + verdict
message = "Node: " + nodeName + ";" + "SIP: " + utils.ConvByteArrayToIP(rb.SourceIP) + ";" + "SPORT: " + strconv.Itoa(int(rb.SourcePort)) + ";" + "DIP: " + utils.ConvByteArrayToIP(rb.DestIP) + ";" + "DPORT: " + strconv.Itoa(int(rb.DestPort)) + ";" + "PROTOCOL: " + protocol + ";" + "PolicyVerdict: " + verdict
}

if enableCloudWatchLogs {
logQueue = append(logQueue, &cloudwatchlogs.InputLogEvent{
Message: &message,
Timestamp: awssdk.Int64(time.Now().UnixNano() / int64(time.Millisecond)),
})
if len(logQueue) > 0 {
log.Info("Sending CW")
input := cloudwatchlogs.PutLogEventsInput{
LogEvents: logQueue,
LogGroupName: &logGroupName,
}

if sequenceToken == "" {
err := createLogStream()
if err != nil {
log.Info("Failed to create log stream")
panic(err)
}
} else {
input = *input.SetSequenceToken(sequenceToken)
}

input = *input.SetLogStreamName(logStreamName)

resp, err := cwl.PutLogEvents(&input)
if err != nil {
log.Info("Kprobe", "Failed ", err)
}

if resp != nil {
sequenceToken = *resp.NextSequenceToken
}

logQueue = []*cloudwatchlogs.InputLogEvent{}
} else {
done = publishData(logQueue, message, log)
if done {
jayanthvn marked this conversation as resolved.
Show resolved Hide resolved
break
}
}
}
}
}(events)
}

func (p *EvProgram) capturePolicyEvents(events <-chan []byte, log logr.Logger, enableCloudWatchLogs bool,
enableIPv6 bool) {
p.wg.Add(1)

if enableIPv6 {
p.capturePolicyV6Events(events, log, enableCloudWatchLogs)
} else {
p.capturePolicyV4Events(events, log, enableCloudWatchLogs)
}
}(ringbufferdata)
}

func ensureLogGroupExists(name string) error {
Expand Down
Loading