Skip to content

Commit

Permalink
Enable Pod network after realizing initial NetworkPolicies
Browse files Browse the repository at this point in the history
Pod network should only be enabled after realizing initial
NetworkPolicies, otherwise traffic from/to Pods could bypass
NetworkPolicy when antrea-agent restarts.

After commit f9fc979 ("Store NetworkPolicy in filesystem as
fallback data source"), antrea-agent can realize either the latest
NetworkPolicies got from antrea-controller or the ones got from
filesystem as fallback. Therefore, waiting for NetworkPolicies to be
realized should not add marked delay or make antrea-controller a failure
point of Pod network.

This commit adds an implementation of wait group capable of waiting with
a timeout, and uses it to wait for common initialization and
NetworkPolicy realization before installing any flows for Pods. More
preconditions can be added via the wait group if needed in the future.

Signed-off-by: Quan Tian <qtian@vmware.com>
  • Loading branch information
tnqn committed Dec 7, 2023
1 parent 615b917 commit 8e706b5
Show file tree
Hide file tree
Showing 10 changed files with 282 additions and 64 deletions.
15 changes: 10 additions & 5 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ import (
"antrea.io/antrea/pkg/util/k8s"
"antrea.io/antrea/pkg/util/lazy"
"antrea.io/antrea/pkg/util/podstore"
utilwait "antrea.io/antrea/pkg/util/wait"
"antrea.io/antrea/pkg/version"
)

Expand Down Expand Up @@ -226,9 +227,12 @@ func run(o *Options) error {
// Create an ifaceStore that caches network interfaces managed by this node.
ifaceStore := interfacestore.NewInterfaceStore()

// networkReadyCh is used to notify that the Node's network is ready.
// Functions that rely on the Node's network should wait for the channel to close.
networkReadyCh := make(chan struct{})
// podNetworkWait is used to wait and notify that preconditions for Pod network are ready.
// Processes that are supposed to finish before enabling Pod network should increment the wait group and decrement
// it when finished.
// Processes that enable Pod network should wait for it.
podNetworkWait := utilwait.NewGroup()

// set up signal capture: the first SIGTERM / SIGINT signal is handled gracefully and will
// cause the stopCh channel to be closed; if another signal is received before the program
// exits, we will force exit.
Expand Down Expand Up @@ -275,7 +279,7 @@ func run(o *Options) error {
wireguardConfig,
egressConfig,
serviceConfig,
networkReadyCh,
podNetworkWait,
stopCh,
o.nodeType,
o.config.ExternalNode.ExternalNodeNamespace,
Expand Down Expand Up @@ -479,6 +483,7 @@ func run(o *Options) error {
gwPort,
tunPort,
nodeConfig,
podNetworkWait,
)
if err != nil {
return fmt.Errorf("error creating new NetworkPolicy controller: %v", err)
Expand Down Expand Up @@ -550,7 +555,7 @@ func run(o *Options) error {
enableAntreaIPAM,
o.config.DisableTXChecksumOffload,
networkConfig,
networkReadyCh)
podNetworkWait)

err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, podUpdateChannel)
if err != nil {
Expand Down
23 changes: 7 additions & 16 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"os"
"strconv"
"strings"
"sync"
"time"

"github.com/containernetworking/plugins/pkg/ip"
Expand Down Expand Up @@ -57,6 +56,7 @@ import (
"antrea.io/antrea/pkg/util/env"
utilip "antrea.io/antrea/pkg/util/ip"
"antrea.io/antrea/pkg/util/k8s"
utilwait "antrea.io/antrea/pkg/util/wait"
)

const (
Expand Down Expand Up @@ -121,9 +121,9 @@ type Initializer struct {
enableL7NetworkPolicy bool
connectUplinkToBridge bool
enableAntreaProxy bool
// networkReadyCh should be closed once the Node's network is ready.
// podNetworkWait should be decremented once the Node's network is ready.
// The CNI server will wait for it before handling any CNI Add requests.
networkReadyCh chan<- struct{}
podNetworkWait *utilwait.Group
stopCh <-chan struct{}
nodeType config.NodeType
externalNodeNamespace string
Expand All @@ -144,7 +144,7 @@ func NewInitializer(
wireGuardConfig *config.WireGuardConfig,
egressConfig *config.EgressConfig,
serviceConfig *config.ServiceConfig,
networkReadyCh chan<- struct{},
podNetworkWait *utilwait.Group,
stopCh <-chan struct{},
nodeType config.NodeType,
externalNodeNamespace string,
Expand All @@ -168,7 +168,7 @@ func NewInitializer(
egressConfig: egressConfig,
serviceConfig: serviceConfig,
l7NetworkPolicyConfig: &config.L7NetworkPolicyConfig{},
networkReadyCh: networkReadyCh,
podNetworkWait: podNetworkWait,
stopCh: stopCh,
nodeType: nodeType,
externalNodeNamespace: externalNodeNamespace,
Expand Down Expand Up @@ -407,9 +407,6 @@ func (i *Initializer) restorePortConfigs() error {
// Initialize sets up agent initial configurations.
func (i *Initializer) Initialize() error {
klog.Info("Setting up node network")
// wg is used to wait for the asynchronous initialization.
var wg sync.WaitGroup

if err := i.initNodeLocalConfig(); err != nil {
return err
}
Expand Down Expand Up @@ -485,23 +482,17 @@ func (i *Initializer) Initialize() error {
}

if i.nodeType == config.K8sNode {
wg.Add(1)
i.podNetworkWait.Increment()
// routeClient.Initialize() should be after i.setupOVSBridge() which
// creates the host gateway interface.
if err := i.routeClient.Initialize(i.nodeConfig, wg.Done); err != nil {
if err := i.routeClient.Initialize(i.nodeConfig, i.podNetworkWait.Done); err != nil {
return err
}

// Install OpenFlow entries on OVS bridge.
if err := i.initOpenFlowPipeline(); err != nil {
return err
}

// The Node's network is ready only when both synchronous and asynchronous initialization are done.
go func() {
wg.Wait()
close(i.networkReadyCh)
}()
} else {
// Install OpenFlow entries on OVS bridge.
if err := i.initOpenFlowPipeline(); err != nil {
Expand Down
46 changes: 30 additions & 16 deletions pkg/agent/cniserver/pod_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"antrea.io/antrea/pkg/ovs/ovsconfig"
"antrea.io/antrea/pkg/util/channel"
"antrea.io/antrea/pkg/util/k8s"
"antrea.io/antrea/pkg/util/wait"
)

type vethPair struct {
Expand Down Expand Up @@ -413,7 +414,7 @@ func parsePrevResult(conf *types.NetworkConfig) error {
return nil
}

func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *containerAccessArbitrator) error {
func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *containerAccessArbitrator, podNetworkWait *wait.Group) error {
// desiredPods is the set of Pods that should be present, based on the
// current list of Pods got from the Kubernetes API.
desiredPods := sets.New[string]()
Expand Down Expand Up @@ -445,21 +446,34 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain
missingIfConfigs = append(missingIfConfigs, containerConfig)
continue
}
// This interface matches an existing Pod.
// We rely on the interface cache / store - which is initialized from the persistent
// OVSDB - to map the Pod to its interface configuration. The interface
// configuration includes the parameters we need to replay the flows.
klog.V(4).InfoS("Syncing Pod interface", "Pod", klog.KRef(namespace, name), "iface", containerConfig.InterfaceName)
if err := pc.ofClient.InstallPodFlows(
containerConfig.InterfaceName,
containerConfig.IPs,
containerConfig.MAC,
uint32(containerConfig.OFPort),
containerConfig.VLANID,
nil,
); err != nil {
klog.ErrorS(err, "Error when re-installing flows for Pod", "Pod", klog.KRef(namespace, name))
}
go func(containerID, pod, namespace string) {
// Do not install Pod flows until all preconditions are met.
podNetworkWait.Wait()
// To avoid race condition with CNIServer CNI event handlers.
containerAccess.lockContainer(containerID)
defer containerAccess.unlockContainer(containerID)

containerConfig, exists := pc.ifaceStore.GetContainerInterface(containerID)
if !exists {
klog.InfoS("The container interface had been deleted, skip installing flows for Pod", "Pod", klog.KRef(namespace, name), "containerID", containerID)
return
}
// This interface matches an existing Pod.
// We rely on the interface cache / store - which is initialized from the persistent
// OVSDB - to map the Pod to its interface configuration. The interface
// configuration includes the parameters we need to replay the flows.
klog.V(4).InfoS("Syncing Pod interface", "Pod", klog.KRef(namespace, name), "iface", containerConfig.InterfaceName)
if err := pc.ofClient.InstallPodFlows(
containerConfig.InterfaceName,
containerConfig.IPs,
containerConfig.MAC,
uint32(containerConfig.OFPort),
containerConfig.VLANID,
nil,
); err != nil {
klog.ErrorS(err, "Error when re-installing flows for Pod", "Pod", klog.KRef(namespace, name))
}
}(containerConfig.ContainerID, name, namespace)
} else {
// clean-up and delete interface
klog.V(4).InfoS("Deleting interface", "Pod", klog.KRef(namespace, name), "iface", containerConfig.InterfaceName)
Expand Down
17 changes: 8 additions & 9 deletions pkg/agent/cniserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"antrea.io/antrea/pkg/cni"
"antrea.io/antrea/pkg/ovs/ovsconfig"
"antrea.io/antrea/pkg/util/channel"
"antrea.io/antrea/pkg/util/wait"
)

const (
Expand Down Expand Up @@ -115,8 +116,8 @@ type CNIServer struct {
enableSecondaryNetworkIPAM bool
disableTXChecksumOffload bool
networkConfig *config.NetworkConfig
// networkReadyCh notifies that the network is ready so new Pods can be created. Therefore, CmdAdd waits for it.
networkReadyCh <-chan struct{}
// podNetworkWait notifies that the network is ready so new Pods can be created. Therefore, CmdAdd waits for it.
podNetworkWait *wait.Group
}

var supportedCNIVersionSet map[string]bool
Expand Down Expand Up @@ -434,11 +435,9 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (*
return resp, err
}

select {
case <-time.After(networkReadyTimeout):
klog.ErrorS(nil, "Cannot process CmdAdd request for container because network is not ready", "container", cniConfig.ContainerId, "timeout", networkReadyTimeout)
if err := s.podNetworkWait.WaitWithTimeout(networkReadyTimeout); err != nil {
klog.ErrorS(err, "Cannot process CmdAdd request for container because network is not ready", "container", cniConfig.ContainerId, "timeout", networkReadyTimeout)
return s.tryAgainLaterResponse(), nil
case <-s.networkReadyCh:
}

result := &ipam.IPAMResult{Result: current.Result{CNIVersion: current.ImplementedSpecVersion}}
Expand Down Expand Up @@ -610,7 +609,7 @@ func New(
routeClient route.Interface,
isChaining, enableBridgingMode, enableSecondaryNetworkIPAM, disableTXChecksumOffload bool,
networkConfig *config.NetworkConfig,
networkReadyCh <-chan struct{},
podNetworkWait *wait.Group,
) *CNIServer {
return &CNIServer{
cniSocket: cniSocket,
Expand All @@ -625,7 +624,7 @@ func New(
disableTXChecksumOffload: disableTXChecksumOffload,
enableSecondaryNetworkIPAM: enableSecondaryNetworkIPAM,
networkConfig: networkConfig,
networkReadyCh: networkReadyCh,
podNetworkWait: podNetworkWait,
}
}

Expand Down Expand Up @@ -739,7 +738,7 @@ func (s *CNIServer) reconcile() error {
return fmt.Errorf("failed to list Pods running on Node %s: %v", s.nodeConfig.Name, err)
}

return s.podConfigurator.reconcile(pods.Items, s.containerAccess)
return s.podConfigurator.reconcile(pods.Items, s.containerAccess, s.podNetworkWait)
}

func init() {
Expand Down
5 changes: 2 additions & 3 deletions pkg/agent/cniserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"antrea.io/antrea/pkg/cni"
"antrea.io/antrea/pkg/ovs/ovsconfig"
ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing"
"antrea.io/antrea/pkg/util/wait"
)

const (
Expand Down Expand Up @@ -663,15 +664,13 @@ func translateRawPrevResult(prevResult *current.Result, cniVersion string) (map[
}

func newCNIServer(t *testing.T) *CNIServer {
networkReadyCh := make(chan struct{})
cniServer := &CNIServer{
cniSocket: testSocket,
nodeConfig: testNodeConfig,
serverVersion: cni.AntreaCNIVersion,
containerAccess: newContainerAccessArbitrator(),
networkReadyCh: networkReadyCh,
podNetworkWait: wait.NewGroup(),
}
close(networkReadyCh)
cniServer.networkConfig = &config.NetworkConfig{InterfaceMTU: 1450}
return cniServer
}
Expand Down
15 changes: 10 additions & 5 deletions pkg/agent/controller/networkpolicy/networkpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"antrea.io/antrea/pkg/apis/controlplane/v1beta2"
"antrea.io/antrea/pkg/querier"
"antrea.io/antrea/pkg/util/channel"
utilwait "antrea.io/antrea/pkg/util/wait"
)

const (
Expand Down Expand Up @@ -143,10 +144,11 @@ type Controller struct {
fullSyncGroup sync.WaitGroup
ifaceStore interfacestore.InterfaceStore
// denyConnStore is for storing deny connections for flow exporter.
denyConnStore *connections.DenyConnectionStore
gwPort uint32
tunPort uint32
nodeConfig *config.NodeConfig
denyConnStore *connections.DenyConnectionStore
gwPort uint32
tunPort uint32
nodeConfig *config.NodeConfig
podNetworkWait *utilwait.Group

// The fileStores store runtime.Objects in files and use them as the fallback data source when agent can't connect
// to antrea-controller on startup.
Expand Down Expand Up @@ -181,7 +183,8 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider,
v4Enabled bool,
v6Enabled bool,
gwPort, tunPort uint32,
nodeConfig *config.NodeConfig) (*Controller, error) {
nodeConfig *config.NodeConfig,
podNetworkWait *utilwait.Group) (*Controller, error) {
idAllocator := newIDAllocator(asyncRuleDeleteInterval, dnsInterceptRuleID)
c := &Controller{
antreaClientProvider: antreaClientGetter,
Expand All @@ -196,6 +199,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider,
gwPort: gwPort,
tunPort: tunPort,
nodeConfig: nodeConfig,
podNetworkWait: podNetworkWait.Increment(),
}

if l7NetworkPolicyEnabled {
Expand Down Expand Up @@ -610,6 +614,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
klog.Infof("All watchers have completed full sync, installing flows for init events")
// Batch install all rules in queue after fullSync is finished.
c.processAllItemsInQueue()
c.podNetworkWait.Done()

klog.Infof("Starting NetworkPolicy workers now")
defer c.queue.ShutDown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"antrea.io/antrea/pkg/client/clientset/versioned/fake"
"antrea.io/antrea/pkg/querier"
"antrea.io/antrea/pkg/util/channel"
"antrea.io/antrea/pkg/util/wait"
)

const testNamespace = "ns1"
Expand Down Expand Up @@ -76,7 +77,30 @@ func newTestController() (*Controller, *fake.Clientset, *mockReconciler) {
groupIDAllocator := openflow.NewGroupAllocator()
groupCounters := []proxytypes.GroupCounter{proxytypes.NewGroupCounter(groupIDAllocator, ch2)}
fs := afero.NewMemMapFs()
controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, fs, "node1", podUpdateChannel, nil, groupCounters, ch2, true, true, true, true, false, nil, testAsyncDeleteInterval, "8.8.8.8:53", config.K8sNode, true, false, config.HostGatewayOFPort, config.DefaultTunOFPort, &config.NodeConfig{})
controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset},
nil,
nil,
fs,
"node1",
podUpdateChannel,
nil,
groupCounters,
ch2,
true,
true,
true,
true,
false,
nil,
testAsyncDeleteInterval,
"8.8.8.8:53",
config.K8sNode,
true,
false,
config.HostGatewayOFPort,
config.DefaultTunOFPort,
&config.NodeConfig{},
wait.NewGroup())
reconciler := newMockReconciler()
controller.reconciler = reconciler
controller.auditLogger = nil
Expand Down
Loading

0 comments on commit 8e706b5

Please sign in to comment.