Skip to content

Commit

Permalink
Support for batch OVS flow updates
Browse files Browse the repository at this point in the history
  • Loading branch information
Dyanngg committed Jul 16, 2020
1 parent 412587d commit 8f50c85
Show file tree
Hide file tree
Showing 12 changed files with 467 additions and 41 deletions.
2 changes: 2 additions & 0 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,8 @@ func (i *Initializer) initOpenFlowPipeline() error {
// TODO: introduce a deterministic mechanism through which the different entities
// responsible for installing flows can notify the agent that this deletion
// operation can take place.
// TODO: For deterministic mechanism, restartFullSyncWaitGroup could be part of the solution
// for determining if NP flows are synced once. Need other mechanisms for node flows though.
time.Sleep(10 * time.Second)
klog.Info("Deleting stale flows from previous round if any")
if err := i.ofClient.DeleteStaleFlows(); err != nil {
Expand Down
77 changes: 77 additions & 0 deletions pkg/agent/controller/networkpolicy/networkpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ type Controller struct {
networkPolicyWatcher *watcher
appliedToGroupWatcher *watcher
addressGroupWatcher *watcher
appliedToSyncGroup sync.WaitGroup
addressSyncGroup sync.WaitGroup
internalNPSyncGroup sync.WaitGroup
restartSynced bool
}

// NewNetworkPolicyController returns a new *Controller.
Expand All @@ -85,8 +89,16 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider,
antreaClientProvider: antreaClientGetter,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "networkpolicyrule"),
reconciler: newReconciler(ofClient, ifaceStore),
restartSynced: false,
}
c.ruleCache = newRuleCache(c.enqueueRule, podUpdates)
// Create a WaitGroup that is used to block network policy workers from asynchronously processing
// NP rules until the events preceding bookmark are synced. It can also be used as part of the
// solution to a determinstic mechanism for when to cleanup flows from previous round.
// Wait until appliedToGroupWatcher, addressGroupWatcher and networkPolicyWatcher to receive bookmark event.
c.appliedToSyncGroup.Add(1)
c.addressSyncGroup.Add(1)
c.internalNPSyncGroup.Add(1)

// Use nodeName to filter resources when watching resources.
options := metav1.ListOptions{
Expand Down Expand Up @@ -141,6 +153,8 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider,
c.ruleCache.ReplaceNetworkPolicies(policies)
return nil
},
restartFullSyncGroup: &c.internalNPSyncGroup,
restartSynced: c.restartSynced,
}

c.appliedToGroupWatcher = &watcher{
Expand Down Expand Up @@ -188,6 +202,8 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider,
c.ruleCache.ReplaceAppliedToGroups(groups)
return nil
},
restartFullSyncGroup: &c.appliedToSyncGroup,
restartSynced: c.restartSynced,
}

c.addressGroupWatcher = &watcher{
Expand Down Expand Up @@ -235,6 +251,8 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider,
c.ruleCache.ReplaceAddressGroups(groups)
return nil
},
restartFullSyncGroup: &c.addressSyncGroup,
restartSynced: c.restartSynced,
}
return c
}
Expand Down Expand Up @@ -294,6 +312,15 @@ func (c *Controller) Run(stopCh <-chan struct{}) error {
go wait.NonSlidingUntil(c.addressGroupWatcher.watch, 5*time.Second, stopCh)
go wait.NonSlidingUntil(c.networkPolicyWatcher.watch, 5*time.Second, stopCh)

klog.Infof("Waiting for all watchers to receive bookmark event")
c.appliedToSyncGroup.Wait()
c.addressSyncGroup.Wait()
c.internalNPSyncGroup.Wait()
klog.Infof("All watchers have received bookmark event")
c.processAllItemsInQueue()

klog.Infof("Staring workers!")

for i := 0; i < defaultWorkers; i++ {
go wait.Until(c.worker, time.Second, stopCh)
}
Expand Down Expand Up @@ -328,6 +355,28 @@ func (c *Controller) processNextWorkItem() bool {
return true
}

func (c *Controller) processAllItemsInQueue() {
klog.Infof("Installing all network policy flows before bookmark event")
numRules := c.queue.Len()
batchSyncRuleKeys := make([]string, numRules)
for i := 0; i < numRules; i++ {
ruleKey, _ := c.queue.Get()
batchSyncRuleKeys[i] = ruleKey.(string)
// set key to done to prevent missing watched updates between here and fullSync finish
c.queue.Done(ruleKey)
}
// Reconcile all rule keys at once
if err := c.syncRules(batchSyncRuleKeys); err != nil {
klog.Errorf("Error occurred when reconciling all rules before bookmark event %v", err)
for _, k := range batchSyncRuleKeys {
c.queue.AddRateLimited(k)
}
}
c.appliedToGroupWatcher.restartSynced = true
c.addressGroupWatcher.restartSynced = true
c.networkPolicyWatcher.restartSynced = true
}

func (c *Controller) syncRule(key string) error {
startTime := time.Now()
defer func() {
Expand All @@ -354,6 +403,27 @@ func (c *Controller) syncRule(key string) error {
return nil
}

func (c *Controller) syncRules(keys []string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing all rules before bookmark event (%v)", time.Since(startTime))
}()

var allRules []*CompletedRule
for _, key := range keys {
rule, exists, completed := c.ruleCache.GetCompletedRule(key)
if !exists || !completed {
klog.Errorf("Rule %v is not complete or does not exist in cache", key)
} else {
allRules = append(allRules, rule)
}
}
if err := c.reconciler.BatchReconcile(allRules); err != nil {
return err
}
return nil
}

func (c *Controller) handleErr(err error, key interface{}) {
if err == nil {
c.queue.Forget(key)
Expand Down Expand Up @@ -383,6 +453,9 @@ type watcher struct {
connected bool
// lock protects connected.
lock sync.RWMutex
// group to be notified when each watcher receives bookmark event
restartFullSyncGroup *sync.WaitGroup
restartSynced bool
}

func (w *watcher) isConnected() bool {
Expand Down Expand Up @@ -442,6 +515,10 @@ loop:
klog.Errorf("Failed to handle init events: %v", err)
return
}
if !w.restartSynced {
// Notify restartFullSyncGroup that all events before bookmark is handled
w.restartFullSyncGroup.Done()
}

for {
select {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ func (r *mockReconciler) Reconcile(rule *CompletedRule) error {
return nil
}

func (r *mockReconciler) BatchReconcile(rules []*CompletedRule) error {
r.Lock()
defer r.Unlock()
for _, rule := range rules {
r.lastRealized[rule.ID] = rule
r.updated <- rule.ID
}
return nil
}

func (r *mockReconciler) Forget(ruleID string) error {
r.Lock()
defer r.Unlock()
Expand Down
11 changes: 11 additions & 0 deletions pkg/agent/controller/networkpolicy/priority.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,17 @@ func (pa *priorityAssigner) GetOFPriority(p types.Priority) (*uint16, map[uint16
return &ofPriority, map[uint16]uint16{}, nil
}

// RegisterPriorities registers a list of Priorities to be created with priorityMap.
// It is used to populate the priorityMap in case of batch rule adds.
func (pa *priorityAssigner) RegisterPriorities(priorities []types.Priority) error {
for _, p := range priorities {
if _, _, err := pa.GetOFPriority(p); err != nil {
return err
}
}
return nil
}

// Release removes the priority that currently corresponds to the input OFPriority from the priorityMap.
func (pa *priorityAssigner) Release(priorityNum uint16) error {
for priorityKey, p := range pa.priorityMap {
Expand Down
120 changes: 110 additions & 10 deletions pkg/agent/controller/networkpolicy/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ type Reconciler interface {
// with the actual state of Openflow entries.
Reconcile(rule *CompletedRule) error

// BatchReconcile reconciles the desired state of the provided CompletedRules
// with the actual state of Openflow entries in batch. It should only be evoked
// if all rules are newly added without last realized status.
BatchReconcile(rules []*CompletedRule) error

// Forget cleanups the actual state of Openflow entries of the specified ruleID.
Forget(ruleID string) error
}
Expand Down Expand Up @@ -231,9 +236,66 @@ func (r *reconciler) getOFPriority(rule *CompletedRule) (*uint16, error) {
return ofPriority, nil
}

func (r *reconciler) BatchReconcile(rules []*CompletedRule) error {
var rulesToInstall []*CompletedRule
var priorities []*uint16
for _, rule := range rules {
if _, exists := r.lastRealizeds.Load(rule.ID); exists {
klog.Infof("rule %s already realized during the initialization phase", rule.ID)
} else {
rulesToInstall = append(rulesToInstall, rule)
}
}
if err := r.registerOFPriorities(rulesToInstall); err != nil {
return err
}
for _, rule := range rulesToInstall {
klog.Infof("Adding rule %s of NetworkPolicy %s/%s to be reconciled in batch", rule.ID, rule.PolicyNamespace, rule.PolicyName)
ofPriority, _ := r.getOFPriority(rule)
priorities = append(priorities, ofPriority)
}
ofRuleInstallErr := r.batchAdd(rulesToInstall, priorities)
if ofRuleInstallErr != nil {
for _, ofPriority := range priorities {
if ofPriority != nil {
r.priorityAssigner.Release(*ofPriority)
}
}
}
return ofRuleInstallErr
}

func (r *reconciler) registerOFPriorities(rules []*CompletedRule) error {
r.priorityMutex.Lock()
defer r.priorityMutex.Unlock()
var prioritiesToRegister []types.Priority
for _, r := range rules {
if r.PolicyPriority != nil {
p := types.Priority{PolicyPriority: *r.PolicyPriority, RulePriority: r.Priority}
prioritiesToRegister = append(prioritiesToRegister, p)
}
}
return r.priorityAssigner.RegisterPriorities(prioritiesToRegister)
}

// add converts CompletedRule to PolicyRule(s) and invokes installOFRule to install them.
func (r *reconciler) add(rule *CompletedRule, ofPriority *uint16) error {
klog.V(2).Infof("Adding new rule %v", rule)
ofRuleByServicesMap, lastRealized := r.computeOFRulesForAdd(rule, ofPriority)
for svcHash, ofRule := range ofRuleByServicesMap {
npName := lastRealized.CompletedRule.PolicyName
npNamespace := lastRealized.CompletedRule.PolicyNamespace
ofID, err := r.installOFRule(ofRule, npName, npNamespace)
if err != nil {
return err
}
// Record ofID only if its Openflow is installed successfully.
lastRealized.ofIDs[svcHash] = ofID
}
return nil
}

func (r *reconciler) computeOFRulesForAdd(rule *CompletedRule, ofPriority *uint16) (map[servicesHash]*types.PolicyRule, *lastRealized) {
lastRealized := newLastRealized(rule)
// TODO: Handle the case that the following processing fails or partially succeeds.
r.lastRealizeds.Store(rule.ID, lastRealized)
Expand Down Expand Up @@ -304,18 +366,50 @@ func (r *reconciler) add(rule *CompletedRule, ofPriority *uint16) error {
}
}
}
return ofRuleByServicesMap, lastRealized
}

for svcHash, ofRule := range ofRuleByServicesMap {
npName := lastRealized.CompletedRule.PolicyName
npNamespace := lastRealized.CompletedRule.PolicyNamespace
ofID, err := r.installOFRule(ofRule, npName, npNamespace)
if err != nil {
return err
func (r *reconciler) batchAdd(rules []*CompletedRule, ofPriorities []*uint16) error {
lastRealizeds := make([]*lastRealized, len(rules))
ofIDUpdateMaps := make([]map[servicesHash]uint32, len(rules))

var allOFRules []types.OFPolicyRule

for idx, rule := range rules {
ofRuleByServicesMap, lastRealized := r.computeOFRulesForAdd(rule, ofPriorities[idx])
lastRealizeds[idx] = lastRealized
for svcHash, ofRule := range ofRuleByServicesMap {
npName := lastRealized.CompletedRule.PolicyName
npNamespace := lastRealized.CompletedRule.PolicyNamespace
ofID, err := r.idAllocator.allocate()
if err != nil {
return fmt.Errorf("error allocating Openflow ID")
}
ofPolicyRule := types.OFPolicyRule{
OfID: ofID,
OfRule: ofRule,
NpName: npName,
NpNamespace: npNamespace,
}
allOFRules = append(allOFRules, ofPolicyRule)
if ofIDUpdateMaps[idx] == nil {
ofIDUpdateMaps[idx] = make(map[servicesHash]uint32)
}
ofIDUpdateMaps[idx][svcHash] = ofID
}
}
if err := r.ofClient.BatchInstallPolicyRuleFlows(allOFRules); err != nil {
for _, rule := range allOFRules {
r.idAllocator.release(rule.OfID)
}
return err
}
for i, lastRealized := range lastRealizeds {
ofIDUpdatesByRule := ofIDUpdateMaps[i]
for svcHash, ofID := range ofIDUpdatesByRule {
lastRealized.ofIDs[svcHash] = ofID
}
// Record ofID only if its Openflow is installed successfully.
lastRealized.ofIDs[svcHash] = ofID
}

return nil
}

Expand Down Expand Up @@ -430,7 +524,13 @@ func (r *reconciler) installOFRule(ofRule *types.PolicyRule, npName, npNamespace
}
klog.V(2).Infof("Installing ofRule %d (Direction: %v, From: %d, To: %d, Service: %d)",
ofID, ofRule.Direction, len(ofRule.From), len(ofRule.To), len(ofRule.Service))
if err := r.ofClient.InstallPolicyRuleFlows(ofID, ofRule, npName, npNamespace); err != nil {
ofPolicyRule := types.OFPolicyRule{
OfID: ofID,
OfRule: ofRule,
NpName: npName,
NpNamespace: npNamespace,
}
if err := r.ofClient.InstallPolicyRuleFlows(ofPolicyRule); err != nil {
r.idAllocator.release(ofID)
return 0, fmt.Errorf("error installing ofRule %v: %v", ofID, err)
}
Expand Down
Loading

0 comments on commit 8f50c85

Please sign in to comment.