Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graphite Scaler + e2e tests #2092

Merged
merged 4 commits into from
Sep 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

- TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX))
- ScaledJob: introduce MultipleScalersCalculation ([#2016](https://github.com/kedacore/keda/pull/2016))
- Add Graphite Scaler ([#1628](https://github.com/kedacore/keda/pull/2092))

### Improvements

Expand Down
6 changes: 6 additions & 0 deletions CREATE-NEW-SCALER.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,9 @@ The constructor should have the following parameters:
## Lifecycle of a scaler

The scaler is created and closed everytime KEDA or HPA wants to call `GetMetrics`, and everytime a new ScaledObject is created or updated that has a trigger for that scaler. Thus, a developer of a scaler should not assume that the scaler will maintain any state between these calls.

## Note
The scaler code is embedded into the two separate binaries comprising KEDA, the operator and the custom metrics server component. The metrics server must be occasionally rebuilt published and deployed to k8s for it to have the same code as your operator.

GetMetricSpecForScaling() is executed by the operator for the purposes of scaling up to and down to 0 replicas.
GetMetrics() is executed by the custom metrics server in response to a calls against the external metrics api, whether by the HPA loop or by curl
213 changes: 213 additions & 0 deletions pkg/scalers/graphite_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package scalers

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
url_pkg "net/url"
"strconv"

v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)

const (
grapServerAddress = "serverAddress"
grapMetricName = "metricName"
grapQuery = "query"
grapThreshold = "threshold"
grapQueryTime = "queryTime"
)

type graphiteScaler struct {
metadata *graphiteMetadata
httpClient *http.Client
}

type graphiteMetadata struct {
serverAddress string
metricName string
query string
threshold int
from string

// basic auth
enableBasicAuth bool
username string
password string // +optional
}

type grapQueryResult []struct {
Target string `json:"target"`
Tags map[string]interface{} `json:"tags"`
Datapoints [][]float64 `json:"datapoints"`
}

var graphiteLog = logf.Log.WithName("graphite_scaler")

// NewGraphiteScaler creates a new graphiteScaler
func NewGraphiteScaler(config *ScalerConfig) (Scaler, error) {
meta, err := parseGraphiteMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing graphite metadata: %s", err)
}

httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout)

return &graphiteScaler{
metadata: meta,
httpClient: httpClient,
}, nil
}

func parseGraphiteMetadata(config *ScalerConfig) (*graphiteMetadata, error) {
meta := graphiteMetadata{}

if val, ok := config.TriggerMetadata[grapServerAddress]; ok && val != "" {
meta.serverAddress = val
} else {
return nil, fmt.Errorf("no %s given", grapServerAddress)
}

if val, ok := config.TriggerMetadata[grapQuery]; ok && val != "" {
meta.query = val
} else {
return nil, fmt.Errorf("no %s given", grapQuery)
}

if val, ok := config.TriggerMetadata[grapMetricName]; ok && val != "" {
meta.metricName = val
} else {
return nil, fmt.Errorf("no %s given", grapMetricName)
}

if val, ok := config.TriggerMetadata[grapQueryTime]; ok && val != "" {
meta.from = val
} else {
return nil, fmt.Errorf("no %s given", grapQueryTime)
}

if val, ok := config.TriggerMetadata[grapThreshold]; ok && val != "" {
t, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("error parsing %s: %s", grapThreshold, err)
}

meta.threshold = t
}

val, ok := config.TriggerMetadata["authMode"]
// no authMode specified
if !ok {
return &meta, nil
}
if val != "basic" {
return nil, fmt.Errorf("authMode must be 'basic'")
}

if len(config.AuthParams["username"]) == 0 {
return nil, fmt.Errorf("no username given")
}

meta.username = config.AuthParams["username"]
// password is optional. For convenience, many application implement basic auth with
// username as apikey and password as empty
meta.password = config.AuthParams["password"]
meta.enableBasicAuth = true

return &meta, nil
}

func (s *graphiteScaler) IsActive(ctx context.Context) (bool, error) {
val, err := s.ExecuteGrapQuery()
if err != nil {
graphiteLog.Error(err, "error executing graphite query")
return false, err
}

return val > 0, nil
}

func (s *graphiteScaler) Close() error {
return nil
}

func (s *graphiteScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
targetMetricValue := resource.NewQuantity(int64(s.metadata.threshold), resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s", "graphite", s.metadata.metricName)),
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetMetricValue,
},
}
metricSpec := v2beta2.MetricSpec{
External: externalMetric, Type: externalMetricType,
}
return []v2beta2.MetricSpec{metricSpec}
}

func (s *graphiteScaler) ExecuteGrapQuery() (float64, error) {
queryEscaped := url_pkg.QueryEscape(s.metadata.query)
url := fmt.Sprintf("%s/render?from=%s&target=%s&format=json", s.metadata.serverAddress, s.metadata.from, queryEscaped)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return -1, err
}
if s.metadata.enableBasicAuth {
req.SetBasicAuth(s.metadata.username, s.metadata.password)
}
r, err := s.httpClient.Do(req)
if err != nil {
return -1, err
}

b, err := ioutil.ReadAll(r.Body)
if err != nil {
return -1, err
}
r.Body.Close()

var result grapQueryResult
err = json.Unmarshal(b, &result)
if err != nil {
return -1, err
}

if len(result) == 0 {
return 0, nil
} else if len(result) > 1 {
return -1, fmt.Errorf("graphite query %s returned multiple series", s.metadata.query)
}

// https://graphite-api.readthedocs.io/en/latest/api.html#json
datapoint := result[0].Datapoints[0][0]

return datapoint, nil
}

func (s *graphiteScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
val, err := s.ExecuteGrapQuery()
if err != nil {
graphiteLog.Error(err, "error executing graphite query")
return []external_metrics.ExternalMetricValue{}, err
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(val), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}
100 changes: 100 additions & 0 deletions pkg/scalers/graphite_scaler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package scalers

import (
"strings"
"testing"
)

type parseGraphiteMetadataTestData struct {
metadata map[string]string
isError bool
}

type graphiteMetricIdentifier struct {
metadataTestData *parseGraphiteMetadataTestData
name string
}

var testGrapMetadata = []parseGraphiteMetadataTestData{
{map[string]string{}, true},
// all properly formed
{map[string]string{"serverAddress": "http://localhost:81", "metricName": "request-count", "threshold": "100", "query": "stats.counters.http.hello-world.request.count.count", "queryTime": "-30Seconds"}, false},
// missing serverAddress
{map[string]string{"serverAddress": "", "metricName": "request-count", "threshold": "100", "query": "stats.counters.http.hello-world.request.count.count", "queryTime": "-30Seconds"}, true},
// missing metricName
{map[string]string{"serverAddress": "http://localhost:81", "metricName": "", "threshold": "100", "query": "stats.counters.http.hello-world.request.count.count", "queryTime": "-30Seconds"}, true},
// malformed threshold
{map[string]string{"serverAddress": "http://localhost:81", "metricName": "request-count", "threshold": "one", "query": "stats.counters.http.hello-world.request.count.count", "queryTime": "-30Seconds"}, true},
// missing query
{map[string]string{"serverAddress": "http://localhost:81", "metricName": "request-count", "threshold": "100", "query": "", "queryTime": "-30Seconds", "disableScaleToZero": "true"}, true},
// missing queryTime
{map[string]string{"serverAddress": "http://localhost:81", "metricName": "request-count", "threshold": "100", "query": "stats.counters.http.hello-world.request.count.count", "queryTime": ""}, true},
}

var graphiteMetricIdentifiers = []graphiteMetricIdentifier{
{&testGrapMetadata[1], "graphite-request-count"},
}

type graphiteAuthMetadataTestData struct {
metadata map[string]string
authParams map[string]string
isError bool
}

var testGraphiteAuthMetadata = []graphiteAuthMetadataTestData{
// success basicAuth
{map[string]string{"serverAddress": "http://localhost:81", "metricName": "request-count", "threshold": "100", "query": "stats.counters.http.hello-world.request.count.count", "queryTime": "-30Seconds", "authMode": "basic"}, map[string]string{"username": "user", "password": "pass"}, false},
// fail basicAuth with no username
{map[string]string{"serverAddress": "http://localhost:81", "metricName": "request-count", "threshold": "100", "query": "stats.counters.http.hello-world.request.count.count", "queryTime": "-30Seconds", "authMode": "basic"}, map[string]string{}, true},
// fail if using non-basicAuth authMode
{map[string]string{"serverAddress": "http://localhost:81", "metricName": "request-count", "threshold": "100", "query": "stats.counters.http.hello-world.request.count.count", "queryTime": "-30Seconds", "authMode": "tls"}, map[string]string{"username": "user"}, true},
}

func TestGraphiteParseMetadata(t *testing.T) {
for _, testData := range testGrapMetadata {
_, err := parseGraphiteMetadata(&ScalerConfig{TriggerMetadata: testData.metadata})
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
}
}

func TestGraphiteGetMetricSpecForScaling(t *testing.T) {
for _, testData := range graphiteMetricIdentifiers {
meta, err := parseGraphiteMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
mockGraphiteScaler := graphiteScaler{
metadata: meta,
}

metricSpec := mockGraphiteScaler.GetMetricSpecForScaling()
metricName := metricSpec[0].External.Metric.Name
if metricName != testData.name {
t.Error("Wrong External metric source name:", metricName)
}
}
}

func TestGraphiteScalerAuthParams(t *testing.T) {
for _, testData := range testGraphiteAuthMetadata {
meta, err := parseGraphiteMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams})

if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}

if err == nil {
if meta.enableBasicAuth && !strings.Contains(testData.metadata["authMode"], "basic") {
t.Error("wrong auth mode detected")
}
}
}
}
2 changes: 2 additions & 0 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,8 @@ func buildScaler(client client.Client, triggerType string, config *scalers.Scale
return scalers.NewExternalPushScaler(config)
case "gcp-pubsub":
return scalers.NewPubSubScaler(config)
case "graphite":
return scalers.NewGraphiteScaler(config)
case "huawei-cloudeye":
return scalers.NewHuaweiCloudeyeScaler(config)
case "ibmmq":
Expand Down
Loading