Skip to content

Commit

Permalink
Add Graphite Scaler + e2e tests (#2092)
Browse files Browse the repository at this point in the history
Co-authored-by: shuoliu1 <shuo_liu1@163.com>
Co-authored-by: 刘烁 <shuo.liu@leyantech.com>
Co-authored-by: Tom Kerkhove <kerkhove.tom@gmail.com>
Co-authored-by: Brandon Pinske <brandon.pinske@crowdstrike.com>

Signed-off-by: shuoliu1 shuo_liu1@163.com
Signed-off-by: Brandon Pinske <brandon@pinske.info>
Signed-off-by: Brandon Pinske <brandon.pinske@crowdstrike.com>
  • Loading branch information
bpinske committed Sep 15, 2021
1 parent af5981a commit ebd8e28
Show file tree
Hide file tree
Showing 7 changed files with 836 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

- TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX))
- ScaledJob: introduce MultipleScalersCalculation ([#2016](https://github.com/kedacore/keda/pull/2016))
- Add Graphite Scaler ([#1628](https://github.com/kedacore/keda/pull/2092))

### Improvements

Expand Down
6 changes: 6 additions & 0 deletions CREATE-NEW-SCALER.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,9 @@ The constructor should have the following parameters:
## Lifecycle of a scaler

The scaler is created and closed everytime KEDA or HPA wants to call `GetMetrics`, and everytime a new ScaledObject is created or updated that has a trigger for that scaler. Thus, a developer of a scaler should not assume that the scaler will maintain any state between these calls.

## Note
The scaler code is embedded into the two separate binaries comprising KEDA, the operator and the custom metrics server component. The metrics server must be occasionally rebuilt published and deployed to k8s for it to have the same code as your operator.

GetMetricSpecForScaling() is executed by the operator for the purposes of scaling up to and down to 0 replicas.
GetMetrics() is executed by the custom metrics server in response to a calls against the external metrics api, whether by the HPA loop or by curl
213 changes: 213 additions & 0 deletions pkg/scalers/graphite_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package scalers

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
url_pkg "net/url"
"strconv"

v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)

const (
grapServerAddress = "serverAddress"
grapMetricName = "metricName"
grapQuery = "query"
grapThreshold = "threshold"
grapQueryTime = "queryTime"
)

type graphiteScaler struct {
metadata *graphiteMetadata
httpClient *http.Client
}

type graphiteMetadata struct {
serverAddress string
metricName string
query string
threshold int
from string

// basic auth
enableBasicAuth bool
username string
password string // +optional
}

type grapQueryResult []struct {
Target string `json:"target"`
Tags map[string]interface{} `json:"tags"`
Datapoints [][]float64 `json:"datapoints"`
}

var graphiteLog = logf.Log.WithName("graphite_scaler")

// NewGraphiteScaler creates a new graphiteScaler
func NewGraphiteScaler(config *ScalerConfig) (Scaler, error) {
meta, err := parseGraphiteMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing graphite metadata: %s", err)
}

httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout)

return &graphiteScaler{
metadata: meta,
httpClient: httpClient,
}, nil
}

func parseGraphiteMetadata(config *ScalerConfig) (*graphiteMetadata, error) {
meta := graphiteMetadata{}

if val, ok := config.TriggerMetadata[grapServerAddress]; ok && val != "" {
meta.serverAddress = val
} else {
return nil, fmt.Errorf("no %s given", grapServerAddress)
}

if val, ok := config.TriggerMetadata[grapQuery]; ok && val != "" {
meta.query = val
} else {
return nil, fmt.Errorf("no %s given", grapQuery)
}

if val, ok := config.TriggerMetadata[grapMetricName]; ok && val != "" {
meta.metricName = val
} else {
return nil, fmt.Errorf("no %s given", grapMetricName)
}

if val, ok := config.TriggerMetadata[grapQueryTime]; ok && val != "" {
meta.from = val
} else {
return nil, fmt.Errorf("no %s given", grapQueryTime)
}

if val, ok := config.TriggerMetadata[grapThreshold]; ok && val != "" {
t, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("error parsing %s: %s", grapThreshold, err)
}

meta.threshold = t
}

val, ok := config.TriggerMetadata["authMode"]
// no authMode specified
if !ok {
return &meta, nil
}
if val != "basic" {
return nil, fmt.Errorf("authMode must be 'basic'")
}

if len(config.AuthParams["username"]) == 0 {
return nil, fmt.Errorf("no username given")
}

meta.username = config.AuthParams["username"]
// password is optional. For convenience, many application implement basic auth with
// username as apikey and password as empty
meta.password = config.AuthParams["password"]
meta.enableBasicAuth = true

return &meta, nil
}

func (s *graphiteScaler) IsActive(ctx context.Context) (bool, error) {
val, err := s.ExecuteGrapQuery()
if err != nil {
graphiteLog.Error(err, "error executing graphite query")
return false, err
}

return val > 0, nil
}

func (s *graphiteScaler) Close() error {
return nil
}

func (s *graphiteScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
targetMetricValue := resource.NewQuantity(int64(s.metadata.threshold), resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s", "graphite", s.metadata.metricName)),
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetMetricValue,
},
}
metricSpec := v2beta2.MetricSpec{
External: externalMetric, Type: externalMetricType,
}
return []v2beta2.MetricSpec{metricSpec}
}

func (s *graphiteScaler) ExecuteGrapQuery() (float64, error) {
queryEscaped := url_pkg.QueryEscape(s.metadata.query)
url := fmt.Sprintf("%s/render?from=%s&target=%s&format=json", s.metadata.serverAddress, s.metadata.from, queryEscaped)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return -1, err
}
if s.metadata.enableBasicAuth {
req.SetBasicAuth(s.metadata.username, s.metadata.password)
}
r, err := s.httpClient.Do(req)
if err != nil {
return -1, err
}

b, err := ioutil.ReadAll(r.Body)
if err != nil {
return -1, err
}
r.Body.Close()

var result grapQueryResult
err = json.Unmarshal(b, &result)
if err != nil {
return -1, err
}

if len(result) == 0 {
return 0, nil
} else if len(result) > 1 {
return -1, fmt.Errorf("graphite query %s returned multiple series", s.metadata.query)
}

// https://graphite-api.readthedocs.io/en/latest/api.html#json
datapoint := result[0].Datapoints[0][0]

return datapoint, nil
}

func (s *graphiteScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
val, err := s.ExecuteGrapQuery()
if err != nil {
graphiteLog.Error(err, "error executing graphite query")
return []external_metrics.ExternalMetricValue{}, err
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(val), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}
100 changes: 100 additions & 0 deletions pkg/scalers/graphite_scaler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package scalers

import (
"strings"
"testing"
)

type parseGraphiteMetadataTestData struct {
metadata map[string]string
isError bool
}

type graphiteMetricIdentifier struct {
metadataTestData *parseGraphiteMetadataTestData
name string
}

var testGrapMetadata = []parseGraphiteMetadataTestData{
{map[string]string{}, true},
// all properly formed
{map[string]string{"serverAddress": "http://localhost:81", "metricName": "request-count", "threshold": "100", "query": "stats.counters.http.hello-world.request.count.count", "queryTime": "-30Seconds"}, false},
// missing serverAddress
{map[string]string{"serverAddress": "", "metricName": "request-count", "threshold": "100", "query": "stats.counters.http.hello-world.request.count.count", "queryTime": "-30Seconds"}, true},
// missing metricName
{map[string]string{"serverAddress": "http://localhost:81", "metricName": "", "threshold": "100", "query": "stats.counters.http.hello-world.request.count.count", "queryTime": "-30Seconds"}, true},
// malformed threshold
{map[string]string{"serverAddress": "http://localhost:81", "metricName": "request-count", "threshold": "one", "query": "stats.counters.http.hello-world.request.count.count", "queryTime": "-30Seconds"}, true},
// missing query
{map[string]string{"serverAddress": "http://localhost:81", "metricName": "request-count", "threshold": "100", "query": "", "queryTime": "-30Seconds", "disableScaleToZero": "true"}, true},
// missing queryTime
{map[string]string{"serverAddress": "http://localhost:81", "metricName": "request-count", "threshold": "100", "query": "stats.counters.http.hello-world.request.count.count", "queryTime": ""}, true},
}

var graphiteMetricIdentifiers = []graphiteMetricIdentifier{
{&testGrapMetadata[1], "graphite-request-count"},
}

type graphiteAuthMetadataTestData struct {
metadata map[string]string
authParams map[string]string
isError bool
}

var testGraphiteAuthMetadata = []graphiteAuthMetadataTestData{
// success basicAuth
{map[string]string{"serverAddress": "http://localhost:81", "metricName": "request-count", "threshold": "100", "query": "stats.counters.http.hello-world.request.count.count", "queryTime": "-30Seconds", "authMode": "basic"}, map[string]string{"username": "user", "password": "pass"}, false},
// fail basicAuth with no username
{map[string]string{"serverAddress": "http://localhost:81", "metricName": "request-count", "threshold": "100", "query": "stats.counters.http.hello-world.request.count.count", "queryTime": "-30Seconds", "authMode": "basic"}, map[string]string{}, true},
// fail if using non-basicAuth authMode
{map[string]string{"serverAddress": "http://localhost:81", "metricName": "request-count", "threshold": "100", "query": "stats.counters.http.hello-world.request.count.count", "queryTime": "-30Seconds", "authMode": "tls"}, map[string]string{"username": "user"}, true},
}

func TestGraphiteParseMetadata(t *testing.T) {
for _, testData := range testGrapMetadata {
_, err := parseGraphiteMetadata(&ScalerConfig{TriggerMetadata: testData.metadata})
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
}
}

func TestGraphiteGetMetricSpecForScaling(t *testing.T) {
for _, testData := range graphiteMetricIdentifiers {
meta, err := parseGraphiteMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
mockGraphiteScaler := graphiteScaler{
metadata: meta,
}

metricSpec := mockGraphiteScaler.GetMetricSpecForScaling()
metricName := metricSpec[0].External.Metric.Name
if metricName != testData.name {
t.Error("Wrong External metric source name:", metricName)
}
}
}

func TestGraphiteScalerAuthParams(t *testing.T) {
for _, testData := range testGraphiteAuthMetadata {
meta, err := parseGraphiteMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams})

if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}

if err == nil {
if meta.enableBasicAuth && !strings.Contains(testData.metadata["authMode"], "basic") {
t.Error("wrong auth mode detected")
}
}
}
}
2 changes: 2 additions & 0 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,8 @@ func buildScaler(client client.Client, triggerType string, config *scalers.Scale
return scalers.NewExternalPushScaler(config)
case "gcp-pubsub":
return scalers.NewPubSubScaler(config)
case "graphite":
return scalers.NewGraphiteScaler(config)
case "huawei-cloudeye":
return scalers.NewHuaweiCloudeyeScaler(config)
case "ibmmq":
Expand Down
Loading

0 comments on commit ebd8e28

Please sign in to comment.