Skip to content

Commit

Permalink
Periodically check timeout of running Traceflow requests (#1179)
Browse files Browse the repository at this point in the history
* Periodically check timeout of running Traceflow requests

Check if running Traceflow requests are timeout every 1 minute.
Change the Traceflow timeout period to 2 minutes.
Deallocate DP tag after a Traceflow request succeeds or is timeout.

* Unit test for traceflow controller

* Look up dataplane tag from cache by Traceflow name in Antrea agent
  • Loading branch information
jianjuns committed Sep 18, 2020
1 parent fd47097 commit eb2de15
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 28 deletions.
35 changes: 19 additions & 16 deletions pkg/agent/controller/traceflow/traceflow_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,31 +419,34 @@ func (c *Controller) errorTraceflowCRD(tf *opsv1alpha1.Traceflow, reason string)
return c.traceflowClient.OpsV1alpha1().Traceflows().Patch(context.TODO(), tf.Name, types.MergePatchType, payloads, metav1.PatchOptions{}, "status")
}

// Deallocate tag from cache. Ignore DataplaneTag == 0 which is invalid case.
// Deallocate tag from cache.
func (c *Controller) deallocateTag(tf *opsv1alpha1.Traceflow) {
if tf.Status.DataplaneTag == 0 {
return
}
c.injectedTagsMutex.Lock()
if existingTraceflowName, ok := c.injectedTags[tf.Status.DataplaneTag]; ok {
dataplaneTag := uint8(0)
c.runningTraceflowsMutex.Lock()
// Controller could have deallocated the tag and cleared the DataplaneTag
// field in the Traceflow Status, so try looking up the tag from the
// cache by Traceflow name.
for tag, existingTraceflowName := range c.runningTraceflows {
if tf.Name == existingTraceflowName {
delete(c.injectedTags, tf.Status.DataplaneTag)
} else {
klog.Warningf("injectedTags cache mismatch tag: %d name: %s existingName: %s",
tf.Status.DataplaneTag, tf.Name, existingTraceflowName)
delete(c.runningTraceflows, tag)
dataplaneTag = tag
break
}
}
c.injectedTagsMutex.Unlock()
c.runningTraceflowsMutex.Lock()
if existingTraceflowName, ok := c.runningTraceflows[tf.Status.DataplaneTag]; ok {
c.runningTraceflowsMutex.Unlock()
if dataplaneTag == 0 {
return
}
c.injectedTagsMutex.Lock()
if existingTraceflowName, ok := c.injectedTags[dataplaneTag]; ok {
if tf.Name == existingTraceflowName {
delete(c.runningTraceflows, tf.Status.DataplaneTag)
delete(c.injectedTags, dataplaneTag)
} else {
klog.Warningf("runningTraceflows cache mismatch tag: %d name: %s existingName: %s",
tf.Status.DataplaneTag, tf.Name, existingTraceflowName)
dataplaneTag, tf.Name, existingTraceflowName)
}
}
c.runningTraceflowsMutex.Unlock()
c.injectedTagsMutex.Unlock()
}

func (c *Controller) isSender(tag uint8) bool {
Expand Down
41 changes: 34 additions & 7 deletions pkg/controller/traceflow/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ import (
const (
// Set resyncPeriod to 0 to disable resyncing.
resyncPeriod time.Duration = 0

// How long to wait before retrying the processing of a traceflow.
minRetryDelay = 5 * time.Second
maxRetryDelay = 300 * time.Second

// Default number of workers processing traceflow request.
defaultWorkers = 4

Expand All @@ -53,11 +55,15 @@ const (

// PodIP index name for Pod cache.
podIPIndex = "podIP"

// String set to TraceflowStatus.Reason.
traceflowTimeout = "Traceflow timeout"
)

var (
// Traceflow timeout period.
timeout = (300 * time.Second).Seconds()
timeoutDuration = 2 * time.Minute
timeoutCheckInterval = timeoutDuration / 2
)

// Controller is for traceflow.
Expand Down Expand Up @@ -139,6 +145,10 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
}
}

go func() {
wait.Until(c.checkTraceflowTimeout, timeoutCheckInterval, stopCh)
}()

for i := 0; i < defaultWorkers; i++ {
go wait.Until(c.worker, time.Second, stopCh)
}
Expand Down Expand Up @@ -170,6 +180,21 @@ func (c *Controller) worker() {
}
}

func (c *Controller) checkTraceflowTimeout() {
c.runningTraceflowsMutex.Lock()
tfs := make([]string, 0, len(c.runningTraceflows))
for _, tfName := range c.runningTraceflows {
tfs = append(tfs, tfName)
}
c.runningTraceflowsMutex.Unlock()

for _, tfName := range tfs {
// Re-post all running Traceflow requests to the work queue to
// be processed and checked for timeout.
c.queue.Add(tfName)
}
}

// processTraceflowItem processes an item in the "traceflow" work queue, by calling syncTraceflow
// after casting the item to a string (Traceflow name). If syncTraceflow returns an error, this
// function logs the error and adds the Traceflow request back to the queue with a rate limit. If
Expand Down Expand Up @@ -226,7 +251,8 @@ func (c *Controller) syncTraceflow(traceflowName string) error {
err = c.startTraceflow(tf)
case opsv1alpha1.Running:
err = c.checkTraceflowStatus(tf)
default:
case opsv1alpha1.Failed:
// Deallocate tag when agent set Traceflow status to Failed.
c.deallocateTagForTF(tf)
}
return err
Expand Down Expand Up @@ -277,20 +303,21 @@ func (c *Controller) checkTraceflowStatus(tf *opsv1alpha1.Traceflow) error {
}
}
if sender && receiver {
c.deallocateTagForTF(tf)
return c.updateTraceflowStatus(tf, opsv1alpha1.Succeeded, "", 0)
}
if time.Now().UTC().Sub(tf.CreationTimestamp.UTC()).Seconds() > timeout {
return c.updateTraceflowStatus(tf, opsv1alpha1.Failed, "traceflow timeout", 0)
// CreationTimestamp is of second accuracy.
if time.Now().Unix() > tf.CreationTimestamp.Unix()+int64(timeoutDuration.Seconds()) {
c.deallocateTagForTF(tf)
return c.updateTraceflowStatus(tf, opsv1alpha1.Failed, traceflowTimeout, 0)
}
return nil
}

func (c *Controller) updateTraceflowStatus(tf *opsv1alpha1.Traceflow, phase opsv1alpha1.TraceflowPhase, reason string, dataPlaneTag uint8) error {
update := tf.DeepCopy()
update.Status.Phase = phase
if phase == opsv1alpha1.Running {
update.Status.DataplaneTag = dataPlaneTag
}
update.Status.DataplaneTag = dataPlaneTag
if reason != "" {
update.Status.Reason = reason
}
Expand Down
159 changes: 159 additions & 0 deletions pkg/controller/traceflow/controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright 2020 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package traceflow

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"

ops "github.com/vmware-tanzu/antrea/pkg/apis/ops/v1alpha1"
"github.com/vmware-tanzu/antrea/pkg/client/clientset/versioned"
fakeversioned "github.com/vmware-tanzu/antrea/pkg/client/clientset/versioned/fake"
crdinformers "github.com/vmware-tanzu/antrea/pkg/client/informers/externalversions"
)

var alwaysReady = func() bool { return true }

const informerDefaultResync time.Duration = 30 * time.Second

type traceflowController struct {
*Controller
client versioned.Interface
informerFactory informers.SharedInformerFactory
crdInformerFactory crdinformers.SharedInformerFactory
}

func newController() *traceflowController {
client := fake.NewSimpleClientset()
crdClient := newCRDClientset()
informerFactory := informers.NewSharedInformerFactory(client, informerDefaultResync)
crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, informerDefaultResync)
controller := NewTraceflowController(crdClient,
informerFactory.Core().V1().Pods(),
crdInformerFactory.Ops().V1alpha1().Traceflows())
controller.traceflowListerSynced = alwaysReady
return &traceflowController{
controller,
crdClient,
informerFactory,
crdInformerFactory,
}
}

func TestTraceflow(t *testing.T) {
// Use shorter timeout.
timeoutDuration = 2 * time.Second
timeoutCheckInterval = timeoutDuration / 2

tfc := newController()
stopCh := make(chan struct{})
tfc.crdInformerFactory.Start(stopCh)
go tfc.Run(stopCh)

numRunningTraceflows := func() int {
tfc.runningTraceflowsMutex.Lock()
defer tfc.runningTraceflowsMutex.Unlock()
return len(tfc.runningTraceflows)
}

tf1 := ops.Traceflow{
ObjectMeta: metav1.ObjectMeta{Name: "tf1", UID: "uid1"},
Spec: ops.TraceflowSpec{
Source: ops.Source{Namespace: "ns1", Pod: "pod1"},
Destination: ops.Destination{Namespace: "ns2", Pod: "pod2"},
},
}

tfc.client.OpsV1alpha1().Traceflows().Create(context.TODO(), &tf1, metav1.CreateOptions{})
res, _ := tfc.waitForTraceflow("tf1", ops.Running, time.Second)
assert.NotNil(t, res)
// DataplaneTag should be allocated by Controller.
assert.True(t, res.Status.DataplaneTag > 0)
assert.Equal(t, numRunningTraceflows(), 1)

// Test Controller handling of successful Traceflow.
res.Status.Results = []ops.NodeResult{
// Sender
{
Observations: []ops.Observation{{Component: ops.SpoofGuard}},
},
// Receiver
{
Observations: []ops.Observation{{Action: ops.Delivered}},
},
}
tfc.client.OpsV1alpha1().Traceflows().Update(context.TODO(), res, metav1.UpdateOptions{})
res, _ = tfc.waitForTraceflow("tf1", ops.Succeeded, time.Second)
assert.NotNil(t, res)
// DataplaneTag should be deallocated by Controller.
assert.True(t, res.Status.DataplaneTag == 0)
assert.Equal(t, numRunningTraceflows(), 0)
tfc.client.OpsV1alpha1().Traceflows().Delete(context.TODO(), "tf1", metav1.DeleteOptions{})

// Test Traceflow timeout.
startTime := time.Now()
tfc.client.OpsV1alpha1().Traceflows().Create(context.TODO(), &tf1, metav1.CreateOptions{})
res, _ = tfc.waitForTraceflow("tf1", ops.Running, time.Second)
assert.NotNil(t, res)
res, _ = tfc.waitForTraceflow("tf1", ops.Failed, timeoutDuration*2)
assert.NotNil(t, res)
assert.True(t, time.Now().Sub(startTime) >= timeoutDuration)
assert.Equal(t, res.Status.Reason, traceflowTimeout)
assert.True(t, res.Status.DataplaneTag == 0)
assert.Equal(t, numRunningTraceflows(), 0)

close(stopCh)
}

func (tfc *traceflowController) waitForTraceflow(name string, phase ops.TraceflowPhase, timeout time.Duration) (*ops.Traceflow, error) {
var tf *ops.Traceflow
var err error
if err = wait.Poll(100*time.Millisecond, timeout, func() (bool, error) {
tf, err = tfc.client.OpsV1alpha1().Traceflows().Get(context.TODO(), name, metav1.GetOptions{})
if err != nil || tf.Status.Phase != phase {
return false, nil
}
return true, nil
}); err != nil {
return nil, err
}
return tf, nil
}

func newCRDClientset() *fakeversioned.Clientset {
client := fakeversioned.NewSimpleClientset()

client.PrependReactor("create", "traceflows", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) {
tf := action.(k8stesting.CreateAction).GetObject().(*ops.Traceflow)

// Fake client does not set CreationTimestamp.
if tf.ObjectMeta.CreationTimestamp == (metav1.Time{}) {
tf.ObjectMeta.CreationTimestamp.Time = time.Now()
}

return false, tf, nil
}))

return client
}
34 changes: 29 additions & 5 deletions test/e2e/traceflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,13 @@ func TestTraceflow(t *testing.T) {
t.Fatal(err)
}

// Creates 4 traceflows:
// Creates 6 traceflows:
// 1. node1Pods[0] -> node1Pods[1], intra node1.
// 2. node1Pods[0] -> node2Pods[0], inter node1 and node2.
// 3. node1Pods[0] -> node1IPs[1], intra node1.
// 4. node1Pods[0] -> node2IPs[0], inter node1 and node2.
// 5. node1Pods[0] -> service, inter node1 and node2.
// 6. node1Pods[0] -> non-existing Pod
testcases := []testcase{
{
name: "intraNodeTraceflow",
Expand Down Expand Up @@ -402,6 +403,25 @@ func TestTraceflow(t *testing.T) {
},
},
},
{
name: "nonExistingDstPod",
tf: &v1alpha1.Traceflow{
ObjectMeta: metav1.ObjectMeta{
Name: randName(fmt.Sprintf("%s-%s-to-%s-%s-", testNamespace, node1Pods[0], testNamespace, "non-existing-pod")),
},
Spec: v1alpha1.TraceflowSpec{
Source: v1alpha1.Source{
Namespace: testNamespace,
Pod: node1Pods[0],
},
Destination: v1alpha1.Destination{
Namespace: testNamespace,
Pod: "non-existing-pod",
},
},
},
expectedPhase: v1alpha1.Failed,
},
}

t.Run("traceflowGroupTest", func(t *testing.T) {
Expand All @@ -417,7 +437,7 @@ func TestTraceflow(t *testing.T) {
}
}()

tf, err := data.waitForTraceflow(tc.tf.Name, tc.expectedPhase)
tf, err := data.waitForTraceflow(t, tc.tf.Name, tc.expectedPhase)
if err != nil {
t.Fatalf("Error: Get Traceflow failed: %v", err)
return
Expand All @@ -431,7 +451,7 @@ func TestTraceflow(t *testing.T) {
t.Fatal(err)
return
}
} else {
} else if len(tc.expectedResults) > 0 {
if tf.Status.Results[0].Observations[0].Component == v1alpha1.SpoofGuard {
if err = compareObservations(tc.expectedResults[0], tf.Status.Results[0]); err != nil {
t.Fatal(err)
Expand All @@ -457,16 +477,20 @@ func TestTraceflow(t *testing.T) {
})
}

func (data *TestData) waitForTraceflow(name string, phase v1alpha1.TraceflowPhase) (*v1alpha1.Traceflow, error) {
func (data *TestData) waitForTraceflow(t *testing.T, name string, phase v1alpha1.TraceflowPhase) (*v1alpha1.Traceflow, error) {
var tf *v1alpha1.Traceflow
var err error
if err = wait.PollImmediate(1*time.Second, 15*time.Second, func() (bool, error) {
timeout := 15 * time.Second
if err = wait.PollImmediate(1*time.Second, timeout, func() (bool, error) {
tf, err = data.crdClient.OpsV1alpha1().Traceflows().Get(context.TODO(), name, metav1.GetOptions{})
if err != nil || tf.Status.Phase != phase {
return false, nil
}
return true, nil
}); err != nil {
if tf != nil {
t.Errorf("Latest Traceflow status: %v", tf.Status)
}
return nil, err
}
return tf, nil
Expand Down

0 comments on commit eb2de15

Please sign in to comment.