From 4f7382572a6943b4d7b7f5a0b29cdabed1e58022 Mon Sep 17 00:00:00 2001 From: Bin Liu Date: Mon, 23 May 2022 15:37:44 +0800 Subject: [PATCH] multicast: make igmp query interval configurable Signed-off-by: Bin Liu --- build/charts/antrea/README.md | 3 +- build/charts/antrea/conf/antrea-agent.conf | 15 ++++-- build/charts/antrea/values.yaml | 10 +++- build/yamls/antrea-aks.yml | 11 +++-- build/yamls/antrea-eks.yml | 11 +++-- build/yamls/antrea-gke.yml | 11 +++-- build/yamls/antrea-ipsec.yml | 11 +++-- build/yamls/antrea.yml | 11 +++-- cmd/antrea-agent/agent.go | 5 +- cmd/antrea-agent/options.go | 24 ++++++++++ hack/generate-manifest.sh | 2 +- pkg/agent/multicast/mcast_controller.go | 48 ++++++++++++-------- pkg/agent/multicast/mcast_controller_test.go | 7 +-- pkg/agent/multicast/mcast_discovery.go | 22 ++++----- pkg/config/agent/config.go | 14 ++++-- test/e2e/framework.go | 2 +- 16 files changed, 141 insertions(+), 66 deletions(-) diff --git a/build/charts/antrea/README.md b/build/charts/antrea/README.md index 2b14782d05f..a39fe33f1ca 100644 --- a/build/charts/antrea/README.md +++ b/build/charts/antrea/README.md @@ -75,7 +75,8 @@ Kubernetes: `>= 1.16.0-0` | ipsec.psk | string | `"changeme"` | Preshared Key (PSK) for IKE authentication. It will be stored in a secret and passed to antrea-agent as an environment variable. | | kubeAPIServerOverride | string | `""` | Address of Kubernetes apiserver, to override any value provided in kubeconfig or InClusterConfig. | | logVerbosity | int | `0` | | -| multicastInterfaces | list | `[]` | Names of the interfaces on Nodes that are used to forward multicast traffic. | +| multicast.igmpQueryInterval | string | `"125s"` | The interval at which the antrea-agent sends IGMP queries to Pods. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". | +| multicast.multicastInterfaces | list | `[]` | Names of the interfaces on Nodes that are used to forward multicast traffic. | | noSNAT | bool | `false` | Whether or not to SNAT (using the Node IP) the egress traffic from a Pod to the external network. | | nodeIPAM.clusterCIDRs | list | `[]` | CIDR ranges to use when allocating Pod IP addresses. | | nodeIPAM.enable | bool | `false` | Enable Node IPAM in Antrea | diff --git a/build/charts/antrea/conf/antrea-agent.conf b/build/charts/antrea/conf/antrea-agent.conf index a2395e848b2..8111f6b8a61 100644 --- a/build/charts/antrea/conf/antrea-agent.conf +++ b/build/charts/antrea/conf/antrea-agent.conf @@ -217,12 +217,19 @@ tlsMinVersion: {{ .Values.tlsMinVersion | quote }} # 3. The Node IP transportInterface: {{ .Values.transportInterface | quote }} +multicast: +{{- with .Values.multicast }} # The names of the interfaces on Nodes that are used to forward multicast traffic. # Defaults to transport interface if not set. -multicastInterfaces: -{{- with .Values.multicastInterfaces }} -{{- toYaml . | nindent 2 }} -{{- end }} + multicastInterfaces: + {{- with .multicastInterfaces }} + {{- toYaml . | nindent 4 }} + {{- end }} + +# The interval at which the antrea-agent sends IGMP queries to Pods. +# Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". + igmpQueryInterval: {{ .igmpQueryInterval | quote }} +{{- end}} # The network CIDRs of the interface on Node which is used for tunneling or routing the traffic across # Nodes. If there are multiple interfaces configured the same network CIDR, the first one is used. The diff --git a/build/charts/antrea/values.yaml b/build/charts/antrea/values.yaml index 45a1297ba26..6a00193d670 100644 --- a/build/charts/antrea/values.yaml +++ b/build/charts/antrea/values.yaml @@ -28,8 +28,14 @@ transportInterface: "" # -- Network CIDRs of the interface on Node which is used for tunneling or # routing the traffic across Nodes. transportInterfaceCIDRs: [] -# -- Names of the interfaces on Nodes that are used to forward multicast traffic. -multicastInterfaces: [] + +multicast: + # -- Names of the interfaces on Nodes that are used to forward multicast traffic. + multicastInterfaces: [] + # -- The interval at which the antrea-agent sends IGMP queries to Pods. + # Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". + igmpQueryInterval: "125s" + # -- Default MTU to use for the host gateway interface and the network interface # of each Pod. By default, antrea-agent will discover the MTU of the Node's # primary interface and adjust it to accommodate for tunnel encapsulation diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index 72a8a5db275..59fcf4ed69f 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -266,9 +266,14 @@ data: # 3. The Node IP transportInterface: "" + multicast: # The names of the interfaces on Nodes that are used to forward multicast traffic. # Defaults to transport interface if not set. - multicastInterfaces: + multicastInterfaces: + + # The interval at which the antrea-agent sends IGMP queries to Pods. + # Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". + igmpQueryInterval: "125s" # The network CIDRs of the interface on Node which is used for tunneling or routing the traffic across # Nodes. If there are multiple interfaces configured the same network CIDR, the first one is used. The @@ -3486,7 +3491,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 4554a36b927c6e64fdbc53b4d4c64673d48c9c829ec444e3be6e699ade8481b6 + checksum/config: 0cc20edc3fc882f0ea9bd3450fbab504858feeff47e1d3f09d8f6ebacd741dbe labels: app: antrea component: antrea-agent @@ -3726,7 +3731,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 4554a36b927c6e64fdbc53b4d4c64673d48c9c829ec444e3be6e699ade8481b6 + checksum/config: 0cc20edc3fc882f0ea9bd3450fbab504858feeff47e1d3f09d8f6ebacd741dbe labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index c274ef7b119..e45901409f9 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -266,9 +266,14 @@ data: # 3. The Node IP transportInterface: "" + multicast: # The names of the interfaces on Nodes that are used to forward multicast traffic. # Defaults to transport interface if not set. - multicastInterfaces: + multicastInterfaces: + + # The interval at which the antrea-agent sends IGMP queries to Pods. + # Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". + igmpQueryInterval: "125s" # The network CIDRs of the interface on Node which is used for tunneling or routing the traffic across # Nodes. If there are multiple interfaces configured the same network CIDR, the first one is used. The @@ -3486,7 +3491,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 4554a36b927c6e64fdbc53b4d4c64673d48c9c829ec444e3be6e699ade8481b6 + checksum/config: 0cc20edc3fc882f0ea9bd3450fbab504858feeff47e1d3f09d8f6ebacd741dbe labels: app: antrea component: antrea-agent @@ -3728,7 +3733,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 4554a36b927c6e64fdbc53b4d4c64673d48c9c829ec444e3be6e699ade8481b6 + checksum/config: 0cc20edc3fc882f0ea9bd3450fbab504858feeff47e1d3f09d8f6ebacd741dbe labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 6c63773e17f..3d63d1b37f2 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -266,9 +266,14 @@ data: # 3. The Node IP transportInterface: "" + multicast: # The names of the interfaces on Nodes that are used to forward multicast traffic. # Defaults to transport interface if not set. - multicastInterfaces: + multicastInterfaces: + + # The interval at which the antrea-agent sends IGMP queries to Pods. + # Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". + igmpQueryInterval: "125s" # The network CIDRs of the interface on Node which is used for tunneling or routing the traffic across # Nodes. If there are multiple interfaces configured the same network CIDR, the first one is used. The @@ -3486,7 +3491,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: edef4c00e4f28a10dc1e077086ef68641a9a3b53d0fe7d47ff3dafc2ce5d5c9b + checksum/config: 6b6be76fd37d8fdac7783fcd026b6f34e993630c12c339b1dafa99ba5b36cf00 labels: app: antrea component: antrea-agent @@ -3726,7 +3731,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: edef4c00e4f28a10dc1e077086ef68641a9a3b53d0fe7d47ff3dafc2ce5d5c9b + checksum/config: 6b6be76fd37d8fdac7783fcd026b6f34e993630c12c339b1dafa99ba5b36cf00 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index 2c8f8b76521..c4c8c0389cf 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -279,9 +279,14 @@ data: # 3. The Node IP transportInterface: "" + multicast: # The names of the interfaces on Nodes that are used to forward multicast traffic. # Defaults to transport interface if not set. - multicastInterfaces: + multicastInterfaces: + + # The interval at which the antrea-agent sends IGMP queries to Pods. + # Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". + igmpQueryInterval: "125s" # The network CIDRs of the interface on Node which is used for tunneling or routing the traffic across # Nodes. If there are multiple interfaces configured the same network CIDR, the first one is used. The @@ -3499,7 +3504,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 50cc962db93c0354f5eaa088e51d690e779692979cbafac0c3e27a88fc2c0c7c + checksum/config: d289c621cfdc7aee9e8320c0398e76f302591b0adc12156d470320ee9839c073 checksum/ipsec-secret: d0eb9c52d0cd4311b6d252a951126bf9bea27ec05590bed8a394f0f792dcb2a4 labels: app: antrea @@ -3775,7 +3780,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 50cc962db93c0354f5eaa088e51d690e779692979cbafac0c3e27a88fc2c0c7c + checksum/config: d289c621cfdc7aee9e8320c0398e76f302591b0adc12156d470320ee9839c073 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index 8fb2b759fa3..243eea42c73 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -266,9 +266,14 @@ data: # 3. The Node IP transportInterface: "" + multicast: # The names of the interfaces on Nodes that are used to forward multicast traffic. # Defaults to transport interface if not set. - multicastInterfaces: + multicastInterfaces: + + # The interval at which the antrea-agent sends IGMP queries to Pods. + # Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". + igmpQueryInterval: "125s" # The network CIDRs of the interface on Node which is used for tunneling or routing the traffic across # Nodes. If there are multiple interfaces configured the same network CIDR, the first one is used. The @@ -3486,7 +3491,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: f7414e9171ab246b09dc380bc7934ebac81af7a5ef7bd3f73d661b6301040768 + checksum/config: 976e8c918d8c411df17238dd333a51f9adfdfafe2d6d480d7652f16be02fff3c labels: app: antrea component: antrea-agent @@ -3726,7 +3731,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: f7414e9171ab246b09dc380bc7934ebac81af7a5ef7bd3f73d661b6301040768 + checksum/config: 976e8c918d8c411df17238dd333a51f9adfdfafe2d6d480d7652f16be02fff3c labels: app: antrea component: antrea-controller diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 4dffbb76914..3e9124e7dae 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -589,9 +589,10 @@ func run(o *Options) error { nodeConfig, ifaceStore, multicastSocket, - sets.NewString(append(o.config.MulticastInterfaces, nodeConfig.NodeTransportInterfaceName)...), + sets.NewString(append(o.config.Multicast.MulticastInterfaces, nodeConfig.NodeTransportInterfaceName)...), ovsBridgeClient, - podUpdateChannel) + podUpdateChannel, + o.igmpQueryInterval) if err := mcastController.Initialize(); err != nil { return err } diff --git a/cmd/antrea-agent/options.go b/cmd/antrea-agent/options.go index ecc6cb0f389..fbc7c7784d0 100644 --- a/cmd/antrea-agent/options.go +++ b/cmd/antrea-agent/options.go @@ -47,6 +47,7 @@ const ( defaultFlowPollInterval = 5 * time.Second defaultActiveFlowExportTimeout = 30 * time.Second defaultIdleFlowExportTimeout = 15 * time.Second + defaultIGMPQueryInterval = 125 * time.Second defaultStaleConnectionTimeout = 5 * time.Minute defaultNPLPortRange = "61000-62000" ) @@ -68,6 +69,7 @@ type Options struct { idleFlowTimeout time.Duration // Stale connection timeout to delete connections if they are not exported. staleConnectionTimeout time.Duration + igmpQueryInterval time.Duration nplStartPort int nplEndPort int } @@ -161,6 +163,9 @@ func (o *Options) validate(args []string) error { if err := o.validateFlowExporterConfig(); err != nil { return fmt.Errorf("failed to validate flow exporter config: %v", err) } + if err := o.validateMulticastConfig(); err != nil { + return fmt.Errorf("failed to validate multicast config: %v", err) + } if features.DefaultFeatureGate.Enabled(features.Egress) { for _, cidr := range o.config.Egress.ExceptCIDRs { _, _, err := net.ParseCIDR(cidr) @@ -271,6 +276,12 @@ func (o *Options) setDefaults() { o.config.NodePortLocal.PortRange = defaultNPLPortRange } } + + if features.DefaultFeatureGate.Enabled(features.Multicast) { + if o.config.Multicast.IGMPQueryInterval == "" { + o.igmpQueryInterval = defaultIGMPQueryInterval + } + } } func (o *Options) validateAntreaProxyConfig() error { @@ -351,6 +362,19 @@ func (o *Options) validateFlowExporterConfig() error { return nil } +func (o *Options) validateMulticastConfig() error { + if features.DefaultFeatureGate.Enabled(features.Multicast) { + var err error + if o.config.Multicast.IGMPQueryInterval != "" { + o.igmpQueryInterval, err = time.ParseDuration(o.config.Multicast.IGMPQueryInterval) + if err != nil { + return err + } + } + } + return nil +} + func (o *Options) validateAntreaIPAMConfig() error { if !o.config.EnableBridgingMode { return nil diff --git a/hack/generate-manifest.sh b/hack/generate-manifest.sh index 521a9663dc9..a126c20ff3f 100755 --- a/hack/generate-manifest.sh +++ b/hack/generate-manifest.sh @@ -307,7 +307,7 @@ if $FLEXIBLE_IPAM; then fi if $MULTICAST; then - HELM_VALUES+=("trafficEncapMode=noEncap" "featureGates.Multicast=true" "multicastInterfaces={$MULTICAST_INTERFACES}") + HELM_VALUES+=("trafficEncapMode=noEncap" "featureGates.Multicast=true" "multicast.multicastInterfaces={$MULTICAST_INTERFACES}") fi if $ALLFEATURES; then diff --git a/pkg/agent/multicast/mcast_controller.go b/pkg/agent/multicast/mcast_controller.go index 481dcebb2d9..01188cba172 100644 --- a/pkg/agent/multicast/mcast_controller.go +++ b/pkg/agent/multicast/mcast_controller.go @@ -153,20 +153,20 @@ func (c *Controller) checkLastMember(group net.IP) { c.queue.AddAfter(group.String(), igmpMaxResponseTime) } -// clearStaleGroups checks the stale group members which have not been updated for mcastGroupTimeout, and then notifies worker +// clearStaleGroups checks the stale group members which have not been updated for c.mcastGroupTimeout, and then notifies worker // to remove them from groupCache. func (c *Controller) clearStaleGroups() { now := time.Now() for _, obj := range c.groupCache.List() { status := obj.(*GroupMemberStatus) diff := now.Sub(status.lastIGMPReport) - if diff > mcastGroupTimeout { + if diff > c.mcastGroupTimeout { // Notify worker to remove the group from groupCache if all its members are not updated before mcastGroupTimeout. c.queue.Add(status.group.String()) } else { // Create a "leave" event for a local member if it is not updated before mcastGroupTimeout. for member, lastUpdate := range status.localMembers { - if now.Sub(lastUpdate) > mcastGroupTimeout { + if now.Sub(lastUpdate) > c.mcastGroupTimeout { ifConfig := &interfacestore.InterfaceConfig{ InterfaceName: member, } @@ -222,6 +222,11 @@ type Controller struct { installedGroupsMutex sync.RWMutex mRouteClient *MRouteClient ovsBridgeClient ovsconfig.OVSBridgeClient + + // queryInterval is the interval to send IGMP query messages. + queryInterval time.Duration + // mcastGroupTimeout is the timeout to detect a group as stale if no IGMP report is received within the time. + mcastGroupTimeout time.Duration } func NewMulticastController(ofClient openflow.Client, @@ -231,25 +236,28 @@ func NewMulticastController(ofClient openflow.Client, multicastSocket RouteInterface, multicastInterfaces sets.String, ovsBridgeClient ovsconfig.OVSBridgeClient, - podUpdateSubscriber channel.Subscriber) *Controller { + podUpdateSubscriber channel.Subscriber, + igmpQueryInterval time.Duration) *Controller { eventCh := make(chan *mcastGroupEvent, workerCount) - groupSnooper := newSnooper(ofClient, ifaceStore, eventCh) + groupSnooper := newSnooper(ofClient, ifaceStore, eventCh, igmpQueryInterval) groupCache := cache.NewIndexer(getGroupEventKey, cache.Indexers{ podInterfaceIndex: podInterfaceIndexFunc, }) multicastRouteClient := newRouteClient(nodeConfig, groupCache, multicastSocket, multicastInterfaces) c := &Controller{ - ofClient: ofClient, - ifaceStore: ifaceStore, - v4GroupAllocator: v4GroupAllocator, - nodeConfig: nodeConfig, - igmpSnooper: groupSnooper, - groupEventCh: eventCh, - groupCache: groupCache, - installedGroups: sets.NewString(), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "multicastgroup"), - mRouteClient: multicastRouteClient, - ovsBridgeClient: ovsBridgeClient, + ofClient: ofClient, + ifaceStore: ifaceStore, + v4GroupAllocator: v4GroupAllocator, + nodeConfig: nodeConfig, + igmpSnooper: groupSnooper, + groupEventCh: eventCh, + groupCache: groupCache, + installedGroups: sets.NewString(), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "multicastgroup"), + mRouteClient: multicastRouteClient, + ovsBridgeClient: ovsBridgeClient, + queryInterval: igmpQueryInterval, + mcastGroupTimeout: igmpQueryInterval * 3, } podUpdateSubscriber.Subscribe(c.removeLocalInterface) return c @@ -279,10 +287,10 @@ func (c *Controller) Run(stopCh <-chan struct{}) { if err := c.igmpSnooper.queryIGMP(net.IPv4zero, queryVersions); err != nil { klog.ErrorS(err, "Failed to send IGMP query") } - }, queryInterval, stopCh) + }, c.queryInterval, stopCh) // Periodically check the group member status, and remove the groups in which no members exist - go wait.NonSlidingUntil(c.clearStaleGroups, queryInterval, stopCh) + go wait.NonSlidingUntil(c.clearStaleGroups, c.queryInterval, stopCh) go c.eventHandler(stopCh) for i := 0; i < int(workerCount); i++ { @@ -421,11 +429,11 @@ func (c *Controller) syncGroup(groupKey string) error { return nil } -// groupIsStale returns true if no local members in the group, or there is no IGMP report received after mcastGroupTimeout. +// groupIsStale returns true if no local members in the group, or there is no IGMP report received after c.mcastGroupTimeout. func (c *Controller) groupIsStale(status *GroupMemberStatus) bool { membersCount := len(status.localMembers) diff := time.Now().Sub(status.lastIGMPReport) - if membersCount == 0 || diff > mcastGroupTimeout { + if membersCount == 0 || diff > c.mcastGroupTimeout { return true } return false diff --git a/pkg/agent/multicast/mcast_controller_test.go b/pkg/agent/multicast/mcast_controller_test.go index 0536dfda675..1f57b503f30 100644 --- a/pkg/agent/multicast/mcast_controller_test.go +++ b/pkg/agent/multicast/mcast_controller_test.go @@ -217,7 +217,7 @@ func TestClearStaleGroups(t *testing.T) { wg.Done() }() now := time.Now() - validUpdateTime := now.Add(-queryInterval) + validUpdateTime := now.Add(-mctrl.queryInterval) validGroups := []*GroupMemberStatus{ { group: net.ParseIP("224.96.1.2"), @@ -230,7 +230,7 @@ func TestClearStaleGroups(t *testing.T) { lastIGMPReport: validUpdateTime, }, } - staleUpdateTime := now.Add(-mcastGroupTimeout - time.Second) + staleUpdateTime := now.Add(-mctrl.mcastGroupTimeout - time.Second) staleGroups := []*GroupMemberStatus{ { group: net.ParseIP("224.96.1.4"), @@ -364,7 +364,8 @@ func newMockMulticastController(t *testing.T) *Controller { mockOFClient.EXPECT().RegisterPacketInHandler(gomock.Any(), gomock.Any(), gomock.Any()).Times(1) groupAllocator := openflow.NewGroupAllocator(false) podUpdateSubscriber := channel.NewSubscribableChannel("PodUpdate", 100) - mctrl := NewMulticastController(mockOFClient, groupAllocator, nodeConfig, mockIfaceStore, mockMulticastSocket, sets.NewString(), ovsClient, podUpdateSubscriber) + queryInterval := 5 * time.Second + mctrl := NewMulticastController(mockOFClient, groupAllocator, nodeConfig, mockIfaceStore, mockMulticastSocket, sets.NewString(), ovsClient, podUpdateSubscriber, queryInterval) return mctrl } diff --git a/pkg/agent/multicast/mcast_discovery.go b/pkg/agent/multicast/mcast_discovery.go index 7d4b7fe91fc..e68da77bdf5 100644 --- a/pkg/agent/multicast/mcast_discovery.go +++ b/pkg/agent/multicast/mcast_discovery.go @@ -34,13 +34,6 @@ const ( IGMPProtocolNumber = 2 ) -const ( - // queryInterval is the interval to send IGMP query messages. - queryInterval = time.Second * 125 - // mcastGroupTimeout is the timeout to detect a group as stale if no IGMP report is received within the time. - mcastGroupTimeout = queryInterval * 3 -) - var ( // igmpMaxResponseTime is the maximum time allowed before sending a responding report which is used for the // "Max Resp Code" field in the IGMP query message. It is also the maximum time to wait for the IGMP report message @@ -52,9 +45,10 @@ var ( ) type IGMPSnooper struct { - ofClient openflow.Client - ifaceStore interfacestore.InterfaceStore - eventCh chan *mcastGroupEvent + ofClient openflow.Client + ifaceStore interfacestore.InterfaceStore + eventCh chan *mcastGroupEvent + queryInterval time.Duration } func (s *IGMPSnooper) HandlePacketIn(pktIn *ofctrl.PacketIn) error { @@ -99,7 +93,7 @@ func (s *IGMPSnooper) parseSrcInterface(pktIn *ofctrl.PacketIn) (*interfacestore func (s *IGMPSnooper) queryIGMP(group net.IP, versions []uint8) error { for _, version := range versions { - igmp, err := generateIGMPQueryPacket(group, version) + igmp, err := generateIGMPQueryPacket(group, version, s.queryInterval) if err != nil { return err } @@ -171,7 +165,7 @@ func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error { return nil } -func generateIGMPQueryPacket(group net.IP, version uint8) (util.Message, error) { +func generateIGMPQueryPacket(group net.IP, version uint8, queryInterval time.Duration) (util.Message, error) { // The max response time field in IGMP protocol uses a value in units of 1/10 second. // See https://datatracker.ietf.org/doc/html/rfc2236 and https://datatracker.ietf.org/doc/html/rfc3376 respTime := uint8(igmpMaxResponseTime.Seconds() * 10) @@ -242,8 +236,8 @@ func parseIGMPPacket(pkt protocol.Ethernet) (protocol.IGMPMessage, error) { } } -func newSnooper(ofClient openflow.Client, ifaceStore interfacestore.InterfaceStore, eventCh chan *mcastGroupEvent) *IGMPSnooper { - d := &IGMPSnooper{ofClient: ofClient, ifaceStore: ifaceStore, eventCh: eventCh} +func newSnooper(ofClient openflow.Client, ifaceStore interfacestore.InterfaceStore, eventCh chan *mcastGroupEvent, queryInterval time.Duration) *IGMPSnooper { + d := &IGMPSnooper{ofClient: ofClient, ifaceStore: ifaceStore, eventCh: eventCh, queryInterval: queryInterval} ofClient.RegisterPacketInHandler(uint8(openflow.PacketInReasonMC), "MulticastGroupDiscovery", d) return d } diff --git a/pkg/config/agent/config.go b/pkg/config/agent/config.go index 690e8491a4f..23109c7353c 100644 --- a/pkg/config/agent/config.go +++ b/pkg/config/agent/config.go @@ -182,9 +182,8 @@ type AgentConfig struct { // 2. TransportInterfaceCIDRs // 3. The Node IP TransportInterfaceCIDRs []string `yaml:"transportInterfaceCIDRs,omitempty"` - // The names of the interfaces on Nodes that are used to forward multicast traffic. - // Defaults to transport interface if not set. - MulticastInterfaces []string `yaml:"multicastInterfaces,omitempty"` + // Multicast configuration options. + Multicast MulticastConfig `yaml:"multicast,omitempty"` // AntreaProxy contains AntreaProxy related configuration options. AntreaProxy AntreaProxyConfig `yaml:"antreaProxy,omitempty"` // Egress related configurations. @@ -231,6 +230,15 @@ type NodePortLocalConfig struct { PortRange string `yaml:"portRange,omitempty"` } +type MulticastConfig struct { + // The names of the interfaces on Nodes that are used to forward multicast traffic. + // Defaults to transport interface if not set. + MulticastInterfaces []string `yaml:"multicastInterfaces,omitempty"` + // The interval for antrea-agent to send IGMP queries to Pods. + // Defaults to 125 seconds. + IGMPQueryInterval string `yaml:"igmpQueryInterval"` +} + type EgressConfig struct { ExceptCIDRs []string `yaml:"exceptCIDRs,omitempty"` } diff --git a/test/e2e/framework.go b/test/e2e/framework.go index b17fe84c5e4..83590118b69 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -2042,7 +2042,7 @@ func (data *TestData) GetMulticastInterfaces(antreaNamespace string) ([]string, if err != nil { return []string{}, err } - return agentConf.MulticastInterfaces, nil + return agentConf.Multicast.MulticastInterfaces, nil } func GetTransportInterface(data *TestData) (string, error) {