Skip to content

Commit

Permalink
Refactor arangodb scaler config (kedacore#5808)
Browse files Browse the repository at this point in the history
Signed-off-by: dttung2905 <ttdao.2015@accountancy.smu.edu.sg>
Signed-off-by: uucloud <uucloud@qq.com>
  • Loading branch information
dttung2905 authored and uucloud committed Jul 11, 2024
1 parent 8ca968d commit 14e166e
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 94 deletions.
122 changes: 31 additions & 91 deletions pkg/scalers/arangodb_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package scalers
import (
"context"
"fmt"
"strconv"
"strings"

driver "github.com/arangodb/go-driver"
Expand Down Expand Up @@ -31,37 +30,37 @@ type dbResult struct {

// arangoDBMetadata specify arangoDB scaler params.
type arangoDBMetadata struct {
// Specify arangoDB server endpoint URL or comma separated URL endpoints of all the coordinators.
// Specify arangoDB server endpoint URL or comma separated URL Endpoints of all the coordinators.
// +required
endpoints string
Endpoints string `keda:"name=endpoints, order=authParams;triggerMetadata"`
// Authentication parameters for connecting to the database
// +required
arangoDBAuth *authentication.AuthMeta
ArangoDBAuth *authentication.Config `keda:"optional"`
// Specify the unique arangoDB server ID. Only required if bearer JWT is being used.
// +optional
serverID string

// The name of the database to be queried.
// +required
dbName string
// The name of the collection to be queried.
// The name of the Collection to be queried.
// +required
collection string
// The arangoDB query to be executed.
Collection string `keda:"name=collection, order=triggerMetadata"`
// The arangoDB Query to be executed.
// +required
query string
Query string `keda:"name=query, order=triggerMetadata"`
// A threshold that is used as targetAverageValue in HPA.
// +required
queryValue float64
QueryValue float64 `keda:"name=queryValue, order=triggerMetadata, default=0"`
// A threshold that is used to check if scaler is active.
// +optional
activationQueryValue float64
ActivationQueryValue float64 `keda:"name=activationQueryValue, order=triggerMetadata, default=0"`
// Specify whether to verify the server's certificate chain and host name.
// +optional
unsafeSsl bool
UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata ,default=false"`
// Specify the max size of the active connection pool.
// +optional
connectionLimit int64
ConnectionLimit int64 `keda:"name=connectionLimit, order=triggerMetadata, optional"`

// The index of the scaler inside the ScaledObject
// +internal
Expand Down Expand Up @@ -97,17 +96,17 @@ func getNewArangoDBClient(meta *arangoDBMetadata) (driver.Client, error) {
var auth driver.Authentication

conn, err := http.NewConnection(http.ConnectionConfig{
Endpoints: strings.Split(meta.endpoints, ","),
TLSConfig: util.CreateTLSClientConfig(meta.unsafeSsl),
Endpoints: strings.Split(meta.Endpoints, ","),
TLSConfig: util.CreateTLSClientConfig(meta.UnsafeSsl),
})
if err != nil {
return nil, fmt.Errorf("failed to create a new http connection, %w", err)
}

if meta.arangoDBAuth.EnableBasicAuth {
auth = driver.BasicAuthentication(meta.arangoDBAuth.Username, meta.arangoDBAuth.Password)
} else if meta.arangoDBAuth.EnableBearerAuth {
hdr, err := jwt.CreateArangodJwtAuthorizationHeader(meta.arangoDBAuth.BearerToken, meta.serverID)
if meta.ArangoDBAuth.EnabledBasicAuth() {
auth = driver.BasicAuthentication(meta.ArangoDBAuth.Username, meta.ArangoDBAuth.Password)
} else if meta.ArangoDBAuth.EnabledBearerAuth() {
hdr, err := jwt.CreateArangodJwtAuthorizationHeader(meta.ArangoDBAuth.BearerToken, meta.serverID)
if err != nil {
return nil, fmt.Errorf("failed to create bearer token authorization header, %w", err)
}
Expand All @@ -128,48 +127,14 @@ func getNewArangoDBClient(meta *arangoDBMetadata) (driver.Client, error) {

func parseArangoDBMetadata(config *scalersconfig.ScalerConfig) (*arangoDBMetadata, error) {
// setting default metadata
meta := arangoDBMetadata{}

// parse metaData from ScaledJob config
endpoints, err := GetFromAuthOrMeta(config, "endpoints")
if err != nil {
return nil, err
}
meta.endpoints = endpoints

if val, ok := config.TriggerMetadata["collection"]; ok {
meta.collection = val
} else {
return nil, fmt.Errorf("no collection given")
}

if val, ok := config.TriggerMetadata["query"]; ok {
meta.query = val
} else {
return nil, fmt.Errorf("no query given")
}

if val, ok := config.TriggerMetadata["queryValue"]; ok {
queryValue, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("failed to convert queryValue to int, %w", err)
}
meta.queryValue = queryValue
} else {
if config.AsMetricSource {
meta.queryValue = 0
} else {
return nil, fmt.Errorf("no queryValue given")
}
meta := &arangoDBMetadata{}
meta.triggerIndex = config.TriggerIndex
if err := config.TypedConfig(meta); err != nil {
return nil, fmt.Errorf("error parsing arango metadata: %w", err)
}

meta.activationQueryValue = 0
if val, ok := config.TriggerMetadata["activationQueryValue"]; ok {
activationQueryValue, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("failed to convert activationQueryValue to int, %w", err)
}
meta.activationQueryValue = activationQueryValue
if !config.AsMetricSource && meta.QueryValue == 0 {
return nil, fmt.Errorf("no QueryValue given")
}

dbName, err := GetFromAuthOrMeta(config, "dbName")
Expand All @@ -178,32 +143,7 @@ func parseArangoDBMetadata(config *scalersconfig.ScalerConfig) (*arangoDBMetadat
}
meta.dbName = dbName

meta.unsafeSsl = false
if val, ok := config.TriggerMetadata["unsafeSsl"]; ok && val != "" {
unsafeSslValue, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("failed to parse unsafeSsl, %w", err)
}
meta.unsafeSsl = unsafeSslValue
}

if val, ok := config.TriggerMetadata["connectionLimit"]; ok {
connectionLimit, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("failed to convert connectionLimit to int, %w", err)
}
meta.connectionLimit = connectionLimit
}

// parse auth configs from ScalerConfig
arangoDBAuth, err := authentication.GetAuthConfigs(config.TriggerMetadata, config.AuthParams)
if err != nil {
return nil, err
}
meta.arangoDBAuth = arangoDBAuth

meta.triggerIndex = config.TriggerIndex
return &meta, nil
return meta, nil
}

// Close disposes of arangoDB connections
Expand All @@ -226,18 +166,18 @@ func (s *arangoDBScaler) getQueryResult(ctx context.Context) (float64, error) {
return -1, fmt.Errorf("failed to connect to %s db, %w", s.metadata.dbName, err)
}

collectionExists, err := db.CollectionExists(ctx, s.metadata.collection)
collectionExists, err := db.CollectionExists(ctx, s.metadata.Collection)
if err != nil {
return -1, fmt.Errorf("failed to check if %s collection exists, %w", s.metadata.collection, err)
return -1, fmt.Errorf("failed to check if %s collection exists, %w", s.metadata.Collection, err)
}

if !collectionExists {
return -1, fmt.Errorf("%s collection not found in %s database", s.metadata.collection, s.metadata.dbName)
return -1, fmt.Errorf("%s collection not found in %s database", s.metadata.Collection, s.metadata.dbName)
}

ctx = driver.WithQueryCount(ctx)

cursor, err := db.Query(ctx, s.metadata.query, nil)
cursor, err := db.Query(ctx, s.metadata.Query, nil)
if err != nil {
return -1, fmt.Errorf("failed to execute the query, %w", err)
}
Expand All @@ -256,7 +196,7 @@ func (s *arangoDBScaler) getQueryResult(ctx context.Context) (float64, error) {
return result.Value, nil
}

// GetMetricsAndActivity query from arangoDB, and return to external metrics and activity
// GetMetricsAndActivity Query from arangoDB, and return to external metrics and activity
func (s *arangoDBScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
num, err := s.getQueryResult(ctx)
if err != nil {
Expand All @@ -265,7 +205,7 @@ func (s *arangoDBScaler) GetMetricsAndActivity(ctx context.Context, metricName s

metric := GenerateMetricInMili(metricName, num)

return append([]external_metrics.ExternalMetricValue{}, metric), num > s.metadata.activationQueryValue, nil
return append([]external_metrics.ExternalMetricValue{}, metric), num > s.metadata.ActivationQueryValue, nil
}

// GetMetricSpecForScaling get the query value for scaling
Expand All @@ -274,7 +214,7 @@ func (s *arangoDBScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpe
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, "arangodb"),
},
Target: GetMetricTargetMili(s.metricType, s.metadata.queryValue),
Target: GetMetricTargetMili(s.metricType, s.metadata.QueryValue),
}
metricSpec := v2.MetricSpec{
External: externalMetric, Type: externalMetricType,
Expand Down
6 changes: 3 additions & 3 deletions pkg/scalers/arangodb_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ var arangoDBMetricIdentifiers = []arangoDBMetricIdentifier{
}

func TestParseArangoDBMetadata(t *testing.T) {
for _, testData := range testArangoDBMetadata {
for idx, testData := range testArangoDBMetadata {
_, err := parseArangoDBMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams})
if err != nil && !testData.raisesError {
t.Error("Expected success but got error:", err)
t.Errorf("Test %v: expected success but got error: %s", idx, err)
}
if err == nil && testData.raisesError {
t.Error("Expected error but got success")
Expand All @@ -103,7 +103,7 @@ func TestArangoDBScalerAuthParams(t *testing.T) {
}

if err == nil {
if meta.arangoDBAuth.EnableBasicAuth && !strings.Contains(testData.metadata["authModes"], "basic") {
if meta.ArangoDBAuth.EnabledBasicAuth() && !strings.Contains(testData.metadata["authModes"], "basic") {
t.Error("wrong auth mode detected")
}
}
Expand Down

0 comments on commit 14e166e

Please sign in to comment.