From 981e466d2746aa3b33cd9c005fb5be8ecb31a362 Mon Sep 17 00:00:00 2001 From: Hongliang Liu Date: Sat, 27 Aug 2022 11:40:48 +0800 Subject: [PATCH] Remove the limit on the number of Endpoints in AntreaProxy Fixes: #2092 Due to the message size of Openflow, the maximum number of Endpoints for a Service in AntreaProxy is 800. Since Openflow 1.5 is used in Antrea, the operation `insert_buckets` introduced in Openflow 1.5 can be used to create a Service with more than 800 Endpoints. To sync Service with more than 800 Endpoints to OVS, multiple Openflow messages will be sent to OVS in a bundle (the first message uses `add` to sync the OVS group and its corresponding buckets to OVS, other messages use `insert_buckets` to sync other buckets to OVS). Signed-off-by: Hongliang Liu --- go.mod | 5 +- go.sum | 10 +-- pkg/agent/openflow/client.go | 15 +++-- pkg/agent/openflow/multicast.go | 12 +++- pkg/agent/proxy/proxier.go | 80 +++-------------------- pkg/ovs/openflow/interfaces.go | 8 +-- pkg/ovs/openflow/ofctrl_bridge.go | 33 ++++++---- pkg/ovs/openflow/ofctrl_flow.go | 4 +- pkg/ovs/openflow/ofctrl_group.go | 51 ++++++++++++--- pkg/ovs/openflow/ofctrl_meter.go | 4 +- pkg/ovs/openflow/testing/mock_openflow.go | 18 ++--- test/integration/ovs/ofctrl_test.go | 65 ++++++++++++++++++ 12 files changed, 184 insertions(+), 121 deletions(-) diff --git a/go.mod b/go.mod index 26629c4006a..e826b58b05c 100644 --- a/go.mod +++ b/go.mod @@ -190,4 +190,7 @@ require ( sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect ) -replace antrea.io/ofnet v0.6.0 => github.com/wenyingd/ofnet v0.0.0-20220817031400-cb451467adc1 +replace ( + antrea.io/libOpenflow v0.8.0 => github.com/hongliangl/libOpenflow v0.0.0-20220826044458-d89f3d570e60 + antrea.io/ofnet v0.6.1 => github.com/hongliangl/ofnet v0.0.0-20220829012559-0c68f9278ec0 +) diff --git a/go.sum b/go.sum index dc8645c3605..65bba67e085 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,3 @@ -antrea.io/libOpenflow v0.8.0 h1:Xm6mlSqdXtDD418nf1lndoDvMi8scqUan8pkEUZ2oas= -antrea.io/libOpenflow v0.8.0/go.mod h1:CzEJZxDNAupiGxeL5VOw92PsxfyvehEAvE3PiC6gr8o= -antrea.io/ofnet v0.6.1 h1:w/FIagCrN7dKt2A2R9grlmcSyGrqlCu+uFYPthtfXeg= -antrea.io/ofnet v0.6.1/go.mod h1:qWqi11pI3kBYcS9SYWm92ZOiOPBx04Jx21cDmJlJhOg= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= @@ -523,6 +519,12 @@ github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2p github.com/hashicorp/memberlist v0.3.1 h1:MXgUXLqva1QvpVEDQW1IQLG0wivQAtmFlHRQ+1vWZfM= github.com/hashicorp/memberlist v0.3.1/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc= +github.com/hongliangl/libOpenflow v0.0.0-20220826044458-d89f3d570e60 h1:SDez/OaeO7nwMQg4d5qXuQsrxybp9XXMxICSSBt0vPo= +github.com/hongliangl/libOpenflow v0.0.0-20220826044458-d89f3d570e60/go.mod h1:CzEJZxDNAupiGxeL5VOw92PsxfyvehEAvE3PiC6gr8o= +github.com/hongliangl/ofnet v0.0.0-20220829005218-b3e30a94edd3 h1:hXwOp1zQ7vJjBaBqWnbgNOPcUncUODp1PcaHgaSgMJ0= +github.com/hongliangl/ofnet v0.0.0-20220829005218-b3e30a94edd3/go.mod h1:Dbzusz1t4m7pFCvltb5wfX2NUEm9m18dDdEt57LBPNU= +github.com/hongliangl/ofnet v0.0.0-20220829012559-0c68f9278ec0 h1:NVDHPsxlCCgbHAp8UKdfQRciviSMzFNrnsWwZ4ABMks= +github.com/hongliangl/ofnet v0.0.0-20220829012559-0c68f9278ec0/go.mod h1:Dbzusz1t4m7pFCvltb5wfX2NUEm9m18dDdEt57LBPNU= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index af55e43e1c1..2e93ae586c8 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -620,8 +620,15 @@ func (c *client) InstallServiceGroup(groupID binding.GroupIDType, withSessionAff defer c.replayMutex.RUnlock() group := c.featureService.serviceEndpointGroup(groupID, withSessionAffinity, endpoints...) - if err := group.Add(); err != nil { - return fmt.Errorf("error when installing Service Endpoints Group: %w", err) + _, installed := c.featureService.groupCache.Load(groupID) + if !installed { + if err := group.Add(); err != nil { + return fmt.Errorf("error when installing Service Endpoints Group %d: %w", groupID, err) + } + } else { + if err := group.Modify(); err != nil { + return fmt.Errorf("error when modifying Service Endpoints Group %d: %w", groupID, err) + } } c.featureService.groupCache.Store(groupID, group) return nil @@ -630,8 +637,8 @@ func (c *client) InstallServiceGroup(groupID binding.GroupIDType, withSessionAff func (c *client) UninstallGroup(groupID binding.GroupIDType) error { c.replayMutex.RLock() defer c.replayMutex.RUnlock() - if !c.bridge.DeleteGroup(groupID) { - return fmt.Errorf("group %d delete failed", groupID) + if err := c.bridge.DeleteGroup(groupID); err != nil { + return fmt.Errorf("error when deleting Service Endpoints Group %d: %w", groupID, err) } c.featureService.groupCache.Delete(groupID) return nil diff --git a/pkg/agent/openflow/multicast.go b/pkg/agent/openflow/multicast.go index f8f9cd2db30..c553cf5bc53 100644 --- a/pkg/agent/openflow/multicast.go +++ b/pkg/agent/openflow/multicast.go @@ -99,8 +99,16 @@ func (f *featureMulticast) multicastReceiversGroup(groupID binding.GroupIDType, ResubmitToTable(MulticastOutputTable.GetID()). Done() } - if err := group.Add(); err != nil { - return fmt.Errorf("error when installing Multicast receiver Group: %w", err) + + _, installed := f.groupCache.Load(groupID) + if !installed { + if err := group.Add(); err != nil { + return fmt.Errorf("error when installing Multicast receiver Group %d: %w", groupID, err) + } + } else { + if err := group.Modify(); err != nil { + return fmt.Errorf("error when modifying Multicast receiver Group %d: %w", groupID, err) + } } f.groupCache.Store(groupID, group) return nil diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index 2c22baeb7b7..84d55195a40 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -19,7 +19,6 @@ import ( "math" "net" "reflect" - "sort" "strings" "sync" "time" @@ -28,7 +27,6 @@ import ( discovery "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/runtime" k8sapitypes "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" @@ -48,10 +46,6 @@ import ( const ( resyncPeriod = time.Minute componentName = "antrea-agent-proxy" - // Due to the maximum message size in Openflow 1.3 and the implementation of Services in Antrea, the maximum number - // of Endpoints that Antrea can support at the moment is 800. If the number of Endpoints for a given Service exceeds - // 800, extra Endpoints will be dropped. - maxEndpoints = 800 // SessionAffinity timeout is implemented using a hard_timeout in OVS. hard_timeout is // represented by a uint16 in the OpenFlow protocol, maxSupportedAffinityTimeout = math.MaxUint16 @@ -105,8 +99,6 @@ type proxier struct { serviceStringMap map[string]k8sproxy.ServicePortName // serviceStringMapMutex protects serviceStringMap object. serviceStringMapMutex sync.Mutex - // oversizeServiceSet records the Services that have more than 800 Endpoints. - oversizeServiceSet sets.String // syncedOnce returns true if the proxier has synced rules at least once. syncedOnce bool @@ -151,9 +143,6 @@ func (p *proxier) removeStaleServices() { } svcInfo := svcPort.(*types.ServiceInfo) klog.V(2).Infof("Removing stale Service: %s %s", svcPortName.Name, svcInfo.String()) - if p.oversizeServiceSet.Has(svcPortName.String()) { - p.oversizeServiceSet.Delete(svcPortName.String()) - } if err := p.ofClient.UninstallServiceFlows(svcInfo.ClusterIP(), uint16(svcInfo.Port()), svcInfo.OFProtocol); err != nil { klog.ErrorS(err, "Failed to remove flows of Service", "Service", svcPortName) continue @@ -415,66 +404,18 @@ func (p *proxier) installServices() { } var allEndpointUpdateList, localEndpointUpdateList []k8sproxy.Endpoint - if len(endpoints) > maxEndpoints { - if !p.oversizeServiceSet.Has(svcPortName.String()) { - klog.Warningf("Since Endpoints of Service %s exceeds %d, extra Endpoints will be dropped", svcPortName.String(), maxEndpoints) - p.oversizeServiceSet.Insert(svcPortName.String()) - } - // If the length of endpoints > maxEndpoints, endpoints should be cut. However, since endpoints is a map, iterate - // the map and append every Endpoint to a target slice. When the Endpoint is local, append Endpoint to slice - // localEndpointList, otherwise append the Endpoint to slice remoteEndpointList. Since the iteration order - // of map in Golang is not guaranteed, if split the Endpoints directly without any sorting, some Endpoints - // may not be installed, so split the endpointList after sorting. - var remoteEndpointList, localEndpointList []k8sproxy.Endpoint - for _, endpoint := range endpoints { - if endpoint.GetIsLocal() { - localEndpointList = append(localEndpointList, endpoint) - } else { - remoteEndpointList = append(remoteEndpointList, endpoint) + // Check if there is any installed Endpoint which is not expected anymore. If internalTrafficPolicy and externalTrafficPolicy + // are both Local, only local Endpoints should be installed and checked; if internalTrafficPolicy or externalTrafficPolicy + // is Cluster, all Endpoints should be installed and checked. + for _, endpoint := range endpoints { + if internalNodeLocal && externalNodeLocal && endpoint.GetIsLocal() || !internalNodeLocal || !externalNodeLocal { + if _, ok := endpointsInstalled[endpoint.String()]; !ok { // There is an expected Endpoint which is not installed. + needUpdateEndpoints = true } } - - sort.Sort(byEndpoint(remoteEndpointList)) - sort.Sort(byEndpoint(localEndpointList)) - - if len(localEndpointList) > maxEndpoints { - // When the number of local Endpoints is greater than maxEndpoints, choose maxEndpoints Endpoints from - // localEndpointList to install. - allEndpointUpdateList = localEndpointList[:maxEndpoints] - } else { - // When the number of local Endpoints is smaller than maxEndpoints, choose all Endpoints of localEndpointList - // and part of remoteEndpointList to install. - localEndpointUpdateList = localEndpointList - allEndpointUpdateList = append(localEndpointList, remoteEndpointList[:maxEndpoints-len(localEndpointList)]...) - } - // Check if there is any installed Endpoint which is not expected anymore. If internalTrafficPolicy and externalTrafficPolicy - // are both Local, only local Endpoints should be installed and checked; if internalTrafficPolicy or externalTrafficPolicy - // is Cluster, all Endpoints should be installed and checked. - for _, endpoint := range allEndpointUpdateList { - if internalNodeLocal && externalNodeLocal && endpoint.GetIsLocal() || !internalNodeLocal || !externalNodeLocal { - if _, ok := endpointsInstalled[endpoint.String()]; !ok { // There is an expected Endpoint which is not installed. - needUpdateEndpoints = true - break - } - } - } - } else { - if p.oversizeServiceSet.Has(svcPortName.String()) { - p.oversizeServiceSet.Delete(svcPortName.String()) - } - // Check if there is any installed Endpoint which is not expected anymore. If internalTrafficPolicy and externalTrafficPolicy - // are both Local, only local Endpoints should be installed and checked; if internalTrafficPolicy or externalTrafficPolicy - // is Cluster, all Endpoints should be installed and checked. - for _, endpoint := range endpoints { - if internalNodeLocal && externalNodeLocal && endpoint.GetIsLocal() || !internalNodeLocal || !externalNodeLocal { - if _, ok := endpointsInstalled[endpoint.String()]; !ok { // There is an expected Endpoint which is not installed. - needUpdateEndpoints = true - } - } - allEndpointUpdateList = append(allEndpointUpdateList, endpoint) - if endpoint.GetIsLocal() { - localEndpointUpdateList = append(localEndpointUpdateList, endpoint) - } + allEndpointUpdateList = append(allEndpointUpdateList, endpoint) + if endpoint.GetIsLocal() { + localEndpointUpdateList = append(localEndpointUpdateList, endpoint) } } @@ -988,7 +929,6 @@ func NewProxier( endpointReferenceCounter: map[string]int{}, nodeLabels: map[string]string{}, serviceStringMap: map[string]k8sproxy.ServicePortName{}, - oversizeServiceSet: sets.NewString(), groupCounter: groupCounter, ofClient: ofClient, routeClient: routeClient, diff --git a/pkg/ovs/openflow/interfaces.go b/pkg/ovs/openflow/interfaces.go index 2a9076f3ea8..1e517de05a9 100644 --- a/pkg/ovs/openflow/interfaces.go +++ b/pkg/ovs/openflow/interfaces.go @@ -96,7 +96,7 @@ type Bridge interface { DeleteTable(id uint8) bool CreateGroupTypeAll(id GroupIDType) Group CreateGroup(id GroupIDType) Group - DeleteGroup(id GroupIDType) bool + DeleteGroup(id GroupIDType) error CreateMeter(id MeterIDType, flags ofctrl.MeterFlag) Meter DeleteMeter(id MeterIDType) bool DeleteMeterAll() error @@ -183,9 +183,9 @@ type OFEntry interface { // Modify / Delete methods can be called on this object. This method // should be called if a reconnection event happened. Reset() - // GetBundleMessage returns ofctrl.OpenFlowModMessage which can be used in Bundle messages. operation specifies what - // operation is expected to be taken on the OFEntry. - GetBundleMessage(operation OFOperation) (ofctrl.OpenFlowModMessage, error) + // GetBundleMessages returns a slice of ofctrl.OpenFlowModMessage which can be used in Bundle messages. operation + // specifies what operation is expected to be taken on the OFEntry. + GetBundleMessages(operation OFOperation) ([]ofctrl.OpenFlowModMessage, error) } type Flow interface { diff --git a/pkg/ovs/openflow/ofctrl_bridge.go b/pkg/ovs/openflow/ofctrl_bridge.go index 225e8f9a4e5..a7c62601738 100644 --- a/pkg/ovs/openflow/ofctrl_bridge.go +++ b/pkg/ovs/openflow/ofctrl_bridge.go @@ -223,15 +223,16 @@ func (b *OFBridge) createGroupWithType(id GroupIDType, groupType ofctrl.GroupTyp return g } -func (b *OFBridge) DeleteGroup(id GroupIDType) bool { - g := b.ofSwitch.GetGroup(uint32(id)) - if g == nil { - return true +func (b *OFBridge) DeleteGroup(id GroupIDType) error { + ofctrlGroup := b.ofSwitch.GetGroup(uint32(id)) + if ofctrlGroup == nil { + return nil } + g := &ofGroup{bridge: b, ofctrl: ofctrlGroup} if err := g.Delete(); err != nil { - return false + return fmt.Errorf("failed to delete the group in bundle: %w", err) } - return true + return b.ofSwitch.DeleteGroup(uint32(id)) } func (b *OFBridge) CreateMeter(id MeterIDType, flags ofctrl.MeterFlag) Meter { @@ -552,26 +553,30 @@ func (b *OFBridge) AddOFEntriesInBundle(addEntries []OFEntry, modEntries []OFEnt return err } + var sentMessages int addMessage := func(entrySet []entryOperation) error { if entrySet == nil { return nil } for _, e := range entrySet { - msg, err := e.entry.GetBundleMessage(e.operation) + messages, err := e.entry.GetBundleMessages(e.operation) if err != nil { return err } + sentMessages += len(messages) // "AddMessage" operation is async, the function only returns error which occur when constructing and sending // the BundleAdd message. An absence of error does not mean that all OpenFlow entries are added into the // bundle by the switch. The number of entries successfully added to the bundle by the switch will be // returned by function "Complete". - if err := tx.AddMessage(msg); err != nil { - // Close the bundle and cancel it if there is error when adding the FlowMod message. - _, err := tx.Complete() - if err == nil { - tx.Abort() + for _, message := range messages { + if err := tx.AddMessage(message); err != nil { + // Close the bundle and cancel it if there is error when adding the FlowMod message. + _, err := tx.Complete() + if err == nil { + tx.Abort() + } + return err } - return err } } return nil @@ -592,7 +597,7 @@ func (b *OFBridge) AddOFEntriesInBundle(addEntries []OFEntry, modEntries []OFEnt count, err := tx.Complete() if err != nil { return err - } else if count != len(addEntries)+len(modEntries)+len(delEntries) { + } else if count != sentMessages { // This case should not be possible if all the calls to "tx.AddMessage" returned nil. This is just a sanity check. tx.Abort() return errors.New("failed to add all Openflow entries in one transaction, cancelling it") diff --git a/pkg/ovs/openflow/ofctrl_flow.go b/pkg/ovs/openflow/ofctrl_flow.go index 5428bb423ef..48430a50ad4 100644 --- a/pkg/ovs/openflow/ofctrl_flow.go +++ b/pkg/ovs/openflow/ofctrl_flow.go @@ -124,7 +124,7 @@ func (f *ofFlow) FlowProtocol() Protocol { return f.protocol } -func (f *ofFlow) GetBundleMessage(entryOper OFOperation) (ofctrl.OpenFlowModMessage, error) { +func (f *ofFlow) GetBundleMessages(entryOper OFOperation) ([]ofctrl.OpenFlowModMessage, error) { var operation int switch entryOper { case AddMessage: @@ -138,7 +138,7 @@ func (f *ofFlow) GetBundleMessage(entryOper OFOperation) (ofctrl.OpenFlowModMess if err != nil { return nil, err } - return message, nil + return []ofctrl.OpenFlowModMessage{message}, nil } // CopyToBuilder returns a new FlowBuilder that copies the table, protocols, diff --git a/pkg/ovs/openflow/ofctrl_group.go b/pkg/ovs/openflow/ofctrl_group.go index 7402aa8e638..aaeb9eac0b5 100644 --- a/pkg/ovs/openflow/ofctrl_group.go +++ b/pkg/ovs/openflow/ofctrl_group.go @@ -23,10 +23,13 @@ import ( "antrea.io/ofnet/ofctrl" ) +var ( + MaxBucketsPerMessage = 800 +) + type ofGroup struct { - ofctrl *ofctrl.Group - bridge *OFBridge - bucketsCount int + ofctrl *ofctrl.Group + bridge *OFBridge } // Reset creates a new ofctrl.Group object for the updated ofSwitch. The @@ -44,15 +47,15 @@ func (g *ofGroup) Reset() { } func (g *ofGroup) Add() error { - return g.ofctrl.Install() + return g.bridge.AddOFEntriesInBundle([]OFEntry{g}, nil, nil) } func (g *ofGroup) Modify() error { - return g.ofctrl.Install() + return g.bridge.AddOFEntriesInBundle(nil, []OFEntry{g}, nil) } func (g *ofGroup) Delete() error { - return g.ofctrl.Delete() + return g.bridge.AddOFEntriesInBundle(nil, nil, []OFEntry{g}) } func (g *ofGroup) Type() EntryType { @@ -71,7 +74,7 @@ func (g *ofGroup) Bucket() BucketBuilder { } } -func (g *ofGroup) GetBundleMessage(entryOper OFOperation) (ofctrl.OpenFlowModMessage, error) { +func (g *ofGroup) GetBundleMessages(entryOper OFOperation) ([]ofctrl.OpenFlowModMessage, error) { var operation int switch entryOper { case AddMessage: @@ -81,8 +84,38 @@ func (g *ofGroup) GetBundleMessage(entryOper OFOperation) (ofctrl.OpenFlowModMes case DeleteMessage: operation = openflow15.OFPGC_DELETE } - message := g.ofctrl.GetBundleMessage(operation) - return message, nil + + // If the operation is to delete the group or empty the group, create the bundle message directly. + if entryOper == DeleteMessage || len(g.ofctrl.Buckets) == 0 { + // If the operation is to delete the group, the slice storing buckets be empty since the number of buckets could + // be more than the maximum number of buckets per message. + g.ofctrl.Buckets = nil + return []ofctrl.OpenFlowModMessage{g.ofctrl.GetBundleMessage(operation)}, nil + } + + var messages []ofctrl.OpenFlowModMessage + // If the operation is to add or modify a group, generate one or more messages according to maxBucketsPerMessage. + // Note that, the command of the messages (except for the first one) should be insert_buckets. + if entryOper == AddMessage || entryOper == ModifyMessage { + messageNum := (len(g.ofctrl.Buckets) + MaxBucketsPerMessage - 1) / MaxBucketsPerMessage + for i := 0; i < messageNum; i++ { + firstBucketIdx := i * MaxBucketsPerMessage + lastBucketIdx := (i + 1) * MaxBucketsPerMessage + if lastBucketIdx > len(g.ofctrl.Buckets) { + lastBucketIdx = len(g.ofctrl.Buckets) + } + tempOfCtrl := &ofctrl.Group{ + ID: g.ofctrl.ID, + GroupType: g.ofctrl.GroupType, + Buckets: g.ofctrl.Buckets[firstBucketIdx:lastBucketIdx], + } + if i != 0 { + operation = openflow15.OFPGC_INSERT_BUCKET + } + messages = append(messages, tempOfCtrl.GetBundleMessage(operation)) + } + } + return messages, nil } func (g *ofGroup) ResetBuckets() Group { diff --git a/pkg/ovs/openflow/ofctrl_meter.go b/pkg/ovs/openflow/ofctrl_meter.go index a6af7e46f3e..299315fff61 100644 --- a/pkg/ovs/openflow/ofctrl_meter.go +++ b/pkg/ovs/openflow/ofctrl_meter.go @@ -51,7 +51,7 @@ func (m *ofMeter) KeyString() string { return fmt.Sprintf("meter_id:%d", m.ofctrl.ID) } -func (m *ofMeter) GetBundleMessage(entryOper OFOperation) (ofctrl.OpenFlowModMessage, error) { +func (m *ofMeter) GetBundleMessages(entryOper OFOperation) ([]ofctrl.OpenFlowModMessage, error) { var operation int switch entryOper { case AddMessage: @@ -62,7 +62,7 @@ func (m *ofMeter) GetBundleMessage(entryOper OFOperation) (ofctrl.OpenFlowModMes operation = openflow15.MC_DELETE } message := m.ofctrl.GetBundleMessage(operation) - return message, nil + return []ofctrl.OpenFlowModMessage{message}, nil } func (m *ofMeter) ResetMeterBands() Meter { diff --git a/pkg/ovs/openflow/testing/mock_openflow.go b/pkg/ovs/openflow/testing/mock_openflow.go index 6f1b6ef6ed7..6c9b0ca7f46 100644 --- a/pkg/ovs/openflow/testing/mock_openflow.go +++ b/pkg/ovs/openflow/testing/mock_openflow.go @@ -191,10 +191,10 @@ func (mr *MockBridgeMockRecorder) DeleteFlowsByCookie(arg0, arg1 interface{}) *g } // DeleteGroup mocks base method -func (m *MockBridge) DeleteGroup(arg0 openflow.GroupIDType) bool { +func (m *MockBridge) DeleteGroup(arg0 openflow.GroupIDType) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DeleteGroup", arg0) - ret0, _ := ret[0].(bool) + ret0, _ := ret[0].(error) return ret0 } @@ -569,19 +569,19 @@ func (mr *MockFlowMockRecorder) FlowProtocol() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FlowProtocol", reflect.TypeOf((*MockFlow)(nil).FlowProtocol)) } -// GetBundleMessage mocks base method -func (m *MockFlow) GetBundleMessage(arg0 openflow.OFOperation) (ofctrl.OpenFlowModMessage, error) { +// GetBundleMessages mocks base method +func (m *MockFlow) GetBundleMessages(arg0 openflow.OFOperation) ([]ofctrl.OpenFlowModMessage, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetBundleMessage", arg0) - ret0, _ := ret[0].(ofctrl.OpenFlowModMessage) + ret := m.ctrl.Call(m, "GetBundleMessages", arg0) + ret0, _ := ret[0].([]ofctrl.OpenFlowModMessage) ret1, _ := ret[1].(error) return ret0, ret1 } -// GetBundleMessage indicates an expected call of GetBundleMessage -func (mr *MockFlowMockRecorder) GetBundleMessage(arg0 interface{}) *gomock.Call { +// GetBundleMessages indicates an expected call of GetBundleMessages +func (mr *MockFlowMockRecorder) GetBundleMessages(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBundleMessage", reflect.TypeOf((*MockFlow)(nil).GetBundleMessage), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBundleMessages", reflect.TypeOf((*MockFlow)(nil).GetBundleMessages), arg0) } // IsDropFlow mocks base method diff --git a/test/integration/ovs/ofctrl_test.go b/test/integration/ovs/ofctrl_test.go index 908703eb73e..fe7a33fab08 100644 --- a/test/integration/ovs/ofctrl_test.go +++ b/test/integration/ovs/ofctrl_test.go @@ -867,6 +867,71 @@ func TestNoteAction(t *testing.T) { CheckFlowExists(t, ofctlClient, "", table.GetID(), false, expectFlows) } +func TestBundleWithGroupInsertBucket(t *testing.T) { + br := "br12" + err := PrepareOVSBridge(br) + require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) + defer DeleteOVSBridge(br) + + bridge := newOFBridge(br) + table = bridge.CreateTable(t2, t3.GetID(), binding.TableMissActionNext) + binding.MaxBucketsPerMessage = 2 + + err = bridge.Connect(maxRetry, make(chan struct{})) + require.Nil(t, err, "Failed to start OFService") + defer bridge.Disconnect() + + ovsCtlClient := ovsctl.NewClient(br) + + groupID := binding.GroupIDType(4) + field1 := binding.NewRegField(1, 0, 31, "field1") + field2 := binding.NewRegField(2, 0, 31, "field2") + field3 := binding.NewRegField(3, 0, 31, "field3") + group := bridge.CreateGroup(groupID). + Bucket().Weight(100). + LoadToRegField(field1, uint32(0xa0a0002)). + LoadToRegField(field2, uint32(0x1)). + LoadToRegField(field3, uint32(0xfff1)). + ResubmitToTable(table.GetNext()).Done(). + Bucket().Weight(100). + LoadToRegField(field1, uint32(0xa0a0202)). + LoadToRegField(field2, uint32(0x2)). + LoadToRegField(field3, uint32(0xfff1)). + ResubmitToTable(table.GetNext()).Done(). + Bucket().Weight(100). + LoadToRegField(field1, uint32(0xa0a0202)). + LoadToRegField(field2, uint32(0x3)). + LoadToRegField(field3, uint32(0xfff1)). + ResubmitToTable(table.GetNext()).Done() + + bucket1 := "weight:100,actions=set_field:0xa0a0002->reg1,set_field:0x1->reg2,set_field:0xfff1->reg3,resubmit(,3)" + bucket2 := "weight:100,actions=set_field:0xa0a0202->reg1,set_field:0x2->reg2,set_field:0xfff1->reg3,resubmit(,3)" + bucket3 := "weight:100,actions=set_field:0xa0a0202->reg1,set_field:0x3->reg2,set_field:0xfff1->reg3,resubmit(,3)" + expectedGroupBuckets := []string{bucket1, bucket2, bucket3} + err = bridge.AddOFEntriesInBundle([]binding.OFEntry{group}, nil, nil) + require.Nil(t, err) + CheckGroupExists(t, ovsCtlClient, groupID, "select", expectedGroupBuckets, true) + + group = group. + Bucket().Weight(100). + LoadToRegField(field1, uint32(0xa0a0202)). + LoadToRegField(field2, uint32(0x4)). + LoadToRegField(field3, uint32(0xfff1)). + ResubmitToTable(table.GetNext()).Done(). + Bucket().Weight(100). + LoadToRegField(field1, uint32(0xa0a0202)). + LoadToRegField(field2, uint32(0x5)). + LoadToRegField(field3, uint32(0xfff1)). + ResubmitToTable(table.GetNext()).Done() + + bucket4 := "weight:100,actions=set_field:0xa0a0202->reg1,set_field:0x4->reg2,set_field:0xfff1->reg3,resubmit(,3)" + bucket5 := "weight:100,actions=set_field:0xa0a0202->reg1,set_field:0x5->reg2,set_field:0xfff1->reg3,resubmit(,3)" + expectedGroupBuckets = []string{bucket1, bucket2, bucket3, bucket4, bucket5} + err = bridge.AddOFEntriesInBundle(nil, []binding.OFEntry{group}, nil) + require.Nil(t, err) + CheckGroupExists(t, ovsCtlClient, groupID, "select", expectedGroupBuckets, true) +} + func prepareFlows(table binding.Table) ([]binding.Flow, []*ExpectFlow) { var flows []binding.Flow _, AllIPs, _ := net.ParseCIDR("0.0.0.0/0")