diff --git a/pkg/agent/multicast/mcast_controller.go b/pkg/agent/multicast/mcast_controller.go index c11a923371f..ea01f5b4f2b 100644 --- a/pkg/agent/multicast/mcast_controller.go +++ b/pkg/agent/multicast/mcast_controller.go @@ -307,7 +307,9 @@ func (c *Controller) syncGroup(groupKey string) error { klog.ErrorS(err, "Failed to uninstall multicast flows", "group", groupKey) return err } + c.installedGroupsMutex.Lock() err := c.mRouteClient.deleteInboundMrouteEntryByGroup(status.group) + c.installedGroupsMutex.Unlock() if err != nil { klog.ErrorS(err, "Cannot delete multicast group", "group", groupKey) return err diff --git a/pkg/agent/multicast/mcast_route.go b/pkg/agent/multicast/mcast_route.go index 279c11c4f19..6135c700257 100644 --- a/pkg/agent/multicast/mcast_route.go +++ b/pkg/agent/multicast/mcast_route.go @@ -17,6 +17,8 @@ import ( "fmt" "net" "strings" + "sync" + "time" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/cache" @@ -38,6 +40,7 @@ func newRouteClient(nodeconfig *config.NodeConfig, groupCache cache.Indexer, mul nodeConfig: nodeconfig, groupCache: groupCache, inboundRouteCache: cache.NewIndexer(getMulticastInboundEntryKey, cache.Indexers{GroupNameIndexName: inboundGroupIndexFunc}), + outboundRouteCache: cache.NewIndexer(getMulticastOutboundEntryKey, cache.Indexers{}), multicastInterfaces: multicastInterfaces.List(), socket: multicastSocket, } @@ -72,6 +75,8 @@ type MRouteClient struct { nodeConfig *config.NodeConfig multicastInterfaces []string inboundRouteCache cache.Indexer + inboundRouteCacheMutex sync.RWMutex + outboundRouteCache cache.Indexer groupCache cache.Indexer socket RouteInterface multicastInterfaceConfigs []multicastInterfaceConfig @@ -148,15 +153,32 @@ func (c *MRouteClient) deleteInboundMrouteEntryByGroup(group net.IP) (err error) mEntries, _ := c.inboundRouteCache.ByIndex(GroupNameIndexName, group.String()) for _, route := range mEntries { entry := route.(*inboundMulticastRouteEntry) - err := c.socket.DelMrouteEntry(net.ParseIP(entry.src), net.ParseIP(entry.group), entry.vif) + err := c.deleteInboundMRoute(entry) if err != nil { return err } - c.inboundRouteCache.Delete(route) } return nil } +func (c *MRouteClient) deleteInboundMRoute(mRoute *inboundMulticastRouteEntry) (err error) { + err = c.socket.DelMrouteEntry(net.ParseIP(mRoute.src).To4(), net.ParseIP(mRoute.group).To4(), mRoute.vif) + if err != nil { + return err + } + c.inboundRouteCache.Delete(mRoute) + return nil +} + +func (c *MRouteClient) deleteOutboundMRoute(mRoute *outboundMulticastRouteEntry) (err error) { + err = c.socket.DelMrouteEntry(net.ParseIP(mRoute.src).To4(), net.ParseIP(mRoute.group).To4(), c.internalInterfaceVIF) + if err != nil { + return err + } + c.outboundRouteCache.Delete(mRoute) + return nil +} + // addOutboundMrouteEntry configures multicast route from Antrea gateway to all the multicast interfaces, // allowing multicast sender Pods to send multicast traffic to external. func (c *MRouteClient) addOutboundMrouteEntry(src net.IP, group net.IP) (err error) { @@ -165,6 +187,11 @@ func (c *MRouteClient) addOutboundMrouteEntry(src net.IP, group net.IP) (err err if err != nil { return err } + routeEntry := &outboundMulticastRouteEntry{} + routeEntry.group = group.String() + routeEntry.src = src.String() + routeEntry.createdTime = time.Now() + c.outboundRouteCache.Add(routeEntry) return nil } @@ -177,15 +204,38 @@ func (c *MRouteClient) addInboundMrouteEntry(src net.IP, group net.IP, inboundVI return err } routeEntry := &inboundMulticastRouteEntry{ - group: group.String(), - src: src.String(), - vif: inboundVIF, + vif: inboundVIF, } + routeEntry.group = group.String() + routeEntry.src = src.String() + routeEntry.createdTime = time.Now() c.inboundRouteCache.Add(routeEntry) return nil } +type multicastRouteEntry struct { + group string + src string + pktCount uint32 + createdTime time.Time +} + +// outboundMulticastRouteEntry encodes the outbound multicast routing entry. +// For example, +// type inboundMulticastRouteEntry struct { +// group "226.94.9.9" +// src "10.0.0.55" +// } encodes the multicast route entry from Antrea gateway to multicast interfaces +// (10.0.0.55,226.94.9.9) Iif: antrea-gw0 Oifs: list of multicastInterfaces. +// The iif is always Antrea gateway and oifs are always outbound interfaces +// so we do not put them in the struct. +// Field pktCount and createdTime are used for removing staled multicast routes. +type outboundMulticastRouteEntry struct { + multicastRouteEntry +} + // inboundMulticastRouteEntry encodes the inbound multicast routing entry. +// It has extra field Iif to reprent inbound interface VIF. // For example, // type inboundMulticastRouteEntry struct { // group "226.94.9.9" @@ -193,11 +243,9 @@ func (c *MRouteClient) addInboundMrouteEntry(src net.IP, group net.IP, inboundVI // vif vif of wlan0 // } encodes the multicast route entry from wlan0 to Antrea gateway // (10.0.0.55,226.94.9.9) Iif: wlan0 Oifs: antrea-gw0. -// The oif is always Antrea gateway so we do not put it in the struct. type inboundMulticastRouteEntry struct { - group string - src string - vif uint16 + multicastRouteEntry + vif uint16 } func getMulticastInboundEntryKey(obj interface{}) (string, error) { @@ -205,6 +253,11 @@ func getMulticastInboundEntryKey(obj interface{}) (string, error) { return entry.group + "/" + entry.src + "/" + fmt.Sprint(entry.vif), nil } +func getMulticastOutboundEntryKey(obj interface{}) (string, error) { + entry := obj.(*outboundMulticastRouteEntry) + return entry.group + "/" + entry.src, nil +} + func inboundGroupIndexFunc(obj interface{}) ([]string, error) { entry, ok := obj.(*inboundMulticastRouteEntry) if !ok { @@ -272,6 +325,8 @@ type RouteInterface interface { // AddMrouteEntry adds multicast route with specified source(src), multicast group IP(group), // inbound multicast interface(iif) and outbound multicast interfaces(oifs). AddMrouteEntry(src net.IP, group net.IP, iif uint16, oifs []uint16) (err error) + // GetMroutePacketCount returns number of routed packets by multicast route entry. + GetMroutePacketCount(src net.IP, group net.IP) (pktCount uint32, err error) // DelMrouteEntry deletes multicast route with specified source(src), multicast group IP(group), // inbound multicast interface(iif). DelMrouteEntry(src net.IP, group net.IP, iif uint16) (err error) diff --git a/pkg/agent/multicast/mcast_route_linux.go b/pkg/agent/multicast/mcast_route_linux.go index 9495a007647..36b8b8474ac 100644 --- a/pkg/agent/multicast/mcast_route_linux.go +++ b/pkg/agent/multicast/mcast_route_linux.go @@ -21,12 +21,18 @@ import ( "fmt" "net" "syscall" + "time" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" "antrea.io/antrea/pkg/util/runtime" ) +const ( + outboundMRouteTimeout = time.Minute * 1 +) + // parseIGMPMsg parses the kernel version into parsedIGMPMsg. Note we need to consider the change // after linux 5.9 in the igmpmsg struct when parsing vif. Please check // https://github.com/torvalds/linux/commit/c8715a8e9f38906e73d6d78764216742db13ba0e. @@ -71,6 +77,8 @@ func (c *MRouteClient) run(stopCh <-chan struct{}) { } }() + go wait.NonSlidingUntil(c.updateMrouteStats, mRouteTimeout, stopCh) + for i := 0; i < int(workerCount); i++ { go c.worker(stopCh) } @@ -78,3 +86,67 @@ func (c *MRouteClient) run(stopCh <-chan struct{}) { c.socket.FlushMRoute() syscall.Close(c.socket.GetFD()) } + +func (c *MRouteClient) updateMulticastRouteStatsEntry(entry *multicastRouteEntry) (isUpdated bool) { + packetCount, err := c.socket.GetMroutePacketCount(net.ParseIP(entry.src).To4(), net.ParseIP(entry.group).To4()) + if err != nil { + klog.ErrorS(err, "Failed to get packet count for outbound multicast route", "outboundRoute", entry) + return + } + packetCountDiff := packetCount - entry.pktCount + now := time.Now() + klog.V(4).Infof("Outbound multicast route %v routes %d packets in last %s", entry, packetCountDiff, mRouteTimeout.String()) + if packetCountDiff == uint32(0) && now.Sub(entry.createdTime) > mRouteTimeout { + return true + } + entry.pktCount = packetCount + return false +} + +func (c *MRouteClient) updateInboundMrouteStats() { + deletedInboundRoutes := make([]*inboundMulticastRouteEntry, 0) + for _, obj := range c.inboundRouteCache.List() { + entry := obj.(*inboundMulticastRouteEntry) + if c.updateMulticastRouteStatsEntry(&entry.multicastRouteEntry) { + deletedInboundRoutes = append(deletedInboundRoutes, entry) + } else { + c.inboundRouteCache.Update(entry) + } + } + for _, inboundRoute := range deletedInboundRoutes { + klog.V(2).InfoS("Deleting staled inbound multicast route", "group", inboundRoute.group, "source", inboundRoute.src) + err := c.deleteInboundMRoute(inboundRoute) + if err != nil { + klog.ErrorS(err, "Failed to delete inbound multicast route", "group", inboundRoute.group, "source", inboundRoute.src, "VIF", inboundRoute.vif) + return + } + } +} + +func (c *MRouteClient) updateOutboundMrouteStats() { + deletedOutboundRoutes := make([]*outboundMulticastRouteEntry, 0) + for _, obj := range c.outboundRouteCache.List() { + entry := obj.(*outboundMulticastRouteEntry) + if c.updateMulticastRouteStatsEntry(&entry.multicastRouteEntry) { + deletedOutboundRoutes = append(deletedOutboundRoutes, entry) + } else { + c.outboundRouteCache.Update(entry) + } + } + for _, outboundRoute := range deletedOutboundRoutes { + klog.V(2).InfoS("Deleting staled outbound multicast route", "group", outboundRoute.group, "source", outboundRoute.src) + err := c.deleteOutboundMRoute(outboundRoute) + if err != nil { + klog.ErrorS(err, "Failed to delete outbound multicast route", "group", outboundRoute.group, "source", outboundRoute.src) + return + } + } +} + +func (c *MRouteClient) updateMrouteStats() { + klog.V(2).InfoS("Updating multicast route statistics and removing staled multicast routes") + c.inboundRouteCacheMutex.Lock() + c.updateInboundMrouteStats() + c.inboundRouteCacheMutex.Unlock() + c.updateOutboundMrouteStats() +} diff --git a/pkg/agent/multicast/mcast_route_test.go b/pkg/agent/multicast/mcast_route_test.go index 081171e7c00..0f1818a9101 100644 --- a/pkg/agent/multicast/mcast_route_test.go +++ b/pkg/agent/multicast/mcast_route_test.go @@ -21,6 +21,7 @@ import ( "fmt" "net" "testing" + "time" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" @@ -64,6 +65,113 @@ func TestParseIGMPMsg(t *testing.T) { } } +func TestUpdateOutboundMrouteStats(t *testing.T) { + mRoute := newMockMulticastRouteClient(t) + err := mRoute.initialize(t) + assert.Nil(t, err) + now := time.Now() + for _, m := range []struct { + isStaled bool + currStats uint32 + group string + source string + packetCount uint32 + createdTime time.Time + }{ + { + group: "224.3.5.7", + source: "10.1.2.3", + createdTime: now, + isStaled: false, + currStats: 0, + }, + { + group: "224.3.5.8", + source: "10.1.2.4", + createdTime: now.Add(time.Duration(-mRouteTimeout)), + packetCount: 10, + isStaled: false, + currStats: 9, + }, + { + group: "224.3.5.9", + source: "10.1.2.5", + createdTime: now.Add(time.Duration(-mRouteTimeout)), + packetCount: 0, + isStaled: true, + currStats: 0, + }, + } { + outboundMrouteEntry := &outboundMulticastRouteEntry{} + outboundMrouteEntry.src = m.source + outboundMrouteEntry.group = m.group + outboundMrouteEntry.pktCount = m.packetCount + outboundMrouteEntry.createdTime = m.createdTime + mRoute.outboundRouteCache.Add(outboundMrouteEntry) + mockMulticastSocket.EXPECT().GetMroutePacketCount(net.ParseIP(m.source).To4(), net.ParseIP(m.group).To4()).Times(1).Return(m.currStats, nil) + if m.isStaled { + mockMulticastSocket.EXPECT().DelMrouteEntry(net.ParseIP(m.source).To4(), net.ParseIP(m.group).To4(), uint16(0)).Times(1) + } + } + mRoute.updateMrouteStats() +} + +func TestUpdateInboundMrouteStats(t *testing.T) { + mRoute := newMockMulticastRouteClient(t) + err := mRoute.initialize(t) + assert.Nil(t, err) + now := time.Now() + for _, m := range []struct { + isStaled bool + currPacketCount uint32 + vif uint16 + group string + source string + packetCount uint32 + createdTime time.Time + }{ + { + group: "224.3.5.7", + source: "192.168.50.60", + createdTime: now, + isStaled: false, + currPacketCount: 0, + vif: 3, + }, + { + group: "224.3.5.8", + source: "192.168.50.61", + createdTime: now.Add(time.Duration(-mRouteTimeout)), + packetCount: 10, + isStaled: false, + currPacketCount: 9, + vif: 4, + }, + { + group: "224.3.5.9", + source: "192.168.50.62", + createdTime: now.Add(time.Duration(-mRouteTimeout)), + packetCount: 5, + isStaled: true, + currPacketCount: 5, + vif: 5, + }, + } { + inboundMrouteEntry := &inboundMulticastRouteEntry{} + inboundMrouteEntry.src = m.source + inboundMrouteEntry.group = m.group + inboundMrouteEntry.vif = m.vif + inboundMrouteEntry.pktCount = m.packetCount + inboundMrouteEntry.createdTime = m.createdTime + mRoute.inboundRouteCache.Add(inboundMrouteEntry) + mockMulticastSocket.EXPECT().GetMroutePacketCount(net.ParseIP(m.source).To4(), net.ParseIP(m.group).To4()).Times(1).Return(m.currPacketCount, nil) + if m.isStaled { + mockMulticastSocket.EXPECT().DelMrouteEntry(net.ParseIP(m.source).To4(), net.ParseIP(m.group).To4(), m.vif).Times(1) + } + } + mRoute.updateMrouteStats() +} + func TestProcessIGMPNocacheMsg(t *testing.T) { mRoute := newMockMulticastRouteClient(t) err := mRoute.initialize(t) diff --git a/pkg/agent/multicast/mcast_socket_linux.go b/pkg/agent/multicast/mcast_socket_linux.go index f8c946b06cb..87495e9ae8a 100644 --- a/pkg/agent/multicast/mcast_socket_linux.go +++ b/pkg/agent/multicast/mcast_socket_linux.go @@ -60,6 +60,18 @@ func (s *Socket) AddMrouteEntry(src net.IP, group net.IP, iif uint16, oifVIFs [] return multicastsyscall.SetsockoptMfcctl(s.GetFD(), syscall.IPPROTO_IP, multicastsyscall.MRT_ADD_MFC, mc) } +func (s *Socket) GetoutboundMroutePacketCount(src net.IP, group net.IP) (pktCount uint32, err error) { + siocSgReq := multicastsyscall.SiocSgReq{ + Src: [4]byte{src[0], src[1], src[2], src[3]}, + Grp: [4]byte{group[0], group[1], group[2], group[3]}, + } + stats, err := multicastsyscall.IoctlGetSiocSgReq(s.GetFD(), &siocSgReq) + if err != nil { + return 0, err + } + return stats.Pktcnt, nil +} + func (s *Socket) DelMrouteEntry(src net.IP, group net.IP, iif uint16) (err error) { mc := &multicastsyscall.Mfcctl{} origin := src.To4() diff --git a/pkg/agent/multicast/mcast_socket_others.go b/pkg/agent/multicast/mcast_socket_others.go index f60848c1f8a..b78c5930d56 100644 --- a/pkg/agent/multicast/mcast_socket_others.go +++ b/pkg/agent/multicast/mcast_socket_others.go @@ -31,6 +31,10 @@ func (s *Socket) AddMrouteEntry(src net.IP, group net.IP, iif uint16, oifVIFs [] return nil } +func (s *Socket) GetMroutePacketCount(src net.IP, group net.IP) (pktCount uint32, err error) { + return 0, nil +} + func (s *Socket) DelMrouteEntry(src net.IP, group net.IP, iif uint16) (err error) { return nil } diff --git a/pkg/agent/multicast/testing/mock_multicast.go b/pkg/agent/multicast/testing/mock_multicast.go index 79a2701ebe3..4fbe77d35f4 100644 --- a/pkg/agent/multicast/testing/mock_multicast.go +++ b/pkg/agent/multicast/testing/mock_multicast.go @@ -117,6 +117,21 @@ func (mr *MockRouteInterfaceMockRecorder) GetFD() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetFD", reflect.TypeOf((*MockRouteInterface)(nil).GetFD)) } +// GetMroutePacketCount mocks base method +func (m *MockRouteInterface) GetMroutePacketCount(arg0, arg1 net.IP) (uint32, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMroutePacketCount", arg0, arg1) + ret0, _ := ret[0].(uint32) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetMroutePacketCount indicates an expected call of GetMroutePacketCount +func (mr *MockRouteInterfaceMockRecorder) GetMroutePacketCount(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMroutePacketCount", reflect.TypeOf((*MockRouteInterface)(nil).GetMroutePacketCount), arg0, arg1) +} + // MulticastInterfaceJoinMgroup mocks base method func (m *MockRouteInterface) MulticastInterfaceJoinMgroup(arg0, arg1 net.IP, arg2 string) error { m.ctrl.T.Helper() diff --git a/pkg/agent/util/syscall/linux/types.go b/pkg/agent/util/syscall/linux/types.go index fc1fc877855..a81f5c2b16c 100644 --- a/pkg/agent/util/syscall/linux/types.go +++ b/pkg/agent/util/syscall/linux/types.go @@ -47,6 +47,7 @@ const ( type Mfcctl C.struct_mfcctl type Vifctl C.struct_vifctl_with_ifindex +type SiocSgReq C.struct_siocsgreq const SizeofMfcctl = C.sizeof_struct_mfcctl const SizeofVifctl = C.sizeof_struct_vifctl_with_ifindex diff --git a/pkg/agent/util/syscall/syscall_unix.go b/pkg/agent/util/syscall/syscall_unix.go index 6a1a494f9cd..6f85281d1a5 100644 --- a/pkg/agent/util/syscall/syscall_unix.go +++ b/pkg/agent/util/syscall/syscall_unix.go @@ -19,6 +19,7 @@ package syscall import ( + "runtime" "syscall" "unsafe" ) @@ -34,7 +35,15 @@ func setsockopt(s int, level int, name int, val unsafe.Pointer, vallen uintptr) return } -// Please add your wrapped syscall functions below +func ioctl(fd int, req uint, arg uintptr) (err error) { + _, _, e1 := syscall.Syscall(syscall.SYS_IOCTL, uintptr(fd), uintptr(req), uintptr(arg)) + if e1 != 0 { + return e1 + } + return +} + +// Please add your wrapped syscall functions below. func SetsockoptMfcctl(fd, level, opt int, mfcctl *Mfcctl) error { return setsockopt(fd, level, opt, unsafe.Pointer(mfcctl), SizeofMfcctl) @@ -43,3 +52,9 @@ func SetsockoptMfcctl(fd, level, opt int, mfcctl *Mfcctl) error { func SetsockoptVifctl(fd, level, opt int, vifctl *Vifctl) error { return setsockopt(fd, level, opt, unsafe.Pointer(vifctl), SizeofVifctl) } + +func IoctlGetSiocSgReq(fd int, siocsgreq *SiocSgReq) (*SiocSgReq, error) { + err := ioctl(fd, SIOCGETSGCNT, uintptr(unsafe.Pointer(siocsgreq))) + runtime.KeepAlive(siocsgreq) + return siocsgreq, err +} diff --git a/pkg/agent/util/syscall/ztypes_linux.go b/pkg/agent/util/syscall/ztypes_linux.go index 2d064ccfe3c..2664ca9ba38 100644 --- a/pkg/agent/util/syscall/ztypes_linux.go +++ b/pkg/agent/util/syscall/ztypes_linux.go @@ -26,6 +26,7 @@ const ( MRT_INIT = 0xc8 MRT_FLUSH = 0xd4 MAXVIFS = 0x20 + SIOCGETSGCNT = 0x89e1 ) type Mfcctl struct { @@ -48,6 +49,14 @@ type Vifctl struct { Rmt_addr [4]byte /* in_addr */ } +type SiocSgReq = struct { + Src [4]byte /* in_addr */ + Grp [4]byte /* in_addr */ + Pktcnt uint32 + Bytecnt uint32 + If uint32 +} + const SizeofMfcctl = 0x3c const SizeofVifctl = 0x10 const SizeofIgmpmsg = 0x14