diff --git a/pkg/controller/traceflow/controller.go b/pkg/controller/traceflow/controller.go index 25563e67f3e..d3a40f22ce9 100644 --- a/pkg/controller/traceflow/controller.go +++ b/pkg/controller/traceflow/controller.go @@ -48,10 +48,6 @@ const ( // Default number of workers processing traceflow request. defaultWorkers = 4 - // Traceflow timeout period. - timeoutDuration = 2 * time.Minute - timeoutCheckInterval = timeoutDuration / 2 - // Min and max data plane tag for traceflow. dataplaneTag=0 means it's not a Traceflow packet. // dataplaneTag=15 is reserved. minTagNum uint8 = 1 @@ -59,6 +55,15 @@ const ( // PodIP index name for Pod cache. podIPIndex = "podIP" + + // String set to TraceflowStatus.Reason. + traceflowTimeout = "Traceflow timeout" +) + +var ( + // Traceflow timeout period. + timeoutDuration = 2 * time.Minute + timeoutCheckInterval = timeoutDuration / 2 ) // Controller is for traceflow. @@ -246,6 +251,9 @@ func (c *Controller) syncTraceflow(traceflowName string) error { err = c.startTraceflow(tf) case opsv1alpha1.Running: err = c.checkTraceflowStatus(tf) + case opsv1alpha1.Failed: + // Deallocate tag when agent set Traceflow status to Failed. + c.deallocateTagForTF(tf) } return err } @@ -298,9 +306,10 @@ func (c *Controller) checkTraceflowStatus(tf *opsv1alpha1.Traceflow) error { c.deallocateTagForTF(tf) return c.updateTraceflowStatus(tf, opsv1alpha1.Succeeded, "", 0) } - if time.Now().UTC().Sub(tf.CreationTimestamp.UTC()) > timeoutDuration { + // CreationTimestamp is of second accuracy. + if time.Now().Unix() > tf.CreationTimestamp.Unix()+int64(timeoutDuration.Seconds()) { c.deallocateTagForTF(tf) - return c.updateTraceflowStatus(tf, opsv1alpha1.Failed, "Traceflow timeout", 0) + return c.updateTraceflowStatus(tf, opsv1alpha1.Failed, traceflowTimeout, 0) } return nil } diff --git a/pkg/controller/traceflow/controller_test.go b/pkg/controller/traceflow/controller_test.go new file mode 100644 index 00000000000..6af1336dc16 --- /dev/null +++ b/pkg/controller/traceflow/controller_test.go @@ -0,0 +1,159 @@ +// Copyright 2020 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package traceflow + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" + + ops "github.com/vmware-tanzu/antrea/pkg/apis/ops/v1alpha1" + "github.com/vmware-tanzu/antrea/pkg/client/clientset/versioned" + fakeversioned "github.com/vmware-tanzu/antrea/pkg/client/clientset/versioned/fake" + crdinformers "github.com/vmware-tanzu/antrea/pkg/client/informers/externalversions" +) + +var alwaysReady = func() bool { return true } + +const informerDefaultResync time.Duration = 30 * time.Second + +type traceflowController struct { + *Controller + client versioned.Interface + informerFactory informers.SharedInformerFactory + crdInformerFactory crdinformers.SharedInformerFactory +} + +func newController() *traceflowController { + client := fake.NewSimpleClientset() + crdClient := newCRDClientset() + informerFactory := informers.NewSharedInformerFactory(client, informerDefaultResync) + crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, informerDefaultResync) + controller := NewTraceflowController(crdClient, + informerFactory.Core().V1().Pods(), + crdInformerFactory.Ops().V1alpha1().Traceflows()) + controller.traceflowListerSynced = alwaysReady + return &traceflowController{ + controller, + crdClient, + informerFactory, + crdInformerFactory, + } +} + +func TestTraceflow(t *testing.T) { + // Use shorter timeout. + timeoutDuration = 2 * time.Second + timeoutCheckInterval = timeoutDuration / 2 + + tfc := newController() + stopCh := make(chan struct{}) + tfc.crdInformerFactory.Start(stopCh) + go tfc.Run(stopCh) + + numRunningTraceflows := func() int { + tfc.runningTraceflowsMutex.Lock() + defer tfc.runningTraceflowsMutex.Unlock() + return len(tfc.runningTraceflows) + } + + tf1 := ops.Traceflow{ + ObjectMeta: metav1.ObjectMeta{Name: "tf1", UID: "uid1"}, + Spec: ops.TraceflowSpec{ + Source: ops.Source{Namespace: "ns1", Pod: "pod1"}, + Destination: ops.Destination{Namespace: "ns2", Pod: "pod2"}, + }, + } + + tfc.client.OpsV1alpha1().Traceflows().Create(context.TODO(), &tf1, metav1.CreateOptions{}) + res, _ := tfc.waitForTraceflow("tf1", ops.Running, time.Second) + assert.NotNil(t, res) + // DataplaneTag should be allocated by Controller. + assert.True(t, res.Status.DataplaneTag > 0) + assert.Equal(t, numRunningTraceflows(), 1) + + // Test Controller handling of successful Traceflow. + res.Status.Results = []ops.NodeResult{ + // Sender + { + Observations: []ops.Observation{{Component: ops.SpoofGuard}}, + }, + // Receiver + { + Observations: []ops.Observation{{Action: ops.Delivered}}, + }, + } + tfc.client.OpsV1alpha1().Traceflows().Update(context.TODO(), res, metav1.UpdateOptions{}) + res, _ = tfc.waitForTraceflow("tf1", ops.Succeeded, time.Second) + assert.NotNil(t, res) + // DataplaneTag should be deallocated by Controller. + assert.True(t, res.Status.DataplaneTag == 0) + assert.Equal(t, numRunningTraceflows(), 0) + tfc.client.OpsV1alpha1().Traceflows().Delete(context.TODO(), "tf1", metav1.DeleteOptions{}) + + // Test Traceflow timeout. + startTime := time.Now() + tfc.client.OpsV1alpha1().Traceflows().Create(context.TODO(), &tf1, metav1.CreateOptions{}) + res, _ = tfc.waitForTraceflow("tf1", ops.Running, time.Second) + assert.NotNil(t, res) + res, _ = tfc.waitForTraceflow("tf1", ops.Failed, timeoutDuration+timeoutCheckInterval) + assert.NotNil(t, res) + assert.True(t, time.Now().Sub(startTime) >= timeoutDuration) + assert.Equal(t, res.Status.Reason, traceflowTimeout) + assert.True(t, res.Status.DataplaneTag == 0) + assert.Equal(t, numRunningTraceflows(), 0) + + close(stopCh) +} + +func (tfc *traceflowController) waitForTraceflow(name string, phase ops.TraceflowPhase, timeout time.Duration) (*ops.Traceflow, error) { + var tf *ops.Traceflow + var err error + if err = wait.Poll(100*time.Millisecond, timeout, func() (bool, error) { + tf, err = tfc.client.OpsV1alpha1().Traceflows().Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil || tf.Status.Phase != phase { + return false, nil + } + return true, nil + }); err != nil { + return nil, err + } + return tf, nil +} + +func newCRDClientset() *fakeversioned.Clientset { + client := fakeversioned.NewSimpleClientset() + + client.PrependReactor("create", "traceflows", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) { + tf := action.(k8stesting.CreateAction).GetObject().(*ops.Traceflow) + + // Fake client does not set CreationTimestamp. + if tf.ObjectMeta.CreationTimestamp == (metav1.Time{}) { + tf.ObjectMeta.CreationTimestamp.Time = time.Now() + } + + return false, tf, nil + })) + + return client +} diff --git a/test/e2e/traceflow_test.go b/test/e2e/traceflow_test.go index c588072758e..d7bb9fc080f 100644 --- a/test/e2e/traceflow_test.go +++ b/test/e2e/traceflow_test.go @@ -104,7 +104,7 @@ func TestTraceflow(t *testing.T) { // 3. node1Pods[0] -> node1IPs[1], intra node1. // 4. node1Pods[0] -> node2IPs[0], inter node1 and node2. // 5. node1Pods[0] -> service, inter node1 and node2. - // 6. node1Pods[0] -> not existed Pod + // 6. node1Pods[0] -> non-existing Pod testcases := []testcase{ { name: "intraNodeTraceflow", @@ -404,10 +404,10 @@ func TestTraceflow(t *testing.T) { }, }, { - name: "notExistedDstPod", + name: "nonExistingDstPod", tf: &v1alpha1.Traceflow{ ObjectMeta: metav1.ObjectMeta{ - Name: randName(fmt.Sprintf("%s-%s-to-%s-%s-", testNamespace, node1Pods[0], testNamespace, "not-existed-pod")), + Name: randName(fmt.Sprintf("%s-%s-to-%s-%s-", testNamespace, node1Pods[0], testNamespace, "non-existing-pod")), }, Spec: v1alpha1.TraceflowSpec{ Source: v1alpha1.Source{ @@ -416,11 +416,10 @@ func TestTraceflow(t *testing.T) { }, Destination: v1alpha1.Destination{ Namespace: testNamespace, - Pod: "not-existed-pod", + Pod: "non-existing-pod", }, }, }, - // Traceflow should fail with timeout. expectedPhase: v1alpha1.Failed, }, } @@ -482,10 +481,6 @@ func (data *TestData) waitForTraceflow(name string, phase v1alpha1.TraceflowPhas var tf *v1alpha1.Traceflow var err error timeout := 15 * time.Second - if phase == v1alpha1.Failed { - // 2 minutes timeout + 1 minute check interval - timeout = 3 * time.Minute - } if err = wait.PollImmediate(1*time.Second, timeout, func() (bool, error) { tf, err = data.crdClient.OpsV1alpha1().Traceflows().Get(context.TODO(), name, metav1.GetOptions{}) if err != nil || tf.Status.Phase != phase {