Skip to content

Commit

Permalink
Refactor elasticsearch scaler config
Browse files Browse the repository at this point in the history
Signed-off-by: Rick Brouwer <rickbrouwer@gmail.com>
  • Loading branch information
rickbrouwer committed Aug 25, 2024
1 parent f4261e3 commit c8d137f
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 289 deletions.
254 changes: 58 additions & 196 deletions pkg/scalers/elasticsearch_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"strconv"
Expand All @@ -22,28 +21,45 @@ import (

type elasticsearchScaler struct {
metricType v2.MetricTargetType
metadata *elasticsearchMetadata
metadata elasticsearchMetadata
esClient *elasticsearch.Client
logger logr.Logger
}

type elasticsearchMetadata struct {
addresses []string
unsafeSsl bool
username string
password string
cloudID string
apiKey string
indexes []string
searchTemplateName string
parameters []string
valueLocation string
targetValue float64
activationTargetValue float64
metricName string
Addresses []string `keda:"name=addresses, order=authParams;triggerMetadata, optional"`
UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, default=false"`
Username string `keda:"name=username, order=authParams;triggerMetadata, optional"`
Password string `keda:"name=password, order=authParams;resolvedEnv;triggerMetadata, optional"`
CloudID string `keda:"name=cloudID, order=authParams;triggerMetadata, optional"`
APIKey string `keda:"name=apiKey, order=authParams;triggerMetadata, optional"`
Index []string `keda:"name=index, order=authParams;triggerMetadata"`
SearchTemplateName string `keda:"name=searchTemplateName, order=authParams;triggerMetadata"`
Parameters []string `keda:"name=parameters, order=triggerMetadata, optional"`
ValueLocation string `keda:"name=valueLocation, order=authParams;triggerMetadata"`
TargetValue float64 `keda:"name=targetValue, order=authParams;triggerMetadata"`
ActivationTargetValue float64 `keda:"name=activationTargetValue, order=triggerMetadata, default=0"`
MetricName string `keda:"name=metricName, order=triggerMetadata, optional"`

TriggerIndex int
}

func (m *elasticsearchMetadata) Validate() error {
if (m.CloudID != "" || m.APIKey != "") && (len(m.Addresses) > 0 || m.Username != "" || m.Password != "") {
return fmt.Errorf("can't provide both cloud config and endpoint addresses")
}
if (m.CloudID == "" && m.APIKey == "") && (len(m.Addresses) == 0 && m.Username == "" && m.Password == "") {
return fmt.Errorf("must provide either cloud config or endpoint addresses")
}
if (m.CloudID != "" && m.APIKey == "") || (m.CloudID == "" && m.APIKey != "") {
return fmt.Errorf("both cloudID and apiKey must be provided when cloudID or apiKey is used")
}
if len(m.Addresses) > 0 && (m.Username == "" || m.Password == "") {
return fmt.Errorf("both username and password must be provided when addresses is used")
}
return nil
}

// NewElasticsearchScaler creates a new elasticsearch scaler
func NewElasticsearchScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
Expand All @@ -69,184 +85,37 @@ func NewElasticsearchScaler(config *scalersconfig.ScalerConfig) (Scaler, error)
}, nil
}

const defaultUnsafeSsl = false

func hasCloudConfig(meta *elasticsearchMetadata) bool {
if meta.cloudID != "" {
return true
}
if meta.apiKey != "" {
return true
}
return false
}

func hasEndpointsConfig(meta *elasticsearchMetadata) bool {
if len(meta.addresses) > 0 {
return true
}
if meta.username != "" {
return true
}
if meta.password != "" {
return true
}
return false
}

func extractEndpointsConfig(config *scalersconfig.ScalerConfig, meta *elasticsearchMetadata) error {
addresses, err := GetFromAuthOrMeta(config, "addresses")
if err != nil {
return err
}

meta.addresses = splitAndTrimBySep(addresses, ",")
if val, ok := config.AuthParams["username"]; ok {
meta.username = val
} else if val, ok := config.TriggerMetadata["username"]; ok {
meta.username = val
}

if config.AuthParams["password"] != "" {
meta.password = config.AuthParams["password"]
} else if config.TriggerMetadata["passwordFromEnv"] != "" {
meta.password = config.ResolvedEnv[config.TriggerMetadata["passwordFromEnv"]]
}

return nil
}

func extractCloudConfig(config *scalersconfig.ScalerConfig, meta *elasticsearchMetadata) error {
cloudID, err := GetFromAuthOrMeta(config, "cloudID")
if err != nil {
return err
}
meta.cloudID = cloudID

apiKey, err := GetFromAuthOrMeta(config, "apiKey")
if err != nil {
return err
}
meta.apiKey = apiKey
return nil
}

var (
// ErrElasticsearchMissingAddressesOrCloudConfig is returned when endpoint addresses or cloud config is missing.
ErrElasticsearchMissingAddressesOrCloudConfig = errors.New("must provide either endpoint addresses or cloud config")

// ErrElasticsearchConfigConflict is returned when both endpoint addresses and cloud config are provided.
ErrElasticsearchConfigConflict = errors.New("can't provide endpoint addresses and cloud config at the same time")
)

func parseElasticsearchMetadata(config *scalersconfig.ScalerConfig) (*elasticsearchMetadata, error) {
func parseElasticsearchMetadata(config *scalersconfig.ScalerConfig) (elasticsearchMetadata, error) {
meta := elasticsearchMetadata{}
err := config.TypedConfig(&meta)

var err error
addresses, err := GetFromAuthOrMeta(config, "addresses")
cloudID, errCloudConfig := GetFromAuthOrMeta(config, "cloudID")
if err != nil && errCloudConfig != nil {
return nil, ErrElasticsearchMissingAddressesOrCloudConfig
}

if err == nil && addresses != "" {
err = extractEndpointsConfig(config, &meta)
if err != nil {
return nil, err
}
}
if errCloudConfig == nil && cloudID != "" {
err = extractCloudConfig(config, &meta)
if err != nil {
return nil, err
}
}

if hasEndpointsConfig(&meta) && hasCloudConfig(&meta) {
return nil, ErrElasticsearchConfigConflict
}

if val, ok := config.TriggerMetadata["unsafeSsl"]; ok {
unsafeSsl, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("error parsing unsafeSsl: %w", err)
}
meta.unsafeSsl = unsafeSsl
} else {
meta.unsafeSsl = defaultUnsafeSsl
}

index, err := GetFromAuthOrMeta(config, "index")
if err != nil {
return nil, err
}
meta.indexes = splitAndTrimBySep(index, ";")

searchTemplateName, err := GetFromAuthOrMeta(config, "searchTemplateName")
if err != nil {
return nil, err
}
meta.searchTemplateName = searchTemplateName

if val, ok := config.TriggerMetadata["parameters"]; ok {
meta.parameters = splitAndTrimBySep(val, ";")
}

valueLocation, err := GetFromAuthOrMeta(config, "valueLocation")
if err != nil {
return nil, err
}
meta.valueLocation = valueLocation

targetValueString, err := GetFromAuthOrMeta(config, "targetValue")
if err != nil {
if config.AsMetricSource {
targetValueString = "0"
} else {
return nil, err
}
}
targetValue, err := strconv.ParseFloat(targetValueString, 64)
if err != nil {
return nil, fmt.Errorf("targetValue parsing error: %w", err)
return meta, err
}
meta.targetValue = targetValue

meta.activationTargetValue = 0
if val, ok := config.TriggerMetadata["activationTargetValue"]; ok {
activationTargetValue, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("activationTargetValue parsing error: %w", err)
}
meta.activationTargetValue = activationTargetValue
}
meta.MetricName = GenerateMetricNameWithIndex(config.TriggerIndex, util.NormalizeString(fmt.Sprintf("elasticsearch-%s", meta.SearchTemplateName)))
meta.TriggerIndex = config.TriggerIndex

meta.metricName = GenerateMetricNameWithIndex(config.TriggerIndex, util.NormalizeString(fmt.Sprintf("elasticsearch-%s", meta.searchTemplateName)))
return &meta, nil
return meta, nil
}

// newElasticsearchClient creates elasticsearch db connection
func newElasticsearchClient(meta *elasticsearchMetadata, logger logr.Logger) (*elasticsearch.Client, error) {
func newElasticsearchClient(meta elasticsearchMetadata, logger logr.Logger) (*elasticsearch.Client, error) {
var config elasticsearch.Config

if hasCloudConfig(meta) {
if meta.CloudID != "" {
config = elasticsearch.Config{
CloudID: meta.cloudID,
APIKey: meta.apiKey,
CloudID: meta.CloudID,
APIKey: meta.APIKey,
}
} else {
config = elasticsearch.Config{
Addresses: meta.addresses,
}
if meta.username != "" {
config.Username = meta.username
}
if meta.password != "" {
config.Password = meta.password
Addresses: meta.Addresses,
Username: meta.Username,
Password: meta.Password,
}
}

config.Transport = util.CreateHTTPTransport(meta.unsafeSsl)
config.Transport = util.CreateHTTPTransport(meta.UnsafeSsl)
esClient, err := elasticsearch.NewClient(config)
if err != nil {
logger.Error(err, fmt.Sprintf("Found error when creating client: %s", err))
Expand All @@ -269,14 +138,14 @@ func (s *elasticsearchScaler) Close(_ context.Context) error {
func (s *elasticsearchScaler) getQueryResult(ctx context.Context) (float64, error) {
// Build the request body.
var body bytes.Buffer
if err := json.NewEncoder(&body).Encode(buildQuery(s.metadata)); err != nil {
if err := json.NewEncoder(&body).Encode(buildQuery(&s.metadata)); err != nil {
s.logger.Error(err, "Error encoding query: %s", err)
}

// Run the templated search
res, err := s.esClient.SearchTemplate(
&body,
s.esClient.SearchTemplate.WithIndex(s.metadata.indexes...),
s.esClient.SearchTemplate.WithIndex(s.metadata.Index...),
s.esClient.SearchTemplate.WithContext(ctx),
)
if err != nil {
Expand All @@ -289,7 +158,7 @@ func (s *elasticsearchScaler) getQueryResult(ctx context.Context) (float64, erro
if err != nil {
return 0, err
}
v, err := getValueFromSearch(b, s.metadata.valueLocation)
v, err := getValueFromSearch(b, s.metadata.ValueLocation)
if err != nil {
return 0, err
}
Expand All @@ -298,14 +167,16 @@ func (s *elasticsearchScaler) getQueryResult(ctx context.Context) (float64, erro

func buildQuery(metadata *elasticsearchMetadata) map[string]interface{} {
parameters := map[string]interface{}{}
for _, p := range metadata.parameters {
for _, p := range metadata.Parameters {
if p != "" {
kv := splitAndTrimBySep(p, ":")
parameters[kv[0]] = kv[1]
kv := strings.Split(p, ":")
key := strings.TrimSpace(kv[0])
value := strings.TrimSpace(kv[1])
parameters[key] = value
}
}
query := map[string]interface{}{
"id": metadata.searchTemplateName,
"id": metadata.SearchTemplateName,
}
if len(parameters) > 0 {
query["params"] = parameters
Expand Down Expand Up @@ -333,9 +204,9 @@ func getValueFromSearch(body []byte, valueLocation string) (float64, error) {
func (s *elasticsearchScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: s.metadata.metricName,
Name: s.metadata.MetricName,
},
Target: GetMetricTargetMili(s.metricType, s.metadata.targetValue),
Target: GetMetricTargetMili(s.metricType, s.metadata.TargetValue),
}
metricSpec := v2.MetricSpec{
External: externalMetric, Type: externalMetricType,
Expand All @@ -352,14 +223,5 @@ func (s *elasticsearchScaler) GetMetricsAndActivity(ctx context.Context, metricN

metric := GenerateMetricInMili(metricName, num)

return []external_metrics.ExternalMetricValue{metric}, num > s.metadata.activationTargetValue, nil
}

// Splits a string separated by a specified separator and trims space from all the elements.
func splitAndTrimBySep(s string, sep string) []string {
x := strings.Split(s, sep)
for i := range x {
x[i] = strings.Trim(x[i], " ")
}
return x
return []external_metrics.ExternalMetricValue{metric}, num > s.metadata.ActivationTargetValue, nil
}
Loading

0 comments on commit c8d137f

Please sign in to comment.