Skip to content

Commit

Permalink
[Multicast] Support removal of stale multicast routes
Browse files Browse the repository at this point in the history
Check packet count difference every minute
for each multicast route and remove ones that have
identical packet count in past mRouteTimeout.

Signed-off-by: ceclinux <src655@gmail.com>
  • Loading branch information
ceclinux committed Feb 12, 2023
1 parent 9df5a81 commit 555050f
Show file tree
Hide file tree
Showing 9 changed files with 323 additions and 18 deletions.
85 changes: 71 additions & 14 deletions pkg/agent/multicast/mcast_route.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"net"
"strings"
"time"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
Expand All @@ -40,6 +41,7 @@ func newRouteClient(nodeconfig *config.NodeConfig, groupCache cache.Indexer, mul
nodeConfig: nodeconfig,
groupCache: groupCache,
inboundRouteCache: cache.NewIndexer(getMulticastInboundEntryKey, cache.Indexers{GroupNameIndexName: inboundGroupIndexFunc}),
outboundRouteCache: cache.NewIndexer(getMulticastOutboundEntryKey, cache.Indexers{}),
multicastInterfaces: multicastInterfaces.List(),
socket: multicastSocket,
}
Expand Down Expand Up @@ -74,6 +76,7 @@ type MRouteClient struct {
nodeConfig *config.NodeConfig
multicastInterfaces []string
inboundRouteCache cache.Indexer
outboundRouteCache cache.Indexer
groupCache cache.Indexer
socket RouteInterface
multicastInterfaceConfigs []multicastInterfaceConfig
Expand Down Expand Up @@ -150,23 +153,47 @@ func (c *MRouteClient) deleteInboundMrouteEntryByGroup(group net.IP) (err error)
mEntries, _ := c.inboundRouteCache.ByIndex(GroupNameIndexName, group.String())
for _, route := range mEntries {
entry := route.(*inboundMulticastRouteEntry)
err := c.socket.DelMrouteEntry(net.ParseIP(entry.src), net.ParseIP(entry.group), entry.vif)
err := c.deleteInboundMRoute(entry)
if err != nil {
return err
}
c.inboundRouteCache.Delete(route)
}
return nil
}

func (c *MRouteClient) deleteInboundMRoute(mRoute *inboundMulticastRouteEntry) error {
err := c.socket.DelMrouteEntry(net.ParseIP(mRoute.src), net.ParseIP(mRoute.group), mRoute.vif)
if err != nil {
return err
}
c.inboundRouteCache.Delete(mRoute)
return nil
}

func (c *MRouteClient) deleteOutboundMRoute(mRoute *outboundMulticastRouteEntry) error {
err := c.socket.DelMrouteEntry(net.ParseIP(mRoute.src), net.ParseIP(mRoute.group), c.internalInterfaceVIF)
if err != nil {
return err
}
c.outboundRouteCache.Delete(mRoute)
return nil
}

// addOutboundMrouteEntry configures multicast route from Antrea gateway to all the multicast interfaces,
// allowing multicast srcNode Pods to send multicast traffic to external.
func (c *MRouteClient) addOutboundMrouteEntry(src net.IP, group net.IP) (err error) {
func (c *MRouteClient) addOutboundMrouteEntry(src net.IP, group net.IP) error {
klog.V(2).InfoS("Adding outbound multicast route entry", "src", src, "group", group, "outboundVIFs", c.externalInterfaceVIFs)
err = c.socket.AddMrouteEntry(src, group, c.internalInterfaceVIF, c.externalInterfaceVIFs)
err := c.socket.AddMrouteEntry(src, group, c.internalInterfaceVIF, c.externalInterfaceVIFs)
if err != nil {
return err
}
routeEntry := &outboundMulticastRouteEntry{
multicastRouteEntry: multicastRouteEntry{
group: group.String(),
src: src.String(),
updatedTime: time.Now(),
},
}
c.outboundRouteCache.Add(routeEntry)
return nil
}

Expand All @@ -179,15 +206,40 @@ func (c *MRouteClient) addInboundMrouteEntry(src net.IP, group net.IP, inboundVI
return err
}
routeEntry := &inboundMulticastRouteEntry{
group: group.String(),
src: src.String(),
vif: inboundVIF,
vif: inboundVIF,
multicastRouteEntry: multicastRouteEntry{
group: group.String(),
src: src.String(),
updatedTime: time.Now(),
},
}
c.inboundRouteCache.Add(routeEntry)
return nil
}

type multicastRouteEntry struct {
group string
src string
pktCount uint32
updatedTime time.Time
}

// outboundMulticastRouteEntry encodes the outbound multicast routing entry.
// For example,
// type inboundMulticastRouteEntry struct {
// group "226.94.9.9"
// src "10.0.0.55"
// } encodes the multicast route entry from Antrea gateway to multicast interfaces
// (10.0.0.55,226.94.9.9) Iif: antrea-gw0 Oifs: list of multicastInterfaces.
// The iif is always Antrea gateway and oifs are always outbound interfaces
// so we do not put them in the struct.
// Field pktCount and updatedTime are used for removing stale multicast routes.
type outboundMulticastRouteEntry struct {
multicastRouteEntry
}

// inboundMulticastRouteEntry encodes the inbound multicast routing entry.
// It has extra field Iif to represent inbound interface VIF.
// For example,
//
// type inboundMulticastRouteEntry struct {
Expand All @@ -197,18 +249,21 @@ func (c *MRouteClient) addInboundMrouteEntry(src net.IP, group net.IP, inboundVI
// } encodes the multicast route entry from wlan0 to Antrea gateway
//
// (10.0.0.55,226.94.9.9) Iif: wlan0 Oifs: antrea-gw0.
// The oif is always Antrea gateway so we do not put it in the struct.
type inboundMulticastRouteEntry struct {
group string
src string
vif uint16
multicastRouteEntry
vif uint16
}

func getMulticastInboundEntryKey(obj interface{}) (string, error) {
entry := obj.(*inboundMulticastRouteEntry)
return entry.group + "/" + entry.src + "/" + fmt.Sprint(entry.vif), nil
}

func getMulticastOutboundEntryKey(obj interface{}) (string, error) {
entry := obj.(*outboundMulticastRouteEntry)
return entry.group + "/" + entry.src, nil
}

func inboundGroupIndexFunc(obj interface{}) ([]string, error) {
entry, ok := obj.(*inboundMulticastRouteEntry)
if !ok {
Expand Down Expand Up @@ -275,10 +330,12 @@ type RouteInterface interface {
MulticastInterfaceLeaveMgroup(mgroup net.IP, ifaceIP net.IP, ifaceName string) error
// AddMrouteEntry adds multicast route with specified source(src), multicast group IP(group),
// inbound multicast interface(iif) and outbound multicast interfaces(oifs).
AddMrouteEntry(src net.IP, group net.IP, iif uint16, oifs []uint16) (err error)
AddMrouteEntry(src net.IP, group net.IP, iif uint16, oifs []uint16) error
// GetMroutePacketCount returns the number of routed packets by the multicast route entry.
GetMroutePacketCount(src net.IP, group net.IP) (uint32, error)
// DelMrouteEntry deletes multicast route with specified source(src), multicast group IP(group),
// inbound multicast interface(iif).
DelMrouteEntry(src net.IP, group net.IP, iif uint16) (err error)
DelMrouteEntry(src net.IP, group net.IP, iif uint16) error
// FlushMRoute flushes static multicast routing entries.
FlushMRoute()
// GetFD returns socket file descriptor.
Expand Down
76 changes: 76 additions & 0 deletions pkg/agent/multicast/mcast_route_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,18 @@ import (
"fmt"
"net"
"syscall"
"time"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/util/runtime"
)

const (
mRouteTimeout = time.Minute * 10
)

// parseIGMPMsg parses the kernel version into parsedIGMPMsg. Note we need to consider the change
// after linux 5.9 in the igmpmsg struct when parsing vif. Please check
// https://github.com/torvalds/linux/commit/c8715a8e9f38906e73d6d78764216742db13ba0e.
Expand Down Expand Up @@ -71,10 +77,80 @@ func (c *MRouteClient) run(stopCh <-chan struct{}) {
}
}()

// Check packet count difference every minute for each multicast route and
// remove ones that do not route any packets in past mRouteTimeout.
// The remaining multicast routes' statistics are getting updated by
// this process as well.
go wait.NonSlidingUntil(c.updateMrouteStats, time.Minute, stopCh)

for i := 0; i < int(workerCount); i++ {
go c.worker(stopCh)
}
<-stopCh
c.socket.FlushMRoute()
syscall.Close(c.socket.GetFD())
}

func (c *MRouteClient) updateMulticastRouteStatsEntry(entry *multicastRouteEntry) (isDeleted bool, newEntry *multicastRouteEntry) {
packetCount, err := c.socket.GetMroutePacketCount(net.ParseIP(entry.src), net.ParseIP(entry.group))
if err != nil {
klog.ErrorS(err, "Failed to get packet count for outbound multicast route", "outboundRoute", entry)
return false, nil
}
packetCountDiff := packetCount - entry.pktCount
now := time.Now()
klog.V(4).Infof("Multicast route %v routes %d packets in last %s", entry, packetCountDiff, time.Minute)
if packetCountDiff == uint32(0) {
return now.Sub(entry.updatedTime) > mRouteTimeout, nil
}
newEntry = &multicastRouteEntry{group: entry.group, src: entry.src, pktCount: packetCount, updatedTime: now}
return false, newEntry
}

func (c *MRouteClient) updateInboundMrouteStats() {
deletedInboundRoutes := make([]*inboundMulticastRouteEntry, 0)
for _, obj := range c.inboundRouteCache.List() {
entry := obj.(*inboundMulticastRouteEntry)
isDeleted, newEntry := c.updateMulticastRouteStatsEntry(&entry.multicastRouteEntry)
if isDeleted {
deletedInboundRoutes = append(deletedInboundRoutes, entry)
} else if newEntry != nil {
newInboundEntry := inboundMulticastRouteEntry{*newEntry, entry.vif}
c.inboundRouteCache.Update(&newInboundEntry)
}
}
for _, inboundRoute := range deletedInboundRoutes {
klog.V(2).InfoS("Deleting stale inbound multicast route", "group", inboundRoute.group, "source", inboundRoute.src)
err := c.deleteInboundMRoute(inboundRoute)
if err != nil {
klog.ErrorS(err, "Failed to delete inbound multicast route", "group", inboundRoute.group, "source", inboundRoute.src, "VIF", inboundRoute.vif)
}
}
}

func (c *MRouteClient) updateOutboundMrouteStats() {
deletedOutboundRoutes := make([]*outboundMulticastRouteEntry, 0)
for _, obj := range c.outboundRouteCache.List() {
entry := obj.(*outboundMulticastRouteEntry)
isDeleted, newEntry := c.updateMulticastRouteStatsEntry(&entry.multicastRouteEntry)
if isDeleted {
deletedOutboundRoutes = append(deletedOutboundRoutes, entry)
} else if newEntry != nil {
newOutboundEntry := outboundMulticastRouteEntry{*newEntry}
c.outboundRouteCache.Update(&newOutboundEntry)
}
}
for _, outboundRoute := range deletedOutboundRoutes {
klog.V(2).InfoS("Deleting stale outbound multicast route", "group", outboundRoute.group, "source", outboundRoute.src)
err := c.deleteOutboundMRoute(outboundRoute)
if err != nil {
klog.ErrorS(err, "Failed to delete outbound multicast route", "group", outboundRoute.group, "source", outboundRoute.src)
}
}
}

func (c *MRouteClient) updateMrouteStats() {
klog.V(2).InfoS("Updating multicast route statistics and removing stale multicast routes")
c.updateInboundMrouteStats()
c.updateOutboundMrouteStats()
}
113 changes: 113 additions & 0 deletions pkg/agent/multicast/mcast_route_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,119 @@ func TestDeleteInboundMrouteEntryByGroup(t *testing.T) {
}
}

func TestUpdateOutboundMrouteStats(t *testing.T) {
mRoute := newMockMulticastRouteClient(t)
err := mRoute.initialize(t)
assert.Nil(t, err)
now := time.Now()
for _, m := range []struct {
isStale bool
currStats uint32
group string
source string
packetCount uint32
createdTime time.Time
}{
{
group: "224.3.5.7",
source: "10.1.2.3",
createdTime: now,
isStale: false,
currStats: 0,
},
{
group: "224.3.5.8",
source: "10.1.2.4",
createdTime: now.Add(time.Duration(-mRouteTimeout)),
packetCount: 10,
isStale: false,
currStats: 9,
},
{
group: "224.3.5.9",
source: "10.1.2.5",
createdTime: now.Add(time.Duration(-mRouteTimeout)),
packetCount: 0,
isStale: true,
currStats: 0,
},
} {
outboundMrouteEntry := &outboundMulticastRouteEntry{
multicastRouteEntry: multicastRouteEntry{
src: m.source,
group: m.group,
pktCount: m.packetCount,
updatedTime: m.createdTime,
},
}
mRoute.outboundRouteCache.Add(outboundMrouteEntry)
mockMulticastSocket.EXPECT().GetMroutePacketCount(net.ParseIP(m.source), net.ParseIP(m.group)).Times(1).Return(m.currStats, nil)
if m.isStale {
mockMulticastSocket.EXPECT().DelMrouteEntry(net.ParseIP(m.source), net.ParseIP(m.group), uint16(0)).Times(1)
}
}
mRoute.updateMrouteStats()
}

func TestUpdateInboundMrouteStats(t *testing.T) {
mRoute := newMockMulticastRouteClient(t)
err := mRoute.initialize(t)
assert.Nil(t, err)
now := time.Now()
for _, m := range []struct {
isStale bool
currPacketCount uint32
vif uint16
group string
source string
packetCount uint32
updatedTime time.Time
}{
{
group: "224.3.5.7",
source: "192.168.50.60",
updatedTime: now,
isStale: false,
currPacketCount: 0,
vif: 3,
},
{
group: "224.3.5.8",
source: "192.168.50.61",
updatedTime: now.Add(time.Duration(-mRouteTimeout)),
packetCount: 10,
isStale: false,
currPacketCount: 9,
vif: 4,
},
{
group: "224.3.5.9",
source: "192.168.50.62",
updatedTime: now.Add(time.Duration(-mRouteTimeout)),
packetCount: 5,
isStale: true,
currPacketCount: 5,
vif: 5,
},
} {
inboundMrouteEntry := &inboundMulticastRouteEntry{
multicastRouteEntry: multicastRouteEntry{
src: m.source,
group: m.group,
pktCount: m.packetCount,
updatedTime: m.updatedTime,
},
vif: m.vif,
}
mRoute.inboundRouteCache.Add(inboundMrouteEntry)
mockMulticastSocket.EXPECT().GetMroutePacketCount(net.ParseIP(m.source), net.ParseIP(m.group)).Times(1).Return(m.currPacketCount, nil)
if m.isStale {
mockMulticastSocket.EXPECT().DelMrouteEntry(net.ParseIP(m.source), net.ParseIP(m.group), m.vif).Times(1)
}
}
mRoute.updateMrouteStats()
}

func TestProcessIGMPNocacheMsg(t *testing.T) {
mRoute := newMockMulticastRouteClient(t)
err := mRoute.initialize(t)
Expand Down
Loading

0 comments on commit 555050f

Please sign in to comment.