Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graphite Scaler + e2e tests #2092

Merged
merged 4 commits into from
Sep 15, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

- TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX))
- ScaledJob: introduce MultipleScalersCalculation ([#2016](https://github.com/kedacore/keda/pull/2016))
- Add Graphite Scaler ([#1628](https://github.com/kedacore/keda/pull/2092))

### Improvements

Expand Down
6 changes: 6 additions & 0 deletions CREATE-NEW-SCALER.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,9 @@ The constructor should have the following parameters:
## Lifecycle of a scaler

The scaler is created and closed everytime KEDA or HPA wants to call `GetMetrics`, and everytime a new ScaledObject is created or updated that has a trigger for that scaler. Thus, a developer of a scaler should not assume that the scaler will maintain any state between these calls.

## Note
The scaler code is embedded into the two separate binaries comprising KEDA, the operator and the custom metrics server component. The metrics server must be occasionally rebuilt published and deployed to k8s for it to have the same code as your operator.

GetMetricSpecForScaling() is executed by the operator for the purposes of scaling up to and down to 0 replicas.
GetMetrics() is executed by the custom metrics server in response to a calls against the external metrics api, whether by the HPA loop or by curl
211 changes: 211 additions & 0 deletions pkg/scalers/graphite_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
package scalers

import (
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
url_pkg "net/url"
"strconv"

v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)

const (
grapServerAddress = "serverAddress"
grapMetricName = "metricName"
grapQuery = "query"
grapThreshold = "threshold"
grapQueryTime = "queryTime"
)

type graphiteScaler struct {
metadata *graphiteMetadata
httpClient *http.Client
}

type graphiteMetadata struct {
serverAddress string
metricName string
query string
threshold int
from string

// basic auth
enableBasicAuth bool
username string
password string // +optional
}

type grapQueryResult []struct {
Target string `json:"target"`
Tags map[string]interface{} `json:"tags"`
Datapoints [][]float64 `json:"datapoints"`
}

var graphiteLog = logf.Log.WithName("graphite_scaler")

// NewGraphiteScaler creates a new graphiteScaler
func NewGraphiteScaler(config *ScalerConfig) (Scaler, error) {
meta, err := parseGraphiteMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing graphite metadata: %s", err)
}

httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout)

return &graphiteScaler{
metadata: meta,
httpClient: httpClient,
}, nil
}

func parseGraphiteMetadata(config *ScalerConfig) (*graphiteMetadata, error) {
meta := graphiteMetadata{}

if val, ok := config.TriggerMetadata[grapServerAddress]; ok && val != "" {
meta.serverAddress = val
} else {
return nil, fmt.Errorf("no %s given", grapServerAddress)
}

if val, ok := config.TriggerMetadata[grapQuery]; ok && val != "" {
meta.query = val
} else {
return nil, fmt.Errorf("no %s given", grapQuery)
}

if val, ok := config.TriggerMetadata[grapMetricName]; ok && val != "" {
meta.metricName = val
} else {
return nil, fmt.Errorf("no %s given", grapMetricName)
}

if val, ok := config.TriggerMetadata[grapQueryTime]; ok && val != "" {
meta.from = val
} else {
return nil, fmt.Errorf("no %s given", grapQueryTime)
}

if val, ok := config.TriggerMetadata[grapThreshold]; ok && val != "" {
t, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("error parsing %s: %s", grapThreshold, err)
}

meta.threshold = t
}

_, ok := config.TriggerMetadata["authMode"]
bpinske marked this conversation as resolved.
Show resolved Hide resolved
// no authMode specified
if !ok {
return &meta, nil
}

if len(config.AuthParams["username"]) == 0 {
return nil, errors.New("no username given")
}

meta.username = config.AuthParams["username"]
// password is optional. For convenience, many application implement basic auth with
// username as apikey and password as empty
meta.password = config.AuthParams["password"]
meta.enableBasicAuth = true

return &meta, nil
}

func (s *graphiteScaler) IsActive(ctx context.Context) (bool, error) {
val, err := s.ExecuteGrapQuery()
if err != nil {
graphiteLog.Error(err, "error executing graphite query")
return false, err
}

return val > 0, nil
}

func (s *graphiteScaler) Close() error {
return nil
}

func (s *graphiteScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
targetMetricValue := resource.NewQuantity(int64(s.metadata.threshold), resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s-%s", "graphite", s.metadata.serverAddress, s.metadata.metricName)),
bpinske marked this conversation as resolved.
Show resolved Hide resolved
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetMetricValue,
},
}
metricSpec := v2beta2.MetricSpec{
External: externalMetric, Type: externalMetricType,
}
return []v2beta2.MetricSpec{metricSpec}
}

func (s *graphiteScaler) ExecuteGrapQuery() (float64, error) {
queryEscaped := url_pkg.QueryEscape(s.metadata.query)
url := fmt.Sprintf("%s/render?from=%s&target=%s&format=json", s.metadata.serverAddress, s.metadata.from, queryEscaped)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return -1, err
}
if s.metadata.enableBasicAuth {
req.SetBasicAuth(s.metadata.username, s.metadata.password)
}
r, err := s.httpClient.Do(req)
if err != nil {
return -1, err
}

b, err := ioutil.ReadAll(r.Body)
if err != nil {
return -1, err
}
r.Body.Close()

var result grapQueryResult
err = json.Unmarshal(b, &result)
if err != nil {
return -1, err
}

if len(result) == 0 {
return 0, nil
} else if len(result) > 1 {
return -1, fmt.Errorf("graphite query %s returned multiple series", s.metadata.query)
}

// https://graphite-api.readthedocs.io/en/latest/api.html#json
datapoint := result[0].Datapoints[0][0]

return datapoint, nil
}

func (s *graphiteScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
val, err := s.ExecuteGrapQuery()
if err != nil {
graphiteLog.Error(err, "error executing graphite query")
return []external_metrics.ExternalMetricValue{}, err
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(val), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}
67 changes: 67 additions & 0 deletions pkg/scalers/graphite_scaler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package scalers

import (
"testing"
)

type parseGraphiteMetadataTestData struct {
metadata map[string]string
isError bool
}

type graphiteMetricIdentifier struct {
metadataTestData *parseGraphiteMetadataTestData
name string
}

var testGrapMetadata = []parseGraphiteMetadataTestData{
{map[string]string{}, true},
// all properly formed
{map[string]string{"serverAddress": "http://localhost:81", "metricName": "request-count", "threshold": "100", "query": "stats.counters.http.hello-world.request.count.count", "queryTime": "-30Seconds", "disableScaleToZero": "true"}, false},
// missing serverAddress
{map[string]string{"serverAddress": "", "grapmetricName": "request-count", "threshold": "100", "query": "stats.counters.http.hello-world.request.count.count", "queryTime": "-30Seconds", "disableScaleToZero": "true"}, true},
// missing metricName
{map[string]string{"serverAddress": "http://localhost:81", "metricName": "", "threshold": "100", "query": "stats.counters.http.hello-world.request.count.count", "queryTime": "-30Seconds", "disableScaleToZero": "true"}, true},
// malformed threshold
{map[string]string{"serverAddress": "http://localhost:81", "metricName": "request-count", "threshold": "one", "query": "stats.counters.http.hello-world.request.count.count", "queryTime": "-30Seconds", "disableScaleToZero": "true"}, true},
// missing query
{map[string]string{"serverAddress": "http://localhost:81", "metricName": "request-count", "threshold": "100", "query": "", "queryTime": "-30Seconds", "disableScaleToZero": "true"}, true},
// missing queryTime
{map[string]string{"serverAddress": "http://localhost:81", "metricName": "request-count", "threshold": "100", "query": "stats.counters.http.hello-world.request.count.count", "queryTime": "", "disableScaleToZero": "true"}, true},
// all properly formed, default disableScaleToZero
{map[string]string{"serverAddress": "http://localhost:81", "metricName": "request-count", "threshold": "100", "queryTime": "-30Seconds", "query": "stats.counters.http.hello-world.request.count.count"}, false},
bpinske marked this conversation as resolved.
Show resolved Hide resolved
}

var graphiteMetricIdentifiers = []graphiteMetricIdentifier{
{&testGrapMetadata[1], "graphite-http---localhost-81-request-count"},
}

func TestGraphiteParseMetadata(t *testing.T) {
for _, testData := range testGrapMetadata {
_, err := parseGraphiteMetadata(&ScalerConfig{TriggerMetadata: testData.metadata})
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
}
}

func TestGraphiteGetMetricSpecForScaling(t *testing.T) {
for _, testData := range graphiteMetricIdentifiers {
meta, err := parseGraphiteMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
mockGraphiteScaler := graphiteScaler{
metadata: meta,
}

metricSpec := mockGraphiteScaler.GetMetricSpecForScaling()
metricName := metricSpec[0].External.Metric.Name
if metricName != testData.name {
t.Error("Wrong External metric source name:", metricName)
}
}
}
2 changes: 2 additions & 0 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,8 @@ func buildScaler(client client.Client, triggerType string, config *scalers.Scale
return scalers.NewExternalPushScaler(config)
case "gcp-pubsub":
return scalers.NewPubSubScaler(config)
case "graphite":
return scalers.NewGraphiteScaler(config)
case "huawei-cloudeye":
return scalers.NewHuaweiCloudeyeScaler(config)
case "ibmmq":
Expand Down
Loading