Skip to content

Commit

Permalink
Use Traceflow API v1beta1
Browse files Browse the repository at this point in the history
Update traceflow api to v1beta1 in antctl traceflow and traceflow e2e tests.

Fixes antrea-io#5656

Signed-off-by: Kumar Atish <atish.iaf@gmail.com>
  • Loading branch information
Atish-iaf committed Nov 16, 2023
1 parent ce28710 commit a854f96
Show file tree
Hide file tree
Showing 5 changed files with 730 additions and 727 deletions.
75 changes: 39 additions & 36 deletions pkg/antctl/raw/traceflow/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/antctl/raw"
"antrea.io/antrea/pkg/apis/crd/v1alpha1"
"antrea.io/antrea/pkg/apis/crd/v1beta1"
antrea "antrea.io/antrea/pkg/client/clientset/versioned"
)

Expand Down Expand Up @@ -64,23 +64,23 @@ var protocols = map[string]int32{
}

type CapturedPacket struct {
SrcIP string `json:"srcIP" yaml:"srcIP"`
DstIP string `json:"dstIP" yaml:"dstIP"`
Length uint16 `json:"length" yaml:"length"`
IPHeader *v1alpha1.IPHeader `json:"ipHeader,omitempty" yaml:"ipHeader,omitempty"`
IPv6Header *v1alpha1.IPv6Header `json:"ipv6Header,omitempty" yaml:"ipv6Header,omitempty"`
TransportHeader *v1alpha1.TransportHeader `json:"transportHeader,omitempty" yaml:"tranportHeader,omitempty"`
SrcIP string `json:"srcIP" yaml:"srcIP"`
DstIP string `json:"dstIP" yaml:"dstIP"`
Length int32 `json:"length" yaml:"length"`
IPHeader *v1beta1.IPHeader `json:"ipHeader,omitempty" yaml:"ipHeader,omitempty"`
IPv6Header *v1beta1.IPv6Header `json:"ipv6Header,omitempty" yaml:"ipv6Header,omitempty"`
TransportHeader *v1beta1.TransportHeader `json:"transportHeader,omitempty" yaml:"tranportHeader,omitempty"`
}

// Response is the response of antctl Traceflow.
type Response struct {
Name string `json:"name" yaml:"name"` // Traceflow name
Phase v1alpha1.TraceflowPhase `json:"phase,omitempty" yaml:"phase,omitempty"` // Traceflow phase
Reason string `json:"reason,omitempty" yaml:"reason,omitempty"` // Traceflow phase reason
Source string `json:"source,omitempty" yaml:"source,omitempty"` // Traceflow source, e.g. "default/pod0"
Destination string `json:"destination,omitempty" yaml:"destination,omitempty"` // Traceflow destination, e.g. "default/pod1"
NodeResults []v1alpha1.NodeResult `json:"results,omitempty" yaml:"results,omitempty"` // Traceflow node results
CapturedPacket *CapturedPacket `json:"capturedPacket,omitempty" yaml:"capturedPacket,omitempty"` // Captured packet in live-traffic Traceflow
Name string `json:"name" yaml:"name"` // Traceflow name
Phase v1beta1.TraceflowPhase `json:"phase,omitempty" yaml:"phase,omitempty"` // Traceflow phase
Reason string `json:"reason,omitempty" yaml:"reason,omitempty"` // Traceflow phase reason
Source string `json:"source,omitempty" yaml:"source,omitempty"` // Traceflow source, e.g. "default/pod0"
Destination string `json:"destination,omitempty" yaml:"destination,omitempty"` // Traceflow destination, e.g. "default/pod1"
NodeResults []v1beta1.NodeResult `json:"results,omitempty" yaml:"results,omitempty"` // Traceflow node results
CapturedPacket *CapturedPacket `json:"capturedPacket,omitempty" yaml:"capturedPacket,omitempty"` // Captured packet in live-traffic Traceflow
}

func init() {
Expand Down Expand Up @@ -168,12 +168,12 @@ func runE(cmd *cobra.Command, _ []string) error {

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if _, err = client.CrdV1alpha1().Traceflows().Create(ctx, tf, metav1.CreateOptions{}); err != nil {
if _, err = client.CrdV1beta1().Traceflows().Create(ctx, tf, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("error when creating Traceflow, is Traceflow feature gate enabled? %w", err)
}
defer func() {
if !option.nowait {
if err = client.CrdV1alpha1().Traceflows().Delete(context.TODO(), tf.Name, metav1.DeleteOptions{}); err != nil {
if err = client.CrdV1beta1().Traceflows().Delete(context.TODO(), tf.Name, metav1.DeleteOptions{}); err != nil {
klog.Errorf("error when deleting Traceflow: %+v", err)
}
}
Expand All @@ -183,13 +183,13 @@ func runE(cmd *cobra.Command, _ []string) error {
return nil
}

var res *v1alpha1.Traceflow
var res *v1beta1.Traceflow
err = wait.Poll(1*time.Second, option.timeout, func() (bool, error) {
res, err = client.CrdV1alpha1().Traceflows().Get(context.TODO(), tf.Name, metav1.GetOptions{})
res, err = client.CrdV1beta1().Traceflows().Get(context.TODO(), tf.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
if res.Status.Phase != v1alpha1.Succeeded && res.Status.Phase != v1alpha1.Failed {
if res.Status.Phase != v1beta1.Succeeded && res.Status.Phase != v1beta1.Failed {
return false, nil
}
return true, nil
Expand All @@ -210,9 +210,9 @@ func runE(cmd *cobra.Command, _ []string) error {
return err
}

func newTraceflow(client kubernetes.Interface) (*v1alpha1.Traceflow, error) {
func newTraceflow(client kubernetes.Interface) (*v1beta1.Traceflow, error) {
var srcName, dstName string
var src v1alpha1.Source
var src v1beta1.Source

if option.source != "" {
srcIP := net.ParseIP(option.source)
Expand Down Expand Up @@ -240,7 +240,7 @@ func newTraceflow(client kubernetes.Interface) (*v1alpha1.Traceflow, error) {
srcName = "any"
}

var dst v1alpha1.Destination
var dst v1beta1.Destination
if option.destination != "" {
dstIP := net.ParseIP(option.destination)
if dstIP != nil {
Expand Down Expand Up @@ -285,17 +285,17 @@ func newTraceflow(client kubernetes.Interface) (*v1alpha1.Traceflow, error) {
}

name := getTFName(fmt.Sprintf("%s-to-%s", srcName, dstName))
tf := &v1alpha1.Traceflow{
tf := &v1beta1.Traceflow{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: v1alpha1.TraceflowSpec{
Spec: v1beta1.TraceflowSpec{
Source: src,
Destination: dst,
Packet: *pkt,
LiveTraffic: option.liveTraffic,
DroppedOnly: option.droppedOnly,
Timeout: uint16(option.timeout.Seconds()),
Timeout: int32(option.timeout.Seconds()),
},
}
return tf, nil
Expand All @@ -314,18 +314,20 @@ func dstIsPod(client kubernetes.Interface, ns string, name string) (bool, error)
return true, nil
}

func parseFlow() (*v1alpha1.Packet, error) {
func parseFlow() (*v1beta1.Packet, error) {
cleanFlow := strings.ReplaceAll(option.flow, " ", "")
fields, err := getPortFields(cleanFlow)
if err != nil {
return nil, fmt.Errorf("error when parsing the flow: %w", err)
}

var pkt v1alpha1.Packet
var pkt v1beta1.Packet

_, isIPv6 := fields["ipv6"]
if isIPv6 {
pkt.IPv6Header = new(v1alpha1.IPv6Header)
pkt.IPv6Header = new(v1beta1.IPv6Header)
} else {
pkt.IPHeader = new(v1beta1.IPHeader)
}
for k, v := range protocols {
if _, ok := fields[k]; ok {
Expand All @@ -340,28 +342,29 @@ func parseFlow() (*v1alpha1.Packet, error) {
}

if r, ok := fields["tcp_src"]; ok {
pkt.TransportHeader.TCP = new(v1alpha1.TCPHeader)
pkt.TransportHeader.TCP = new(v1beta1.TCPHeader)
pkt.TransportHeader.TCP.SrcPort = int32(r)
}
if r, ok := fields["tcp_dst"]; ok {
if pkt.TransportHeader.TCP == nil {
pkt.TransportHeader.TCP = new(v1alpha1.TCPHeader)
pkt.TransportHeader.TCP = new(v1beta1.TCPHeader)
}
pkt.TransportHeader.TCP.DstPort = int32(r)
}
if r, ok := fields["tcp_flags"]; ok {
if pkt.TransportHeader.TCP == nil {
pkt.TransportHeader.TCP = new(v1alpha1.TCPHeader)
pkt.TransportHeader.TCP = new(v1beta1.TCPHeader)
}
pkt.TransportHeader.TCP.Flags = int32(r)
tcpFlags := int32(r)
pkt.TransportHeader.TCP.Flags = &tcpFlags
}
if r, ok := fields["udp_src"]; ok {
pkt.TransportHeader.UDP = new(v1alpha1.UDPHeader)
pkt.TransportHeader.UDP = new(v1beta1.UDPHeader)
pkt.TransportHeader.UDP.SrcPort = int32(r)
}
if r, ok := fields["udp_dst"]; ok {
if pkt.TransportHeader.UDP == nil {
pkt.TransportHeader.UDP = new(v1alpha1.UDPHeader)
pkt.TransportHeader.UDP = new(v1beta1.UDPHeader)
}
pkt.TransportHeader.UDP.DstPort = int32(r)
}
Expand Down Expand Up @@ -390,7 +393,7 @@ func getPortFields(cleanFlow string) (map[string]int, error) {
return fields, nil
}

func output(tf *v1alpha1.Traceflow, writer io.Writer) error {
func output(tf *v1beta1.Traceflow, writer io.Writer) error {
r := Response{
Name: tf.Name,
Phase: tf.Status.Phase,
Expand All @@ -410,7 +413,7 @@ func output(tf *v1alpha1.Traceflow, writer io.Writer) error {
if pkt != nil {
r.CapturedPacket = &CapturedPacket{SrcIP: pkt.SrcIP, DstIP: pkt.DstIP, Length: pkt.Length, IPv6Header: pkt.IPv6Header}
if pkt.IPv6Header == nil {
r.CapturedPacket.IPHeader = &pkt.IPHeader
r.CapturedPacket.IPHeader = pkt.IPHeader
}
if pkt.TransportHeader.TCP != nil || pkt.TransportHeader.UDP != nil || pkt.TransportHeader.ICMP != nil {
r.CapturedPacket.TransportHeader = &pkt.TransportHeader
Expand Down
Loading

0 comments on commit a854f96

Please sign in to comment.