Skip to content

Commit

Permalink
Merge pull request #1256 from PiotrProkop/fix-topo-updater-policy-and…
Browse files Browse the repository at this point in the history
…-scope-advertisment

Fix Topology Manager policy and scope not being updated after NRT creation
  • Loading branch information
k8s-ci-robot committed Jul 28, 2023
2 parents 65b7216 + 6d98b61 commit e0f10a8
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 72 deletions.
42 changes: 1 addition & 41 deletions cmd/nfd-topology-updater/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,16 @@ package main
import (
"flag"
"fmt"
"net/url"
"os"
"path"
"time"

"k8s.io/klog/v2"
kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"

topology "sigs.k8s.io/node-feature-discovery/pkg/nfd-topology-updater"
"sigs.k8s.io/node-feature-discovery/pkg/resourcemonitor"
"sigs.k8s.io/node-feature-discovery/pkg/utils"
"sigs.k8s.io/node-feature-discovery/pkg/utils/hostpath"
"sigs.k8s.io/node-feature-discovery/pkg/utils/kubeconf"
"sigs.k8s.io/node-feature-discovery/pkg/version"
)

Expand Down Expand Up @@ -63,14 +60,8 @@ func main() {
// Plug klog into grpc logging infrastructure
utils.ConfigureGrpcKlog()

klConfig, err := getKubeletConfig(resourcemonitorArgs.KubeletConfigURI, resourcemonitorArgs.APIAuthTokenFile)
if err != nil {
klog.ErrorS(err, "failed to get kubelet configuration")
os.Exit(1)
}

// Get new TopologyUpdater instance
instance, err := topology.NewTopologyUpdater(*args, *resourcemonitorArgs, klConfig.TopologyManagerPolicy, klConfig.TopologyManagerScope)
instance, err := topology.NewTopologyUpdater(*args, *resourcemonitorArgs)
if err != nil {
klog.ErrorS(err, "failed to initialize topology updater instance")
os.Exit(1)
Expand Down Expand Up @@ -134,34 +125,3 @@ func initFlags(flagset *flag.FlagSet) (*topology.Args, *resourcemonitor.Args) {

return args, resourcemonitorArgs
}

func getKubeletConfig(uri, apiAuthTokenFile string) (*kubeletconfigv1beta1.KubeletConfiguration, error) {
u, err := url.ParseRequestURI(uri)
if err != nil {
return nil, fmt.Errorf("failed to parse -kubelet-config-uri: %w", err)
}

// init kubelet API client
var klConfig *kubeletconfigv1beta1.KubeletConfiguration
switch u.Scheme {
case "file":
klConfig, err = kubeconf.GetKubeletConfigFromLocalFile(u.Path)
if err != nil {
return nil, fmt.Errorf("failed to read kubelet config: %w", err)
}
return klConfig, err
case "https":
restConfig, err := kubeconf.InsecureConfig(u.String(), apiAuthTokenFile)
if err != nil {
return nil, fmt.Errorf("failed to initialize rest config for kubelet config uri: %w", err)
}

klConfig, err = kubeconf.GetKubeletConfiguration(restConfig)
if err != nil {
return nil, fmt.Errorf("failed to get kubelet config from configz endpoint: %w", err)
}
return klConfig, nil
}

return nil, fmt.Errorf("unsupported URI scheme: %v", u.Scheme)
}
133 changes: 102 additions & 31 deletions pkg/nfd-topology-updater/nfd-topology-updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package nfdtopologyupdater

import (
"fmt"
"net/url"
"os"
"path/filepath"

Expand All @@ -26,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"

"github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
"sigs.k8s.io/node-feature-discovery/pkg/apihelper"
Expand All @@ -34,6 +36,7 @@ import (
"sigs.k8s.io/node-feature-discovery/pkg/resourcemonitor"
"sigs.k8s.io/node-feature-discovery/pkg/topologypolicy"
"sigs.k8s.io/node-feature-discovery/pkg/utils"
"sigs.k8s.io/node-feature-discovery/pkg/utils/kubeconf"
"sigs.k8s.io/node-feature-discovery/pkg/version"
"sigs.k8s.io/yaml"
)
Expand Down Expand Up @@ -66,35 +69,20 @@ type NfdTopologyUpdater interface {
Stop()
}

type staticNodeInfo struct {
nodeName string
tmPolicy string
tmScope string
}

func newStaticNodeInfo(policy, scope string) staticNodeInfo {
nodeName := utils.NodeName()
klog.InfoS("detected kubelet Topology Manager configuration", "policy", policy, "scope", scope, "nodeName", nodeName)
return staticNodeInfo{
nodeName: nodeName,
tmPolicy: policy,
tmScope: scope,
}
}

type nfdTopologyUpdater struct {
nodeInfo staticNodeInfo
nodeName string
args Args
apihelper apihelper.APIHelpers
resourcemonitorArgs resourcemonitor.Args
stop chan struct{} // channel for signaling stop
eventSource <-chan kubeletnotifier.Info
configFilePath string
config *NFDConfig
kubeletConfigFunc func() (*kubeletconfigv1beta1.KubeletConfiguration, error)
}

// NewTopologyUpdater creates a new NfdTopologyUpdater instance.
func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args, policy, scope string) (NfdTopologyUpdater, error) {
func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args) (NfdTopologyUpdater, error) {
eventSource := make(chan kubeletnotifier.Info)
if args.KubeletStateDir != "" {
ntf, err := kubeletnotifier.New(resourcemonitorArgs.SleepInterval, eventSource, args.KubeletStateDir)
Expand All @@ -103,24 +91,40 @@ func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args, pol
}
go ntf.Run()
}

kubeletConfigFunc, err := getKubeletConfigFunc(resourcemonitorArgs.KubeletConfigURI, resourcemonitorArgs.APIAuthTokenFile)
if err != nil {
return nil, err
}

nfd := &nfdTopologyUpdater{
args: args,
resourcemonitorArgs: resourcemonitorArgs,
nodeInfo: newStaticNodeInfo(policy, scope),
stop: make(chan struct{}, 1),
nodeName: utils.NodeName(),
eventSource: eventSource,
config: &NFDConfig{},
kubeletConfigFunc: kubeletConfigFunc,
}
if args.ConfigFile != "" {
nfd.configFilePath = filepath.Clean(args.ConfigFile)
}
return nfd, nil
}

func (w *nfdTopologyUpdater) detectTopologyPolicyAndScope() (string, string, error) {
klConfig, err := w.kubeletConfigFunc()
if err != nil {
return "", "", err
}

return klConfig.TopologyManagerPolicy, klConfig.TopologyManagerScope, nil
}

// Run nfdTopologyUpdater. Returns if a fatal error is encountered, or, after
// one request if OneShot is set to 'true' in the updater args.
func (w *nfdTopologyUpdater) Run() error {
klog.InfoS("Node Feature Discovery Topology Updater", "version", version.Get(), "nodeName", w.nodeInfo.nodeName)
klog.InfoS("Node Feature Discovery Topology Updater", "version", version.Get(), "nodeName", w.nodeName)

podResClient, err := podres.GetPodResClient(w.resourcemonitorArgs.PodResourceSocketPath)
if err != nil {
Expand Down Expand Up @@ -151,7 +155,7 @@ func (w *nfdTopologyUpdater) Run() error {
// zonesChannel := make(chan v1alpha1.ZoneList)
var zones v1alpha2.ZoneList

excludeList := resourcemonitor.NewExcludeResourceList(w.config.ExcludeList, w.nodeInfo.nodeName)
excludeList := resourcemonitor.NewExcludeResourceList(w.config.ExcludeList, w.nodeName)
resAggr, err := resourcemonitor.NewResourcesAggregator(podResClient, excludeList)
if err != nil {
return fmt.Errorf("failed to obtain node resource information: %w", err)
Expand All @@ -169,8 +173,13 @@ func (w *nfdTopologyUpdater) Run() error {
}
zones = resAggr.Aggregate(scanResponse.PodResources)
klog.V(1).InfoS("aggregated resources identified", "resourceZones", utils.DelayedDumper(zones))
readKubeletConfig := false
if info.Event == kubeletnotifier.IntervalBased {
readKubeletConfig = true
}

if !w.args.NoPublish {
if err = w.updateNodeResourceTopology(zones, scanResponse); err != nil {
if err = w.updateNodeResourceTopology(zones, scanResponse, readKubeletConfig); err != nil {
return err
}
}
Expand All @@ -195,27 +204,29 @@ func (w *nfdTopologyUpdater) Stop() {
}
}

func (w *nfdTopologyUpdater) updateNodeResourceTopology(zoneInfo v1alpha2.ZoneList, scanResponse resourcemonitor.ScanResponse) error {
func (w *nfdTopologyUpdater) updateNodeResourceTopology(zoneInfo v1alpha2.ZoneList, scanResponse resourcemonitor.ScanResponse, readKubeletConfig bool) error {
cli, err := w.apihelper.GetTopologyClient()
if err != nil {
return err
}

nrt, err := cli.TopologyV1alpha2().NodeResourceTopologies().Get(context.TODO(), w.nodeInfo.nodeName, metav1.GetOptions{})
nrt, err := cli.TopologyV1alpha2().NodeResourceTopologies().Get(context.TODO(), w.nodeName, metav1.GetOptions{})
if errors.IsNotFound(err) {
nrtNew := v1alpha2.NodeResourceTopology{
ObjectMeta: metav1.ObjectMeta{
Name: w.nodeInfo.nodeName,
Name: w.nodeName,
},
Zones: zoneInfo,
TopologyPolicies: []string{string(topologypolicy.DetectTopologyPolicy(w.nodeInfo.tmPolicy, w.nodeInfo.tmScope))},
Attributes: createTopologyAttributes(w.nodeInfo.tmPolicy, w.nodeInfo.tmScope),
Zones: zoneInfo,
Attributes: v1alpha2.AttributeList{},
}

if err := w.updateNRTTopologyManagerInfo(&nrtNew); err != nil {
return err
}

updateAttributes(&nrtNew.Attributes, scanResponse.Attributes)

_, err := cli.TopologyV1alpha2().NodeResourceTopologies().Create(context.TODO(), &nrtNew, metav1.CreateOptions{})
if err != nil {
if _, err := cli.TopologyV1alpha2().NodeResourceTopologies().Create(context.TODO(), &nrtNew, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("failed to create NodeResourceTopology: %w", err)
}
return nil
Expand All @@ -225,16 +236,41 @@ func (w *nfdTopologyUpdater) updateNodeResourceTopology(zoneInfo v1alpha2.ZoneLi

nrtMutated := nrt.DeepCopy()
nrtMutated.Zones = zoneInfo
updateAttributes(&nrtMutated.Attributes, scanResponse.Attributes)

attributes := scanResponse.Attributes

if readKubeletConfig {
if err := w.updateNRTTopologyManagerInfo(nrtMutated); err != nil {
return err
}
}

updateAttributes(&nrtMutated.Attributes, attributes)

nrtUpdated, err := cli.TopologyV1alpha2().NodeResourceTopologies().Update(context.TODO(), nrtMutated, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update NodeResourceTopology: %w", err)
}

klog.V(4).InfoS("NodeResourceTopology object updated", "nodeResourceTopology", utils.DelayedDumper(nrtUpdated))
return nil
}

func (w *nfdTopologyUpdater) updateNRTTopologyManagerInfo(nrt *v1alpha2.NodeResourceTopology) error {
policy, scope, err := w.detectTopologyPolicyAndScope()
if err != nil {
return fmt.Errorf("failed to detect TopologyManager's policy and scope: %w", err)
}

tmAttributes := createTopologyAttributes(policy, scope)
deprecatedTopologyPolicies := []string{string(topologypolicy.DetectTopologyPolicy(policy, scope))}

updateAttributes(&nrt.Attributes, tmAttributes)
nrt.TopologyPolicies = deprecatedTopologyPolicies

return nil
}

func (w *nfdTopologyUpdater) configure() error {
if w.configFilePath == "" {
klog.InfoS("no configuration file specified")
Expand Down Expand Up @@ -290,3 +326,38 @@ func updateAttributes(lhs *v1alpha2.AttributeList, rhs v1alpha2.AttributeList) {
updateAttribute(lhs, attr)
}
}

func getKubeletConfigFunc(uri, apiAuthTokenFile string) (func() (*kubeletconfigv1beta1.KubeletConfiguration, error), error) {
u, err := url.ParseRequestURI(uri)
if err != nil {
return nil, fmt.Errorf("failed to parse -kubelet-config-uri: %w", err)
}

// init kubelet API client
var klConfig *kubeletconfigv1beta1.KubeletConfiguration
switch u.Scheme {
case "file":
return func() (*kubeletconfigv1beta1.KubeletConfiguration, error) {
klConfig, err = kubeconf.GetKubeletConfigFromLocalFile(u.Path)
if err != nil {
return nil, fmt.Errorf("failed to read kubelet config: %w", err)
}
return klConfig, err
}, nil
case "https":
restConfig, err := kubeconf.InsecureConfig(u.String(), apiAuthTokenFile)
if err != nil {
return nil, fmt.Errorf("failed to initialize rest config for kubelet config uri: %w", err)
}

return func() (*kubeletconfigv1beta1.KubeletConfiguration, error) {
klConfig, err = kubeconf.GetKubeletConfiguration(restConfig)
if err != nil {
return nil, fmt.Errorf("failed to get kubelet config from configz endpoint: %w", err)
}
return klConfig, nil
}, nil
}

return nil, fmt.Errorf("unsupported URI scheme: %v", u.Scheme)
}

0 comments on commit e0f10a8

Please sign in to comment.