Skip to content

Commit

Permalink
Support per-Node max-egress-ips annotation
Browse files Browse the repository at this point in the history
A global max-egress-ips may not work for the case that the cluster
consists different instance types of Nodes. This patch adds support for
per-Node max-egress-ips annotation, with which Nodes can be configured
with different capacity via their annotations. It also makes dynamically
adjusting a Node's capacity at runtime and configuring Node capacity
post-deployment possible.

Signed-off-by: Quan Tian <qtian@vmware.com>
  • Loading branch information
tnqn committed Feb 23, 2023
1 parent 04ff40d commit 7337199
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 9 deletions.
2 changes: 1 addition & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ func run(o *Options) error {
if egressEnabled {
egressController, err = egress.NewEgressController(
ofClient, antreaClientProvider, crdClient, ifaceStore, routeClient, nodeConfig.Name, nodeConfig.NodeTransportInterfaceName,
memberlistCluster, egressInformer, podUpdateChannel, o.config.Egress.MaxEgressIPsPerNode,
memberlistCluster, egressInformer, nodeInformer, podUpdateChannel, o.config.Egress.MaxEgressIPsPerNode,
)
if err != nil {
return fmt.Errorf("error creating new Egress controller: %v", err)
Expand Down
4 changes: 3 additions & 1 deletion pkg/agent/controller/egress/egress_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -158,6 +159,7 @@ func NewEgressController(
nodeTransportInterface string,
cluster memberlist.Interface,
egressInformer crdinformers.EgressInformer,
nodeInformers coreinformers.NodeInformer,
podUpdateSubscriber channel.Subscriber,
maxEgressIPsPerNode int,
) (*EgressController, error) {
Expand Down Expand Up @@ -186,7 +188,7 @@ func NewEgressController(
}
c.ipAssigner = ipAssigner

c.egressIPScheduler = NewEgressIPScheduler(cluster, egressInformer, maxEgressIPsPerNode)
c.egressIPScheduler = NewEgressIPScheduler(cluster, egressInformer, nodeInformers, maxEgressIPsPerNode)

c.egressInformer.AddIndexers(
cache.Indexers{
Expand Down
6 changes: 6 additions & 0 deletions pkg/agent/controller/egress/egress_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"
"k8s.io/client-go/util/workqueue"

Expand Down Expand Up @@ -148,6 +150,9 @@ func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeControll
crdClient := fakeversioned.NewSimpleClientset(initObjects...)
crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, 0)
egressInformer := crdInformerFactory.Crd().V1alpha2().Egresses()
k8sClient := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(k8sClient, 0)
nodeInformer := informerFactory.Core().V1().Nodes()
localIPDetector := &fakeLocalIPDetector{localIPs: sets.NewString(fakeLocalEgressIP1, fakeLocalEgressIP2)}

ifaceStore := interfacestore.NewInterfaceStore()
Expand All @@ -167,6 +172,7 @@ func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeControll
"eth0",
mockCluster,
egressInformer,
nodeInformer,
podUpdateChannel,
255,
)
Expand Down
120 changes: 117 additions & 3 deletions pkg/agent/controller/egress/ip_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@ package egress

import (
"sort"
"strconv"
"sync"
"sync/atomic"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/memberlist"
"antrea.io/antrea/pkg/agent/types"
crdv1a2 "antrea.io/antrea/pkg/apis/crd/v1alpha2"
crdinformers "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1alpha2"
crdlisters "antrea.io/antrea/pkg/client/listers/crd/v1alpha2"
Expand Down Expand Up @@ -67,18 +71,23 @@ type egressIPScheduler struct {
// eventHandlers is the registered callbacks.
eventHandlers []scheduleEventHandler

// The maximum number of Egress IPs a Node can accommodate.
// The global maximum number of Egress IPs a Node can accommodate.
maxEgressIPsPerNode int
// nodeToMaxEgressIPs caches the maximum number of Egress IPs of each Node gotten from Node annotation.
// It takes precedence over the global value.
nodeToMaxEgressIPs map[string]int
nodeToMaxEgressIPsMutex sync.RWMutex
}

func NewEgressIPScheduler(cluster memberlist.Interface, egressInformer crdinformers.EgressInformer, maxEgressIPsPerNode int) *egressIPScheduler {
func NewEgressIPScheduler(cluster memberlist.Interface, egressInformer crdinformers.EgressInformer, nodeInformer corev1informers.NodeInformer, maxEgressIPsPerNode int) *egressIPScheduler {
s := &egressIPScheduler{
cluster: cluster,
egressLister: egressInformer.Lister(),
egressListerSynced: egressInformer.Informer().HasSynced,
scheduleResults: map[string]*scheduleResult{},
scheduledOnce: &atomic.Bool{},
maxEgressIPsPerNode: maxEgressIPsPerNode,
nodeToMaxEgressIPs: map[string]int{},
queue: workqueue.New(),
}
egressInformer.Informer().AddEventHandlerWithResyncPeriod(
Expand All @@ -89,6 +98,16 @@ func NewEgressIPScheduler(cluster memberlist.Interface, egressInformer crdinform
},
resyncPeriod,
)
nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: s.updateNode,
UpdateFunc: func(_, newObj interface{}) {
s.updateNode(newObj)
},
DeleteFunc: s.deleteNode,
},
resyncPeriod,
)

s.cluster.AddClusterEventHandler(func(poolName string) {
// Trigger scheduling regardless of which pool is changed.
Expand All @@ -97,6 +116,60 @@ func NewEgressIPScheduler(cluster memberlist.Interface, egressInformer crdinform
return s
}

func getMaxEgressIPsFromAnnotation(node *corev1.Node) (int, bool, error) {
maxEgressIPsStr, exists := node.Annotations[types.NodeMaxEgressIPsAnnotationKey]
if !exists {
return 0, false, nil
}
maxEgressIPs, err := strconv.Atoi(maxEgressIPsStr)
if err != nil {
return 0, false, err
}
return maxEgressIPs, true, nil
}

// updateNode processes Node ADD and UPDATE events.
func (s *egressIPScheduler) updateNode(obj interface{}) {
node := obj.(*corev1.Node)
maxEgressIPs, found, err := getMaxEgressIPsFromAnnotation(node)
if err != nil {
klog.ErrorS(err, "The Node's max-egress-ips annotation was invalid", "node", node.Name)
if s.deleteMaxEgressIPsByNode(node.Name) {
s.queue.Add(workItem)
}
return
}
if !found {
if s.deleteMaxEgressIPsByNode(node.Name) {
s.queue.Add(workItem)
}
return
}
if s.updateMaxEgressIPsByNode(node.Name, maxEgressIPs) {
s.queue.Add(workItem)
}
}

// deleteNode processes Node DELETE events.
func (s *egressIPScheduler) deleteNode(obj interface{}) {
node, ok := obj.(*corev1.Node)
if !ok {
deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
klog.Errorf("Received unexpected object: %v", obj)
return
}
node, ok = deletedState.Obj.(*corev1.Node)
if !ok {
klog.Errorf("DeletedFinalStateUnknown contains non-Node object: %v", deletedState.Obj)
return
}
}
if s.deleteMaxEgressIPsByNode(node.Name) {
s.queue.Add(workItem)
}
}

// addEgress processes Egress ADD events.
func (s *egressIPScheduler) addEgress(obj interface{}) {
egress := obj.(*crdv1a2.Egress)
Expand Down Expand Up @@ -200,6 +273,47 @@ func (o EgressesByCreationTimestamp) Less(i, j int) bool {
return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp)
}

// updateMaxEgressIPsByNode updates the maxEgressIPs for a given Node in the cache.
// It returns whether there is a real change, which indicates if rescheduling is required.
func (s *egressIPScheduler) updateMaxEgressIPsByNode(nodeName string, maxEgressIPs int) bool {
s.nodeToMaxEgressIPsMutex.Lock()
defer s.nodeToMaxEgressIPsMutex.Unlock()

oldMaxEgressIPs, exists := s.nodeToMaxEgressIPs[nodeName]
if exists && oldMaxEgressIPs == maxEgressIPs {
return false
}
s.nodeToMaxEgressIPs[nodeName] = maxEgressIPs
return true
}

// deleteMaxEgressIPsByNode deletes the maxEgressIPs for a given Node in the cache.
// It returns whether there is a real change, which indicates if rescheduling is required.
func (s *egressIPScheduler) deleteMaxEgressIPsByNode(nodeName string) bool {
s.nodeToMaxEgressIPsMutex.Lock()
defer s.nodeToMaxEgressIPsMutex.Unlock()

_, exists := s.nodeToMaxEgressIPs[nodeName]
if !exists {
return false
}
delete(s.nodeToMaxEgressIPs, nodeName)
return true
}

// getMaxEgressIPsByNode gets the maxEgressIPs for a given Node.
// If there isn't a value for the Node, the global value will be returned.
func (s *egressIPScheduler) getMaxEgressIPsByNode(nodeName string) int {
s.nodeToMaxEgressIPsMutex.RLock()
defer s.nodeToMaxEgressIPsMutex.RUnlock()

maxEgressIPs, exists := s.nodeToMaxEgressIPs[nodeName]
if exists {
return maxEgressIPs
}
return s.maxEgressIPsPerNode
}

// schedule takes the spec of Egress and ExternalIPPool and the state of memberlist cluster as inputs, generates
// scheduling results deterministically. When every Node's capacity is sufficient, each Egress's schedule is independent
// and is only determined by the consistent hash map. When any Node's capacity is insufficient, one Egress's schedule
Expand Down Expand Up @@ -231,7 +345,7 @@ func (s *egressIPScheduler) schedule() {
if !ipsOnNode.Has(egress.Spec.EgressIP) {
numIPs += 1
}
return numIPs <= s.maxEgressIPsPerNode
return numIPs <= s.getMaxEgressIPsByNode(node)
}
node, err := s.cluster.SelectNodeForIP(egress.Spec.EgressIP, egress.Spec.ExternalIPPool, maxEgressIPsFilter)
if err != nil {
Expand Down
70 changes: 66 additions & 4 deletions pkg/agent/controller/egress/ip_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,18 @@ import (
"time"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"

"antrea.io/antrea/pkg/agent/consistenthash"
"antrea.io/antrea/pkg/agent/memberlist"
agenttypes "antrea.io/antrea/pkg/agent/types"
crdv1a2 "antrea.io/antrea/pkg/apis/crd/v1alpha2"
fakeversioned "antrea.io/antrea/pkg/client/clientset/versioned/fake"
crdinformers "antrea.io/antrea/pkg/client/informers/externalversions"
Expand Down Expand Up @@ -98,6 +102,7 @@ func TestSchedule(t *testing.T) {
name string
nodes []string
maxEgressIPsPerNode int
nodeToMaxEgressIPs map[string]int
expectedResults map[string]*scheduleResult
}{
{
Expand All @@ -119,6 +124,26 @@ func TestSchedule(t *testing.T) {
},
},
},
{
name: "node specific limit",
nodes: []string{"node1", "node2", "node3"},
maxEgressIPsPerNode: 3,
nodeToMaxEgressIPs: map[string]int{
"node1": 0,
"node2": 2,
"node3": 0,
},
expectedResults: map[string]*scheduleResult{
"egressA": {
node: "node2",
ip: "1.1.1.1",
},
"egressB": {
node: "node2",
ip: "1.1.1.11",
},
},
},
{
name: "insufficient node capacity",
nodes: []string{"node1", "node2", "node3"},
Expand Down Expand Up @@ -162,9 +187,12 @@ func TestSchedule(t *testing.T) {
crdClient := fakeversioned.NewSimpleClientset(egresses...)
crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, 0)
egressInformer := crdInformerFactory.Crd().V1alpha2().Egresses()
clientset := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(clientset, 0)
nodeInformer := informerFactory.Core().V1().Nodes()

s := NewEgressIPScheduler(fakeCluster, egressInformer, tt.maxEgressIPsPerNode)

s := NewEgressIPScheduler(fakeCluster, egressInformer, nodeInformer, tt.maxEgressIPsPerNode)
s.nodeToMaxEgressIPs = tt.nodeToMaxEgressIPs
stopCh := make(chan struct{})
defer close(stopCh)
crdInformerFactory.Start(stopCh)
Expand Down Expand Up @@ -192,8 +220,11 @@ func BenchmarkSchedule(b *testing.B) {
crdClient := fakeversioned.NewSimpleClientset(egresses...)
crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, 0)
egressInformer := crdInformerFactory.Crd().V1alpha2().Egresses()
clientset := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(clientset, 0)
nodeInformer := informerFactory.Core().V1().Nodes()

s := NewEgressIPScheduler(fakeCluster, egressInformer, 10)
s := NewEgressIPScheduler(fakeCluster, egressInformer, nodeInformer, 10)
stopCh := make(chan struct{})
defer close(stopCh)
crdInformerFactory.Start(stopCh)
Expand Down Expand Up @@ -222,12 +253,27 @@ func TestRun(t *testing.T) {
Spec: crdv1a2.EgressSpec{EgressIP: "1.1.1.21", ExternalIPPool: "pool1"},
},
}
node1 := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
Annotations: map[string]string{},
},
}
node2 := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node2",
Annotations: map[string]string{},
},
}
fakeCluster := newFakeMemberlistCluster([]string{"node1", "node2"})
crdClient := fakeversioned.NewSimpleClientset(egresses...)
crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, 0)
egressInformer := crdInformerFactory.Crd().V1alpha2().Egresses()
clientset := fake.NewSimpleClientset(node1, node2)
informerFactory := informers.NewSharedInformerFactory(clientset, 0)
nodeInformer := informerFactory.Core().V1().Nodes()

s := NewEgressIPScheduler(fakeCluster, egressInformer, 2)
s := NewEgressIPScheduler(fakeCluster, egressInformer, nodeInformer, 2)
egressUpdates := make(chan string, 10)
s.AddEventHandler(func(egress string) {
egressUpdates <- egress
Expand Down Expand Up @@ -287,6 +333,22 @@ func TestRun(t *testing.T) {
assertScheduleResult(t, s, "egressB", "1.1.1.11", "node2", true)
assertScheduleResult(t, s, "egressC", "1.1.1.21", "node1", true)
assertScheduleResult(t, s, "egressD", "1.1.1.1", "node1", true)

// Set node1's max-egress-ips annotation to invalid value, nothing should happen.
updatedNode1 := node1.DeepCopy()
updatedNode1.Annotations[agenttypes.NodeMaxEgressIPsAnnotationKey] = "invalid-value"
clientset.CoreV1().Nodes().Update(ctx, updatedNode1, metav1.UpdateOptions{})
assertReceivedItems(t, egressUpdates, sets.NewString())
// Set node1's max-egress-ips annotation to 1, egressD should be moved to node2.
updatedNode1 = node1.DeepCopy()
updatedNode1.Annotations[agenttypes.NodeMaxEgressIPsAnnotationKey] = "1"
clientset.CoreV1().Nodes().Update(ctx, updatedNode1, metav1.UpdateOptions{})
assertReceivedItems(t, egressUpdates, sets.NewString("egressD"))
assertScheduleResult(t, s, "egressD", "1.1.1.1", "node2", true)
// Unset node1's max-egress-ips annotation, egressD should be moved to node1.
clientset.CoreV1().Nodes().Update(ctx, node1, metav1.UpdateOptions{})
assertReceivedItems(t, egressUpdates, sets.NewString("egressD"))
assertScheduleResult(t, s, "egressD", "1.1.1.1", "node1", true)
}

func assertReceivedItems(t *testing.T, ch <-chan string, expectedItems sets.String) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/agent/types/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ const (
// NodeWireGuardPublicAnnotationKey represents the key of the Node's WireGuard public key in the Annotations of the Node.
NodeWireGuardPublicAnnotationKey string = "node.antrea.io/wireguard-public-key"

// NodeMaxEgressIPsAnnotationKey represents the key of the maximum number of Egress IPs in the Annotations of the Node.
NodeMaxEgressIPsAnnotationKey string = "node.antrea.io/max-egress-ips"

// ServiceExternalIPPoolAnnotationKey is the key of the Service annotation that specifies the Service's desired external IP pool.
ServiceExternalIPPoolAnnotationKey string = "service.antrea.io/external-ip-pool"
)

0 comments on commit 7337199

Please sign in to comment.