From 8f50c858bfc4d4e396b8ab745781ca52c83f4f1f Mon Sep 17 00:00:00 2001 From: Yang Ding Date: Mon, 6 Jul 2020 17:28:34 -0700 Subject: [PATCH] Support for batch OVS flow updates --- pkg/agent/agent.go | 2 + .../networkpolicy/networkpolicy_controller.go | 77 +++++++++++ .../networkpolicy_controller_test.go | 10 ++ .../controller/networkpolicy/priority.go | 11 ++ .../controller/networkpolicy/reconciler.go | 120 +++++++++++++++-- .../networkpolicy/reconciler_test.go | 125 +++++++++++++++++- pkg/agent/openflow/client.go | 5 +- pkg/agent/openflow/network_policy.go | 87 +++++++++--- pkg/agent/openflow/network_policy_test.go | 16 ++- pkg/agent/openflow/testing/mock_openflow.go | 22 ++- pkg/agent/types/networkpolicy.go | 8 ++ test/integration/agent/openflow_test.go | 25 +++- 12 files changed, 467 insertions(+), 41 deletions(-) diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 9d0ee46c8e1..3a6ffe9d79c 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -336,6 +336,8 @@ func (i *Initializer) initOpenFlowPipeline() error { // TODO: introduce a deterministic mechanism through which the different entities // responsible for installing flows can notify the agent that this deletion // operation can take place. + // TODO: For deterministic mechanism, restartFullSyncWaitGroup could be part of the solution + // for determining if NP flows are synced once. Need other mechanisms for node flows though. time.Sleep(10 * time.Second) klog.Info("Deleting stale flows from previous round if any") if err := i.ofClient.DeleteStaleFlows(); err != nil { diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go index 51eb3c249c0..ea8a73c255e 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go @@ -73,6 +73,10 @@ type Controller struct { networkPolicyWatcher *watcher appliedToGroupWatcher *watcher addressGroupWatcher *watcher + appliedToSyncGroup sync.WaitGroup + addressSyncGroup sync.WaitGroup + internalNPSyncGroup sync.WaitGroup + restartSynced bool } // NewNetworkPolicyController returns a new *Controller. @@ -85,8 +89,16 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, antreaClientProvider: antreaClientGetter, queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "networkpolicyrule"), reconciler: newReconciler(ofClient, ifaceStore), + restartSynced: false, } c.ruleCache = newRuleCache(c.enqueueRule, podUpdates) + // Create a WaitGroup that is used to block network policy workers from asynchronously processing + // NP rules until the events preceding bookmark are synced. It can also be used as part of the + // solution to a determinstic mechanism for when to cleanup flows from previous round. + // Wait until appliedToGroupWatcher, addressGroupWatcher and networkPolicyWatcher to receive bookmark event. + c.appliedToSyncGroup.Add(1) + c.addressSyncGroup.Add(1) + c.internalNPSyncGroup.Add(1) // Use nodeName to filter resources when watching resources. options := metav1.ListOptions{ @@ -141,6 +153,8 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, c.ruleCache.ReplaceNetworkPolicies(policies) return nil }, + restartFullSyncGroup: &c.internalNPSyncGroup, + restartSynced: c.restartSynced, } c.appliedToGroupWatcher = &watcher{ @@ -188,6 +202,8 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, c.ruleCache.ReplaceAppliedToGroups(groups) return nil }, + restartFullSyncGroup: &c.appliedToSyncGroup, + restartSynced: c.restartSynced, } c.addressGroupWatcher = &watcher{ @@ -235,6 +251,8 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, c.ruleCache.ReplaceAddressGroups(groups) return nil }, + restartFullSyncGroup: &c.addressSyncGroup, + restartSynced: c.restartSynced, } return c } @@ -294,6 +312,15 @@ func (c *Controller) Run(stopCh <-chan struct{}) error { go wait.NonSlidingUntil(c.addressGroupWatcher.watch, 5*time.Second, stopCh) go wait.NonSlidingUntil(c.networkPolicyWatcher.watch, 5*time.Second, stopCh) + klog.Infof("Waiting for all watchers to receive bookmark event") + c.appliedToSyncGroup.Wait() + c.addressSyncGroup.Wait() + c.internalNPSyncGroup.Wait() + klog.Infof("All watchers have received bookmark event") + c.processAllItemsInQueue() + + klog.Infof("Staring workers!") + for i := 0; i < defaultWorkers; i++ { go wait.Until(c.worker, time.Second, stopCh) } @@ -328,6 +355,28 @@ func (c *Controller) processNextWorkItem() bool { return true } +func (c *Controller) processAllItemsInQueue() { + klog.Infof("Installing all network policy flows before bookmark event") + numRules := c.queue.Len() + batchSyncRuleKeys := make([]string, numRules) + for i := 0; i < numRules; i++ { + ruleKey, _ := c.queue.Get() + batchSyncRuleKeys[i] = ruleKey.(string) + // set key to done to prevent missing watched updates between here and fullSync finish + c.queue.Done(ruleKey) + } + // Reconcile all rule keys at once + if err := c.syncRules(batchSyncRuleKeys); err != nil { + klog.Errorf("Error occurred when reconciling all rules before bookmark event %v", err) + for _, k := range batchSyncRuleKeys { + c.queue.AddRateLimited(k) + } + } + c.appliedToGroupWatcher.restartSynced = true + c.addressGroupWatcher.restartSynced = true + c.networkPolicyWatcher.restartSynced = true +} + func (c *Controller) syncRule(key string) error { startTime := time.Now() defer func() { @@ -354,6 +403,27 @@ func (c *Controller) syncRule(key string) error { return nil } +func (c *Controller) syncRules(keys []string) error { + startTime := time.Now() + defer func() { + klog.V(4).Infof("Finished syncing all rules before bookmark event (%v)", time.Since(startTime)) + }() + + var allRules []*CompletedRule + for _, key := range keys { + rule, exists, completed := c.ruleCache.GetCompletedRule(key) + if !exists || !completed { + klog.Errorf("Rule %v is not complete or does not exist in cache", key) + } else { + allRules = append(allRules, rule) + } + } + if err := c.reconciler.BatchReconcile(allRules); err != nil { + return err + } + return nil +} + func (c *Controller) handleErr(err error, key interface{}) { if err == nil { c.queue.Forget(key) @@ -383,6 +453,9 @@ type watcher struct { connected bool // lock protects connected. lock sync.RWMutex + // group to be notified when each watcher receives bookmark event + restartFullSyncGroup *sync.WaitGroup + restartSynced bool } func (w *watcher) isConnected() bool { @@ -442,6 +515,10 @@ loop: klog.Errorf("Failed to handle init events: %v", err) return } + if !w.restartSynced { + // Notify restartFullSyncGroup that all events before bookmark is handled + w.restartFullSyncGroup.Done() + } for { select { diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go index 9cc15311ae3..2d0ac991801 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go @@ -76,6 +76,16 @@ func (r *mockReconciler) Reconcile(rule *CompletedRule) error { return nil } +func (r *mockReconciler) BatchReconcile(rules []*CompletedRule) error { + r.Lock() + defer r.Unlock() + for _, rule := range rules { + r.lastRealized[rule.ID] = rule + r.updated <- rule.ID + } + return nil +} + func (r *mockReconciler) Forget(ruleID string) error { r.Lock() defer r.Unlock() diff --git a/pkg/agent/controller/networkpolicy/priority.go b/pkg/agent/controller/networkpolicy/priority.go index 3ed9cb745d6..0d50d91ddc9 100644 --- a/pkg/agent/controller/networkpolicy/priority.go +++ b/pkg/agent/controller/networkpolicy/priority.go @@ -144,6 +144,17 @@ func (pa *priorityAssigner) GetOFPriority(p types.Priority) (*uint16, map[uint16 return &ofPriority, map[uint16]uint16{}, nil } +// RegisterPriorities registers a list of Priorities to be created with priorityMap. +// It is used to populate the priorityMap in case of batch rule adds. +func (pa *priorityAssigner) RegisterPriorities(priorities []types.Priority) error { + for _, p := range priorities { + if _, _, err := pa.GetOFPriority(p); err != nil { + return err + } + } + return nil +} + // Release removes the priority that currently corresponds to the input OFPriority from the priorityMap. func (pa *priorityAssigner) Release(priorityNum uint16) error { for priorityKey, p := range pa.priorityMap { diff --git a/pkg/agent/controller/networkpolicy/reconciler.go b/pkg/agent/controller/networkpolicy/reconciler.go index d03ea5065bc..6f252a382cc 100644 --- a/pkg/agent/controller/networkpolicy/reconciler.go +++ b/pkg/agent/controller/networkpolicy/reconciler.go @@ -50,6 +50,11 @@ type Reconciler interface { // with the actual state of Openflow entries. Reconcile(rule *CompletedRule) error + // BatchReconcile reconciles the desired state of the provided CompletedRules + // with the actual state of Openflow entries in batch. It should only be evoked + // if all rules are newly added without last realized status. + BatchReconcile(rules []*CompletedRule) error + // Forget cleanups the actual state of Openflow entries of the specified ruleID. Forget(ruleID string) error } @@ -231,9 +236,66 @@ func (r *reconciler) getOFPriority(rule *CompletedRule) (*uint16, error) { return ofPriority, nil } +func (r *reconciler) BatchReconcile(rules []*CompletedRule) error { + var rulesToInstall []*CompletedRule + var priorities []*uint16 + for _, rule := range rules { + if _, exists := r.lastRealizeds.Load(rule.ID); exists { + klog.Infof("rule %s already realized during the initialization phase", rule.ID) + } else { + rulesToInstall = append(rulesToInstall, rule) + } + } + if err := r.registerOFPriorities(rulesToInstall); err != nil { + return err + } + for _, rule := range rulesToInstall { + klog.Infof("Adding rule %s of NetworkPolicy %s/%s to be reconciled in batch", rule.ID, rule.PolicyNamespace, rule.PolicyName) + ofPriority, _ := r.getOFPriority(rule) + priorities = append(priorities, ofPriority) + } + ofRuleInstallErr := r.batchAdd(rulesToInstall, priorities) + if ofRuleInstallErr != nil { + for _, ofPriority := range priorities { + if ofPriority != nil { + r.priorityAssigner.Release(*ofPriority) + } + } + } + return ofRuleInstallErr +} + +func (r *reconciler) registerOFPriorities(rules []*CompletedRule) error { + r.priorityMutex.Lock() + defer r.priorityMutex.Unlock() + var prioritiesToRegister []types.Priority + for _, r := range rules { + if r.PolicyPriority != nil { + p := types.Priority{PolicyPriority: *r.PolicyPriority, RulePriority: r.Priority} + prioritiesToRegister = append(prioritiesToRegister, p) + } + } + return r.priorityAssigner.RegisterPriorities(prioritiesToRegister) +} + // add converts CompletedRule to PolicyRule(s) and invokes installOFRule to install them. func (r *reconciler) add(rule *CompletedRule, ofPriority *uint16) error { klog.V(2).Infof("Adding new rule %v", rule) + ofRuleByServicesMap, lastRealized := r.computeOFRulesForAdd(rule, ofPriority) + for svcHash, ofRule := range ofRuleByServicesMap { + npName := lastRealized.CompletedRule.PolicyName + npNamespace := lastRealized.CompletedRule.PolicyNamespace + ofID, err := r.installOFRule(ofRule, npName, npNamespace) + if err != nil { + return err + } + // Record ofID only if its Openflow is installed successfully. + lastRealized.ofIDs[svcHash] = ofID + } + return nil +} + +func (r *reconciler) computeOFRulesForAdd(rule *CompletedRule, ofPriority *uint16) (map[servicesHash]*types.PolicyRule, *lastRealized) { lastRealized := newLastRealized(rule) // TODO: Handle the case that the following processing fails or partially succeeds. r.lastRealizeds.Store(rule.ID, lastRealized) @@ -304,18 +366,50 @@ func (r *reconciler) add(rule *CompletedRule, ofPriority *uint16) error { } } } + return ofRuleByServicesMap, lastRealized +} - for svcHash, ofRule := range ofRuleByServicesMap { - npName := lastRealized.CompletedRule.PolicyName - npNamespace := lastRealized.CompletedRule.PolicyNamespace - ofID, err := r.installOFRule(ofRule, npName, npNamespace) - if err != nil { - return err +func (r *reconciler) batchAdd(rules []*CompletedRule, ofPriorities []*uint16) error { + lastRealizeds := make([]*lastRealized, len(rules)) + ofIDUpdateMaps := make([]map[servicesHash]uint32, len(rules)) + + var allOFRules []types.OFPolicyRule + + for idx, rule := range rules { + ofRuleByServicesMap, lastRealized := r.computeOFRulesForAdd(rule, ofPriorities[idx]) + lastRealizeds[idx] = lastRealized + for svcHash, ofRule := range ofRuleByServicesMap { + npName := lastRealized.CompletedRule.PolicyName + npNamespace := lastRealized.CompletedRule.PolicyNamespace + ofID, err := r.idAllocator.allocate() + if err != nil { + return fmt.Errorf("error allocating Openflow ID") + } + ofPolicyRule := types.OFPolicyRule{ + OfID: ofID, + OfRule: ofRule, + NpName: npName, + NpNamespace: npNamespace, + } + allOFRules = append(allOFRules, ofPolicyRule) + if ofIDUpdateMaps[idx] == nil { + ofIDUpdateMaps[idx] = make(map[servicesHash]uint32) + } + ofIDUpdateMaps[idx][svcHash] = ofID + } + } + if err := r.ofClient.BatchInstallPolicyRuleFlows(allOFRules); err != nil { + for _, rule := range allOFRules { + r.idAllocator.release(rule.OfID) + } + return err + } + for i, lastRealized := range lastRealizeds { + ofIDUpdatesByRule := ofIDUpdateMaps[i] + for svcHash, ofID := range ofIDUpdatesByRule { + lastRealized.ofIDs[svcHash] = ofID } - // Record ofID only if its Openflow is installed successfully. - lastRealized.ofIDs[svcHash] = ofID } - return nil } @@ -430,7 +524,13 @@ func (r *reconciler) installOFRule(ofRule *types.PolicyRule, npName, npNamespace } klog.V(2).Infof("Installing ofRule %d (Direction: %v, From: %d, To: %d, Service: %d)", ofID, ofRule.Direction, len(ofRule.From), len(ofRule.To), len(ofRule.Service)) - if err := r.ofClient.InstallPolicyRuleFlows(ofID, ofRule, npName, npNamespace); err != nil { + ofPolicyRule := types.OFPolicyRule{ + OfID: ofID, + OfRule: ofRule, + NpName: npName, + NpNamespace: npNamespace, + } + if err := r.ofClient.InstallPolicyRuleFlows(ofPolicyRule); err != nil { r.idAllocator.release(ofID) return 0, fmt.Errorf("error installing ofRule %v: %v", ofID, err) } diff --git a/pkg/agent/controller/networkpolicy/reconciler_test.go b/pkg/agent/controller/networkpolicy/reconciler_test.go index e1228031038..2c87da9f724 100644 --- a/pkg/agent/controller/networkpolicy/reconciler_test.go +++ b/pkg/agent/controller/networkpolicy/reconciler_test.go @@ -428,8 +428,9 @@ func TestReconcilerReconcile(t *testing.T) { controller := gomock.NewController(t) defer controller.Finish() mockOFClient := openflowtest.NewMockClient(controller) - for _, ofRule := range tt.expectedOFRules { - mockOFClient.EXPECT().InstallPolicyRuleFlows(gomock.Any(), gomock.Eq(ofRule), "", "") + // TODO: mock idAllocator and priorityAssigner + for i := 0; i < len(tt.expectedOFRules); i++ { + mockOFClient.EXPECT().InstallPolicyRuleFlows(gomock.Any()) } r := newReconciler(mockOFClient, ifaceStore) if err := r.Reconcile(tt.args); (err != nil) != tt.wantErr { @@ -439,6 +440,124 @@ func TestReconcilerReconcile(t *testing.T) { } } +func TestReconcilerBatchReconcile(t *testing.T) { + ifaceStore := interfacestore.NewInterfaceStore() + ifaceStore.AddInterface(&interfacestore.InterfaceConfig{ + InterfaceName: util.GenerateContainerInterfaceName("pod1", "ns1", "container1"), + IP: net.ParseIP("2.2.2.2"), + ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{PodName: "pod1", PodNamespace: "ns1", ContainerID: "container1"}, + OVSPortConfig: &interfacestore.OVSPortConfig{OFPort: 1}, + }) + ifaceStore.AddInterface(&interfacestore.InterfaceConfig{ + InterfaceName: util.GenerateContainerInterfaceName("pod3", "ns1", "container3"), + IP: net.ParseIP("3.3.3.3"), + ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{PodName: "pod3", PodNamespace: "ns1", ContainerID: "container3"}, + OVSPortConfig: &interfacestore.OVSPortConfig{OFPort: 3}, + }) + completedRules := []*CompletedRule{ + { + rule: &rule{ID: "ingress-rule", Direction: v1beta1.DirectionIn, Services: []v1beta1.Service{serviceTCP80, serviceTCP}}, + FromAddresses: addressGroup1, + ToAddresses: nil, + Pods: appliedToGroup1, + }, + { + rule: &rule{ID: "ingress-rule-no-ports", Direction: v1beta1.DirectionIn, Services: []v1beta1.Service{}}, + Pods: appliedToGroup1, + }, + { + rule: &rule{ID: "ingress-rule-diff-named-port", Direction: v1beta1.DirectionIn, Services: []v1beta1.Service{serviceHTTP}}, + Pods: appliedToGroupWithDiffContainerPort, + }, + { + rule: &rule{ID: "egress-rule", Direction: v1beta1.DirectionOut}, + FromAddresses: nil, + ToAddresses: addressGroup1, + Pods: appliedToGroup1, + }, + } + expectedOFRules := []*types.PolicyRule{ + { + Direction: v1beta1.DirectionIn, + From: ipsToOFAddresses(sets.NewString("1.1.1.1")), + To: ofPortsToOFAddresses(sets.NewInt32(1)), + Service: []v1beta1.Service{serviceTCP80, serviceTCP}, + }, + { + Direction: v1beta1.DirectionIn, + From: []types.Address{}, + To: ofPortsToOFAddresses(sets.NewInt32(1)), + Service: nil, + }, + { + Direction: v1beta1.DirectionIn, + From: []types.Address{}, + To: ofPortsToOFAddresses(sets.NewInt32(1)), + Service: []v1beta1.Service{serviceTCP80}, + }, + { + Direction: v1beta1.DirectionIn, + From: []types.Address{}, + To: ofPortsToOFAddresses(sets.NewInt32(3)), + Service: []v1beta1.Service{serviceTCP443}, + }, + { + Direction: v1beta1.DirectionOut, + From: ipsToOFAddresses(sets.NewString("2.2.2.2")), + To: ipsToOFAddresses(sets.NewString("1.1.1.1")), + Service: nil, + }, + } + tests := []struct { + name string + args []*CompletedRule + expectedOFRules []*types.PolicyRule + numInstalledRules int + wantErr bool + }{ + { + "batch-install", + completedRules, + expectedOFRules, + 0, + false, + }, + { + "batch-install-partial", + completedRules, + expectedOFRules, + 1, + false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + controller := gomock.NewController(t) + defer controller.Finish() + mockOFClient := openflowtest.NewMockClient(controller) + r := newReconciler(mockOFClient, ifaceStore) + if tt.numInstalledRules > 0 { + // BatchInstall should skip rules already installed + r.lastRealizeds.Store(tt.args[0].ID, newLastRealized(tt.args[0])) + } + // TODO: mock idAllocator and priorityAssigner + mockOFClient.EXPECT().BatchInstallPolicyRuleFlows(gomock.Any()). + Do(func(rules []types.OFPolicyRule) { + if tt.numInstalledRules == 0 && len(rules) != len(tt.expectedOFRules) { + t.Fatalf("Expect to install %v flows while %v flows were installed", + len(tt.expectedOFRules), len(rules)) + } else if tt.numInstalledRules > 0 && len(rules) != len(tt.expectedOFRules)-tt.numInstalledRules { + t.Fatalf("Expect to install %v flows while %v flows were installed", + len(tt.expectedOFRules)-tt.numInstalledRules, len(rules)) + } + }) + if err := r.BatchReconcile(tt.args); (err != nil) != tt.wantErr { + t.Fatalf("BatchReconcile() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + func TestReconcilerUpdate(t *testing.T) { ifaceStore := interfacestore.NewInterfaceStore() ifaceStore.AddInterface( @@ -559,7 +678,7 @@ func TestReconcilerUpdate(t *testing.T) { controller := gomock.NewController(t) defer controller.Finish() mockOFClient := openflowtest.NewMockClient(controller) - mockOFClient.EXPECT().InstallPolicyRuleFlows(gomock.Any(), gomock.Any(), "", "") + mockOFClient.EXPECT().InstallPolicyRuleFlows(gomock.Any()) if len(tt.expectedAddedFrom) > 0 { mockOFClient.EXPECT().AddPolicyRuleAddress(gomock.Any(), types.SrcAddress, gomock.Eq(tt.expectedAddedFrom), nil) } diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index adae1122f31..97738d59697 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -122,7 +122,10 @@ type Client interface { // NetworkPolicy rule. Each ingress/egress policy rule installs Openflow entries on two tables, one for // ruleTable and the other for dropTable. If a packet does not pass the ruleTable, it will be dropped by the // dropTable. - InstallPolicyRuleFlows(ruleID uint32, rule *types.PolicyRule, npName, npNamespace string) error + InstallPolicyRuleFlows(ofPolicyRule types.OFPolicyRule) error + + // BatchInstallPolicyRuleFlows installs multiple flows for NetworkPolicy rules in batch. + BatchInstallPolicyRuleFlows(ofPolicyRules []types.OFPolicyRule) error // UninstallPolicyRuleFlows removes the Openflow entry relevant to the specified NetworkPolicy rule. // It also returns a slice of stale ofPriorities used by ClusterNetworkPolicies. diff --git a/pkg/agent/openflow/network_policy.go b/pkg/agent/openflow/network_policy.go index fb42fa8b0de..336790f7934 100644 --- a/pkg/agent/openflow/network_policy.go +++ b/pkg/agent/openflow/network_policy.go @@ -711,21 +711,41 @@ func (c *policyRuleConjunction) getAddressClause(addrType types.AddressType) *cl // If there is an error in any clause's addAddrFlows or addServiceFlows, the conjunction action flow will never be hit. // If the default drop flow is already installed before this error, all packets will be dropped by the default drop flow, // Otherwise all packets will be allowed. -func (c *client) InstallPolicyRuleFlows(ruleID uint32, rule *types.PolicyRule, npName, npNamespace string) error { +func (c *client) InstallPolicyRuleFlows(ofPolicyRule types.OFPolicyRule) error { c.replayMutex.RLock() defer c.replayMutex.RUnlock() + conj := c.calculateActionFlowChangesForRule(ofPolicyRule) + + c.conjMatchFlowLock.Lock() + defer c.conjMatchFlowLock.Unlock() + ctxChanges := c.calculateMatchFlowChangesForRule(conj, ofPolicyRule.OfRule, false) + + if err := c.ofEntryOperations.AddAll(conj.actionFlows); err != nil { + return err + } + if err := c.applyConjunctiveMatchFlows(ctxChanges); err != nil { + return err + } + // Add the policyRuleConjunction into policyCache + c.policyCache.Add(conj) + return nil +} + +// calculateActionFlowChangesForRule calculates and updates the actionFlows for the conjunction corresponded to the ofPolicyRule. +func (c *client) calculateActionFlowChangesForRule(ofPolicyRule types.OFPolicyRule) *policyRuleConjunction { + rule := ofPolicyRule.OfRule + ruleID := ofPolicyRule.OfID // Check if the policyRuleConjunction is added into cache or not. If yes, return nil. conj := c.getPolicyRuleConjunction(ruleID) if conj != nil { klog.V(2).Infof("PolicyRuleConjunction %d is already added in cache", ruleID) return nil } - conj = &policyRuleConjunction{ id: ruleID, - npName: npName, - npNamespace: npNamespace} + npName: ofPolicyRule.NpName, + npNamespace: ofPolicyRule.NpNamespace} nClause, ruleTable, dropTable := conj.calculateClauses(rule, c) // Conjunction action flows are installed only if the number of clauses in the conjunction is > 1. It should be a rule @@ -739,31 +759,61 @@ func (c *client) InstallPolicyRuleFlows(ruleID uint32, rule *types.PolicyRule, n } else { actionFlows = append(actionFlows, c.conjunctionActionFlow(ruleID, ruleTable.GetID(), dropTable.GetNext(), rule.Priority)) } - if err := c.ofEntryOperations.AddAll(actionFlows); err != nil { - return nil - } - // Add the action flows after the Openflow entries are installed on the OVS bridge successfully. conj.actionFlows = actionFlows } - c.conjMatchFlowLock.Lock() - defer c.conjMatchFlowLock.Unlock() + return conj +} +// calculateMatchFlowChangesForRule calculates the contextChanges for the policyRule, and updates the context status in case of batch install. +func (c *client) calculateMatchFlowChangesForRule(conj *policyRuleConjunction, rule *types.PolicyRule, isBatchInstall bool) []*conjMatchFlowContextChange { // Calculate the conjMatchFlowContext changes. The changed Openflow entries are included in the conjMatchFlowContext change. ctxChanges := conj.calculateChangesForRuleCreation(c, rule) + // Update conjunctiveMatchContext if during batch flow install, otherwise the subsequent contextChange + // calculations will not be based on the previous flowChanges that have not been sent to OVS bridge. + // TODO: roll back if batch flow install fails? + if isBatchInstall { + for _, ctxChange := range ctxChanges { + ctxChange.updateContextStatus() + } + } + return ctxChanges +} - // Send the changed Openflow entries to the OVS bridge, and then update the conjMatchFlowContext as the expected status. - if err := c.applyConjunctiveMatchFlows(ctxChanges); err != nil { +// BatchInstallPolicyRuleFlows installs flows for NetworkPolicy rules in case of agent restart. It calculates and +// accumulates all Openflow entry updates required and installs all of them on OVS bridge in one bundle. +func (c *client) BatchInstallPolicyRuleFlows(ofPolicyRules []types.OFPolicyRule) error { + c.replayMutex.RLock() + defer c.replayMutex.RUnlock() + + var allCtxChanges []*conjMatchFlowContextChange + var allActionFlowChanges []*binding.Flow + var updatedConjunctions []*policyRuleConjunction + + for _, rule := range ofPolicyRules { + conj := c.calculateActionFlowChangesForRule(rule) + ctxChanges := c.calculateMatchFlowChangesForRule(conj, rule.OfRule, true) + for _, actionFlow := range conj.actionFlows { + allActionFlowChanges = append(allActionFlowChanges, &actionFlow) + } + allCtxChanges = append(allCtxChanges, ctxChanges...) + updatedConjunctions = append(updatedConjunctions, conj) + } + // Send the changed Openflow entries to the OVS bridge. + if err := c.sendConjunctiveFlows(allCtxChanges, allActionFlowChanges); err != nil { return err } - // Add the policyRuleConjunction into policyCache. - c.policyCache.Add(conj) + // Update conjMatchFlowContexts as the expected status. + for _, conj := range updatedConjunctions { + // Add the policyRuleConjunction into policyCache + c.policyCache.Add(conj) + } return nil } // applyConjunctiveMatchFlows installs OpenFlow entries on the OVS bridge, and then updates the conjMatchFlowContext. func (c *client) applyConjunctiveMatchFlows(flowChanges []*conjMatchFlowContextChange) error { // Send the OpenFlow entries to the OVS bridge. - if err := c.sendConjunctiveMatchFlows(flowChanges); err != nil { + if err := c.sendConjunctiveFlows(flowChanges, nil); err != nil { return err } // Update conjunctiveMatchContext. @@ -773,9 +823,12 @@ func (c *client) applyConjunctiveMatchFlows(flowChanges []*conjMatchFlowContextC return nil } -// sendConjunctiveMatchFlows sends all the changed OpenFlow entries to the OVS bridge in a single Bundle. -func (c *client) sendConjunctiveMatchFlows(changes []*conjMatchFlowContextChange) error { +// sendConjunctiveFlows sends all the changed OpenFlow entries to the OVS bridge in a single Bundle. +func (c *client) sendConjunctiveFlows(changes []*conjMatchFlowContextChange, actionFlows []*binding.Flow) error { var addFlows, modifyFlows, deleteFlows []binding.Flow + for _, actionFlow := range actionFlows { + addFlows = append(addFlows, *actionFlow) + } var flowChanges []*flowChange for _, flowChange := range changes { if flowChange.matchFlow != nil { diff --git a/pkg/agent/openflow/network_policy_test.go b/pkg/agent/openflow/network_policy_test.go index 66d684cba7b..d9e7630ec35 100644 --- a/pkg/agent/openflow/network_policy_test.go +++ b/pkg/agent/openflow/network_policy_test.go @@ -168,7 +168,13 @@ func TestInstallPolicyRuleFlows(t *testing.T) { require.Nil(t, err) assert.Equal(t, 0, len(c.GetNetworkPolicyFlowKeys("np1", "ns1"))) - err = c.InstallPolicyRuleFlows(ruleID2, rule2, "np1", "ns1") + ofPolicyRule2 := types.OFPolicyRule{ + OfID: ruleID2, + OfRule: rule2, + NpName: "np1", + NpNamespace: "ns1", + } + err = c.InstallPolicyRuleFlows(ofPolicyRule2) require.Nil(t, err) checkConjunctionConfig(t, ruleID2, 1, 2, 1, 0) assert.Equal(t, 6, len(c.GetNetworkPolicyFlowKeys("np1", "ns1"))) @@ -203,7 +209,13 @@ func TestInstallPolicyRuleFlows(t *testing.T) { err = c.applyConjunctiveMatchFlows(ctxChanges3) require.Nil(t, err) - err = c.InstallPolicyRuleFlows(ruleID3, rule3, "np1", "ns1") + ofPolicyRule3 := types.OFPolicyRule{ + OfID: ruleID3, + OfRule: rule3, + NpName: "np1", + NpNamespace: "ns1", + } + err = c.InstallPolicyRuleFlows(ofPolicyRule3) require.Nil(t, err, "Failed to invoke InstallPolicyRuleFlows") checkConjunctionConfig(t, ruleID3, 1, 2, 1, 2) assert.Equal(t, 14, len(c.GetNetworkPolicyFlowKeys("np1", "ns1"))) diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index 6e1d1925a19..27e802569f7 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -67,6 +67,20 @@ func (mr *MockClientMockRecorder) AddPolicyRuleAddress(arg0, arg1, arg2, arg3 in return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddPolicyRuleAddress", reflect.TypeOf((*MockClient)(nil).AddPolicyRuleAddress), arg0, arg1, arg2, arg3) } +// BatchInstallPolicyRuleFlows mocks base method +func (m *MockClient) BatchInstallPolicyRuleFlows(arg0 []types.OFPolicyRule) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "BatchInstallPolicyRuleFlows", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// BatchInstallPolicyRuleFlows indicates an expected call of BatchInstallPolicyRuleFlows +func (mr *MockClientMockRecorder) BatchInstallPolicyRuleFlows(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BatchInstallPolicyRuleFlows", reflect.TypeOf((*MockClient)(nil).BatchInstallPolicyRuleFlows), arg0) +} + // DeletePolicyRuleAddress mocks base method func (m *MockClient) DeletePolicyRuleAddress(arg0 uint32, arg1 types.AddressType, arg2 []types.Address, arg3 *uint16) error { m.ctrl.T.Helper() @@ -336,17 +350,17 @@ func (mr *MockClientMockRecorder) InstallPodFlows(arg0, arg1, arg2, arg3, arg4 i } // InstallPolicyRuleFlows mocks base method -func (m *MockClient) InstallPolicyRuleFlows(arg0 uint32, arg1 *types.PolicyRule, arg2, arg3 string) error { +func (m *MockClient) InstallPolicyRuleFlows(arg0 types.OFPolicyRule) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "InstallPolicyRuleFlows", arg0, arg1, arg2, arg3) + ret := m.ctrl.Call(m, "InstallPolicyRuleFlows", arg0) ret0, _ := ret[0].(error) return ret0 } // InstallPolicyRuleFlows indicates an expected call of InstallPolicyRuleFlows -func (mr *MockClientMockRecorder) InstallPolicyRuleFlows(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { +func (mr *MockClientMockRecorder) InstallPolicyRuleFlows(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallPolicyRuleFlows", reflect.TypeOf((*MockClient)(nil).InstallPolicyRuleFlows), arg0, arg1, arg2, arg3) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallPolicyRuleFlows", reflect.TypeOf((*MockClient)(nil).InstallPolicyRuleFlows), arg0) } // InstallServiceFlows mocks base method diff --git a/pkg/agent/types/networkpolicy.go b/pkg/agent/types/networkpolicy.go index 76bba316d6c..5edf33b72d7 100644 --- a/pkg/agent/types/networkpolicy.go +++ b/pkg/agent/types/networkpolicy.go @@ -61,3 +61,11 @@ type Priority struct { PolicyPriority float64 RulePriority int32 } + +// OFPolicyRule groups all configurations that the openflow module needs to install flow for PolicyRule. +type OFPolicyRule struct { + OfID uint32 + OfRule *PolicyRule + NpName string + NpNamespace string +} diff --git a/test/integration/agent/openflow_test.go b/test/integration/agent/openflow_test.go index 7d85c23ef0d..93827e95e6a 100644 --- a/test/integration/agent/openflow_test.go +++ b/test/integration/agent/openflow_test.go @@ -169,7 +169,13 @@ func TestReplayFlowsNetworkPolicyFlows(t *testing.T) { Action: &defaultAction, } - err = c.InstallPolicyRuleFlows(ruleID, rule, "np1", "ns1") + ofPolicyRule := types.OFPolicyRule{ + OfID: ruleID, + OfRule: rule, + NpName: "np1", + NpNamespace: "ns1", + } + err = c.InstallPolicyRuleFlows(ofPolicyRule) require.Nil(t, err, "Failed to InstallPolicyRuleFlows") err = c.AddPolicyRuleAddress(ruleID, types.SrcAddress, prepareIPNetAddresses([]string{"192.168.5.0/24", "192.169.1.0/24"}), nil) @@ -325,8 +331,13 @@ func TestNetworkPolicyFlows(t *testing.T) { Service: []v1beta1.Service{npPort1}, Action: &defaultAction, } - - err = c.InstallPolicyRuleFlows(ruleID, rule, "np1", "ns1") + ofPolicyRule := types.OFPolicyRule{ + OfID: ruleID, + OfRule: rule, + NpName: "np1", + NpNamespace: "ns1", + } + err = c.InstallPolicyRuleFlows(ofPolicyRule) require.Nil(t, err, "Failed to InstallPolicyRuleFlows") checkConjunctionFlows(t, ingressRuleTable, ingressDefaultTable, contrackCommitTable, priorityNormal, ruleID, rule, assert.True) checkDefaultDropFlows(t, ingressDefaultTable, priorityNormal, types.DstAddress, toIPList, true) @@ -361,7 +372,13 @@ func TestNetworkPolicyFlows(t *testing.T) { Service: []v1beta1.Service{npPort2}, Action: &defaultAction, } - err = c.InstallPolicyRuleFlows(ruleID2, rule2, "np1", "ns1") + ofPolicyRule2 := types.OFPolicyRule{ + OfID: ruleID2, + OfRule: rule2, + NpName: "np1", + NpNamespace: "ns1", + } + err = c.InstallPolicyRuleFlows(ofPolicyRule2) require.Nil(t, err, "Failed to InstallPolicyRuleFlows") // Dump flows