diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 82d4b2d3e01..92b7ae82710 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -20,6 +20,7 @@ import ( "net" "time" + "github.com/spf13/afero" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/util/sets" @@ -459,6 +460,7 @@ func run(o *Options) error { antreaClientProvider, ofClient, ifaceStore, + afero.NewOsFs(), nodeKey, podUpdateChannel, externalEntityUpdateChannel, diff --git a/pkg/agent/controller/networkpolicy/cache.go b/pkg/agent/controller/networkpolicy/cache.go index efe4d254cb7..370af96cf23 100644 --- a/pkg/agent/controller/networkpolicy/cache.go +++ b/pkg/agent/controller/networkpolicy/cache.go @@ -551,13 +551,13 @@ func (c *ruleCache) addAddressGroupLocked(group *v1beta.AddressGroup) error { // PatchAddressGroup updates a cached *v1beta.AddressGroup. // The rules referencing it will be regarded as dirty. -func (c *ruleCache) PatchAddressGroup(patch *v1beta.AddressGroupPatch) error { +func (c *ruleCache) PatchAddressGroup(patch *v1beta.AddressGroupPatch) (*v1beta.AddressGroup, error) { c.addressSetLock.Lock() defer c.addressSetLock.Unlock() groupMemberSet, exists := c.addressSetByGroup[patch.Name] if !exists { - return fmt.Errorf("AddressGroup %v doesn't exist in cache, can't be patched", patch.Name) + return nil, fmt.Errorf("AddressGroup %v doesn't exist in cache, can't be patched", patch.Name) } for i := range patch.AddedGroupMembers { groupMemberSet.Insert(&patch.AddedGroupMembers[i]) @@ -567,7 +567,16 @@ func (c *ruleCache) PatchAddressGroup(patch *v1beta.AddressGroupPatch) error { } c.onAddressGroupUpdate(patch.Name) - return nil + + members := make([]v1beta.GroupMember, 0, len(groupMemberSet)) + for _, member := range groupMemberSet { + members = append(members, *member) + } + group := &v1beta.AddressGroup{ + ObjectMeta: patch.ObjectMeta, + GroupMembers: members, + } + return group, nil } // DeleteAddressGroup deletes a cached *v1beta.AddressGroup. @@ -639,13 +648,13 @@ func (c *ruleCache) addAppliedToGroupLocked(group *v1beta.AppliedToGroup) error // PatchAppliedToGroup updates a cached *v1beta.AppliedToGroupPatch. // The rules referencing it will be regarded as dirty. -func (c *ruleCache) PatchAppliedToGroup(patch *v1beta.AppliedToGroupPatch) error { +func (c *ruleCache) PatchAppliedToGroup(patch *v1beta.AppliedToGroupPatch) (*v1beta.AppliedToGroup, error) { c.appliedToSetLock.Lock() defer c.appliedToSetLock.Unlock() memberSet, exists := c.appliedToSetByGroup[patch.Name] if !exists { - return fmt.Errorf("AppliedToGroup %v doesn't exist in cache, can't be patched", patch.Name) + return nil, fmt.Errorf("AppliedToGroup %v doesn't exist in cache, can't be patched", patch.Name) } for i := range patch.AddedGroupMembers { memberSet.Insert(&patch.AddedGroupMembers[i]) @@ -654,7 +663,16 @@ func (c *ruleCache) PatchAppliedToGroup(patch *v1beta.AppliedToGroupPatch) error memberSet.Delete(&patch.RemovedGroupMembers[i]) } c.onAppliedToGroupUpdate(patch.Name) - return nil + + members := make([]v1beta.GroupMember, 0, len(memberSet)) + for _, member := range memberSet { + members = append(members, *member) + } + group := &v1beta.AppliedToGroup{ + ObjectMeta: patch.ObjectMeta, + GroupMembers: members, + } + return group, nil } // DeleteAppliedToGroup deletes a cached *v1beta.AppliedToGroup. diff --git a/pkg/agent/controller/networkpolicy/cache_test.go b/pkg/agent/controller/networkpolicy/cache_test.go index 0ced8235e26..dc68f2f5b13 100644 --- a/pkg/agent/controller/networkpolicy/cache_test.go +++ b/pkg/agent/controller/networkpolicy/cache_test.go @@ -1039,7 +1039,7 @@ func TestRuleCachePatchAppliedToGroup(t *testing.T) { for _, rule := range tt.rules { c.rules.Add(rule) } - err := c.PatchAppliedToGroup(tt.args) + ret, err := c.PatchAppliedToGroup(tt.args) if (err == nil) == tt.expectedErr { t.Fatalf("Got error %v, expected %t", err, tt.expectedErr) } @@ -1048,6 +1048,9 @@ func TestRuleCachePatchAppliedToGroup(t *testing.T) { } actualPods, _ := c.appliedToSetByGroup[tt.args.Name] assert.ElementsMatch(t, tt.expectedPods, actualPods.Items(), "stored Pods not equal") + if !tt.expectedErr { + assert.Equal(t, len(ret.GroupMembers), len(actualPods)) + } }) } } @@ -1116,7 +1119,7 @@ func TestRuleCachePatchAddressGroup(t *testing.T) { for _, rule := range tt.rules { c.rules.Add(rule) } - err := c.PatchAddressGroup(tt.args) + ret, err := c.PatchAddressGroup(tt.args) if (err == nil) == tt.expectedErr { t.Fatalf("Got error %v, expected %t", err, tt.expectedErr) } @@ -1125,6 +1128,9 @@ func TestRuleCachePatchAddressGroup(t *testing.T) { } actualAddresses, _ := c.addressSetByGroup[tt.args.Name] assert.ElementsMatch(t, tt.expectedAddresses, actualAddresses.Items(), "stored addresses not equal") + if !tt.expectedErr { + assert.Equal(t, len(ret.GroupMembers), len(actualAddresses)) + } }) } } diff --git a/pkg/agent/controller/networkpolicy/filestore.go b/pkg/agent/controller/networkpolicy/filestore.go new file mode 100644 index 00000000000..6bbed204bd7 --- /dev/null +++ b/pkg/agent/controller/networkpolicy/filestore.go @@ -0,0 +1,133 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package networkpolicy + +import ( + "fmt" + "io" + "io/fs" + "os" + + "github.com/spf13/afero" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/klog/v2" +) + +// fileStore encodes and stores runtime.Objects in files. Each object will be stored in a separate file under the given +// directory. +type fileStore struct { + fs afero.Fs + // The directory to store the files. + dir string + // serializer knows how to encode and decode the objects. + serializer runtime.Serializer +} + +func newFileStore(fs afero.Fs, dir string, serializer runtime.Serializer) (*fileStore, error) { + s := &fileStore{ + fs: fs, + dir: dir, + serializer: serializer, + } + klog.V(2).InfoS("Creating directory for NetworkPolicy cache", "dir", dir) + if err := s.fs.MkdirAll(dir, 0o600); err != nil { + return nil, err + } + return s, nil +} + +// save stores the given object in file with the object's UID as the file name, overwriting any existing content if the +// file already exists. Note the method may update the object's GroupVersionKind in-place during serialization. +func (s fileStore) save(item runtime.Object) error { + object := item.(metav1.Object) + path := fmt.Sprintf("%s/%s", s.dir, object.GetUID()) + file, err := s.fs.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o600) + if err != nil { + return fmt.Errorf("error opening file for writing object %v", object.GetUID()) + } + defer file.Close() + // Encode may update the object's GroupVersionKind in-place during serialization. + err = s.serializer.Encode(item, file) + if err != nil { + return fmt.Errorf("error writing object %v to file", object.GetUID()) + } + return nil +} + +// delete removes the file with the object's UID as the file name if it exists. +func (s fileStore) delete(item runtime.Object) error { + object := item.(metav1.Object) + path := fmt.Sprintf("%s/%s", s.dir, object.GetUID()) + err := s.fs.Remove(path) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + return nil +} + +// replaceAll replaces all files under the directory with the given objects. Existing files not in the given objects +// will be removed. Note the method may update the object's GroupVersionKind in-place during serialization. +func (s fileStore) replaceAll(items []runtime.Object) error { + if err := s.fs.RemoveAll(s.dir); err != nil { + return err + } + if err := s.fs.MkdirAll(s.dir, 0o600); err != nil { + return err + } + for _, item := range items { + if err := s.save(item); err != nil { + return err + } + } + return nil +} + +func (s fileStore) loadAll() ([]runtime.Object, error) { + var objects []runtime.Object + err := afero.Walk(s.fs, s.dir, func(path string, info fs.FileInfo, err error) error { + if info.IsDir() { + return nil + } + file, err2 := s.fs.Open(path) + if err2 != nil { + return err2 + } + defer file.Close() + data, err2 := io.ReadAll(file) + if err2 != nil { + return err2 + } + + object, gkv, err2 := s.serializer.Decode(data, nil, nil) + // If the data is corrupted somehow, we still want to load other data and continue the process. + if err2 != nil { + klog.ErrorS(err2, "Failed to decode data from file, ignore it", "file", path) + return nil + } + // Note: we haven't stored a different version so far but version conversion should be performed when the used + // version is upgraded in the future. + klog.V(2).InfoS("Loaded object from file", "gkv", gkv, "object", object) + objects = append(objects, object) + return nil + }) + if err != nil { + return nil, err + } + return objects, nil +} diff --git a/pkg/agent/controller/networkpolicy/filestore_test.go b/pkg/agent/controller/networkpolicy/filestore_test.go new file mode 100644 index 00000000000..bcca4dc1f99 --- /dev/null +++ b/pkg/agent/controller/networkpolicy/filestore_test.go @@ -0,0 +1,190 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package networkpolicy + +import ( + "fmt" + "testing" + + "github.com/spf13/afero" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer/protobuf" + "k8s.io/apimachinery/pkg/types" + + "antrea.io/antrea/pkg/apis/controlplane/v1beta2" +) + +const ( + testDirNetworkPolicy = "/var/run/antrea-test/file-store/network-policies" + testDirAppliedToGroup = "/var/run/antrea-test/file-store/applied-to-groups" + testDirAddressGroup = "/var/run/antrea-test/file-store/address-groups" +) + +// Set it to MemMapFs as the file system may be not writable. +// Change it to NewOsFs to evaluate performance when writing to disk. +var newFS = afero.NewMemMapFs + +func newFakeFileStore(tb testing.TB, path string) *fileStore { + serializer := protobuf.NewSerializer(scheme, scheme) + codec := codecs.CodecForVersions(serializer, serializer, v1beta2.SchemeGroupVersion, v1beta2.SchemeGroupVersion) + // Create a new FS for every fileStore in case of interaction between tests. + s, err := newFileStore(newFS(), path, codec) + assert.NoError(tb, err) + return s +} + +func TestFileStore(t *testing.T) { + policy1 := newNetworkPolicy("policy1", "uid1", []string{"addressGroup1"}, nil, []string{"appliedToGroup1"}, nil) + policy2 := newNetworkPolicy("policy2", "uid2", []string{"addressGroup2"}, nil, []string{"appliedToGroup2"}, nil) + policy3 := newNetworkPolicy("policy3", "uid3", []string{"addressGroup3"}, nil, []string{"appliedToGroup3"}, nil) + updatedPolicy2 := policy2.DeepCopy() + updatedPolicy2.AppliedToGroups = []string{"foo"} + + tests := []struct { + name string + ops func(*fileStore) + expectedObjects []runtime.Object + }{ + { + name: "add", + ops: func(store *fileStore) { + store.save(policy1) + store.save(policy2) + store.save(policy3) + }, + expectedObjects: []runtime.Object{policy1, policy2, policy3}, + }, + { + name: "update", + ops: func(store *fileStore) { + store.save(policy1) + store.save(policy2) + store.save(updatedPolicy2) + }, + expectedObjects: []runtime.Object{policy1, updatedPolicy2}, + }, + { + name: "delete", + ops: func(store *fileStore) { + store.save(policy1) + store.save(policy2) + store.delete(policy2) + }, + expectedObjects: []runtime.Object{policy1}, + }, + { + name: "replace", + ops: func(store *fileStore) { + store.save(policy1) + store.save(policy2) + store.replaceAll([]runtime.Object{updatedPolicy2, policy3}) + }, + expectedObjects: []runtime.Object{updatedPolicy2, policy3}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := newFakeFileStore(t, testDirNetworkPolicy) + tt.ops(s) + gotObjects, err := s.loadAll() + require.NoError(t, err) + assert.Equal(t, tt.expectedObjects, gotObjects) + }) + } +} + +func BenchmarkFileStoreAddNetworkPolicy(b *testing.B) { + policy1 := newNetworkPolicy("policy1", "uid1", []string{"addressGroup1"}, nil, []string{"appliedToGroup1"}, nil) + s := newFakeFileStore(b, testDirNetworkPolicy) + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + s.save(policy1) + } +} + +func BenchmarkFileStoreAddAppliedToGroup(b *testing.B) { + members := make([]v1beta2.GroupMember, 0, 100) + for i := 0; i < 100; i++ { + members = append(members, *newAppliedToGroupMemberPod(fmt.Sprintf("pod-%d", i), "namespace")) + } + atg := newAppliedToGroup("test", members) + s := newFakeFileStore(b, testDirAppliedToGroup) + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + s.save(atg) + } +} + +func BenchmarkFileStoreAddAddressGroup(b *testing.B) { + members := make([]v1beta2.GroupMember, 0, 1000) + for i := 0; i < 1000; i++ { + members = append(members, *newAddressGroupPodMember(fmt.Sprintf("pod-%d", i), "namespace", "192.168.0.1")) + } + ag := newAddressGroup("test", members) + s := newFakeFileStore(b, testDirAddressGroup) + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + s.save(ag) + } +} + +func BenchmarkFileStoreReplaceAll(b *testing.B) { + nps := make([]runtime.Object, 0, 1000) + atgs := make([]runtime.Object, 0, 1000) + ags := make([]runtime.Object, 0, 1000) + for i := 0; i < 1000; i++ { + policyName := fmt.Sprintf("policy-%d", i) + addressGroupName := fmt.Sprintf("address-group-%d", i) + appliedToGroupName := fmt.Sprintf("applied-to-group-%d", i) + nps = append(nps, newNetworkPolicy(policyName, types.UID(policyName), []string{addressGroupName}, nil, []string{appliedToGroupName}, nil)) + + var atgMembers []v1beta2.GroupMember + for j := 0; j < 100; j++ { + atgMembers = append(atgMembers, *newAppliedToGroupMemberPod(fmt.Sprintf("pod-%d", j), "namespace")) + } + atg := newAppliedToGroup(appliedToGroupName, atgMembers) + atgs = append(atgs, atg) + + var agMembers []v1beta2.GroupMember + podNum := 100 + if i < 10 { + podNum = 10000 + } else if i < 110 { + podNum = 1000 + } + for j := 0; j < podNum; j++ { + agMembers = append(agMembers, *newAddressGroupPodMember(fmt.Sprintf("pod-%d", j), "namespace", "192.168.0.1")) + } + ag := newAddressGroup(addressGroupName, agMembers) + ags = append(ags, ag) + } + + networkPolicyStore := newFakeFileStore(b, testDirNetworkPolicy) + appliedToGroupStore := newFakeFileStore(b, testDirAppliedToGroup) + addressGroupStore := newFakeFileStore(b, testDirAddressGroup) + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + networkPolicyStore.replaceAll(nps) + appliedToGroupStore.replaceAll(atgs) + addressGroupStore.replaceAll(ags) + } +} diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go index f25446e30d2..227a2dc03ac 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go @@ -23,9 +23,12 @@ import ( "time" "antrea.io/ofnet/ofctrl" + "github.com/spf13/afero" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/runtime/serializer/protobuf" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/util/workqueue" @@ -39,6 +42,7 @@ import ( "antrea.io/antrea/pkg/agent/openflow" proxytypes "antrea.io/antrea/pkg/agent/proxy/types" "antrea.io/antrea/pkg/agent/types" + "antrea.io/antrea/pkg/apis/controlplane/install" "antrea.io/antrea/pkg/apis/controlplane/v1beta2" "antrea.io/antrea/pkg/querier" "antrea.io/antrea/pkg/util/channel" @@ -58,6 +62,13 @@ const ( dnsInterceptRuleID = uint32(1) ) +const ( + dataDir = "/var/run/antrea/networkpolicy" + networkPoliciesDir = dataDir + "/network-policies" + appliedToGroupsDir = dataDir + "/applied-to-groups" + addressGroupsDir = dataDir + "/address-groups" +) + type L7RuleReconciler interface { AddRule(ruleID, policyName string, vlanID uint32, l7Protocols []v1beta2.L7Protocol, enableLogging bool) error DeleteRule(ruleID string, vlanID uint32) error @@ -65,6 +76,15 @@ type L7RuleReconciler interface { var emptyWatch = watch.NewEmptyWatch() +var ( + scheme = runtime.NewScheme() + codecs = serializer.NewCodecFactory(scheme) +) + +func init() { + install.Install(scheme) +} + type packetInAction func(*ofctrl.PacketIn) error // Controller is responsible for watching Antrea AddressGroups, AppliedToGroups, @@ -128,6 +148,12 @@ type Controller struct { tunPort uint32 nodeConfig *config.NodeConfig + // The fileStores store runtime.Objects in files and use them as the fallback data source when agent can't connect + // to antrea-controller on startup. + networkPolicyStore *fileStore + appliedToGroupStore *fileStore + addressGroupStore *fileStore + logPacketAction packetInAction rejectRequestAction packetInAction storeDenyConnectionAction packetInAction @@ -137,6 +163,7 @@ type Controller struct { func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, ofClient openflow.Client, ifaceStore interfacestore.InterfaceStore, + fs afero.Fs, nodeName string, podUpdateSubscriber channel.Subscriber, externalEntityUpdateSubscriber channel.Subscriber, @@ -176,8 +203,8 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, c.l7VlanIDAllocator = newL7VlanIDAllocator() } + var err error if antreaPolicyEnabled { - var err error if c.fqdnController, err = newFQDNController(ofClient, idAllocator, dnsServerOverride, c.enqueueRule, v4Enabled, v6Enabled, gwPort); err != nil { return nil, err } @@ -189,6 +216,22 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, c.reconciler = newReconciler(ofClient, ifaceStore, idAllocator, c.fqdnController, groupCounters, v4Enabled, v6Enabled, antreaPolicyEnabled, multicastEnabled) c.ruleCache = newRuleCache(c.enqueueRule, podUpdateSubscriber, externalEntityUpdateSubscriber, groupIDUpdates, nodeType) + + serializer := protobuf.NewSerializer(scheme, scheme) + codec := codecs.CodecForVersions(serializer, serializer, v1beta2.SchemeGroupVersion, v1beta2.SchemeGroupVersion) + c.networkPolicyStore, err = newFileStore(fs, networkPoliciesDir, codec) + if err != nil { + return nil, fmt.Errorf("error creating file store for NetworkPolicy: %v", err) + } + c.appliedToGroupStore, err = newFileStore(fs, appliedToGroupsDir, codec) + if err != nil { + return nil, fmt.Errorf("error creating file store for AppliedToGroup: %v", err) + } + c.addressGroupStore, err = newFileStore(fs, addressGroupsDir, codec) + if err != nil { + return nil, fmt.Errorf("error creating file store for AddressGroup: %v", err) + } + if statusManagerEnabled { c.statusManager = newStatusController(antreaClientGetter, nodeName, c.ruleCache) } @@ -235,6 +278,11 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, "policyName", policy.SourceRef.ToString()) return nil } + // Storing the object to file first because its GroupVersionKind can be updated in-place during + // serialization, which may incur data race if we add it to ruleCache first. + if err := c.networkPolicyStore.save(policy); err != nil { + klog.ErrorS(err, "Failed to store the NetworkPolicy to file", "policyName", policy.SourceRef.ToString()) + } c.ruleCache.AddNetworkPolicy(policy) klog.InfoS("NetworkPolicy applied to Pods on this Node", "policyName", policy.SourceRef.ToString()) return nil @@ -249,6 +297,11 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, "policyName", policy.SourceRef.ToString()) return nil } + // Storing the object to file first because its GroupVersionKind can be updated in-place during + // serialization, which may incur data race if we add it to ruleCache first. + if err := c.networkPolicyStore.save(policy); err != nil { + klog.ErrorS(err, "Failed to store the NetworkPolicy to file", "policyName", policy.SourceRef.ToString()) + } updated := c.ruleCache.UpdateNetworkPolicy(policy) // If any rule or the generation changes, we ensure statusManager will resync the policy's status once, in // case the changes don't cause any actual rule update but the whole policy's generation is changed. @@ -269,6 +322,9 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, } c.ruleCache.DeleteNetworkPolicy(policy) klog.InfoS("NetworkPolicy no longer applied to Pods on this Node", "policyName", policy.SourceRef.ToString()) + if err := c.networkPolicyStore.save(policy); err != nil { + klog.ErrorS(err, "Failed to delete the NetworkPolicy from file", "policyName", policy.SourceRef.ToString()) + } return nil }, ReplaceFunc: func(objs []runtime.Object) error { @@ -293,9 +349,15 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, c.statusManager.Resync(policies[i].UID) } } + // Storing the object to file first because its GroupVersionKind can be updated in-place during + // serialization, which may incur data race if we add it to ruleCache first. + if err := c.networkPolicyStore.replaceAll(objs); err != nil { + klog.ErrorS(err, "Failed to store the NetworkPolicies to files") + } c.ruleCache.ReplaceNetworkPolicies(policies) return nil }, + FallbackFunc: c.networkPolicyStore.loadAll, fullSyncWaitGroup: &c.fullSyncGroup, fullSynced: false, } @@ -314,15 +376,28 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, if !ok { return fmt.Errorf("cannot convert to *v1beta1.AppliedToGroup: %v", obj) } + // Storing the object to file first because its GroupVersionKind can be updated in-place during + // serialization, which may incur data race if we add it to ruleCache first. + if err := c.appliedToGroupStore.save(group); err != nil { + klog.ErrorS(err, "Failed to store the AppliedToGroup to file", "groupName", group.Name) + } c.ruleCache.AddAppliedToGroup(group) return nil }, UpdateFunc: func(obj runtime.Object) error { - group, ok := obj.(*v1beta2.AppliedToGroupPatch) + patch, ok := obj.(*v1beta2.AppliedToGroupPatch) if !ok { - return fmt.Errorf("cannot convert to *v1beta1.AppliedToGroup: %v", obj) + return fmt.Errorf("cannot convert to *v1beta1.AppliedToGroupPatch: %v", obj) + } + group, err := c.ruleCache.PatchAppliedToGroup(patch) + if err != nil { + return err + } + // It's fine to store the object to file after applying the patch to ruleCache because the returned object + // is newly created, and ruleCache itself doesn't use it. + if err := c.appliedToGroupStore.save(group); err != nil { + klog.ErrorS(err, "Failed to store the AppliedToGroup to file", "groupName", group.Name) } - c.ruleCache.PatchAppliedToGroup(group) return nil }, DeleteFunc: func(obj runtime.Object) error { @@ -331,6 +406,9 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, return fmt.Errorf("cannot convert to *v1beta1.AppliedToGroup: %v", obj) } c.ruleCache.DeleteAppliedToGroup(group) + if err := c.appliedToGroupStore.delete(group); err != nil { + klog.ErrorS(err, "Failed to delete the AppliedToGroup from file", "groupName", group.Name) + } return nil }, ReplaceFunc: func(objs []runtime.Object) error { @@ -342,9 +420,15 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, return fmt.Errorf("cannot convert to *v1beta1.AppliedToGroup: %v", objs[i]) } } + // Storing the object to file first because its GroupVersionKind can be updated in-place during + // serialization, which may incur data race if we add it to ruleCache first. + if c.appliedToGroupStore.replaceAll(objs); err != nil { + klog.ErrorS(err, "Failed to store the AppliedToGroups to files") + } c.ruleCache.ReplaceAppliedToGroups(groups) return nil }, + FallbackFunc: c.appliedToGroupStore.loadAll, fullSyncWaitGroup: &c.fullSyncGroup, fullSynced: false, } @@ -363,15 +447,28 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, if !ok { return fmt.Errorf("cannot convert to *v1beta1.AddressGroup: %v", obj) } + // Storing the object to file first because its GroupVersionKind can be updated in-place during + // serialization, which may incur data race if we add it to ruleCache first. + if err := c.addressGroupStore.save(group); err != nil { + klog.ErrorS(err, "Failed to store the AddressGroup to file", "groupName", group.Name) + } c.ruleCache.AddAddressGroup(group) return nil }, UpdateFunc: func(obj runtime.Object) error { - group, ok := obj.(*v1beta2.AddressGroupPatch) + patch, ok := obj.(*v1beta2.AddressGroupPatch) if !ok { - return fmt.Errorf("cannot convert to *v1beta1.AddressGroup: %v", obj) + return fmt.Errorf("cannot convert to *v1beta1.AddressGroupPatch: %v", obj) + } + group, err := c.ruleCache.PatchAddressGroup(patch) + if err != nil { + return err + } + // It's fine to store the object to file after applying the patch to ruleCache because the returned object + // is newly created, and ruleCache itself doesn't use it. + if err := c.addressGroupStore.save(group); err != nil { + klog.ErrorS(err, "Failed to store the AddressGroup to file", "groupName", group.Name) } - c.ruleCache.PatchAddressGroup(group) return nil }, DeleteFunc: func(obj runtime.Object) error { @@ -380,6 +477,9 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, return fmt.Errorf("cannot convert to *v1beta1.AddressGroup: %v", obj) } c.ruleCache.DeleteAddressGroup(group) + if err := c.addressGroupStore.delete(group); err != nil { + klog.ErrorS(err, "Failed to delete the AddressGroup from file", "groupName", group.Name) + } return nil }, ReplaceFunc: func(objs []runtime.Object) error { @@ -391,9 +491,15 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, return fmt.Errorf("cannot convert to *v1beta1.AddressGroup: %v", objs[i]) } } + // Storing the object to file first because its GroupVersionKind can be updated in-place during + // serialization, which may incur data race if we add it to ruleCache first. + if c.addressGroupStore.replaceAll(objs); err != nil { + klog.ErrorS(err, "Failed to store the AddressGroups to files") + } c.ruleCache.ReplaceAddressGroups(groups) return nil }, + FallbackFunc: c.addressGroupStore.loadAll, fullSyncWaitGroup: &c.fullSyncGroup, fullSynced: false, } @@ -741,6 +847,8 @@ type watcher struct { DeleteFunc func(obj runtime.Object) error // ReplaceFunc is the function that handles init events. ReplaceFunc func(objs []runtime.Object) error + // FallbackFunc is the function that provides the data when it can't start the watch successfully. + FallbackFunc func() ([]runtime.Object, error) // connected represents whether the watch has connected to apiserver successfully. connected bool // lock protects connected. @@ -763,17 +871,45 @@ func (w *watcher) setConnected(connected bool) { w.connected = connected } +// fallback gets init events from the FallbackFunc if the watcher hasn't been synced once. +func (w *watcher) fallback() { + // If the watcher has been synced once, the fallback data source doesn't have newer data, do nothing. + if w.fullSynced { + return + } + objects, err := w.FallbackFunc() + if err != nil { + klog.ErrorS(err, "Failed to get init events from fallback") + return + } + if err := w.ReplaceFunc(objects); err != nil { + klog.ErrorS(err, "Failed to handle init events") + return + } + w.onFullSync() +} + +func (w *watcher) onFullSync() { + if !w.fullSynced { + w.fullSynced = true + // Notify fullSyncWaitGroup that all events before bookmark is handled + w.fullSyncWaitGroup.Done() + } +} + func (w *watcher) watch() { klog.Infof("Starting watch for %s", w.objectType) watcher, err := w.watchFunc() if err != nil { klog.Warningf("Failed to start watch for %s: %v", w.objectType, err) + w.fallback() return } // Watch method doesn't return error but "emptyWatch" in case of some partial data errors, // e.g. timeout error. Make sure that watcher is not empty and log warning otherwise. if reflect.TypeOf(watcher) == reflect.TypeOf(emptyWatch) { klog.Warningf("Failed to start watch for %s, please ensure antrea service is reachable for the agent", w.objectType) + w.fallback() return } @@ -814,11 +950,7 @@ loop: klog.Errorf("Failed to handle init events: %v", err) return } - if !w.fullSynced { - w.fullSynced = true - // Notify fullSyncWaitGroup that all events before bookmark is handled - w.fullSyncWaitGroup.Done() - } + w.onFullSync() for { select { diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go index d2179ce6a88..5ee9e30842e 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go @@ -23,9 +23,11 @@ import ( "time" "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/spf13/afero" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/watch" @@ -71,7 +73,8 @@ func newTestController() (*Controller, *fake.Clientset, *mockReconciler) { ch2 := make(chan string, 100) groupIDAllocator := openflow.NewGroupAllocator() groupCounters := []proxytypes.GroupCounter{proxytypes.NewGroupCounter(groupIDAllocator, ch2)} - controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, "node1", podUpdateChannel, nil, groupCounters, ch2, true, true, true, true, false, nil, testAsyncDeleteInterval, "8.8.8.8:53", config.K8sNode, true, false, config.HostGatewayOFPort, config.DefaultTunOFPort, &config.NodeConfig{}) + fs := afero.NewMemMapFs() + controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, fs, "node1", podUpdateChannel, nil, groupCounters, ch2, true, true, true, true, false, nil, testAsyncDeleteInterval, "8.8.8.8:53", config.K8sNode, true, false, config.HostGatewayOFPort, config.DefaultTunOFPort, &config.NodeConfig{}) reconciler := newMockReconciler() controller.reconciler = reconciler controller.auditLogger = nil @@ -146,14 +149,16 @@ var _ Reconciler = &mockReconciler{} func newAddressGroup(name string, addresses []v1beta2.GroupMember) *v1beta2.AddressGroup { return &v1beta2.AddressGroup{ - ObjectMeta: v1.ObjectMeta{Name: name}, + TypeMeta: v1.TypeMeta{Kind: "AddressGroup", APIVersion: "controlplane.antrea.io/v1beta2"}, + ObjectMeta: v1.ObjectMeta{Name: name, UID: types.UID(name)}, GroupMembers: addresses, } } func newAppliedToGroup(name string, pods []v1beta2.GroupMember) *v1beta2.AppliedToGroup { return &v1beta2.AppliedToGroup{ - ObjectMeta: v1.ObjectMeta{Name: name}, + TypeMeta: v1.TypeMeta{Kind: "AppliedToGroup", APIVersion: "controlplane.antrea.io/v1beta2"}, + ObjectMeta: v1.ObjectMeta{Name: name, UID: types.UID(name)}, GroupMembers: pods, } } @@ -165,6 +170,7 @@ func newNetworkPolicy(name string, uid types.UID, from, to, appliedTo []string, } networkPolicyRule1 := newPolicyRule(dir, from, to, services) return &v1beta2.NetworkPolicy{ + TypeMeta: v1.TypeMeta{Kind: "NetworkPolicy", APIVersion: "controlplane.antrea.io/v1beta2"}, ObjectMeta: v1.ObjectMeta{UID: uid, Name: string(uid)}, Rules: []v1beta2.NetworkPolicyRule{networkPolicyRule1}, AppliedToGroups: appliedTo, @@ -507,6 +513,96 @@ func TestAddNetworkPolicyWithMultipleRules(t *testing.T) { assert.Equal(t, 1, controller.GetAppliedToGroupNum()) } +func TestFallbackToFileStore(t *testing.T) { + prepareMockTables() + controller, clientset, reconciler := newTestController() + addressGroupWatcher := watch.NewFake() + appliedToGroupWatcher := watch.NewFake() + networkPolicyWatcher := watch.NewFake() + clientset.AddWatchReactor("addressgroups", k8stesting.DefaultWatchReactor(addressGroupWatcher, fmt.Errorf("network unavailable"))) + clientset.AddWatchReactor("appliedtogroups", k8stesting.DefaultWatchReactor(appliedToGroupWatcher, fmt.Errorf("network unavailable"))) + clientset.AddWatchReactor("networkpolicies", k8stesting.DefaultWatchReactor(networkPolicyWatcher, fmt.Errorf("network unavailable"))) + + policy := newNetworkPolicy("policy1", "uid1", []string{"addressGroup1"}, nil, []string{"appliedToGroup1"}, nil) + atgMember := newAppliedToGroupMemberPod("pod1", "namespace") + agMember := newAddressGroupPodMember("pod2", "namespace", "192.168.0.1") + atg := newAppliedToGroup("appliedToGroup1", []v1beta2.GroupMember{*atgMember}) + ag := newAddressGroup("addressGroup1", []v1beta2.GroupMember{*agMember}) + controller.networkPolicyStore.save(policy) + controller.appliedToGroupStore.save(atg) + controller.addressGroupStore.save(ag) + + stopCh := make(chan struct{}) + defer close(stopCh) + go controller.Run(stopCh) + + select { + case ruleID := <-reconciler.updated: + actualRule, _ := reconciler.getLastRealized(ruleID) + assert.Equal(t, v1beta2.NewGroupMemberSet(atgMember), actualRule.TargetMembers) + assert.Equal(t, v1beta2.NewGroupMemberSet(agMember), actualRule.FromAddresses) + assert.Equal(t, policy.SourceRef, actualRule.SourceRef) + case <-time.After(time.Millisecond * 100): + t.Fatal("Expected one rule update, got timeout") + } +} + +func TestOverrideFileStore(t *testing.T) { + prepareMockTables() + controller, clientset, reconciler := newTestController() + addressGroupWatcher := watch.NewFake() + appliedToGroupWatcher := watch.NewFake() + networkPolicyWatcher := watch.NewFake() + clientset.AddWatchReactor("addressgroups", k8stesting.DefaultWatchReactor(addressGroupWatcher, nil)) + clientset.AddWatchReactor("appliedtogroups", k8stesting.DefaultWatchReactor(appliedToGroupWatcher, nil)) + clientset.AddWatchReactor("networkpolicies", k8stesting.DefaultWatchReactor(networkPolicyWatcher, nil)) + + policy1 := newNetworkPolicy("policy1", "uid1", []string{"addressGroup1"}, nil, []string{"appliedToGroup1"}, nil) + policy2 := newNetworkPolicy("policy2", "uid2", []string{"addressGroup2"}, nil, []string{"appliedToGroup2"}, nil) + atgMember1 := newAppliedToGroupMemberPod("pod1", "namespace") + atgMember2 := newAppliedToGroupMemberPod("pod2", "namespace") + agMember1 := newAddressGroupPodMember("pod3", "namespace", "192.168.0.1") + agMember2 := newAddressGroupPodMember("pod4", "namespace", "192.168.0.2") + atg1 := newAppliedToGroup("appliedToGroup1", []v1beta2.GroupMember{*atgMember1}) + atg2 := newAppliedToGroup("appliedToGroup2", []v1beta2.GroupMember{*atgMember2}) + ag1 := newAddressGroup("addressGroup1", []v1beta2.GroupMember{*agMember1}) + ag2 := newAddressGroup("addressGroup2", []v1beta2.GroupMember{*agMember2}) + controller.networkPolicyStore.save(policy1) + controller.appliedToGroupStore.save(atg1) + controller.addressGroupStore.save(ag1) + + stopCh := make(chan struct{}) + defer close(stopCh) + go controller.Run(stopCh) + + networkPolicyWatcher.Add(policy2) + networkPolicyWatcher.Action(watch.Bookmark, nil) + addressGroupWatcher.Add(ag2) + addressGroupWatcher.Action(watch.Bookmark, nil) + appliedToGroupWatcher.Add(atg2) + appliedToGroupWatcher.Action(watch.Bookmark, nil) + + select { + case ruleID := <-reconciler.updated: + actualRule, _ := reconciler.getLastRealized(ruleID) + assert.Equal(t, v1beta2.NewGroupMemberSet(atgMember2), actualRule.TargetMembers) + assert.Equal(t, v1beta2.NewGroupMemberSet(agMember2), actualRule.FromAddresses) + assert.Equal(t, policy2.SourceRef, actualRule.SourceRef) + case <-time.After(time.Millisecond * 100): + t.Fatal("Expected one rule update, got timeout") + } + + objects, err := controller.appliedToGroupStore.loadAll() + require.NoError(t, err) + assert.Equal(t, []runtime.Object{atg2}, objects) + objects, err = controller.addressGroupStore.loadAll() + require.NoError(t, err) + assert.Equal(t, []runtime.Object{ag2}, objects) + objects, err = controller.networkPolicyStore.loadAll() + require.NoError(t, err) + assert.Equal(t, []runtime.Object{policy2}, objects) +} + func TestNetworkPolicyMetrics(t *testing.T) { prepareMockTables() // Initialize NetworkPolicy metrics (prometheus) diff --git a/test/e2e/networkpolicy_test.go b/test/e2e/networkpolicy_test.go index 70afc10b763..56f5b615821 100644 --- a/test/e2e/networkpolicy_test.go +++ b/test/e2e/networkpolicy_test.go @@ -96,6 +96,10 @@ func TestNetworkPolicy(t *testing.T) { skipIfProxyDisabled(t, data) testAllowHairpinService(t, data) }) + t.Run("testNetworkPolicyAfterAgentRestart", func(t *testing.T) { + t.Cleanup(exportLogsForSubtest(t, data)) + testNetworkPolicyAfterAgentRestart(t, data) + }) } func testNetworkPolicyStats(t *testing.T, data *TestData) { @@ -704,6 +708,94 @@ func testNetworkPolicyResyncAfterRestart(t *testing.T, data *TestData) { } } +// The test validates that Pods can't bypass NetworkPolicy when antrea-agent restarts. +func testNetworkPolicyAfterAgentRestart(t *testing.T, data *TestData) { + workerNode := workerNodeName(1) + var isolatedPod, deniedPod, allowedPod string + var isolatedPodIPs, deniedPodIPs, allowedPodIPs *PodIPs + var wg sync.WaitGroup + createTestPod := func(prefix string) (string, *PodIPs) { + defer wg.Done() + podName, podIPs, cleanup := createAndWaitForPod(t, data, data.createNginxPodOnNode, prefix, workerNode, data.testNamespace, false) + t.Cleanup(cleanup) + return podName, podIPs + } + wg.Add(3) + go func() { + isolatedPod, isolatedPodIPs = createTestPod("test-isolated") + }() + go func() { + deniedPod, deniedPodIPs = createTestPod("test-denied") + }() + go func() { + allowedPod, allowedPodIPs = createTestPod("test-allowed") + }() + wg.Wait() + + allowedPeer := networkingv1.NetworkPolicyPeer{ + PodSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"antrea-e2e": allowedPod}}, + } + netpol, err := data.createNetworkPolicy("test-isolated", &networkingv1.NetworkPolicySpec{ + PodSelector: metav1.LabelSelector{MatchLabels: map[string]string{"antrea-e2e": isolatedPod}}, + Ingress: []networkingv1.NetworkPolicyIngressRule{{From: []networkingv1.NetworkPolicyPeer{allowedPeer}}}, + Egress: []networkingv1.NetworkPolicyEgressRule{{To: []networkingv1.NetworkPolicyPeer{allowedPeer}}}, + }) + require.NoError(t, err) + t.Cleanup(func() { data.deleteNetworkpolicy(netpol) }) + + checkFunc := func(testPod string, testPodIPs *PodIPs, expectErr bool) { + var wg sync.WaitGroup + checkOne := func(clientPod, serverPod string, serverIP *net.IP) { + defer wg.Done() + if serverIP != nil { + _, _, err := data.runWgetCommandFromTestPodWithRetry(clientPod, data.testNamespace, nginxContainerName, serverIP.String(), 1) + if expectErr && err == nil { + t.Errorf("Pod %s should not be able to connect %s, but was able to connect", clientPod, serverPod) + } else if !expectErr && err != nil { + t.Errorf("Pod %s should be able to connect %s, but was not able to connect, err: %v", clientPod, serverPod, err) + } + } + } + wg.Add(4) + go checkOne(isolatedPod, testPod, testPodIPs.IPv4) + go checkOne(isolatedPod, testPod, testPodIPs.IPv6) + go checkOne(testPod, isolatedPod, isolatedPodIPs.IPv4) + go checkOne(testPod, isolatedPod, isolatedPodIPs.IPv6) + wg.Wait() + } + + scaleFunc := func(replicas int32) { + scale, err := data.clientset.AppsV1().Deployments(antreaNamespace).GetScale(context.TODO(), antreaDeployment, metav1.GetOptions{}) + require.NoError(t, err) + scale.Spec.Replicas = replicas + _, err = data.clientset.AppsV1().Deployments(antreaNamespace).UpdateScale(context.TODO(), antreaDeployment, scale, metav1.UpdateOptions{}) + require.NoError(t, err) + } + + // Scale antrea-controller to 0 so antrea-agent will lose connection with antrea-controller. + scaleFunc(0) + t.Cleanup(func() { scaleFunc(1) }) + + // Restart the antrea-agent. + _, err = data.deleteAntreaAgentOnNode(workerNode, 30, defaultTimeout) + require.NoError(t, err) + antreaPod, err := data.getAntreaPodOnNode(workerNode) + require.NoError(t, err) + // Make sure the new antrea-agent disconnects from antrea-controller but connects to OVS. + waitForAgentCondition(t, data, antreaPod, v1beta1.ControllerConnectionUp, corev1.ConditionFalse) + waitForAgentCondition(t, data, antreaPod, v1beta1.OpenflowConnectionUp, corev1.ConditionTrue) + // Even the new antrea-agent can't connect to antrea-controller, the previous policy should continue working. + checkFunc(deniedPod, deniedPodIPs, true) + checkFunc(allowedPod, allowedPodIPs, false) + + // Scale antrea-controller to 1 so antrea-agent will connect to antrea-controller. + scaleFunc(1) + // Make sure antrea-agent connects to antrea-controller. + waitForAgentCondition(t, data, antreaPod, v1beta1.ControllerConnectionUp, corev1.ConditionTrue) + checkFunc(deniedPod, deniedPodIPs, true) + checkFunc(allowedPod, allowedPodIPs, false) +} + func testIngressPolicyWithoutPortNumber(t *testing.T, data *TestData) { serverPort := int32(80) _, serverIPs, cleanupFunc := createAndWaitForPod(t, data, data.createNginxPodOnNode, "test-server-", "", data.testNamespace, false) @@ -1039,8 +1131,9 @@ func waitForAgentCondition(t *testing.T, data *TestData, podName string, conditi t.Logf("cmds: %s", cmds) stdout, _, err := runAntctl(podName, cmds, data) + // The server may be unavailable. if err != nil { - return true, err + return false, nil } var agentInfo agentinfo.AntreaAgentInfoResponse err = json.Unmarshal([]byte(stdout), &agentInfo)