Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Egress IP scheduling #4627

Merged
merged 1 commit into from
Feb 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ func run(o *Options) error {
if egressEnabled {
egressController, err = egress.NewEgressController(
ofClient, antreaClientProvider, crdClient, ifaceStore, routeClient, nodeConfig.Name, nodeConfig.NodeTransportInterfaceName,
memberlistCluster, egressInformer, podUpdateChannel, o.config.Egress.MaxEgressIPsPerNode,
memberlistCluster, egressInformer, nodeInformer, podUpdateChannel, o.config.Egress.MaxEgressIPsPerNode,
)
if err != nil {
return fmt.Errorf("error creating new Egress controller: %v", err)
Expand Down
136 changes: 55 additions & 81 deletions pkg/agent/controller/egress/egress_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -66,9 +67,7 @@ const (
// maxEgressMark is the maximum mark of Egress IPs can be configured on a Node.
maxEgressMark = 255

egressIPIndex = "egressIP"
externalIPPoolIndex = "externalIPPool"
egressNodeIndex = "egressNode"
egressIPIndex = "egressIP"

// egressDummyDevice is the dummy device that holds the Egress IPs configured to the system by antrea-agent.
egressDummyDevice = "antrea-egress0"
Expand Down Expand Up @@ -147,7 +146,7 @@ type EgressController struct {
cluster memberlist.Interface
ipAssigner ipassigner.IPAssigner

maxEgressIPsPerNode int
egressIPScheduler *egressIPScheduler
}

func NewEgressController(
Expand All @@ -160,6 +159,7 @@ func NewEgressController(
nodeTransportInterface string,
cluster memberlist.Interface,
egressInformer crdinformers.EgressInformer,
nodeInformers coreinformers.NodeInformer,
podUpdateSubscriber channel.Subscriber,
maxEgressIPsPerNode int,
) (*EgressController, error) {
Expand All @@ -181,14 +181,15 @@ func NewEgressController(
localIPDetector: ipassigner.NewLocalIPDetector(),
idAllocator: newIDAllocator(minEgressMark, maxEgressMark),
cluster: cluster,
maxEgressIPsPerNode: maxEgressIPsPerNode,
}
ipAssigner, err := newIPAssigner(nodeTransportInterface, egressDummyDevice)
if err != nil {
return nil, fmt.Errorf("initializing egressIP assigner failed: %v", err)
}
c.ipAssigner = ipAssigner

c.egressIPScheduler = NewEgressIPScheduler(cluster, egressInformer, nodeInformers, maxEgressIPsPerNode)

c.egressInformer.AddIndexers(
cache.Indexers{
// egressIPIndex will be used to get all Egresses sharing the same Egress IP.
Expand All @@ -199,28 +200,6 @@ func NewEgressController(
}
return []string{egress.Spec.EgressIP}, nil
},
// externalIPPoolIndex will be used to get all Egresses associated with a given ExternalIPPool.
externalIPPoolIndex: func(obj interface{}) (strings []string, e error) {
egress, ok := obj.(*crdv1a2.Egress)
if !ok {
return nil, fmt.Errorf("obj is not Egress: %+v", obj)
}
if egress.Spec.ExternalIPPool == "" {
return nil, nil
}
return []string{egress.Spec.ExternalIPPool}, nil
},
// egressNodeIndex will be used to get all Egresses assigned to a given Node.
egressNodeIndex: func(obj interface{}) ([]string, error) {
egress, ok := obj.(*crdv1a2.Egress)
if !ok {
return nil, fmt.Errorf("obj is not Egress: %+v", obj)
}
if egress.Status.EgressNode == "" {
return nil, nil
}
return []string{egress.Status.EgressNode}, nil
},
})
c.egressInformer.AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
Expand All @@ -234,10 +213,15 @@ func NewEgressController(
// reported to kube-apiserver and processed by antrea-controller.
podUpdateSubscriber.Subscribe(c.processPodUpdate)
c.localIPDetector.AddEventHandler(c.onLocalIPUpdate)
c.cluster.AddClusterEventHandler(c.enqueueEgressesByExternalIPPool)
c.egressIPScheduler.AddEventHandler(c.onEgressIPSchedule)
return c, nil
}

// onEgressIPSchedule will be called when EgressIPScheduler reschedules an Egress's IP.
func (c *EgressController) onEgressIPSchedule(egress string) {
c.queue.Add(egress)
}

// processPodUpdate will be called when CNIServer publishes a Pod update event.
// It triggers reconciling the effective Egress of the Pod.
func (c *EgressController) processPodUpdate(e interface{}) {
Expand All @@ -263,11 +247,13 @@ func (c *EgressController) addEgress(obj interface{}) {
}

// updateEgress processes Egress UPDATE events.
func (c *EgressController) updateEgress(_, cur interface{}) {
func (c *EgressController) updateEgress(old, cur interface{}) {
oldEgress := old.(*crdv1a2.Egress)
curEgress := cur.(*crdv1a2.Egress)
// We need to sync the Egress once even if its spec doesn't change as a Node's EgressIP capacity may be exceeded
// when multiple Egresses were processed in parallel and were assigned to the same Node.
// Re-sync after status change could correct the assignment eventually.
// Ignore handling the Egress Status change if Egress IP already has been assigned on current node.
if curEgress.Status.EgressNode == c.nodeName && oldEgress.GetGeneration() == curEgress.GetGeneration() {
return
}
c.queue.Add(curEgress.Name)
klog.V(2).InfoS("Processed Egress UPDATE event", "egress", klog.KObj(curEgress))
}
Expand Down Expand Up @@ -307,18 +293,6 @@ func (c *EgressController) onLocalIPUpdate(ip string, added bool) {
}
}

// enqueueEgressesByExternalIPPool enqueues all Egresses that refer to the provided ExternalIPPool,
// the ExternalIPPool is affected by a Node update/create/delete event or
// Node leaves/join cluster event or ExternalIPPool changed.
func (c *EgressController) enqueueEgressesByExternalIPPool(eipName string) {
objects, _ := c.egressInformer.GetIndexer().ByIndex(externalIPPoolIndex, eipName)
for _, object := range objects {
egress := object.(*crdv1a2.Egress)
c.queue.Add(egress.Name)
}
klog.InfoS("Detected ExternalIPPool event", "ExternalIPPool", eipName, "enqueueEgressNum", len(objects))
}

// Run will create defaultWorkers workers (go routines) which will process the Egress events from the
// workqueue.
func (c *EgressController) Run(stopCh <-chan struct{}) {
Expand All @@ -328,9 +302,9 @@ func (c *EgressController) Run(stopCh <-chan struct{}) {
defer klog.Infof("Shutting down %s", controllerName)

go c.localIPDetector.Run(stopCh)

go c.egressIPScheduler.Run(stopCh)
go c.ipAssigner.Run(stopCh)
if !cache.WaitForNamedCacheSync(controllerName, stopCh, c.egressListerSynced, c.localIPDetector.HasSynced) {
if !cache.WaitForNamedCacheSync(controllerName, stopCh, c.egressListerSynced, c.localIPDetector.HasSynced, c.egressIPScheduler.HasScheduled) {
return
}

Expand All @@ -353,8 +327,11 @@ func (c *EgressController) replaceEgressIPs() error {
desiredLocalEgressIPs := sets.NewString()
egresses, _ := c.egressLister.List(labels.Everything())
for _, egress := range egresses {
if egress.Spec.EgressIP != "" && egress.Spec.ExternalIPPool != "" && egress.Status.EgressNode == c.nodeName {
if isEgressSchedulable(egress) && egress.Status.EgressNode == c.nodeName {
desiredLocalEgressIPs.Insert(egress.Spec.EgressIP)
// Record the Egress's state as we assign their IPs to this Node in the following call. It makes sure these
// Egress IPs will be unassigned when the Egresses are deleted.
c.newEgressState(egress.Name, egress.Spec.EgressIP)
}
}
if err := c.ipAssigner.InitIPs(desiredLocalEgressIPs); err != nil {
Expand Down Expand Up @@ -632,61 +609,53 @@ func (c *EgressController) syncEgress(egressName string) error {
return err
}

var desiredEgressIP string
var desiredNode string
// Only check whether the Egress IP should be assigned to this Node when the Egress is schedulable.
// Otherwise, users are responsible for assigning the Egress IP to Nodes.
if isEgressSchedulable(egress) {
egressIP, egressNode, scheduled := c.egressIPScheduler.GetEgressIPAndNode(egressName)
if scheduled {
desiredEgressIP = egressIP
desiredNode = egressNode
}
} else {
desiredEgressIP = egress.Spec.EgressIP
}

eState, exist := c.getEgressState(egressName)
// If the EgressIP changes, uninstalls this Egress first.
if exist && eState.egressIP != egress.Spec.EgressIP {
if exist && eState.egressIP != desiredEgressIP {
if err := c.uninstallEgress(egressName, eState); err != nil {
return err
}
exist = false
}
// Do not proceed if EgressIP is empty.
if egress.Spec.EgressIP == "" {
if desiredEgressIP == "" {
if err := c.updateEgressStatus(egress, false); err != nil {
return fmt.Errorf("update Egress %s status error: %v", egressName, err)
}
return nil
}
if !exist {
eState = c.newEgressState(egressName, egress.Spec.EgressIP)
}

localNodeSelected := false
// Only check whether the Egress IP should be assigned to this Node when ExternalIPPool is set.
// If ExternalIPPool is empty, users are responsible for assigning the Egress IPs to Nodes.
if egress.Spec.ExternalIPPool != "" {
maxEgressIPsFilter := func(node string) bool {
// Assuming this Egress IP is assigned to this Node.
egressIPsOnNode := sets.NewString(egress.Spec.EgressIP)
// Add the Egress IPs that are already assigned to this Node.
egressesOnNode, _ := c.egressInformer.GetIndexer().ByIndex(egressNodeIndex, node)
for _, obj := range egressesOnNode {
egressOnNode := obj.(*crdv1a2.Egress)
// We don't count manually managed Egress IPs.
if egressOnNode.Spec.ExternalIPPool == "" {
continue
}
egressIPsOnNode.Insert(egressOnNode.Spec.EgressIP)
}
// Check if this Node can accommodate all Egress IPs.
return egressIPsOnNode.Len() <= c.maxEgressIPsPerNode
}
localNodeSelected, err = c.cluster.ShouldSelectIP(egress.Spec.EgressIP, egress.Spec.ExternalIPPool, maxEgressIPsFilter)
if err != nil {
return err
}
eState = c.newEgressState(egressName, desiredEgressIP)
}
if localNodeSelected {

if desiredNode == c.nodeName {
// Ensure the Egress IP is assigned to the system.
if err := c.ipAssigner.AssignIP(egress.Spec.EgressIP); err != nil {
if err := c.ipAssigner.AssignIP(desiredEgressIP); err != nil {
return err
}
} else {
// Unassign the Egress IP from the local Node if it was assigned by the agent.
if err := c.ipAssigner.UnassignIP(egress.Spec.EgressIP); err != nil {
if err := c.ipAssigner.UnassignIP(desiredEgressIP); err != nil {
return err
}
}

// Realize the latest EgressIP and get the desired mark.
mark, err := c.realizeEgressIP(egressName, egress.Spec.EgressIP)
mark, err := c.realizeEgressIP(egressName, desiredEgressIP)
if err != nil {
return err
}
Expand All @@ -701,7 +670,7 @@ func (c *EgressController) syncEgress(egressName string) error {
eState.mark = mark
}

if err := c.updateEgressStatus(egress, c.localIPDetector.IsLocalIP(egress.Spec.EgressIP)); err != nil {
if err := c.updateEgressStatus(egress, c.localIPDetector.IsLocalIP(desiredEgressIP)); err != nil {
return fmt.Errorf("update Egress %s status error: %v", egressName, err)
}

Expand Down Expand Up @@ -970,3 +939,8 @@ func (c *EgressController) GetEgress(ns, podName string) (string, error) {
}
return binding.effectiveEgress, nil
}

// An Egress is schedulable if its Egress IP is allocated from ExternalIPPool.
func isEgressSchedulable(egress *crdv1a2.Egress) bool {
return egress.Spec.EgressIP != "" && egress.Spec.ExternalIPPool != ""
}
25 changes: 21 additions & 4 deletions pkg/agent/controller/egress/egress_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"
"k8s.io/client-go/util/workqueue"

Expand Down Expand Up @@ -131,6 +133,7 @@ type fakeController struct {
mockRouteClient *routetest.MockInterface
crdClient *fakeversioned.Clientset
crdInformerFactory crdinformers.SharedInformerFactory
informerFactory informers.SharedInformerFactory
mockIPAssigner *ipassignertest.MockIPAssigner
podUpdateChannel *channel.SubscribableChannel
}
Expand All @@ -148,6 +151,9 @@ func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeControll
crdClient := fakeversioned.NewSimpleClientset(initObjects...)
crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, 0)
egressInformer := crdInformerFactory.Crd().V1alpha2().Egresses()
k8sClient := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(k8sClient, 0)
nodeInformer := informerFactory.Core().V1().Nodes()
localIPDetector := &fakeLocalIPDetector{localIPs: sets.NewString(fakeLocalEgressIP1, fakeLocalEgressIP2)}

ifaceStore := interfacestore.NewInterfaceStore()
Expand All @@ -167,6 +173,7 @@ func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeControll
"eth0",
mockCluster,
egressInformer,
nodeInformer,
podUpdateChannel,
255,
)
Expand All @@ -178,6 +185,7 @@ func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeControll
mockRouteClient: mockRouteClient,
crdClient: crdClient,
crdInformerFactory: crdInformerFactory,
informerFactory: informerFactory,
mockIPAssigner: mockIPAssigner,
podUpdateChannel: podUpdateChannel,
}
Expand Down Expand Up @@ -639,16 +647,19 @@ func TestSyncEgress(t *testing.T) {
defer c.mockController.Finish()

if tt.maxEgressIPsPerNode > 0 {
c.maxEgressIPsPerNode = tt.maxEgressIPsPerNode
c.egressIPScheduler.maxEgressIPsPerNode = tt.maxEgressIPsPerNode
}

stopCh := make(chan struct{})
defer close(stopCh)
c.crdInformerFactory.Start(stopCh)
c.informerFactory.Start(stopCh)
c.crdInformerFactory.WaitForCacheSync(stopCh)
c.informerFactory.WaitForCacheSync(stopCh)
c.addEgressGroup(tt.existingEgressGroup)

tt.expectedCalls(c.mockOFClient, c.mockRouteClient, c.mockIPAssigner)
c.egressIPScheduler.schedule()
err := c.syncEgress(tt.existingEgress.Name)
assert.NoError(t, err)

Expand All @@ -666,6 +677,7 @@ func TestSyncEgress(t *testing.T) {
egress, _ := c.egressLister.Get(tt.newEgress.Name)
return reflect.DeepEqual(egress, tt.newEgress), nil
}))
c.egressIPScheduler.schedule()
err = c.syncEgress(tt.newEgress.Name)
assert.NoError(t, err)
// Call it one more time to ensure it's idempotent, no extra datapath calls are supposed to be made.
Expand Down Expand Up @@ -698,7 +710,9 @@ func TestPodUpdateShouldSyncEgress(t *testing.T) {
defer close(stopCh)
go c.podUpdateChannel.Run(stopCh)
c.crdInformerFactory.Start(stopCh)
c.informerFactory.Start(stopCh)
c.crdInformerFactory.WaitForCacheSync(stopCh)
c.informerFactory.WaitForCacheSync(stopCh)

c.mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1))
c.mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1))
Expand Down Expand Up @@ -770,7 +784,9 @@ func TestSyncOverlappingEgress(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
c.crdInformerFactory.Start(stopCh)
c.informerFactory.Start(stopCh)
c.crdInformerFactory.WaitForCacheSync(stopCh)
c.informerFactory.WaitForCacheSync(stopCh)
c.addEgressGroup(egressGroup1)
c.addEgressGroup(egressGroup2)
c.addEgressGroup(egressGroup3)
Expand All @@ -796,9 +812,6 @@ func TestSyncOverlappingEgress(t *testing.T) {
err = c.syncEgress(egress3.Name)
assert.NoError(t, err)

// egress1 and egress3 are expected to be triggered for resync because their status is updated.
checkQueueItemExistence(t, c.queue, egress1.Name, egress3.Name)

// After deleting egress1, pod1 and pod2 no longer enforces egress1. The Egress IP shouldn't be released as egress3
// is still referring to it.
// egress2 and egress3 are expected to be triggered for resync.
Expand Down Expand Up @@ -976,7 +989,9 @@ func TestGetEgress(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
c.crdInformerFactory.Start(stopCh)
c.informerFactory.Start(stopCh)
c.crdInformerFactory.WaitForCacheSync(stopCh)
c.informerFactory.WaitForCacheSync(stopCh)
c.addEgressGroup(egressGroup)
c.mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1))
c.mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1))
Expand Down Expand Up @@ -1037,7 +1052,9 @@ func TestGetEgressIPByMark(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
c.crdInformerFactory.Start(stopCh)
c.informerFactory.Start(stopCh)
c.crdInformerFactory.WaitForCacheSync(stopCh)
c.informerFactory.WaitForCacheSync(stopCh)
c.mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1))
c.mockRouteClient.EXPECT().AddSNATRule(net.ParseIP(fakeLocalEgressIP1), uint32(1))
c.mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP1)
Expand Down
Loading