Skip to content

Commit

Permalink
Remove the limit on the number of Endpoints in AntreaProxy
Browse files Browse the repository at this point in the history
Fixes: antrea-io#2092

Due to the message size of Openflow, the maximum number of Endpoints for
a Service in AntreaProxy is 800. Since Openflow 1.5 is used in Antrea,
the operation `insert_buckets` introduced in Openflow 1.5 can be used to
create a Service with more than 800 Endpoints. To sync Service with more
than 800 Endpoints to OVS, multiple Openflow messages will be sent to
OVS in a bundle (the first message uses `add` to sync the OVS group and
its corresponding buckets to OVS, other messages use `insert_buckets` to
sync other buckets to OVS).

Signed-off-by: Hongliang Liu <lhongliang@vmware.com>
  • Loading branch information
hongliangl committed Aug 31, 2022
1 parent 7afa4fb commit 981e466
Show file tree
Hide file tree
Showing 12 changed files with 184 additions and 121 deletions.
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -190,4 +190,7 @@ require (
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect
)

replace antrea.io/ofnet v0.6.0 => github.com/wenyingd/ofnet v0.0.0-20220817031400-cb451467adc1
replace (
antrea.io/libOpenflow v0.8.0 => github.com/hongliangl/libOpenflow v0.0.0-20220826044458-d89f3d570e60
antrea.io/ofnet v0.6.1 => github.com/hongliangl/ofnet v0.0.0-20220829012559-0c68f9278ec0
)
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
antrea.io/libOpenflow v0.8.0 h1:Xm6mlSqdXtDD418nf1lndoDvMi8scqUan8pkEUZ2oas=
antrea.io/libOpenflow v0.8.0/go.mod h1:CzEJZxDNAupiGxeL5VOw92PsxfyvehEAvE3PiC6gr8o=
antrea.io/ofnet v0.6.1 h1:w/FIagCrN7dKt2A2R9grlmcSyGrqlCu+uFYPthtfXeg=
antrea.io/ofnet v0.6.1/go.mod h1:qWqi11pI3kBYcS9SYWm92ZOiOPBx04Jx21cDmJlJhOg=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
Expand Down Expand Up @@ -523,6 +519,12 @@ github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2p
github.com/hashicorp/memberlist v0.3.1 h1:MXgUXLqva1QvpVEDQW1IQLG0wivQAtmFlHRQ+1vWZfM=
github.com/hashicorp/memberlist v0.3.1/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
github.com/hongliangl/libOpenflow v0.0.0-20220826044458-d89f3d570e60 h1:SDez/OaeO7nwMQg4d5qXuQsrxybp9XXMxICSSBt0vPo=
github.com/hongliangl/libOpenflow v0.0.0-20220826044458-d89f3d570e60/go.mod h1:CzEJZxDNAupiGxeL5VOw92PsxfyvehEAvE3PiC6gr8o=
github.com/hongliangl/ofnet v0.0.0-20220829005218-b3e30a94edd3 h1:hXwOp1zQ7vJjBaBqWnbgNOPcUncUODp1PcaHgaSgMJ0=
github.com/hongliangl/ofnet v0.0.0-20220829005218-b3e30a94edd3/go.mod h1:Dbzusz1t4m7pFCvltb5wfX2NUEm9m18dDdEt57LBPNU=
github.com/hongliangl/ofnet v0.0.0-20220829012559-0c68f9278ec0 h1:NVDHPsxlCCgbHAp8UKdfQRciviSMzFNrnsWwZ4ABMks=
github.com/hongliangl/ofnet v0.0.0-20220829012559-0c68f9278ec0/go.mod h1:Dbzusz1t4m7pFCvltb5wfX2NUEm9m18dDdEt57LBPNU=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
Expand Down
15 changes: 11 additions & 4 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,8 +620,15 @@ func (c *client) InstallServiceGroup(groupID binding.GroupIDType, withSessionAff
defer c.replayMutex.RUnlock()

group := c.featureService.serviceEndpointGroup(groupID, withSessionAffinity, endpoints...)
if err := group.Add(); err != nil {
return fmt.Errorf("error when installing Service Endpoints Group: %w", err)
_, installed := c.featureService.groupCache.Load(groupID)
if !installed {
if err := group.Add(); err != nil {
return fmt.Errorf("error when installing Service Endpoints Group %d: %w", groupID, err)
}
} else {
if err := group.Modify(); err != nil {
return fmt.Errorf("error when modifying Service Endpoints Group %d: %w", groupID, err)
}
}
c.featureService.groupCache.Store(groupID, group)
return nil
Expand All @@ -630,8 +637,8 @@ func (c *client) InstallServiceGroup(groupID binding.GroupIDType, withSessionAff
func (c *client) UninstallGroup(groupID binding.GroupIDType) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
if !c.bridge.DeleteGroup(groupID) {
return fmt.Errorf("group %d delete failed", groupID)
if err := c.bridge.DeleteGroup(groupID); err != nil {
return fmt.Errorf("error when deleting Service Endpoints Group %d: %w", groupID, err)
}
c.featureService.groupCache.Delete(groupID)
return nil
Expand Down
12 changes: 10 additions & 2 deletions pkg/agent/openflow/multicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,16 @@ func (f *featureMulticast) multicastReceiversGroup(groupID binding.GroupIDType,
ResubmitToTable(MulticastOutputTable.GetID()).
Done()
}
if err := group.Add(); err != nil {
return fmt.Errorf("error when installing Multicast receiver Group: %w", err)

_, installed := f.groupCache.Load(groupID)
if !installed {
if err := group.Add(); err != nil {
return fmt.Errorf("error when installing Multicast receiver Group %d: %w", groupID, err)
}
} else {
if err := group.Modify(); err != nil {
return fmt.Errorf("error when modifying Multicast receiver Group %d: %w", groupID, err)
}
}
f.groupCache.Store(groupID, group)
return nil
Expand Down
80 changes: 10 additions & 70 deletions pkg/agent/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"math"
"net"
"reflect"
"sort"
"strings"
"sync"
"time"
Expand All @@ -28,7 +27,6 @@ import (
discovery "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/runtime"
k8sapitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
Expand All @@ -48,10 +46,6 @@ import (
const (
resyncPeriod = time.Minute
componentName = "antrea-agent-proxy"
// Due to the maximum message size in Openflow 1.3 and the implementation of Services in Antrea, the maximum number
// of Endpoints that Antrea can support at the moment is 800. If the number of Endpoints for a given Service exceeds
// 800, extra Endpoints will be dropped.
maxEndpoints = 800
// SessionAffinity timeout is implemented using a hard_timeout in OVS. hard_timeout is
// represented by a uint16 in the OpenFlow protocol,
maxSupportedAffinityTimeout = math.MaxUint16
Expand Down Expand Up @@ -105,8 +99,6 @@ type proxier struct {
serviceStringMap map[string]k8sproxy.ServicePortName
// serviceStringMapMutex protects serviceStringMap object.
serviceStringMapMutex sync.Mutex
// oversizeServiceSet records the Services that have more than 800 Endpoints.
oversizeServiceSet sets.String

// syncedOnce returns true if the proxier has synced rules at least once.
syncedOnce bool
Expand Down Expand Up @@ -151,9 +143,6 @@ func (p *proxier) removeStaleServices() {
}
svcInfo := svcPort.(*types.ServiceInfo)
klog.V(2).Infof("Removing stale Service: %s %s", svcPortName.Name, svcInfo.String())
if p.oversizeServiceSet.Has(svcPortName.String()) {
p.oversizeServiceSet.Delete(svcPortName.String())
}
if err := p.ofClient.UninstallServiceFlows(svcInfo.ClusterIP(), uint16(svcInfo.Port()), svcInfo.OFProtocol); err != nil {
klog.ErrorS(err, "Failed to remove flows of Service", "Service", svcPortName)
continue
Expand Down Expand Up @@ -415,66 +404,18 @@ func (p *proxier) installServices() {
}

var allEndpointUpdateList, localEndpointUpdateList []k8sproxy.Endpoint
if len(endpoints) > maxEndpoints {
if !p.oversizeServiceSet.Has(svcPortName.String()) {
klog.Warningf("Since Endpoints of Service %s exceeds %d, extra Endpoints will be dropped", svcPortName.String(), maxEndpoints)
p.oversizeServiceSet.Insert(svcPortName.String())
}
// If the length of endpoints > maxEndpoints, endpoints should be cut. However, since endpoints is a map, iterate
// the map and append every Endpoint to a target slice. When the Endpoint is local, append Endpoint to slice
// localEndpointList, otherwise append the Endpoint to slice remoteEndpointList. Since the iteration order
// of map in Golang is not guaranteed, if split the Endpoints directly without any sorting, some Endpoints
// may not be installed, so split the endpointList after sorting.
var remoteEndpointList, localEndpointList []k8sproxy.Endpoint
for _, endpoint := range endpoints {
if endpoint.GetIsLocal() {
localEndpointList = append(localEndpointList, endpoint)
} else {
remoteEndpointList = append(remoteEndpointList, endpoint)
// Check if there is any installed Endpoint which is not expected anymore. If internalTrafficPolicy and externalTrafficPolicy
// are both Local, only local Endpoints should be installed and checked; if internalTrafficPolicy or externalTrafficPolicy
// is Cluster, all Endpoints should be installed and checked.
for _, endpoint := range endpoints {
if internalNodeLocal && externalNodeLocal && endpoint.GetIsLocal() || !internalNodeLocal || !externalNodeLocal {
if _, ok := endpointsInstalled[endpoint.String()]; !ok { // There is an expected Endpoint which is not installed.
needUpdateEndpoints = true
}
}

sort.Sort(byEndpoint(remoteEndpointList))
sort.Sort(byEndpoint(localEndpointList))

if len(localEndpointList) > maxEndpoints {
// When the number of local Endpoints is greater than maxEndpoints, choose maxEndpoints Endpoints from
// localEndpointList to install.
allEndpointUpdateList = localEndpointList[:maxEndpoints]
} else {
// When the number of local Endpoints is smaller than maxEndpoints, choose all Endpoints of localEndpointList
// and part of remoteEndpointList to install.
localEndpointUpdateList = localEndpointList
allEndpointUpdateList = append(localEndpointList, remoteEndpointList[:maxEndpoints-len(localEndpointList)]...)
}
// Check if there is any installed Endpoint which is not expected anymore. If internalTrafficPolicy and externalTrafficPolicy
// are both Local, only local Endpoints should be installed and checked; if internalTrafficPolicy or externalTrafficPolicy
// is Cluster, all Endpoints should be installed and checked.
for _, endpoint := range allEndpointUpdateList {
if internalNodeLocal && externalNodeLocal && endpoint.GetIsLocal() || !internalNodeLocal || !externalNodeLocal {
if _, ok := endpointsInstalled[endpoint.String()]; !ok { // There is an expected Endpoint which is not installed.
needUpdateEndpoints = true
break
}
}
}
} else {
if p.oversizeServiceSet.Has(svcPortName.String()) {
p.oversizeServiceSet.Delete(svcPortName.String())
}
// Check if there is any installed Endpoint which is not expected anymore. If internalTrafficPolicy and externalTrafficPolicy
// are both Local, only local Endpoints should be installed and checked; if internalTrafficPolicy or externalTrafficPolicy
// is Cluster, all Endpoints should be installed and checked.
for _, endpoint := range endpoints {
if internalNodeLocal && externalNodeLocal && endpoint.GetIsLocal() || !internalNodeLocal || !externalNodeLocal {
if _, ok := endpointsInstalled[endpoint.String()]; !ok { // There is an expected Endpoint which is not installed.
needUpdateEndpoints = true
}
}
allEndpointUpdateList = append(allEndpointUpdateList, endpoint)
if endpoint.GetIsLocal() {
localEndpointUpdateList = append(localEndpointUpdateList, endpoint)
}
allEndpointUpdateList = append(allEndpointUpdateList, endpoint)
if endpoint.GetIsLocal() {
localEndpointUpdateList = append(localEndpointUpdateList, endpoint)
}
}

Expand Down Expand Up @@ -988,7 +929,6 @@ func NewProxier(
endpointReferenceCounter: map[string]int{},
nodeLabels: map[string]string{},
serviceStringMap: map[string]k8sproxy.ServicePortName{},
oversizeServiceSet: sets.NewString(),
groupCounter: groupCounter,
ofClient: ofClient,
routeClient: routeClient,
Expand Down
8 changes: 4 additions & 4 deletions pkg/ovs/openflow/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type Bridge interface {
DeleteTable(id uint8) bool
CreateGroupTypeAll(id GroupIDType) Group
CreateGroup(id GroupIDType) Group
DeleteGroup(id GroupIDType) bool
DeleteGroup(id GroupIDType) error
CreateMeter(id MeterIDType, flags ofctrl.MeterFlag) Meter
DeleteMeter(id MeterIDType) bool
DeleteMeterAll() error
Expand Down Expand Up @@ -183,9 +183,9 @@ type OFEntry interface {
// Modify / Delete methods can be called on this object. This method
// should be called if a reconnection event happened.
Reset()
// GetBundleMessage returns ofctrl.OpenFlowModMessage which can be used in Bundle messages. operation specifies what
// operation is expected to be taken on the OFEntry.
GetBundleMessage(operation OFOperation) (ofctrl.OpenFlowModMessage, error)
// GetBundleMessages returns a slice of ofctrl.OpenFlowModMessage which can be used in Bundle messages. operation
// specifies what operation is expected to be taken on the OFEntry.
GetBundleMessages(operation OFOperation) ([]ofctrl.OpenFlowModMessage, error)
}

type Flow interface {
Expand Down
33 changes: 19 additions & 14 deletions pkg/ovs/openflow/ofctrl_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,15 +223,16 @@ func (b *OFBridge) createGroupWithType(id GroupIDType, groupType ofctrl.GroupTyp
return g
}

func (b *OFBridge) DeleteGroup(id GroupIDType) bool {
g := b.ofSwitch.GetGroup(uint32(id))
if g == nil {
return true
func (b *OFBridge) DeleteGroup(id GroupIDType) error {
ofctrlGroup := b.ofSwitch.GetGroup(uint32(id))
if ofctrlGroup == nil {
return nil
}
g := &ofGroup{bridge: b, ofctrl: ofctrlGroup}
if err := g.Delete(); err != nil {
return false
return fmt.Errorf("failed to delete the group in bundle: %w", err)
}
return true
return b.ofSwitch.DeleteGroup(uint32(id))
}

func (b *OFBridge) CreateMeter(id MeterIDType, flags ofctrl.MeterFlag) Meter {
Expand Down Expand Up @@ -552,26 +553,30 @@ func (b *OFBridge) AddOFEntriesInBundle(addEntries []OFEntry, modEntries []OFEnt
return err
}

var sentMessages int
addMessage := func(entrySet []entryOperation) error {
if entrySet == nil {
return nil
}
for _, e := range entrySet {
msg, err := e.entry.GetBundleMessage(e.operation)
messages, err := e.entry.GetBundleMessages(e.operation)
if err != nil {
return err
}
sentMessages += len(messages)
// "AddMessage" operation is async, the function only returns error which occur when constructing and sending
// the BundleAdd message. An absence of error does not mean that all OpenFlow entries are added into the
// bundle by the switch. The number of entries successfully added to the bundle by the switch will be
// returned by function "Complete".
if err := tx.AddMessage(msg); err != nil {
// Close the bundle and cancel it if there is error when adding the FlowMod message.
_, err := tx.Complete()
if err == nil {
tx.Abort()
for _, message := range messages {
if err := tx.AddMessage(message); err != nil {
// Close the bundle and cancel it if there is error when adding the FlowMod message.
_, err := tx.Complete()
if err == nil {
tx.Abort()
}
return err
}
return err
}
}
return nil
Expand All @@ -592,7 +597,7 @@ func (b *OFBridge) AddOFEntriesInBundle(addEntries []OFEntry, modEntries []OFEnt
count, err := tx.Complete()
if err != nil {
return err
} else if count != len(addEntries)+len(modEntries)+len(delEntries) {
} else if count != sentMessages {
// This case should not be possible if all the calls to "tx.AddMessage" returned nil. This is just a sanity check.
tx.Abort()
return errors.New("failed to add all Openflow entries in one transaction, cancelling it")
Expand Down
4 changes: 2 additions & 2 deletions pkg/ovs/openflow/ofctrl_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (f *ofFlow) FlowProtocol() Protocol {
return f.protocol
}

func (f *ofFlow) GetBundleMessage(entryOper OFOperation) (ofctrl.OpenFlowModMessage, error) {
func (f *ofFlow) GetBundleMessages(entryOper OFOperation) ([]ofctrl.OpenFlowModMessage, error) {
var operation int
switch entryOper {
case AddMessage:
Expand All @@ -138,7 +138,7 @@ func (f *ofFlow) GetBundleMessage(entryOper OFOperation) (ofctrl.OpenFlowModMess
if err != nil {
return nil, err
}
return message, nil
return []ofctrl.OpenFlowModMessage{message}, nil
}

// CopyToBuilder returns a new FlowBuilder that copies the table, protocols,
Expand Down
Loading

0 comments on commit 981e466

Please sign in to comment.