Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address Excessive API Server calls from CNI Pods #1419

Merged
merged 6 commits into from
May 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions cmd/aws-k8s-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package main
import (
"os"

"github.com/aws/amazon-vpc-cni-k8s/pkg/eniconfig"
"github.com/aws/amazon-vpc-cni-k8s/pkg/ipamd"
"github.com/aws/amazon-vpc-cni-k8s/pkg/k8sapi"
"github.com/aws/amazon-vpc-cni-k8s/pkg/utils/logger"
Expand All @@ -39,18 +38,22 @@ func _main() int {

log.Infof("Starting L-IPAMD %s ...", version)

kubeClient, err := k8sapi.CreateKubeClient()
//Check API Server Connectivity
if k8sapi.CheckAPIServerConnectivity() != nil {
return 1
}

rawK8SClient, err := k8sapi.CreateKubeClient()
if err != nil {
log.Errorf("Failed to create client: %v", err)
return 1
}

eniConfigController := eniconfig.NewENIConfigController()
if ipamd.UseCustomNetworkCfg() {
go eniConfigController.Start()
cacheK8SClient, err := k8sapi.CreateCachedKubeClient(rawK8SClient)
if err != nil {
return 1
}

ipamContext, err := ipamd.New(kubeClient, eniConfigController)
ipamContext, err := ipamd.New(rawK8SClient, cacheK8SClient)

if err != nil {
log.Errorf("Initialization failure: %v", err)
Expand Down
25 changes: 15 additions & 10 deletions cmd/cni-metrics-helper/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ func main() {
_, _ = fmt.Fprintf(os.Stderr, "Usage of %s:\n", os.Args[0])
flags.PrintDefaults()
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err := flags.Parse(os.Args)
if err != nil {
Expand Down Expand Up @@ -80,20 +82,22 @@ func main() {

log.Infof("Starting CNIMetricsHelper. Sending metrics to CloudWatch: %v, LogLevel %s", options.submitCW, logConfig.LogLevel)

kubeClient, err := k8sapi.CreateKubeClient()
clientSet, err := k8sapi.GetKubeClientSet()

rawK8SClient, err := k8sapi.CreateKubeClient()
if err != nil {
log.Fatalf("Failed to create client: %v", err)
log.Fatalf("Error creating Kubernetes Client: %s", err)
os.Exit(1)
}
k8sClient, err := k8sapi.CreateCachedKubeClient(rawK8SClient)
if err != nil {
log.Fatalf("Error creating Cached Kubernetes Client: %s", err)
os.Exit(1)
}

discoverController := k8sapi.NewController(kubeClient)
go discoverController.DiscoverCNIK8SPods()

var cw publisher.Publisher

if options.submitCW {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cw, err = publisher.New(ctx)
if err != nil {
log.Fatalf("Failed to create publisher: %v", err)
Expand All @@ -102,12 +106,13 @@ func main() {
defer cw.Stop()
}

var cniMetric = metrics.CNIMetricsNew(kubeClient, cw, discoverController, options.submitCW, log)
podWatcher := metrics.NewDefaultPodWatcher(k8sClient, log)
var cniMetric = metrics.CNIMetricsNew(clientSet, cw, options.submitCW, log, podWatcher)

// metric loop
var pullInterval = 30 // seconds
for range time.Tick(time.Duration(pullInterval) * time.Second) {
log.Info("Collecting metrics ...")
metrics.Handler(cniMetric)
metrics.Handler(ctx, cniMetric)
}
}
44 changes: 24 additions & 20 deletions cmd/cni-metrics-helper/metrics/cni_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ package metrics

import (
"github.com/aws/amazon-vpc-cni-k8s/pkg/utils/logger"
"golang.org/x/net/context"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes"

"github.com/aws/amazon-vpc-cni-k8s/pkg/k8sapi"
"github.com/aws/amazon-vpc-cni-k8s/pkg/publisher"
)

Expand Down Expand Up @@ -139,28 +139,29 @@ var InterestingCNIMetrics = map[string]metricsConvert{

// CNIMetricsTarget defines data structure for kube-state-metric target
type CNIMetricsTarget struct {
interestingMetrics map[string]metricsConvert
cwMetricsPublisher publisher.Publisher
kubeClient clientset.Interface
discoveryController *k8sapi.Controller
submitCW bool
log logger.Logger
interestingMetrics map[string]metricsConvert
cwMetricsPublisher publisher.Publisher
kubeClient kubernetes.Interface
podWatcher *defaultPodWatcher
submitCW bool
log logger.Logger
}

// CNIMetricsNew creates a new metricsTarget
func CNIMetricsNew(c clientset.Interface, cw publisher.Publisher, d *k8sapi.Controller, submitCW bool, l logger.Logger) *CNIMetricsTarget {
func CNIMetricsNew(k8sClient kubernetes.Interface, cw publisher.Publisher, submitCW bool, l logger.Logger,
watcher *defaultPodWatcher) *CNIMetricsTarget {
return &CNIMetricsTarget{
interestingMetrics: InterestingCNIMetrics,
cwMetricsPublisher: cw,
kubeClient: c,
discoveryController: d,
submitCW: submitCW,
log: l,
interestingMetrics: InterestingCNIMetrics,
cwMetricsPublisher: cw,
kubeClient: k8sClient,
podWatcher: watcher,
submitCW: submitCW,
log: l,
}
}

func (t *CNIMetricsTarget) grabMetricsFromTarget(cniPod string) ([]byte, error) {
output, err := getMetricsFromPod(t.kubeClient, cniPod, metav1.NamespaceSystem, metricsPort)
func (t *CNIMetricsTarget) grabMetricsFromTarget(ctx context.Context, cniPod string) ([]byte, error) {
output, err := getMetricsFromPod(ctx, t.kubeClient, cniPod, metav1.NamespaceSystem, metricsPort)
if err != nil {
t.log.Errorf("grabMetricsFromTarget: Failed to grab CNI endpoint: %v", err)
return nil, err
Expand All @@ -178,9 +179,12 @@ func (t *CNIMetricsTarget) getCWMetricsPublisher() publisher.Publisher {
return t.cwMetricsPublisher
}

func (t *CNIMetricsTarget) getTargetList() []string {
pods := t.discoveryController.GetCNIPods()
return pods
func (t *CNIMetricsTarget) getTargetList(ctx context.Context) ([]string, error) {
pods, err := t.podWatcher.GetCNIPods(ctx)
if err != nil {
return pods, err
}
return pods, nil
}

func (t *CNIMetricsTarget) submitCloudWatch() bool {
Expand Down
39 changes: 24 additions & 15 deletions cmd/cni-metrics-helper/metrics/cni_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,48 +3,57 @@ package metrics
import (
"testing"

"github.com/aws/amazon-vpc-cni-k8s/pkg/k8sapi"
"github.com/aws/amazon-vpc-cni-k8s/pkg/publisher/mock_publisher"
"github.com/aws/amazon-vpc-cni-k8s/pkg/utils/logger"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
k8sfake "k8s.io/client-go/kubernetes/fake"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
testclient "sigs.k8s.io/controller-runtime/pkg/client/fake"

eniconfigscheme "github.com/aws/amazon-vpc-cni-k8s/pkg/apis/crd/v1alpha1"
"github.com/aws/amazon-vpc-cni-k8s/pkg/publisher/mock_publisher"
"github.com/aws/amazon-vpc-cni-k8s/pkg/utils/logger"
)

var logConfig = logger.Configuration{
LogLevel: "Debug",
LogLocation: "stdout",
}

var log = logger.New(&logConfig)
var testLog = logger.New(&logConfig)

type testMocks struct {
ctrl *gomock.Controller
clientset *k8sfake.Clientset
discoverController *k8sapi.Controller
mockPublisher *mock_publisher.MockPublisher
clientset *k8sfake.Clientset
podWatcher *defaultPodWatcher
mockPublisher *mock_publisher.MockPublisher
}

func setup(t *testing.T) *testMocks {
ctrl := gomock.NewController(t)
fakeClientset := k8sfake.NewSimpleClientset()
k8sSchema := runtime.NewScheme()
clientgoscheme.AddToScheme(k8sSchema)
eniconfigscheme.AddToScheme(k8sSchema)
podWatcher := NewDefaultPodWatcher(testclient.NewFakeClientWithScheme(k8sSchema), testLog)
return &testMocks{
ctrl: ctrl,
clientset: fakeClientset,
discoverController: k8sapi.NewController(fakeClientset),
mockPublisher: mock_publisher.NewMockPublisher(ctrl),
clientset: fakeClientset,
podWatcher: podWatcher,
mockPublisher: mock_publisher.NewMockPublisher(ctrl),
}
}

func TestCNIMetricsNew(t *testing.T) {
m := setup(t)
_, _ = m.clientset.CoreV1().Pods("kube-system").Create(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "aws-node-1"}})
cniMetric := CNIMetricsNew(m.clientset, m.mockPublisher, m.discoverController, false, log)
ctx := context.Background()
_, _ = m.clientset.CoreV1().Pods("kube-system").Create(ctx, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "aws-node-1"}}, metav1.CreateOptions{})
//cniMetric := CNIMetricsNew(m.clientset, m.mockPublisher, m.discoverController, false, log)
cniMetric := CNIMetricsNew(m.clientset, m.mockPublisher, false, testLog, m.podWatcher)
assert.NotNil(t, cniMetric)
assert.NotNil(t, cniMetric.getCWMetricsPublisher())
assert.NotEmpty(t, cniMetric.getInterestingMetrics())
assert.Equal(t, log, cniMetric.getLogger())
assert.Equal(t, testLog, cniMetric.getLogger())
assert.False(t, cniMetric.submitCloudWatch())
}
31 changes: 18 additions & 13 deletions cmd/cni-metrics-helper/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,28 @@ package metrics

import (
"bytes"
"context"
"fmt"

"github.com/aws/amazon-vpc-cni-k8s/pkg/publisher"
"github.com/aws/amazon-vpc-cni-k8s/pkg/utils/logger"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatch"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
clientset "k8s.io/client-go/kubernetes"
"github.com/prometheus/common/log"
"k8s.io/client-go/kubernetes"

"github.com/aws/amazon-vpc-cni-k8s/pkg/publisher"
"github.com/aws/amazon-vpc-cni-k8s/pkg/utils/logger"
)

type metricMatcher func(metric *dto.Metric) bool
type actionFuncType func(aggregatedValue *float64, sampleValue float64)

type metricsTarget interface {
grabMetricsFromTarget(target string) ([]byte, error)
grabMetricsFromTarget(ctx context.Context, target string) ([]byte, error)
getInterestingMetrics() map[string]metricsConvert
getCWMetricsPublisher() publisher.Publisher
getTargetList() []string
getTargetList(ctx context.Context) ([]string, error)
submitCloudWatch() bool
getLogger() logger.Logger
}
Expand Down Expand Up @@ -81,14 +84,15 @@ func metricsMax(aggregatedValue *float64, sampleValue float64) {
}
}

func getMetricsFromPod(client clientset.Interface, podName string, namespace string, port int) ([]byte, error) {
rawOutput, err := client.CoreV1().RESTClient().Get().
func getMetricsFromPod(ctx context.Context, k8sClient kubernetes.Interface, podName string, namespace string, port int) ([]byte, error) {
rawOutput, err := k8sClient.CoreV1().RESTClient().Get().
Namespace(namespace).
Resource("pods").
SubResource("proxy").
Name(fmt.Sprintf("%v:%v", podName, port)).
Suffix("metrics").
Do().Raw()
Do(ctx).Raw()

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -365,16 +369,17 @@ func resetMetrics(interestingMetrics map[string]metricsConvert) {
}
}

func metricsListGrabAggregateConvert(t metricsTarget) (map[string]*dto.MetricFamily, map[string]metricsConvert, bool, error) {
func metricsListGrabAggregateConvert(ctx context.Context, t metricsTarget) (map[string]*dto.MetricFamily, map[string]metricsConvert, bool, error) {
var resetDetected = false
var families map[string]*dto.MetricFamily

interestingMetrics := t.getInterestingMetrics()
resetMetrics(interestingMetrics)

targetList := t.getTargetList()
targetList, _ := t.getTargetList(ctx)
log.Debugf("Total TargetList pod count:- %v", len(targetList))
for _, target := range targetList {
rawOutput, err := t.grabMetricsFromTarget(target)
rawOutput, err := t.grabMetricsFromTarget(ctx, target)
if err != nil {
// it may take times to remove some metric targets
continue
Expand Down Expand Up @@ -413,8 +418,8 @@ func metricsListGrabAggregateConvert(t metricsTarget) (map[string]*dto.MetricFam
}

// Handler grabs metrics from target, aggregates the metrics and convert them into cloudwatch metrics
func Handler(t metricsTarget) {
families, interestingMetrics, resetDetected, err := metricsListGrabAggregateConvert(t)
func Handler(ctx context.Context, t metricsTarget) {
families, interestingMetrics, resetDetected, err := metricsListGrabAggregateConvert(ctx, t)

if err != nil || resetDetected {
t.getLogger().Infof("Skipping 1st poll after reset, error: %v", err)
Expand Down
14 changes: 8 additions & 6 deletions cmd/cni-metrics-helper/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
package metrics

import (
"context"
"io/ioutil"
"testing"

"github.com/stretchr/testify/assert"

"github.com/aws/amazon-vpc-cni-k8s/pkg/publisher"
"github.com/aws/amazon-vpc-cni-k8s/pkg/utils/logger"
"github.com/stretchr/testify/assert"
)

type testMetricsTarget struct {
Expand All @@ -37,7 +39,7 @@ func newTestMetricsTarget(metricFile string, interestingMetrics map[string]metri
interestingMetrics: interestingMetrics}
}

func (target *testMetricsTarget) grabMetricsFromTarget(targetName string) ([]byte, error) {
func (target *testMetricsTarget) grabMetricsFromTarget(ctx context.Context, targetName string) ([]byte, error) {
testMetrics, _ := ioutil.ReadFile(target.metricFile)

return testMetrics, nil
Expand All @@ -51,8 +53,8 @@ func (target *testMetricsTarget) getCWMetricsPublisher() publisher.Publisher {
return nil
}

func (target *testMetricsTarget) getTargetList() []string {
return []string{target.metricFile}
func (target *testMetricsTarget) getTargetList(ctx context.Context) ([]string, error) {
return []string{target.metricFile}, nil
}

func (target *testMetricsTarget) submitCloudWatch() bool {
Expand All @@ -61,8 +63,8 @@ func (target *testMetricsTarget) submitCloudWatch() bool {

func TestAPIServerMetric(t *testing.T) {
testTarget := newTestMetricsTarget("cni_test1.data", InterestingCNIMetrics)

_, _, resetDetected, err := metricsListGrabAggregateConvert(testTarget)
ctx := context.Background()
_, _, resetDetected, err := metricsListGrabAggregateConvert(ctx, testTarget)
assert.NoError(t, err)
assert.True(t, resetDetected)

Expand Down
Loading