From eb2de15bcffa0564392bda34cea9eb45b3a240a1 Mon Sep 17 00:00:00 2001 From: Jianjun Shen Date: Thu, 17 Sep 2020 18:14:18 -0700 Subject: [PATCH] Periodically check timeout of running Traceflow requests (#1179) * Periodically check timeout of running Traceflow requests Check if running Traceflow requests are timeout every 1 minute. Change the Traceflow timeout period to 2 minutes. Deallocate DP tag after a Traceflow request succeeds or is timeout. * Unit test for traceflow controller * Look up dataplane tag from cache by Traceflow name in Antrea agent --- .../traceflow/traceflow_controller.go | 35 ++-- pkg/controller/traceflow/controller.go | 41 ++++- pkg/controller/traceflow/controller_test.go | 159 ++++++++++++++++++ test/e2e/traceflow_test.go | 34 +++- 4 files changed, 241 insertions(+), 28 deletions(-) create mode 100644 pkg/controller/traceflow/controller_test.go diff --git a/pkg/agent/controller/traceflow/traceflow_controller.go b/pkg/agent/controller/traceflow/traceflow_controller.go index 1c28b8487b9..9db093306bd 100644 --- a/pkg/agent/controller/traceflow/traceflow_controller.go +++ b/pkg/agent/controller/traceflow/traceflow_controller.go @@ -419,31 +419,34 @@ func (c *Controller) errorTraceflowCRD(tf *opsv1alpha1.Traceflow, reason string) return c.traceflowClient.OpsV1alpha1().Traceflows().Patch(context.TODO(), tf.Name, types.MergePatchType, payloads, metav1.PatchOptions{}, "status") } -// Deallocate tag from cache. Ignore DataplaneTag == 0 which is invalid case. +// Deallocate tag from cache. func (c *Controller) deallocateTag(tf *opsv1alpha1.Traceflow) { - if tf.Status.DataplaneTag == 0 { - return - } - c.injectedTagsMutex.Lock() - if existingTraceflowName, ok := c.injectedTags[tf.Status.DataplaneTag]; ok { + dataplaneTag := uint8(0) + c.runningTraceflowsMutex.Lock() + // Controller could have deallocated the tag and cleared the DataplaneTag + // field in the Traceflow Status, so try looking up the tag from the + // cache by Traceflow name. + for tag, existingTraceflowName := range c.runningTraceflows { if tf.Name == existingTraceflowName { - delete(c.injectedTags, tf.Status.DataplaneTag) - } else { - klog.Warningf("injectedTags cache mismatch tag: %d name: %s existingName: %s", - tf.Status.DataplaneTag, tf.Name, existingTraceflowName) + delete(c.runningTraceflows, tag) + dataplaneTag = tag + break } } - c.injectedTagsMutex.Unlock() - c.runningTraceflowsMutex.Lock() - if existingTraceflowName, ok := c.runningTraceflows[tf.Status.DataplaneTag]; ok { + c.runningTraceflowsMutex.Unlock() + if dataplaneTag == 0 { + return + } + c.injectedTagsMutex.Lock() + if existingTraceflowName, ok := c.injectedTags[dataplaneTag]; ok { if tf.Name == existingTraceflowName { - delete(c.runningTraceflows, tf.Status.DataplaneTag) + delete(c.injectedTags, dataplaneTag) } else { klog.Warningf("runningTraceflows cache mismatch tag: %d name: %s existingName: %s", - tf.Status.DataplaneTag, tf.Name, existingTraceflowName) + dataplaneTag, tf.Name, existingTraceflowName) } } - c.runningTraceflowsMutex.Unlock() + c.injectedTagsMutex.Unlock() } func (c *Controller) isSender(tag uint8) bool { diff --git a/pkg/controller/traceflow/controller.go b/pkg/controller/traceflow/controller.go index 3d49f87fa73..d3a40f22ce9 100644 --- a/pkg/controller/traceflow/controller.go +++ b/pkg/controller/traceflow/controller.go @@ -40,9 +40,11 @@ import ( const ( // Set resyncPeriod to 0 to disable resyncing. resyncPeriod time.Duration = 0 + // How long to wait before retrying the processing of a traceflow. minRetryDelay = 5 * time.Second maxRetryDelay = 300 * time.Second + // Default number of workers processing traceflow request. defaultWorkers = 4 @@ -53,11 +55,15 @@ const ( // PodIP index name for Pod cache. podIPIndex = "podIP" + + // String set to TraceflowStatus.Reason. + traceflowTimeout = "Traceflow timeout" ) var ( // Traceflow timeout period. - timeout = (300 * time.Second).Seconds() + timeoutDuration = 2 * time.Minute + timeoutCheckInterval = timeoutDuration / 2 ) // Controller is for traceflow. @@ -139,6 +145,10 @@ func (c *Controller) Run(stopCh <-chan struct{}) { } } + go func() { + wait.Until(c.checkTraceflowTimeout, timeoutCheckInterval, stopCh) + }() + for i := 0; i < defaultWorkers; i++ { go wait.Until(c.worker, time.Second, stopCh) } @@ -170,6 +180,21 @@ func (c *Controller) worker() { } } +func (c *Controller) checkTraceflowTimeout() { + c.runningTraceflowsMutex.Lock() + tfs := make([]string, 0, len(c.runningTraceflows)) + for _, tfName := range c.runningTraceflows { + tfs = append(tfs, tfName) + } + c.runningTraceflowsMutex.Unlock() + + for _, tfName := range tfs { + // Re-post all running Traceflow requests to the work queue to + // be processed and checked for timeout. + c.queue.Add(tfName) + } +} + // processTraceflowItem processes an item in the "traceflow" work queue, by calling syncTraceflow // after casting the item to a string (Traceflow name). If syncTraceflow returns an error, this // function logs the error and adds the Traceflow request back to the queue with a rate limit. If @@ -226,7 +251,8 @@ func (c *Controller) syncTraceflow(traceflowName string) error { err = c.startTraceflow(tf) case opsv1alpha1.Running: err = c.checkTraceflowStatus(tf) - default: + case opsv1alpha1.Failed: + // Deallocate tag when agent set Traceflow status to Failed. c.deallocateTagForTF(tf) } return err @@ -277,10 +303,13 @@ func (c *Controller) checkTraceflowStatus(tf *opsv1alpha1.Traceflow) error { } } if sender && receiver { + c.deallocateTagForTF(tf) return c.updateTraceflowStatus(tf, opsv1alpha1.Succeeded, "", 0) } - if time.Now().UTC().Sub(tf.CreationTimestamp.UTC()).Seconds() > timeout { - return c.updateTraceflowStatus(tf, opsv1alpha1.Failed, "traceflow timeout", 0) + // CreationTimestamp is of second accuracy. + if time.Now().Unix() > tf.CreationTimestamp.Unix()+int64(timeoutDuration.Seconds()) { + c.deallocateTagForTF(tf) + return c.updateTraceflowStatus(tf, opsv1alpha1.Failed, traceflowTimeout, 0) } return nil } @@ -288,9 +317,7 @@ func (c *Controller) checkTraceflowStatus(tf *opsv1alpha1.Traceflow) error { func (c *Controller) updateTraceflowStatus(tf *opsv1alpha1.Traceflow, phase opsv1alpha1.TraceflowPhase, reason string, dataPlaneTag uint8) error { update := tf.DeepCopy() update.Status.Phase = phase - if phase == opsv1alpha1.Running { - update.Status.DataplaneTag = dataPlaneTag - } + update.Status.DataplaneTag = dataPlaneTag if reason != "" { update.Status.Reason = reason } diff --git a/pkg/controller/traceflow/controller_test.go b/pkg/controller/traceflow/controller_test.go new file mode 100644 index 00000000000..34998bc1c5c --- /dev/null +++ b/pkg/controller/traceflow/controller_test.go @@ -0,0 +1,159 @@ +// Copyright 2020 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package traceflow + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" + + ops "github.com/vmware-tanzu/antrea/pkg/apis/ops/v1alpha1" + "github.com/vmware-tanzu/antrea/pkg/client/clientset/versioned" + fakeversioned "github.com/vmware-tanzu/antrea/pkg/client/clientset/versioned/fake" + crdinformers "github.com/vmware-tanzu/antrea/pkg/client/informers/externalversions" +) + +var alwaysReady = func() bool { return true } + +const informerDefaultResync time.Duration = 30 * time.Second + +type traceflowController struct { + *Controller + client versioned.Interface + informerFactory informers.SharedInformerFactory + crdInformerFactory crdinformers.SharedInformerFactory +} + +func newController() *traceflowController { + client := fake.NewSimpleClientset() + crdClient := newCRDClientset() + informerFactory := informers.NewSharedInformerFactory(client, informerDefaultResync) + crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, informerDefaultResync) + controller := NewTraceflowController(crdClient, + informerFactory.Core().V1().Pods(), + crdInformerFactory.Ops().V1alpha1().Traceflows()) + controller.traceflowListerSynced = alwaysReady + return &traceflowController{ + controller, + crdClient, + informerFactory, + crdInformerFactory, + } +} + +func TestTraceflow(t *testing.T) { + // Use shorter timeout. + timeoutDuration = 2 * time.Second + timeoutCheckInterval = timeoutDuration / 2 + + tfc := newController() + stopCh := make(chan struct{}) + tfc.crdInformerFactory.Start(stopCh) + go tfc.Run(stopCh) + + numRunningTraceflows := func() int { + tfc.runningTraceflowsMutex.Lock() + defer tfc.runningTraceflowsMutex.Unlock() + return len(tfc.runningTraceflows) + } + + tf1 := ops.Traceflow{ + ObjectMeta: metav1.ObjectMeta{Name: "tf1", UID: "uid1"}, + Spec: ops.TraceflowSpec{ + Source: ops.Source{Namespace: "ns1", Pod: "pod1"}, + Destination: ops.Destination{Namespace: "ns2", Pod: "pod2"}, + }, + } + + tfc.client.OpsV1alpha1().Traceflows().Create(context.TODO(), &tf1, metav1.CreateOptions{}) + res, _ := tfc.waitForTraceflow("tf1", ops.Running, time.Second) + assert.NotNil(t, res) + // DataplaneTag should be allocated by Controller. + assert.True(t, res.Status.DataplaneTag > 0) + assert.Equal(t, numRunningTraceflows(), 1) + + // Test Controller handling of successful Traceflow. + res.Status.Results = []ops.NodeResult{ + // Sender + { + Observations: []ops.Observation{{Component: ops.SpoofGuard}}, + }, + // Receiver + { + Observations: []ops.Observation{{Action: ops.Delivered}}, + }, + } + tfc.client.OpsV1alpha1().Traceflows().Update(context.TODO(), res, metav1.UpdateOptions{}) + res, _ = tfc.waitForTraceflow("tf1", ops.Succeeded, time.Second) + assert.NotNil(t, res) + // DataplaneTag should be deallocated by Controller. + assert.True(t, res.Status.DataplaneTag == 0) + assert.Equal(t, numRunningTraceflows(), 0) + tfc.client.OpsV1alpha1().Traceflows().Delete(context.TODO(), "tf1", metav1.DeleteOptions{}) + + // Test Traceflow timeout. + startTime := time.Now() + tfc.client.OpsV1alpha1().Traceflows().Create(context.TODO(), &tf1, metav1.CreateOptions{}) + res, _ = tfc.waitForTraceflow("tf1", ops.Running, time.Second) + assert.NotNil(t, res) + res, _ = tfc.waitForTraceflow("tf1", ops.Failed, timeoutDuration*2) + assert.NotNil(t, res) + assert.True(t, time.Now().Sub(startTime) >= timeoutDuration) + assert.Equal(t, res.Status.Reason, traceflowTimeout) + assert.True(t, res.Status.DataplaneTag == 0) + assert.Equal(t, numRunningTraceflows(), 0) + + close(stopCh) +} + +func (tfc *traceflowController) waitForTraceflow(name string, phase ops.TraceflowPhase, timeout time.Duration) (*ops.Traceflow, error) { + var tf *ops.Traceflow + var err error + if err = wait.Poll(100*time.Millisecond, timeout, func() (bool, error) { + tf, err = tfc.client.OpsV1alpha1().Traceflows().Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil || tf.Status.Phase != phase { + return false, nil + } + return true, nil + }); err != nil { + return nil, err + } + return tf, nil +} + +func newCRDClientset() *fakeversioned.Clientset { + client := fakeversioned.NewSimpleClientset() + + client.PrependReactor("create", "traceflows", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) { + tf := action.(k8stesting.CreateAction).GetObject().(*ops.Traceflow) + + // Fake client does not set CreationTimestamp. + if tf.ObjectMeta.CreationTimestamp == (metav1.Time{}) { + tf.ObjectMeta.CreationTimestamp.Time = time.Now() + } + + return false, tf, nil + })) + + return client +} diff --git a/test/e2e/traceflow_test.go b/test/e2e/traceflow_test.go index b2e25be184d..bf4130f682c 100644 --- a/test/e2e/traceflow_test.go +++ b/test/e2e/traceflow_test.go @@ -98,12 +98,13 @@ func TestTraceflow(t *testing.T) { t.Fatal(err) } - // Creates 4 traceflows: + // Creates 6 traceflows: // 1. node1Pods[0] -> node1Pods[1], intra node1. // 2. node1Pods[0] -> node2Pods[0], inter node1 and node2. // 3. node1Pods[0] -> node1IPs[1], intra node1. // 4. node1Pods[0] -> node2IPs[0], inter node1 and node2. // 5. node1Pods[0] -> service, inter node1 and node2. + // 6. node1Pods[0] -> non-existing Pod testcases := []testcase{ { name: "intraNodeTraceflow", @@ -402,6 +403,25 @@ func TestTraceflow(t *testing.T) { }, }, }, + { + name: "nonExistingDstPod", + tf: &v1alpha1.Traceflow{ + ObjectMeta: metav1.ObjectMeta{ + Name: randName(fmt.Sprintf("%s-%s-to-%s-%s-", testNamespace, node1Pods[0], testNamespace, "non-existing-pod")), + }, + Spec: v1alpha1.TraceflowSpec{ + Source: v1alpha1.Source{ + Namespace: testNamespace, + Pod: node1Pods[0], + }, + Destination: v1alpha1.Destination{ + Namespace: testNamespace, + Pod: "non-existing-pod", + }, + }, + }, + expectedPhase: v1alpha1.Failed, + }, } t.Run("traceflowGroupTest", func(t *testing.T) { @@ -417,7 +437,7 @@ func TestTraceflow(t *testing.T) { } }() - tf, err := data.waitForTraceflow(tc.tf.Name, tc.expectedPhase) + tf, err := data.waitForTraceflow(t, tc.tf.Name, tc.expectedPhase) if err != nil { t.Fatalf("Error: Get Traceflow failed: %v", err) return @@ -431,7 +451,7 @@ func TestTraceflow(t *testing.T) { t.Fatal(err) return } - } else { + } else if len(tc.expectedResults) > 0 { if tf.Status.Results[0].Observations[0].Component == v1alpha1.SpoofGuard { if err = compareObservations(tc.expectedResults[0], tf.Status.Results[0]); err != nil { t.Fatal(err) @@ -457,16 +477,20 @@ func TestTraceflow(t *testing.T) { }) } -func (data *TestData) waitForTraceflow(name string, phase v1alpha1.TraceflowPhase) (*v1alpha1.Traceflow, error) { +func (data *TestData) waitForTraceflow(t *testing.T, name string, phase v1alpha1.TraceflowPhase) (*v1alpha1.Traceflow, error) { var tf *v1alpha1.Traceflow var err error - if err = wait.PollImmediate(1*time.Second, 15*time.Second, func() (bool, error) { + timeout := 15 * time.Second + if err = wait.PollImmediate(1*time.Second, timeout, func() (bool, error) { tf, err = data.crdClient.OpsV1alpha1().Traceflows().Get(context.TODO(), name, metav1.GetOptions{}) if err != nil || tf.Status.Phase != phase { return false, nil } return true, nil }); err != nil { + if tf != nil { + t.Errorf("Latest Traceflow status: %v", tf.Status) + } return nil, err } return tf, nil