Skip to content

Commit

Permalink
Unit test for traceflow controller
Browse files Browse the repository at this point in the history
  • Loading branch information
jianjuns committed Sep 16, 2020
1 parent ac8f96e commit b01487b
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 15 deletions.
21 changes: 15 additions & 6 deletions pkg/controller/traceflow/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,22 @@ const (
// Default number of workers processing traceflow request.
defaultWorkers = 4

// Traceflow timeout period.
timeoutDuration = 2 * time.Minute
timeoutCheckInterval = timeoutDuration / 2

// Min and max data plane tag for traceflow. dataplaneTag=0 means it's not a Traceflow packet.
// dataplaneTag=15 is reserved.
minTagNum uint8 = 1
maxTagNum uint8 = 14

// PodIP index name for Pod cache.
podIPIndex = "podIP"

// String set to TraceflowStatus.Reason.
traceflowTimeout = "Traceflow timeout"
)

var (
// Traceflow timeout period.
timeoutDuration = 2 * time.Minute
timeoutCheckInterval = timeoutDuration / 2
)

// Controller is for traceflow.
Expand Down Expand Up @@ -246,6 +251,9 @@ func (c *Controller) syncTraceflow(traceflowName string) error {
err = c.startTraceflow(tf)
case opsv1alpha1.Running:
err = c.checkTraceflowStatus(tf)
case opsv1alpha1.Failed:
// Deallocate tag when agent set Traceflow status to Failed.
c.deallocateTagForTF(tf)
}
return err
}
Expand Down Expand Up @@ -298,9 +306,10 @@ func (c *Controller) checkTraceflowStatus(tf *opsv1alpha1.Traceflow) error {
c.deallocateTagForTF(tf)
return c.updateTraceflowStatus(tf, opsv1alpha1.Succeeded, "", 0)
}
if time.Now().UTC().Sub(tf.CreationTimestamp.UTC()) > timeoutDuration {
// CreationTimestamp is of second accuracy.
if time.Now().Unix() > tf.CreationTimestamp.Unix()+int64(timeoutDuration.Seconds()) {
c.deallocateTagForTF(tf)
return c.updateTraceflowStatus(tf, opsv1alpha1.Failed, "Traceflow timeout", 0)
return c.updateTraceflowStatus(tf, opsv1alpha1.Failed, traceflowTimeout, 0)
}
return nil
}
Expand Down
159 changes: 159 additions & 0 deletions pkg/controller/traceflow/controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright 2020 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package traceflow

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"

ops "github.com/vmware-tanzu/antrea/pkg/apis/ops/v1alpha1"
"github.com/vmware-tanzu/antrea/pkg/client/clientset/versioned"
fakeversioned "github.com/vmware-tanzu/antrea/pkg/client/clientset/versioned/fake"
crdinformers "github.com/vmware-tanzu/antrea/pkg/client/informers/externalversions"
)

var alwaysReady = func() bool { return true }

const informerDefaultResync time.Duration = 30 * time.Second

type traceflowController struct {
*Controller
client versioned.Interface
informerFactory informers.SharedInformerFactory
crdInformerFactory crdinformers.SharedInformerFactory
}

func newController() *traceflowController {
client := fake.NewSimpleClientset()
crdClient := newCRDClientset()
informerFactory := informers.NewSharedInformerFactory(client, informerDefaultResync)
crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, informerDefaultResync)
controller := NewTraceflowController(crdClient,
informerFactory.Core().V1().Pods(),
crdInformerFactory.Ops().V1alpha1().Traceflows())
controller.traceflowListerSynced = alwaysReady
return &traceflowController{
controller,
crdClient,
informerFactory,
crdInformerFactory,
}
}

func TestTraceflow(t *testing.T) {
// Use shorter timeout.
timeoutDuration = 2 * time.Second
timeoutCheckInterval = timeoutDuration / 2

tfc := newController()
stopCh := make(chan struct{})
tfc.crdInformerFactory.Start(stopCh)
go tfc.Run(stopCh)

numRunningTraceflows := func() int {
tfc.runningTraceflowsMutex.Lock()
defer tfc.runningTraceflowsMutex.Unlock()
return len(tfc.runningTraceflows)
}

tf1 := ops.Traceflow{
ObjectMeta: metav1.ObjectMeta{Name: "tf1", UID: "uid1"},
Spec: ops.TraceflowSpec{
Source: ops.Source{Namespace: "ns1", Pod: "pod1"},
Destination: ops.Destination{Namespace: "ns2", Pod: "pod2"},
},
}

tfc.client.OpsV1alpha1().Traceflows().Create(context.TODO(), &tf1, metav1.CreateOptions{})
res, _ := tfc.waitForTraceflow("tf1", ops.Running, time.Second)
assert.NotNil(t, res)
// DataplaneTag should be allocated by Controller.
assert.True(t, res.Status.DataplaneTag > 0)
assert.Equal(t, numRunningTraceflows(), 1)

// Test Controller handling of successful Traceflow.
res.Status.Results = []ops.NodeResult{
// Sender
{
Observations: []ops.Observation{{Component: ops.SpoofGuard}},
},
// Receiver
{
Observations: []ops.Observation{{Action: ops.Delivered}},
},
}
tfc.client.OpsV1alpha1().Traceflows().Update(context.TODO(), res, metav1.UpdateOptions{})
res, _ = tfc.waitForTraceflow("tf1", ops.Succeeded, time.Second)
assert.NotNil(t, res)
// DataplaneTag should be deallocated by Controller.
assert.True(t, res.Status.DataplaneTag == 0)
assert.Equal(t, numRunningTraceflows(), 0)
tfc.client.OpsV1alpha1().Traceflows().Delete(context.TODO(), "tf1", metav1.DeleteOptions{})

// Test Traceflow timeout.
startTime := time.Now()
tfc.client.OpsV1alpha1().Traceflows().Create(context.TODO(), &tf1, metav1.CreateOptions{})
res, _ = tfc.waitForTraceflow("tf1", ops.Running, time.Second)
assert.NotNil(t, res)
res, _ = tfc.waitForTraceflow("tf1", ops.Failed, timeoutDuration+timeoutCheckInterval)
assert.NotNil(t, res)
assert.True(t, time.Now().Sub(startTime) >= timeoutDuration)
assert.Equal(t, res.Status.Reason, traceflowTimeout)
assert.True(t, res.Status.DataplaneTag == 0)
assert.Equal(t, numRunningTraceflows(), 0)

close(stopCh)
}

func (tfc *traceflowController) waitForTraceflow(name string, phase ops.TraceflowPhase, timeout time.Duration) (*ops.Traceflow, error) {
var tf *ops.Traceflow
var err error
if err = wait.Poll(100*time.Millisecond, timeout, func() (bool, error) {
tf, err = tfc.client.OpsV1alpha1().Traceflows().Get(context.TODO(), name, metav1.GetOptions{})
if err != nil || tf.Status.Phase != phase {
return false, nil
}
return true, nil
}); err != nil {
return nil, err
}
return tf, nil
}

func newCRDClientset() *fakeversioned.Clientset {
client := fakeversioned.NewSimpleClientset()

client.PrependReactor("create", "traceflows", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) {
tf := action.(k8stesting.CreateAction).GetObject().(*ops.Traceflow)

// Fake client does not set CreationTimestamp.
if tf.ObjectMeta.CreationTimestamp == (metav1.Time{}) {
tf.ObjectMeta.CreationTimestamp.Time = time.Now()
}

return false, tf, nil
}))

return client
}
13 changes: 4 additions & 9 deletions test/e2e/traceflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestTraceflow(t *testing.T) {
// 3. node1Pods[0] -> node1IPs[1], intra node1.
// 4. node1Pods[0] -> node2IPs[0], inter node1 and node2.
// 5. node1Pods[0] -> service, inter node1 and node2.
// 6. node1Pods[0] -> not existed Pod
// 6. node1Pods[0] -> non-existing Pod
testcases := []testcase{
{
name: "intraNodeTraceflow",
Expand Down Expand Up @@ -404,10 +404,10 @@ func TestTraceflow(t *testing.T) {
},
},
{
name: "notExistedDstPod",
name: "nonExistingDstPod",
tf: &v1alpha1.Traceflow{
ObjectMeta: metav1.ObjectMeta{
Name: randName(fmt.Sprintf("%s-%s-to-%s-%s-", testNamespace, node1Pods[0], testNamespace, "not-existed-pod")),
Name: randName(fmt.Sprintf("%s-%s-to-%s-%s-", testNamespace, node1Pods[0], testNamespace, "non-existing-pod")),
},
Spec: v1alpha1.TraceflowSpec{
Source: v1alpha1.Source{
Expand All @@ -416,11 +416,10 @@ func TestTraceflow(t *testing.T) {
},
Destination: v1alpha1.Destination{
Namespace: testNamespace,
Pod: "not-existed-pod",
Pod: "non-existing-pod",
},
},
},
// Traceflow should fail with timeout.
expectedPhase: v1alpha1.Failed,
},
}
Expand Down Expand Up @@ -482,10 +481,6 @@ func (data *TestData) waitForTraceflow(name string, phase v1alpha1.TraceflowPhas
var tf *v1alpha1.Traceflow
var err error
timeout := 15 * time.Second
if phase == v1alpha1.Failed {
// 2 minutes timeout + 1 minute check interval
timeout = 3 * time.Minute
}
if err = wait.PollImmediate(1*time.Second, timeout, func() (bool, error) {
tf, err = data.crdClient.OpsV1alpha1().Traceflows().Get(context.TODO(), name, metav1.GetOptions{})
if err != nil || tf.Status.Phase != phase {
Expand Down

0 comments on commit b01487b

Please sign in to comment.