Skip to content

Commit

Permalink
Fix name collision between Antrea NetworkPolicy and K8s NetworkPolicy
Browse files Browse the repository at this point in the history
NetworkPolicy in controlplane API group is the object that consumed by
antrea-agents. Both Antrea NetworkPolicy and K8s NetworkPolicy will be
converted to it. Currently, the namespace and name of the original
NetworkPolicy are copied to the controlplane NetworkPolicy and
<Namespace>/<Name> is used as the key func. Therefore, one K8s
NetworkPolicy may overwrite the controlplane NetworkPolicy mapping to a
Antrea NetworkPolicy that has the same namespace and name.

This patch changes to use the UID of the original NetworkPolicy as the
name and UID of the controlplane NetworkPolicy and keeps the namespace,
name, UID, and type of the original NetworkPolicy in a new field
"SourceRef". Besides, the controlplane NetworkPolicy is changed to
cluster scoped like AppliedToGroups and AddressGroups.
  • Loading branch information
tnqn committed Sep 17, 2020
1 parent fd47097 commit 0633e8f
Show file tree
Hide file tree
Showing 38 changed files with 1,408 additions and 645 deletions.
137 changes: 58 additions & 79 deletions pkg/agent/controller/networkpolicy/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"sync"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
Expand All @@ -38,6 +37,7 @@ const (
appliedToGroupIndex = "appliedToGroup"
addressGroupIndex = "addressGroup"
policyIndex = "policy"
namespaceIndex = "namespace"
)

// rule is the struct stored in ruleCache, it contains necessary information
Expand All @@ -52,6 +52,8 @@ const (
type rule struct {
// ID is calculated from the hash value of all other fields.
ID string
// Reference to the original NetworkPolicy that the rule belongs to.
PolicyRef *v1beta1.NetworkPolicyReference
// Direction of this rule.
Direction v1beta1.Direction
// Source Address of this rule, can't coexist with To.
Expand All @@ -70,14 +72,6 @@ type rule struct {
TierPriority *v1beta1.TierPriority
// Targets of this rule.
AppliedToGroups []string
// The parent Policy ID. Used to identify rules belong to a specified
// policy for deletion.
PolicyUID types.UID
// The metadata of parent Policy. Used to associate the rule with Policy
// for troubleshooting purpose (logging and CLI).
PolicyName string
// PolicyNamespace is empty for ClusterNetworkPolicy.
PolicyNamespace string
}

// hashRule calculates a string based on the rule's content.
Expand Down Expand Up @@ -113,9 +107,9 @@ func (r *CompletedRule) String() string {
r.ID, r.Direction, len(r.Pods), addressString, len(r.Services), r.PolicyPriority, r.Priority)
}

// isAntreaNetworkPolicyRule returns true if the rule is part of a ClusterNetworkPolicy.
func (r *CompletedRule) isAntreaNetworkPolicyRule() bool {
return r.PolicyPriority != nil
// isAntreaPolicyRule returns true if the rule is part of a Antrea policy.
func (r *CompletedRule) isAntreaPolicyRule() bool {
return r.PolicyRef.Type != v1beta1.K8sNetworkPolicy
}

// ruleCache caches Antrea AddressGroups, AppliedToGroups and NetworkPolicies,
Expand All @@ -131,10 +125,6 @@ type ruleCache struct {
// It is a mapping from group name to a set of GroupMembers.
addressSetByGroup map[string]v1beta1.GroupMemberSet

policyMapLock sync.RWMutex
// policyMap is a map using NetworkPolicy UID as the key.
policyMap map[string]*types.NamespacedName

// rules is a storage that supports listing rules using multiple indexing functions.
// rules is thread-safe.
rules cache.Indexer
Expand All @@ -146,48 +136,51 @@ type ruleCache struct {
}

func (c *ruleCache) getNetworkPolicies(namespace string) []v1beta1.NetworkPolicy {
ret := []v1beta1.NetworkPolicy{}
c.policyMapLock.RLock()
defer c.policyMapLock.RUnlock()
for uid, np := range c.policyMap {
if namespace == "" || np.Namespace == namespace {
ret = append(ret, *c.buildNetworkPolicyFromRules(uid))
}
var ret []v1beta1.NetworkPolicy
rulesByPolicy := map[v1beta1.NetworkPolicyReference][]*rule{}
var objs []interface{}
if namespace == "" {
objs = c.rules.List()
} else {
objs, _ = c.rules.ByIndex(namespaceIndex, namespace)
}
for _, obj := range objs {
rule := obj.(*rule)
rulesByPolicy[*rule.PolicyRef] = append(rulesByPolicy[*rule.PolicyRef], rule)
}
for _, rules := range rulesByPolicy {
ret = append(ret, *c.buildNetworkPolicyFromRules(rules))
}
return ret
}

// getNetworkPolicy looks up and returns the cached NetworkPolicy.
// nil is returned if the specified NetworkPolicy is not found.
func (c *ruleCache) getNetworkPolicy(npName, npNamespace string) *v1beta1.NetworkPolicy {
var npUID string
c.policyMapLock.Lock()
defer c.policyMapLock.Unlock()
for uid, np := range c.policyMap {
if np.Name == npName && np.Namespace == npNamespace {
npUID = uid
break
objs, _ := c.rules.ByIndex(namespaceIndex, npNamespace)
var rules []*rule
for _, obj := range objs {
rule := obj.(*rule)
if rule.PolicyRef.Name == npName {
rules = append(rules, rule)
}
}

if npUID == "" {
if len(rules) == 0 {
// NetworkPolicy not found.
return nil
}
return c.buildNetworkPolicyFromRules(npUID)
return c.buildNetworkPolicyFromRules(rules)
}

func (c *ruleCache) buildNetworkPolicyFromRules(uid string) *v1beta1.NetworkPolicy {
func (c *ruleCache) buildNetworkPolicyFromRules(rules []*rule) *v1beta1.NetworkPolicy {
var np *v1beta1.NetworkPolicy
rules, _ := c.rules.ByIndex(policyIndex, uid)
// Sort the rules by priority
sort.Slice(rules, func(i, j int) bool {
r1 := rules[i].(*rule)
r2 := rules[j].(*rule)
return r1.Priority < r2.Priority
return rules[i].Priority < rules[j].Priority
})
for _, ruleObj := range rules {
np = addRuleToNetworkPolicy(np, ruleObj.(*rule))
for _, rule := range rules {
np = addRuleToNetworkPolicy(np, rule)
}
return np
}
Expand All @@ -197,9 +190,11 @@ func (c *ruleCache) buildNetworkPolicyFromRules(uid string) *v1beta1.NetworkPoli
func addRuleToNetworkPolicy(np *v1beta1.NetworkPolicy, rule *rule) *v1beta1.NetworkPolicy {
if np == nil {
np = &v1beta1.NetworkPolicy{
ObjectMeta: metav1.ObjectMeta{UID: rule.PolicyUID,
Name: rule.PolicyName,
Namespace: rule.PolicyNamespace},
ObjectMeta: metav1.ObjectMeta{
UID: rule.PolicyRef.UID,
Name: string(rule.PolicyRef.UID),
},
SourceRef: rule.PolicyRef,
AppliedToGroups: rule.AppliedToGroups,
Priority: rule.PolicyPriority,
TierPriority: rule.TierPriority,
Expand Down Expand Up @@ -232,11 +227,11 @@ func (c *ruleCache) getAppliedNetworkPolicies(pod, namespace string) []v1beta1.N
rules, _ := c.rules.ByIndex(appliedToGroupIndex, group)
for _, ruleObj := range rules {
rule := ruleObj.(*rule)
np, ok := npMap[string(rule.PolicyUID)]
np, ok := npMap[string(rule.PolicyRef.UID)]
np = addRuleToNetworkPolicy(np, rule)
if !ok {
// First rule for this NetworkPolicy
npMap[string(rule.PolicyUID)] = np
npMap[string(rule.PolicyRef.UID)] = np
}
}
}
Expand Down Expand Up @@ -316,19 +311,25 @@ func appliedToGroupIndexFunc(obj interface{}) ([]string, error) {
// It's provided to cache.Indexer to build an index of NetworkPolicy.
func policyIndexFunc(obj interface{}) ([]string, error) {
rule := obj.(*rule)
return []string{string(rule.PolicyUID)}, nil
return []string{string(rule.PolicyRef.UID)}, nil
}

// namespaceIndexFunc knows how to get NetworkPolicy Namespace of a *rule.
// It's provided to cache.Indexer to build an index of NetworkPolicy.
func namespaceIndexFunc(obj interface{}) ([]string, error) {
rule := obj.(*rule)
return []string{rule.PolicyRef.Namespace}, nil
}

// newRuleCache returns a new *ruleCache.
func newRuleCache(dirtyRuleHandler func(string), podUpdate <-chan v1beta1.PodReference) *ruleCache {
rules := cache.NewIndexer(
ruleKeyFunc,
cache.Indexers{addressGroupIndex: addressGroupIndexFunc, appliedToGroupIndex: appliedToGroupIndexFunc, policyIndex: policyIndexFunc},
cache.Indexers{addressGroupIndex: addressGroupIndexFunc, appliedToGroupIndex: appliedToGroupIndexFunc, policyIndex: policyIndexFunc, namespaceIndex: namespaceIndexFunc},
)
cache := &ruleCache{
podSetByGroup: make(map[string]v1beta1.GroupMemberPodSet),
addressSetByGroup: make(map[string]v1beta1.GroupMemberSet),
policyMap: make(map[string]*types.NamespacedName),
rules: rules,
dirtyRuleHandler: dirtyRuleHandler,
podUpdates: podUpdate,
Expand Down Expand Up @@ -563,41 +564,31 @@ func toRule(r *v1beta1.NetworkPolicyRule, policy *v1beta1.NetworkPolicy) *rule {
PolicyPriority: policy.Priority,
TierPriority: policy.TierPriority,
AppliedToGroups: policy.AppliedToGroups,
PolicyUID: policy.UID,
PolicyRef: policy.SourceRef,
}
rule.ID = hashRule(rule)
rule.PolicyNamespace = policy.Namespace
rule.PolicyName = policy.Name
return rule
}

// GetNetworkPolicyNum gets the number of NetworkPolicy.
func (c *ruleCache) GetNetworkPolicyNum() int {
c.policyMapLock.RLock()
defer c.policyMapLock.RUnlock()

return len(c.policyMap)
return len(c.rules.ListIndexFuncValues(policyIndex))
}

// ReplaceNetworkPolicies atomically adds the given policies to the cache and deletes
// the pre-existing policies that are not in the given policies from the cache.
// It makes the cache in sync with the apiserver when restarting a watch.
func (c *ruleCache) ReplaceNetworkPolicies(policies []*v1beta1.NetworkPolicy) {
c.policyMapLock.Lock()
defer c.policyMapLock.Unlock()

oldKeys := make(sets.String, len(c.policyMap))
for key := range c.policyMap {
oldKeys.Insert(key)
}
oldUIDs := c.rules.ListIndexFuncValues(policyIndex)
oldUIDSet := sets.NewString(oldUIDs...)

for i := range policies {
oldKeys.Delete(string(policies[i].UID))
c.addNetworkPolicyLocked(policies[i])
oldUIDSet.Delete(string(policies[i].UID))
c.AddNetworkPolicy(policies[i])
}

for key := range oldKeys {
c.deleteNetworkPolicyLocked(key)
for key := range oldUIDSet {
c.deleteNetworkPolicy(key)
}
return
}
Expand All @@ -607,14 +598,6 @@ func (c *ruleCache) ReplaceNetworkPolicies(policies []*v1beta1.NetworkPolicy) {
// watcher reconnects to the Apiserver, we use the same processing as
// UpdateNetworkPolicy to ensure orphan rules are removed.
func (c *ruleCache) AddNetworkPolicy(policy *v1beta1.NetworkPolicy) error {
c.policyMapLock.Lock()
defer c.policyMapLock.Unlock()

return c.addNetworkPolicyLocked(policy)
}

func (c *ruleCache) addNetworkPolicyLocked(policy *v1beta1.NetworkPolicy) error {
c.policyMap[string(policy.UID)] = &types.NamespacedName{Namespace: policy.Namespace, Name: policy.Name}
metrics.NetworkPolicyCount.Inc()
return c.UpdateNetworkPolicy(policy)
}
Expand Down Expand Up @@ -664,14 +647,10 @@ func (c *ruleCache) UpdateNetworkPolicy(policy *v1beta1.NetworkPolicy) error {
// DeleteNetworkPolicy deletes a cached *v1beta1.NetworkPolicy.
// All its rules will be regarded as dirty.
func (c *ruleCache) DeleteNetworkPolicy(policy *v1beta1.NetworkPolicy) error {
c.policyMapLock.Lock()
defer c.policyMapLock.Unlock()

return c.deleteNetworkPolicyLocked(string(policy.UID))
return c.deleteNetworkPolicy(string(policy.UID))
}

func (c *ruleCache) deleteNetworkPolicyLocked(uid string) error {
delete(c.policyMap, uid)
func (c *ruleCache) deleteNetworkPolicy(uid string) error {
existingRules, _ := c.rules.ByIndex(policyIndex, uid)
for _, r := range existingRules {
ruleID := r.(*rule).ID
Expand Down
Loading

0 comments on commit 0633e8f

Please sign in to comment.