Skip to content

Commit

Permalink
Use Traceflow API v1beta1 (#5689)
Browse files Browse the repository at this point in the history
Use Traceflow API v1beta1 for `antctl traceflow` and in Traceflow e2e tests.

Fixes #5656

Signed-off-by: Kumar Atish <atish.iaf@gmail.com>
  • Loading branch information
Atish-iaf committed Nov 15, 2023
1 parent 6acd0d9 commit aa1c6b0
Show file tree
Hide file tree
Showing 5 changed files with 730 additions and 727 deletions.
75 changes: 39 additions & 36 deletions pkg/antctl/raw/traceflow/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/antctl/raw"
"antrea.io/antrea/pkg/apis/crd/v1alpha1"
"antrea.io/antrea/pkg/apis/crd/v1beta1"
antrea "antrea.io/antrea/pkg/client/clientset/versioned"
)

Expand Down Expand Up @@ -64,23 +64,23 @@ var protocols = map[string]int32{
}

type CapturedPacket struct {
SrcIP string `json:"srcIP" yaml:"srcIP"`
DstIP string `json:"dstIP" yaml:"dstIP"`
Length uint16 `json:"length" yaml:"length"`
IPHeader *v1alpha1.IPHeader `json:"ipHeader,omitempty" yaml:"ipHeader,omitempty"`
IPv6Header *v1alpha1.IPv6Header `json:"ipv6Header,omitempty" yaml:"ipv6Header,omitempty"`
TransportHeader *v1alpha1.TransportHeader `json:"transportHeader,omitempty" yaml:"tranportHeader,omitempty"`
SrcIP string `json:"srcIP" yaml:"srcIP"`
DstIP string `json:"dstIP" yaml:"dstIP"`
Length int32 `json:"length" yaml:"length"`
IPHeader *v1beta1.IPHeader `json:"ipHeader,omitempty" yaml:"ipHeader,omitempty"`
IPv6Header *v1beta1.IPv6Header `json:"ipv6Header,omitempty" yaml:"ipv6Header,omitempty"`
TransportHeader *v1beta1.TransportHeader `json:"transportHeader,omitempty" yaml:"tranportHeader,omitempty"`
}

// Response is the response of antctl Traceflow.
type Response struct {
Name string `json:"name" yaml:"name"` // Traceflow name
Phase v1alpha1.TraceflowPhase `json:"phase,omitempty" yaml:"phase,omitempty"` // Traceflow phase
Reason string `json:"reason,omitempty" yaml:"reason,omitempty"` // Traceflow phase reason
Source string `json:"source,omitempty" yaml:"source,omitempty"` // Traceflow source, e.g. "default/pod0"
Destination string `json:"destination,omitempty" yaml:"destination,omitempty"` // Traceflow destination, e.g. "default/pod1"
NodeResults []v1alpha1.NodeResult `json:"results,omitempty" yaml:"results,omitempty"` // Traceflow node results
CapturedPacket *CapturedPacket `json:"capturedPacket,omitempty" yaml:"capturedPacket,omitempty"` // Captured packet in live-traffic Traceflow
Name string `json:"name" yaml:"name"` // Traceflow name
Phase v1beta1.TraceflowPhase `json:"phase,omitempty" yaml:"phase,omitempty"` // Traceflow phase
Reason string `json:"reason,omitempty" yaml:"reason,omitempty"` // Traceflow phase reason
Source string `json:"source,omitempty" yaml:"source,omitempty"` // Traceflow source, e.g. "default/pod0"
Destination string `json:"destination,omitempty" yaml:"destination,omitempty"` // Traceflow destination, e.g. "default/pod1"
NodeResults []v1beta1.NodeResult `json:"results,omitempty" yaml:"results,omitempty"` // Traceflow node results
CapturedPacket *CapturedPacket `json:"capturedPacket,omitempty" yaml:"capturedPacket,omitempty"` // Captured packet in live-traffic Traceflow
}

func init() {
Expand Down Expand Up @@ -168,12 +168,12 @@ func runE(cmd *cobra.Command, _ []string) error {

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if _, err = client.CrdV1alpha1().Traceflows().Create(ctx, tf, metav1.CreateOptions{}); err != nil {
if _, err = client.CrdV1beta1().Traceflows().Create(ctx, tf, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("error when creating Traceflow, is Traceflow feature gate enabled? %w", err)
}
defer func() {
if !option.nowait {
if err = client.CrdV1alpha1().Traceflows().Delete(context.TODO(), tf.Name, metav1.DeleteOptions{}); err != nil {
if err = client.CrdV1beta1().Traceflows().Delete(context.TODO(), tf.Name, metav1.DeleteOptions{}); err != nil {
klog.Errorf("error when deleting Traceflow: %+v", err)
}
}
Expand All @@ -183,13 +183,13 @@ func runE(cmd *cobra.Command, _ []string) error {
return nil
}

var res *v1alpha1.Traceflow
var res *v1beta1.Traceflow
err = wait.Poll(1*time.Second, option.timeout, func() (bool, error) {
res, err = client.CrdV1alpha1().Traceflows().Get(context.TODO(), tf.Name, metav1.GetOptions{})
res, err = client.CrdV1beta1().Traceflows().Get(context.TODO(), tf.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
if res.Status.Phase != v1alpha1.Succeeded && res.Status.Phase != v1alpha1.Failed {
if res.Status.Phase != v1beta1.Succeeded && res.Status.Phase != v1beta1.Failed {
return false, nil
}
return true, nil
Expand All @@ -210,9 +210,9 @@ func runE(cmd *cobra.Command, _ []string) error {
return err
}

func newTraceflow(client kubernetes.Interface) (*v1alpha1.Traceflow, error) {
func newTraceflow(client kubernetes.Interface) (*v1beta1.Traceflow, error) {
var srcName, dstName string
var src v1alpha1.Source
var src v1beta1.Source

if option.source != "" {
srcIP := net.ParseIP(option.source)
Expand Down Expand Up @@ -240,7 +240,7 @@ func newTraceflow(client kubernetes.Interface) (*v1alpha1.Traceflow, error) {
srcName = "any"
}

var dst v1alpha1.Destination
var dst v1beta1.Destination
if option.destination != "" {
dstIP := net.ParseIP(option.destination)
if dstIP != nil {
Expand Down Expand Up @@ -285,17 +285,17 @@ func newTraceflow(client kubernetes.Interface) (*v1alpha1.Traceflow, error) {
}

name := getTFName(fmt.Sprintf("%s-to-%s", srcName, dstName))
tf := &v1alpha1.Traceflow{
tf := &v1beta1.Traceflow{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: v1alpha1.TraceflowSpec{
Spec: v1beta1.TraceflowSpec{
Source: src,
Destination: dst,
Packet: *pkt,
LiveTraffic: option.liveTraffic,
DroppedOnly: option.droppedOnly,
Timeout: uint16(option.timeout.Seconds()),
Timeout: int32(option.timeout.Seconds()),
},
}
return tf, nil
Expand All @@ -314,18 +314,20 @@ func dstIsPod(client kubernetes.Interface, ns string, name string) (bool, error)
return true, nil
}

func parseFlow() (*v1alpha1.Packet, error) {
func parseFlow() (*v1beta1.Packet, error) {
cleanFlow := strings.ReplaceAll(option.flow, " ", "")
fields, err := getPortFields(cleanFlow)
if err != nil {
return nil, fmt.Errorf("error when parsing the flow: %w", err)
}

var pkt v1alpha1.Packet
var pkt v1beta1.Packet

_, isIPv6 := fields["ipv6"]
if isIPv6 {
pkt.IPv6Header = new(v1alpha1.IPv6Header)
pkt.IPv6Header = new(v1beta1.IPv6Header)
} else {
pkt.IPHeader = new(v1beta1.IPHeader)
}
for k, v := range protocols {
if _, ok := fields[k]; ok {
Expand All @@ -340,28 +342,29 @@ func parseFlow() (*v1alpha1.Packet, error) {
}

if r, ok := fields["tcp_src"]; ok {
pkt.TransportHeader.TCP = new(v1alpha1.TCPHeader)
pkt.TransportHeader.TCP = new(v1beta1.TCPHeader)
pkt.TransportHeader.TCP.SrcPort = int32(r)
}
if r, ok := fields["tcp_dst"]; ok {
if pkt.TransportHeader.TCP == nil {
pkt.TransportHeader.TCP = new(v1alpha1.TCPHeader)
pkt.TransportHeader.TCP = new(v1beta1.TCPHeader)
}
pkt.TransportHeader.TCP.DstPort = int32(r)
}
if r, ok := fields["tcp_flags"]; ok {
if pkt.TransportHeader.TCP == nil {
pkt.TransportHeader.TCP = new(v1alpha1.TCPHeader)
pkt.TransportHeader.TCP = new(v1beta1.TCPHeader)
}
pkt.TransportHeader.TCP.Flags = int32(r)
tcpFlags := int32(r)
pkt.TransportHeader.TCP.Flags = &tcpFlags
}
if r, ok := fields["udp_src"]; ok {
pkt.TransportHeader.UDP = new(v1alpha1.UDPHeader)
pkt.TransportHeader.UDP = new(v1beta1.UDPHeader)
pkt.TransportHeader.UDP.SrcPort = int32(r)
}
if r, ok := fields["udp_dst"]; ok {
if pkt.TransportHeader.UDP == nil {
pkt.TransportHeader.UDP = new(v1alpha1.UDPHeader)
pkt.TransportHeader.UDP = new(v1beta1.UDPHeader)
}
pkt.TransportHeader.UDP.DstPort = int32(r)
}
Expand Down Expand Up @@ -390,7 +393,7 @@ func getPortFields(cleanFlow string) (map[string]int, error) {
return fields, nil
}

func output(tf *v1alpha1.Traceflow, writer io.Writer) error {
func output(tf *v1beta1.Traceflow, writer io.Writer) error {
r := Response{
Name: tf.Name,
Phase: tf.Status.Phase,
Expand All @@ -410,7 +413,7 @@ func output(tf *v1alpha1.Traceflow, writer io.Writer) error {
if pkt != nil {
r.CapturedPacket = &CapturedPacket{SrcIP: pkt.SrcIP, DstIP: pkt.DstIP, Length: pkt.Length, IPv6Header: pkt.IPv6Header}
if pkt.IPv6Header == nil {
r.CapturedPacket.IPHeader = &pkt.IPHeader
r.CapturedPacket.IPHeader = pkt.IPHeader
}
if pkt.TransportHeader.TCP != nil || pkt.TransportHeader.UDP != nil || pkt.TransportHeader.ICMP != nil {
r.CapturedPacket.TransportHeader = &pkt.TransportHeader
Expand Down
Loading

0 comments on commit aa1c6b0

Please sign in to comment.