From 4772049aca77c0e492340874487165f89286fb07 Mon Sep 17 00:00:00 2001 From: Hongliang Liu Date: Sat, 27 Aug 2022 11:40:48 +0800 Subject: [PATCH] Remove the limit on the number of Endpoints in AntreaProxy Fixes: #2092 Due to the message size of Openflow, the maximum number of Endpoints for a Service in AntreaProxy is 800. Since Openflow 1.5 is used in Antrea, the operation `insert_buckets` introduced in Openflow 1.5 can be used to create a Service with more than 800 Endpoints. To sync Service with more than 800 Endpoints to OVS, multiple Openflow messages will be sent to OVS in a bundle (the first message uses `add` to sync the OVS group and its corresponding buckets to OVS, other messages use `insert_buckets` to sync other buckets to OVS). Signed-off-by: Hongliang Liu --- go.mod | 4 +- go.sum | 8 +-- pkg/agent/openflow/client.go | 19 ++++-- pkg/agent/openflow/client_test.go | 26 ++++---- pkg/agent/openflow/multicast.go | 20 ++++-- pkg/agent/openflow/service.go | 8 ++- pkg/agent/proxy/proxier.go | 80 +++-------------------- pkg/ovs/openflow/interfaces.go | 8 +-- pkg/ovs/openflow/ofctrl_bridge.go | 33 ++++++---- pkg/ovs/openflow/ofctrl_flow.go | 4 +- pkg/ovs/openflow/ofctrl_group.go | 48 +++++++++++--- pkg/ovs/openflow/ofctrl_meter.go | 4 +- pkg/ovs/openflow/testing/mock_openflow.go | 32 ++++----- test/integration/ovs/ofctrl_test.go | 76 +++++++++++++++++++++ 14 files changed, 220 insertions(+), 150 deletions(-) diff --git a/go.mod b/go.mod index 9730eed4182..d5a854cab3f 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,8 @@ module antrea.io/antrea go 1.19 require ( - antrea.io/libOpenflow v0.8.0 - antrea.io/ofnet v0.6.1 + antrea.io/libOpenflow v0.8.1 + antrea.io/ofnet v0.6.2 github.com/ClickHouse/clickhouse-go v1.5.4 github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/Mellanox/sriovnet v1.1.0 diff --git a/go.sum b/go.sum index 2b61872c03b..fd986a1b4a9 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ -antrea.io/libOpenflow v0.8.0 h1:Xm6mlSqdXtDD418nf1lndoDvMi8scqUan8pkEUZ2oas= -antrea.io/libOpenflow v0.8.0/go.mod h1:CzEJZxDNAupiGxeL5VOw92PsxfyvehEAvE3PiC6gr8o= -antrea.io/ofnet v0.6.1 h1:w/FIagCrN7dKt2A2R9grlmcSyGrqlCu+uFYPthtfXeg= -antrea.io/ofnet v0.6.1/go.mod h1:qWqi11pI3kBYcS9SYWm92ZOiOPBx04Jx21cDmJlJhOg= +antrea.io/libOpenflow v0.8.1 h1:uxkXvhlPRXAVw26LW6Pt2jCSEh8NR56vQW70YGvy7aU= +antrea.io/libOpenflow v0.8.1/go.mod h1:CzEJZxDNAupiGxeL5VOw92PsxfyvehEAvE3PiC6gr8o= +antrea.io/ofnet v0.6.2 h1:CCW2lOVBtEgpbsIT+DatHcYCfQ5+MDTHSSlQrc2Qelc= +antrea.io/ofnet v0.6.2/go.mod h1:/gjpTqhUpyn8uZnef+ytdCCAeY5oGG1jCr/szPUqVXU= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 2102d8211bb..034f7471bbb 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -623,8 +623,15 @@ func (c *client) InstallServiceGroup(groupID binding.GroupIDType, withSessionAff defer c.replayMutex.RUnlock() group := c.featureService.serviceEndpointGroup(groupID, withSessionAffinity, endpoints...) - if err := group.Add(); err != nil { - return fmt.Errorf("error when installing Service Endpoints Group: %w", err) + _, installed := c.featureService.groupCache.Load(groupID) + if !installed { + if err := group.Add(); err != nil { + return fmt.Errorf("error when installing Service Endpoints Group %d: %w", groupID, err) + } + } else { + if err := group.Modify(); err != nil { + return fmt.Errorf("error when modifying Service Endpoints Group %d: %w", groupID, err) + } } c.featureService.groupCache.Store(groupID, group) return nil @@ -633,8 +640,8 @@ func (c *client) InstallServiceGroup(groupID binding.GroupIDType, withSessionAff func (c *client) UninstallServiceGroup(groupID binding.GroupIDType) error { c.replayMutex.RLock() defer c.replayMutex.RUnlock() - if !c.bridge.DeleteGroup(groupID) { - return fmt.Errorf("group %d delete failed", groupID) + if err := c.bridge.DeleteGroup(groupID); err != nil { + return fmt.Errorf("error when deleting Service Endpoints Group %d: %w", groupID, err) } c.featureService.groupCache.Delete(groupID) return nil @@ -1328,8 +1335,8 @@ func (c *client) InstallMulticastGroup(groupID binding.GroupIDType, localReceive func (c *client) UninstallMulticastGroup(groupID binding.GroupIDType) error { c.replayMutex.RLock() defer c.replayMutex.RUnlock() - if !c.bridge.DeleteGroup(groupID) { - return fmt.Errorf("group %d delete failed", groupID) + if err := c.bridge.DeleteGroup(groupID); err != nil { + return fmt.Errorf("error when deleting Multicast receiver Group %d: %w", groupID, err) } c.featureMulticast.groupCache.Delete(groupID) return nil diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index c19d698c465..357f51943f7 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -613,12 +613,12 @@ func TestServiceGroupInstallAndUninstall(t *testing.T) { groupID1 := binding.GroupIDType(1) groupID2 := binding.GroupIDType(2) for _, tc := range []struct { - groupID binding.GroupIDType - sessionAffinity bool - deleteSucceeded bool + groupID binding.GroupIDType + sessionAffinity bool + deleteGroupError error }{ - {groupID: groupID1, deleteSucceeded: false, sessionAffinity: true}, - {groupID: groupID2, deleteSucceeded: true, sessionAffinity: false}, + {groupID: groupID1, deleteGroupError: fmt.Errorf("failed to delete"), sessionAffinity: true}, + {groupID: groupID2, deleteGroupError: nil, sessionAffinity: false}, } { mockGroup := ovsoftest.NewMockGroup(ctrl) mockBucketBuilder := ovsoftest.NewMockBucketBuilder(ctrl) @@ -638,10 +638,10 @@ func TestServiceGroupInstallAndUninstall(t *testing.T) { require.NoError(t, err) _, exists := client.featureService.groupCache.Load(tc.groupID) assert.True(t, exists) - mockOFBridge.EXPECT().DeleteGroup(tc.groupID).Return(tc.deleteSucceeded) + mockOFBridge.EXPECT().DeleteGroup(tc.groupID).Return(tc.deleteGroupError) err = client.UninstallServiceGroup(tc.groupID) _, exists = client.featureService.groupCache.Load(tc.groupID) - if tc.deleteSucceeded { + if tc.deleteGroupError == nil { assert.NoError(t, err) assert.False(t, exists) } else { @@ -675,11 +675,11 @@ func TestMulticastGroupInstallAndUninstall(t *testing.T) { groupID1 := binding.GroupIDType(1) groupID2 := binding.GroupIDType(2) for _, tc := range []struct { - groupID binding.GroupIDType - deleteSucceeded bool + groupID binding.GroupIDType + deleteGroupError error }{ - {groupID: groupID1, deleteSucceeded: false}, - {groupID: groupID2, deleteSucceeded: true}, + {groupID: groupID1, deleteGroupError: fmt.Errorf("failed to delete")}, + {groupID: groupID2, deleteGroupError: nil}, } { mockGroup := ovsoftest.NewMockGroup(ctrl) mockBucketBuilder := ovsoftest.NewMockBucketBuilder(ctrl) @@ -696,9 +696,9 @@ func TestMulticastGroupInstallAndUninstall(t *testing.T) { require.NoError(t, err) _, exists := client.featureMulticast.groupCache.Load(tc.groupID) assert.True(t, exists) - mockOFBridge.EXPECT().DeleteGroup(tc.groupID).Return(tc.deleteSucceeded) + mockOFBridge.EXPECT().DeleteGroup(tc.groupID).Return(tc.deleteGroupError) err = client.UninstallMulticastGroup(tc.groupID) - if tc.deleteSucceeded { + if tc.deleteGroupError == nil { assert.NoError(t, err) _, exists = client.featureMulticast.groupCache.Load(tc.groupID) assert.False(t, exists) diff --git a/pkg/agent/openflow/multicast.go b/pkg/agent/openflow/multicast.go index f8f9cd2db30..143a711e86e 100644 --- a/pkg/agent/openflow/multicast.go +++ b/pkg/agent/openflow/multicast.go @@ -99,8 +99,16 @@ func (f *featureMulticast) multicastReceiversGroup(groupID binding.GroupIDType, ResubmitToTable(MulticastOutputTable.GetID()). Done() } - if err := group.Add(); err != nil { - return fmt.Errorf("error when installing Multicast receiver Group: %w", err) + + _, installed := f.groupCache.Load(groupID) + if !installed { + if err := group.Add(); err != nil { + return fmt.Errorf("error when installing Multicast receiver Group %d: %w", groupID, err) + } + } else { + if err := group.Modify(); err != nil { + return fmt.Errorf("error when modifying Multicast receiver Group %d: %w", groupID, err) + } } f.groupCache.Store(groupID, group) return nil @@ -176,14 +184,16 @@ func (f *featureMulticast) multicastPodMetricFlows(podIP net.IP, podOFPort uint3 } func (f *featureMulticast) replayGroups() { + var groups []binding.OFEntry f.groupCache.Range(func(id, value interface{}) bool { group := value.(binding.Group) group.Reset() - if err := group.Add(); err != nil { - klog.ErrorS(err, "Error when replaying cached group", "group", id) - } + groups = append(groups, group) return true }) + if err := f.bridge.AddOFEntriesInBundle(groups, nil, nil); err != nil { + klog.ErrorS(err, "error when replaying cached groups for Multicast") + } } func (f *featureMulticast) multicastRemoteReportFlows(groupID binding.GroupIDType, firstMulticastTable binding.Table) []binding.Flow { diff --git a/pkg/agent/openflow/service.go b/pkg/agent/openflow/service.go index d4e11e6a915..10976086f28 100644 --- a/pkg/agent/openflow/service.go +++ b/pkg/agent/openflow/service.go @@ -157,12 +157,14 @@ func (f *featureService) replayFlows() []binding.Flow { } func (f *featureService) replayGroups() { + var groups []binding.OFEntry f.groupCache.Range(func(id, value interface{}) bool { group := value.(binding.Group) group.Reset() - if err := group.Add(); err != nil { - klog.Errorf("Error when replaying cached group %d: %v", id, err) - } + groups = append(groups, group) return true }) + if err := f.bridge.AddOFEntriesInBundle(groups, nil, nil); err != nil { + klog.ErrorS(err, "error when replaying cached groups for Service") + } } diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index 5cd74b15097..03612479481 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -19,7 +19,6 @@ import ( "math" "net" "reflect" - "sort" "strings" "sync" "time" @@ -29,7 +28,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" apimachinerytypes "k8s.io/apimachinery/pkg/types" k8sapitypes "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" @@ -50,10 +48,6 @@ import ( const ( resyncPeriod = time.Minute componentName = "antrea-agent-proxy" - // Due to the maximum message size in Openflow 1.3 and the implementation of Services in Antrea, the maximum number - // of Endpoints that Antrea can support at the moment is 800. If the number of Endpoints for a given Service exceeds - // 800, extra Endpoints will be dropped. - maxEndpoints = 800 // SessionAffinity timeout is implemented using a hard_timeout in OVS. hard_timeout is // represented by a uint16 in the OpenFlow protocol, maxSupportedAffinityTimeout = math.MaxUint16 @@ -107,8 +101,6 @@ type proxier struct { serviceStringMap map[string]k8sproxy.ServicePortName // serviceStringMapMutex protects serviceStringMap object. serviceStringMapMutex sync.Mutex - // oversizeServiceSet records the Services that have more than 800 Endpoints. - oversizeServiceSet sets.String serviceHealthServer healthcheck.ServiceHealthServer numLocalEndpoints map[apimachinerytypes.NamespacedName]int @@ -156,9 +148,6 @@ func (p *proxier) removeStaleServices() { } svcInfo := svcPort.(*types.ServiceInfo) klog.V(2).Infof("Removing stale Service: %s %s", svcPortName.Name, svcInfo.String()) - if p.oversizeServiceSet.Has(svcPortName.String()) { - p.oversizeServiceSet.Delete(svcPortName.String()) - } if err := p.ofClient.UninstallServiceFlows(svcInfo.ClusterIP(), uint16(svcInfo.Port()), svcInfo.OFProtocol); err != nil { klog.ErrorS(err, "Failed to remove flows of Service", "Service", svcPortName) continue @@ -420,66 +409,18 @@ func (p *proxier) installServices() { } var allEndpointUpdateList, localEndpointUpdateList []k8sproxy.Endpoint - if len(endpoints) > maxEndpoints { - if !p.oversizeServiceSet.Has(svcPortName.String()) { - klog.Warningf("Since Endpoints of Service %s exceeds %d, extra Endpoints will be dropped", svcPortName.String(), maxEndpoints) - p.oversizeServiceSet.Insert(svcPortName.String()) - } - // If the length of endpoints > maxEndpoints, endpoints should be cut. However, since endpoints is a map, iterate - // the map and append every Endpoint to a target slice. When the Endpoint is local, append Endpoint to slice - // localEndpointList, otherwise append the Endpoint to slice remoteEndpointList. Since the iteration order - // of map in Golang is not guaranteed, if split the Endpoints directly without any sorting, some Endpoints - // may not be installed, so split the endpointList after sorting. - var remoteEndpointList, localEndpointList []k8sproxy.Endpoint - for _, endpoint := range endpoints { - if endpoint.GetIsLocal() { - localEndpointList = append(localEndpointList, endpoint) - } else { - remoteEndpointList = append(remoteEndpointList, endpoint) + // Check if there is any installed Endpoint which is not expected anymore. If internalTrafficPolicy and externalTrafficPolicy + // are both Local, only local Endpoints should be installed and checked; if internalTrafficPolicy or externalTrafficPolicy + // is Cluster, all Endpoints should be installed and checked. + for _, endpoint := range endpoints { + if internalNodeLocal && externalNodeLocal && endpoint.GetIsLocal() || !internalNodeLocal || !externalNodeLocal { + if _, ok := endpointsInstalled[endpoint.String()]; !ok { // There is an expected Endpoint which is not installed. + needUpdateEndpoints = true } } - - sort.Sort(byEndpoint(remoteEndpointList)) - sort.Sort(byEndpoint(localEndpointList)) - - if len(localEndpointList) > maxEndpoints { - // When the number of local Endpoints is greater than maxEndpoints, choose maxEndpoints Endpoints from - // localEndpointList to install. - allEndpointUpdateList = localEndpointList[:maxEndpoints] - } else { - // When the number of local Endpoints is smaller than maxEndpoints, choose all Endpoints of localEndpointList - // and part of remoteEndpointList to install. - localEndpointUpdateList = localEndpointList - allEndpointUpdateList = append(localEndpointList, remoteEndpointList[:maxEndpoints-len(localEndpointList)]...) - } - // Check if there is any installed Endpoint which is not expected anymore. If internalTrafficPolicy and externalTrafficPolicy - // are both Local, only local Endpoints should be installed and checked; if internalTrafficPolicy or externalTrafficPolicy - // is Cluster, all Endpoints should be installed and checked. - for _, endpoint := range allEndpointUpdateList { - if internalNodeLocal && externalNodeLocal && endpoint.GetIsLocal() || !internalNodeLocal || !externalNodeLocal { - if _, ok := endpointsInstalled[endpoint.String()]; !ok { // There is an expected Endpoint which is not installed. - needUpdateEndpoints = true - break - } - } - } - } else { - if p.oversizeServiceSet.Has(svcPortName.String()) { - p.oversizeServiceSet.Delete(svcPortName.String()) - } - // Check if there is any installed Endpoint which is not expected anymore. If internalTrafficPolicy and externalTrafficPolicy - // are both Local, only local Endpoints should be installed and checked; if internalTrafficPolicy or externalTrafficPolicy - // is Cluster, all Endpoints should be installed and checked. - for _, endpoint := range endpoints { - if internalNodeLocal && externalNodeLocal && endpoint.GetIsLocal() || !internalNodeLocal || !externalNodeLocal { - if _, ok := endpointsInstalled[endpoint.String()]; !ok { // There is an expected Endpoint which is not installed. - needUpdateEndpoints = true - } - } - allEndpointUpdateList = append(allEndpointUpdateList, endpoint) - if endpoint.GetIsLocal() { - localEndpointUpdateList = append(localEndpointUpdateList, endpoint) - } + allEndpointUpdateList = append(allEndpointUpdateList, endpoint) + if endpoint.GetIsLocal() { + localEndpointUpdateList = append(localEndpointUpdateList, endpoint) } } @@ -1011,7 +952,6 @@ func NewProxier( endpointReferenceCounter: map[string]int{}, nodeLabels: map[string]string{}, serviceStringMap: map[string]k8sproxy.ServicePortName{}, - oversizeServiceSet: sets.NewString(), groupCounter: groupCounter, ofClient: ofClient, routeClient: routeClient, diff --git a/pkg/ovs/openflow/interfaces.go b/pkg/ovs/openflow/interfaces.go index 2a9076f3ea8..1e517de05a9 100644 --- a/pkg/ovs/openflow/interfaces.go +++ b/pkg/ovs/openflow/interfaces.go @@ -96,7 +96,7 @@ type Bridge interface { DeleteTable(id uint8) bool CreateGroupTypeAll(id GroupIDType) Group CreateGroup(id GroupIDType) Group - DeleteGroup(id GroupIDType) bool + DeleteGroup(id GroupIDType) error CreateMeter(id MeterIDType, flags ofctrl.MeterFlag) Meter DeleteMeter(id MeterIDType) bool DeleteMeterAll() error @@ -183,9 +183,9 @@ type OFEntry interface { // Modify / Delete methods can be called on this object. This method // should be called if a reconnection event happened. Reset() - // GetBundleMessage returns ofctrl.OpenFlowModMessage which can be used in Bundle messages. operation specifies what - // operation is expected to be taken on the OFEntry. - GetBundleMessage(operation OFOperation) (ofctrl.OpenFlowModMessage, error) + // GetBundleMessages returns a slice of ofctrl.OpenFlowModMessage which can be used in Bundle messages. operation + // specifies what operation is expected to be taken on the OFEntry. + GetBundleMessages(operation OFOperation) ([]ofctrl.OpenFlowModMessage, error) } type Flow interface { diff --git a/pkg/ovs/openflow/ofctrl_bridge.go b/pkg/ovs/openflow/ofctrl_bridge.go index 225e8f9a4e5..3ef02a42342 100644 --- a/pkg/ovs/openflow/ofctrl_bridge.go +++ b/pkg/ovs/openflow/ofctrl_bridge.go @@ -223,15 +223,16 @@ func (b *OFBridge) createGroupWithType(id GroupIDType, groupType ofctrl.GroupTyp return g } -func (b *OFBridge) DeleteGroup(id GroupIDType) bool { - g := b.ofSwitch.GetGroup(uint32(id)) - if g == nil { - return true +func (b *OFBridge) DeleteGroup(id GroupIDType) error { + ofctrlGroup := b.ofSwitch.GetGroup(uint32(id)) + if ofctrlGroup == nil { + return nil } + g := &ofGroup{bridge: b, ofctrl: ofctrlGroup} if err := g.Delete(); err != nil { - return false + return fmt.Errorf("failed to delete the group: %w", err) } - return true + return b.ofSwitch.DeleteGroup(uint32(id)) } func (b *OFBridge) CreateMeter(id MeterIDType, flags ofctrl.MeterFlag) Meter { @@ -552,26 +553,30 @@ func (b *OFBridge) AddOFEntriesInBundle(addEntries []OFEntry, modEntries []OFEnt return err } + var sentMessages int addMessage := func(entrySet []entryOperation) error { if entrySet == nil { return nil } for _, e := range entrySet { - msg, err := e.entry.GetBundleMessage(e.operation) + messages, err := e.entry.GetBundleMessages(e.operation) if err != nil { return err } + sentMessages += len(messages) // "AddMessage" operation is async, the function only returns error which occur when constructing and sending // the BundleAdd message. An absence of error does not mean that all OpenFlow entries are added into the // bundle by the switch. The number of entries successfully added to the bundle by the switch will be // returned by function "Complete". - if err := tx.AddMessage(msg); err != nil { - // Close the bundle and cancel it if there is error when adding the FlowMod message. - _, err := tx.Complete() - if err == nil { - tx.Abort() + for _, message := range messages { + if err := tx.AddMessage(message); err != nil { + // Close the bundle and cancel it if there is error when adding the FlowMod message. + _, err := tx.Complete() + if err == nil { + tx.Abort() + } + return err } - return err } } return nil @@ -592,7 +597,7 @@ func (b *OFBridge) AddOFEntriesInBundle(addEntries []OFEntry, modEntries []OFEnt count, err := tx.Complete() if err != nil { return err - } else if count != len(addEntries)+len(modEntries)+len(delEntries) { + } else if count != sentMessages { // This case should not be possible if all the calls to "tx.AddMessage" returned nil. This is just a sanity check. tx.Abort() return errors.New("failed to add all Openflow entries in one transaction, cancelling it") diff --git a/pkg/ovs/openflow/ofctrl_flow.go b/pkg/ovs/openflow/ofctrl_flow.go index 5428bb423ef..48430a50ad4 100644 --- a/pkg/ovs/openflow/ofctrl_flow.go +++ b/pkg/ovs/openflow/ofctrl_flow.go @@ -124,7 +124,7 @@ func (f *ofFlow) FlowProtocol() Protocol { return f.protocol } -func (f *ofFlow) GetBundleMessage(entryOper OFOperation) (ofctrl.OpenFlowModMessage, error) { +func (f *ofFlow) GetBundleMessages(entryOper OFOperation) ([]ofctrl.OpenFlowModMessage, error) { var operation int switch entryOper { case AddMessage: @@ -138,7 +138,7 @@ func (f *ofFlow) GetBundleMessage(entryOper OFOperation) (ofctrl.OpenFlowModMess if err != nil { return nil, err } - return message, nil + return []ofctrl.OpenFlowModMessage{message}, nil } // CopyToBuilder returns a new FlowBuilder that copies the table, protocols, diff --git a/pkg/ovs/openflow/ofctrl_group.go b/pkg/ovs/openflow/ofctrl_group.go index 1d5cbe606db..6bc0e769823 100644 --- a/pkg/ovs/openflow/ofctrl_group.go +++ b/pkg/ovs/openflow/ofctrl_group.go @@ -23,10 +23,13 @@ import ( "antrea.io/ofnet/ofctrl" ) +var ( + MaxBucketsPerMessage = 800 +) + type ofGroup struct { - ofctrl *ofctrl.Group - bridge *OFBridge - bucketsCount int + ofctrl *ofctrl.Group + bridge *OFBridge } // Reset creates a new ofctrl.Group object for the updated ofSwitch. The @@ -44,15 +47,15 @@ func (g *ofGroup) Reset() { } func (g *ofGroup) Add() error { - return g.ofctrl.Install() + return g.bridge.AddOFEntriesInBundle([]OFEntry{g}, nil, nil) } func (g *ofGroup) Modify() error { - return g.ofctrl.Install() + return g.bridge.AddOFEntriesInBundle(nil, []OFEntry{g}, nil) } func (g *ofGroup) Delete() error { - return g.ofctrl.Delete() + return g.bridge.AddOFEntriesInBundle(nil, nil, []OFEntry{g}) } func (g *ofGroup) Type() EntryType { @@ -71,7 +74,7 @@ func (g *ofGroup) Bucket() BucketBuilder { } } -func (g *ofGroup) GetBundleMessage(entryOper OFOperation) (ofctrl.OpenFlowModMessage, error) { +func (g *ofGroup) GetBundleMessages(entryOper OFOperation) ([]ofctrl.OpenFlowModMessage, error) { var operation int switch entryOper { case AddMessage: @@ -80,9 +83,36 @@ func (g *ofGroup) GetBundleMessage(entryOper OFOperation) (ofctrl.OpenFlowModMes operation = openflow15.OFPGC_MODIFY case DeleteMessage: operation = openflow15.OFPGC_DELETE + // If the operation is to delete the group, empty the slice storing buckets since the number of buckets could + // be greater than MaxBucketsPerMessage. + g.ofctrl.Buckets = nil + } + + var messages []ofctrl.OpenFlowModMessage + for start := 0; start <= len(g.ofctrl.Buckets); start += MaxBucketsPerMessage { + // Get the range of buckets to generate a temp group. + end := start + MaxBucketsPerMessage + if end > len(g.ofctrl.Buckets) { + end = len(g.ofctrl.Buckets) + } + // Generate a temp group to get an OVS message. Note that, the original group should not be modified since it is + // also stored in group cache, and the group cache is used when replying groups. + groupMessage := &ofctrl.Group{ + ID: g.ofctrl.ID, + GroupType: g.ofctrl.GroupType, + Buckets: g.ofctrl.Buckets[start:end], + } + // For the message which is not the first, insert_buckets is used to add buckets to the group on OVS. + if start != 0 { + operation = openflow15.OFPGC_INSERT_BUCKET + // There is no need to generate an insert_buckets message without bucket. + if start == end { + break + } + } + messages = append(messages, groupMessage.GetBundleMessage(operation)) } - message := g.ofctrl.GetBundleMessage(operation) - return message, nil + return messages, nil } func (g *ofGroup) ResetBuckets() Group { diff --git a/pkg/ovs/openflow/ofctrl_meter.go b/pkg/ovs/openflow/ofctrl_meter.go index a6af7e46f3e..299315fff61 100644 --- a/pkg/ovs/openflow/ofctrl_meter.go +++ b/pkg/ovs/openflow/ofctrl_meter.go @@ -51,7 +51,7 @@ func (m *ofMeter) KeyString() string { return fmt.Sprintf("meter_id:%d", m.ofctrl.ID) } -func (m *ofMeter) GetBundleMessage(entryOper OFOperation) (ofctrl.OpenFlowModMessage, error) { +func (m *ofMeter) GetBundleMessages(entryOper OFOperation) ([]ofctrl.OpenFlowModMessage, error) { var operation int switch entryOper { case AddMessage: @@ -62,7 +62,7 @@ func (m *ofMeter) GetBundleMessage(entryOper OFOperation) (ofctrl.OpenFlowModMes operation = openflow15.MC_DELETE } message := m.ofctrl.GetBundleMessage(operation) - return message, nil + return []ofctrl.OpenFlowModMessage{message}, nil } func (m *ofMeter) ResetMeterBands() Meter { diff --git a/pkg/ovs/openflow/testing/mock_openflow.go b/pkg/ovs/openflow/testing/mock_openflow.go index ee337edb3eb..efa6a99e3cc 100644 --- a/pkg/ovs/openflow/testing/mock_openflow.go +++ b/pkg/ovs/openflow/testing/mock_openflow.go @@ -191,10 +191,10 @@ func (mr *MockBridgeMockRecorder) DeleteFlowsByCookie(arg0, arg1 interface{}) *g } // DeleteGroup mocks base method -func (m *MockBridge) DeleteGroup(arg0 openflow.GroupIDType) bool { +func (m *MockBridge) DeleteGroup(arg0 openflow.GroupIDType) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DeleteGroup", arg0) - ret0, _ := ret[0].(bool) + ret0, _ := ret[0].(error) return ret0 } @@ -569,19 +569,19 @@ func (mr *MockFlowMockRecorder) FlowProtocol() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FlowProtocol", reflect.TypeOf((*MockFlow)(nil).FlowProtocol)) } -// GetBundleMessage mocks base method -func (m *MockFlow) GetBundleMessage(arg0 openflow.OFOperation) (ofctrl.OpenFlowModMessage, error) { +// GetBundleMessages mocks base method +func (m *MockFlow) GetBundleMessages(arg0 openflow.OFOperation) ([]ofctrl.OpenFlowModMessage, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetBundleMessage", arg0) - ret0, _ := ret[0].(ofctrl.OpenFlowModMessage) + ret := m.ctrl.Call(m, "GetBundleMessages", arg0) + ret0, _ := ret[0].([]ofctrl.OpenFlowModMessage) ret1, _ := ret[1].(error) return ret0, ret1 } -// GetBundleMessage indicates an expected call of GetBundleMessage -func (mr *MockFlowMockRecorder) GetBundleMessage(arg0 interface{}) *gomock.Call { +// GetBundleMessages indicates an expected call of GetBundleMessages +func (mr *MockFlowMockRecorder) GetBundleMessages(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBundleMessage", reflect.TypeOf((*MockFlow)(nil).GetBundleMessage), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBundleMessages", reflect.TypeOf((*MockFlow)(nil).GetBundleMessages), arg0) } // IsDropFlow mocks base method @@ -2234,19 +2234,19 @@ func (mr *MockGroupMockRecorder) Delete() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockGroup)(nil).Delete)) } -// GetBundleMessage mocks base method -func (m *MockGroup) GetBundleMessage(arg0 openflow.OFOperation) (ofctrl.OpenFlowModMessage, error) { +// GetBundleMessages mocks base method +func (m *MockGroup) GetBundleMessages(arg0 openflow.OFOperation) ([]ofctrl.OpenFlowModMessage, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetBundleMessage", arg0) - ret0, _ := ret[0].(ofctrl.OpenFlowModMessage) + ret := m.ctrl.Call(m, "GetBundleMessages", arg0) + ret0, _ := ret[0].([]ofctrl.OpenFlowModMessage) ret1, _ := ret[1].(error) return ret0, ret1 } -// GetBundleMessage indicates an expected call of GetBundleMessage -func (mr *MockGroupMockRecorder) GetBundleMessage(arg0 interface{}) *gomock.Call { +// GetBundleMessages indicates an expected call of GetBundleMessages +func (mr *MockGroupMockRecorder) GetBundleMessages(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBundleMessage", reflect.TypeOf((*MockGroup)(nil).GetBundleMessage), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBundleMessages", reflect.TypeOf((*MockGroup)(nil).GetBundleMessages), arg0) } // KeyString mocks base method diff --git a/test/integration/ovs/ofctrl_test.go b/test/integration/ovs/ofctrl_test.go index 908703eb73e..9be5f8a7185 100644 --- a/test/integration/ovs/ofctrl_test.go +++ b/test/integration/ovs/ofctrl_test.go @@ -867,6 +867,82 @@ func TestNoteAction(t *testing.T) { CheckFlowExists(t, ofctlClient, "", table.GetID(), false, expectFlows) } +func TestBundleWithGroupInsertBucket(t *testing.T) { + br := "br12" + err := PrepareOVSBridge(br) + require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) + defer DeleteOVSBridge(br) + + bridge := newOFBridge(br) + table = bridge.CreateTable(t2, t3.GetID(), binding.TableMissActionNext) + // Set the maximum of buckets in a message to test insert_buckets. + binding.MaxBucketsPerMessage = 2 + + err = bridge.Connect(maxRetry, make(chan struct{})) + require.Nil(t, err, "Failed to start OFService") + defer bridge.Disconnect() + + ovsCtlClient := ovsctl.NewClient(br) + groupID := binding.GroupIDType(4) + + // Create a group without buckets. + group := bridge.CreateGroup(groupID) + expectedGroupBuckets := []string{} + err = bridge.AddOFEntriesInBundle([]binding.OFEntry{group}, nil, nil) + require.Nil(t, err) + CheckGroupExists(t, ovsCtlClient, groupID, "select", expectedGroupBuckets, true) + + // Update the group, then the group has three buckets, and two OVS messages will be generated. + field1 := binding.NewRegField(1, 0, 31, "field1") + field2 := binding.NewRegField(2, 0, 31, "field2") + field3 := binding.NewRegField(3, 0, 31, "field3") + group = group. + Bucket().Weight(100). + LoadToRegField(field1, uint32(0xa0a0002)). + LoadToRegField(field2, uint32(0x1)). + LoadToRegField(field3, uint32(0xfff1)). + ResubmitToTable(table.GetNext()).Done(). + Bucket().Weight(100). + LoadToRegField(field1, uint32(0xa0a0202)). + LoadToRegField(field2, uint32(0x2)). + LoadToRegField(field3, uint32(0xfff1)). + ResubmitToTable(table.GetNext()).Done(). + Bucket().Weight(100). + LoadToRegField(field1, uint32(0xa0a0202)). + LoadToRegField(field2, uint32(0x3)). + LoadToRegField(field3, uint32(0xfff1)). + ResubmitToTable(table.GetNext()).Done() + + bucket1 := "weight:100,actions=set_field:0xa0a0002->reg1,set_field:0x1->reg2,set_field:0xfff1->reg3,resubmit(,3)" + bucket2 := "weight:100,actions=set_field:0xa0a0202->reg1,set_field:0x2->reg2,set_field:0xfff1->reg3,resubmit(,3)" + bucket3 := "weight:100,actions=set_field:0xa0a0202->reg1,set_field:0x3->reg2,set_field:0xfff1->reg3,resubmit(,3)" + expectedGroupBuckets = []string{bucket1, bucket2, bucket3} + err = bridge.AddOFEntriesInBundle(nil, []binding.OFEntry{group}, nil) + require.Nil(t, err) + CheckGroupExists(t, ovsCtlClient, groupID, "select", expectedGroupBuckets, true) + + group = group. + Bucket().Weight(100). + LoadToRegField(field1, uint32(0xa0a0202)). + LoadToRegField(field2, uint32(0x4)). + LoadToRegField(field3, uint32(0xfff1)). + ResubmitToTable(table.GetNext()).Done() + + bucket4 := "weight:100,actions=set_field:0xa0a0202->reg1,set_field:0x4->reg2,set_field:0xfff1->reg3,resubmit(,3)" + expectedGroupBuckets = []string{bucket1, bucket2, bucket3, bucket4} + err = bridge.AddOFEntriesInBundle(nil, []binding.OFEntry{group}, nil) + require.Nil(t, err) + CheckGroupExists(t, ovsCtlClient, groupID, "select", expectedGroupBuckets, true) + + group.ResetBuckets() + expectedGroupBuckets = []string{} + err = bridge.AddOFEntriesInBundle(nil, []binding.OFEntry{group}, nil) + require.Nil(t, err) + CheckGroupExists(t, ovsCtlClient, groupID, "select", expectedGroupBuckets, true) + // In case it affects other tests, set the maximum of buckets in a message back to 800. + binding.MaxBucketsPerMessage = 800 +} + func prepareFlows(table binding.Table) ([]binding.Flow, []*ExpectFlow) { var flows []binding.Flow _, AllIPs, _ := net.ParseCIDR("0.0.0.0/0")