Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an elasticsearch scaler based on search template #2304

Closed
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ repos:
entry: "(?i)(black|white)[_-]?(list|List)"
pass_filenames: true
- id: sort-scalers
name: Check if scalers are sorted in scaler_handler.go
name: Check if scalers are sorted in scale_handler.go
language: system
entry: "bash tools/sort_scalers.sh"
files: .*scale_handler\.go$
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
- Improve Redis Scaler, upgrade library, add username and Sentinel support ([#2181](https://github.com/kedacore/keda/pull/2181))
- Add GCP identity authentication when using Pubsub Scaler ([#2225](https://github.com/kedacore/keda/pull/2225))
- Add ScalersCache to reuse scalers unless they need changing ([#2187](https://github.com/kedacore/keda/pull/2187))
- Add an elasticsearch scaler based on search template ([#2304](https://github.com/kedacore/keda/pull/2304))

### Improvements

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/Shopify/sarama v1.30.0
github.com/aws/aws-sdk-go v1.42.3
github.com/denisenkom/go-mssqldb v0.11.0
github.com/elastic/go-elasticsearch/v7 v7.15.1
github.com/go-logr/logr v0.4.0
github.com/go-playground/assert/v2 v2.0.1
github.com/go-redis/redis/v8 v8.11.4
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/elastic/go-elasticsearch/v7 v7.15.1 h1:Wd8RLHb5D8xPBU8vGlnLXyflkso9G+rCmsXjqH8LLQQ=
github.com/elastic/go-elasticsearch/v7 v7.15.1/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
Expand Down
277 changes: 277 additions & 0 deletions pkg/scalers/elasticsearch_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
package scalers

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"strings"

"github.com/elastic/go-elasticsearch/v7"
kedautil "github.com/kedacore/keda/v2/pkg/util"
"github.com/tidwall/gjson"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

type elasticsearchScaler struct {
metadata *elasticsearchMetadata
esClient *elasticsearch.Client
}

type elasticsearchMetadata struct {
addresses []string
unsafeSsl bool
username string
password string
indexes []string
searchTemplateName string
parameters []string
valueLocation string
targetValue int
}

var elasticsearchLog = logf.Log.WithName("elasticsearch_scaler")

// NewElasticsearchScaler creates a new elasticsearch scaler
func NewElasticsearchScaler(config *ScalerConfig) (Scaler, error) {
meta, err := parseElasticsearchMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing elasticsearch metadata: %s", err)
}

esClient, err := newElasticsearchClient(meta)
if err != nil {
return nil, fmt.Errorf("error getting elasticsearch client: %s", err)
}
return &elasticsearchScaler{
metadata: meta,
esClient: esClient,
}, nil
}

const defaultUnsafeSsl = false

func parseElasticsearchMetadata(config *ScalerConfig) (*elasticsearchMetadata, error) {
meta := elasticsearchMetadata{}

var err error
addresses, err := GetFromAuthOrMeta(config, "addresses")
if err != nil {
return nil, err
}
meta.addresses = splitAndTrimBySep(addresses, ",")

if val, ok := config.TriggerMetadata["unsafeSsl"]; ok {
meta.unsafeSsl, err = strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("error parsing unsafeSsL: %s", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return nil, fmt.Errorf("error parsing unsafeSsL: %s", err)
return nil, fmt.Errorf("error parsing unsafeSsl: %s", err)

Does the last L need to be in capital letter?

}
} else {
meta.unsafeSsl = defaultUnsafeSsl
}

if val, ok := config.AuthParams["username"]; ok {
meta.username = val
} else if val, ok := config.TriggerMetadata["username"]; ok {
meta.username = val
}

if config.AuthParams["password"] != "" {
meta.password = config.AuthParams["password"]
} else if config.TriggerMetadata["passwordFromEnv"] != "" {
meta.password = config.ResolvedEnv[config.TriggerMetadata["passwordFromEnv"]]
}

index, err := GetFromAuthOrMeta(config, "index")
if err != nil {
return nil, err
}
meta.indexes = splitAndTrimBySep(index, ";")

meta.searchTemplateName, err = GetFromAuthOrMeta(config, "searchTemplateName")
if err != nil {
return nil, err
}

if val, ok := config.TriggerMetadata["parameters"]; ok {
meta.parameters = splitAndTrimBySep(val, ";")
}

meta.valueLocation, err = GetFromAuthOrMeta(config, "valueLocation")
if err != nil {
return nil, err
}

targetValue, err := GetFromAuthOrMeta(config, "targetValue")
if err != nil {
return nil, err
}
meta.targetValue, err = strconv.Atoi(targetValue)
if err != nil {
return nil, fmt.Errorf("targetValue parsing error %s", err.Error())
}

return &meta, nil
}

// newElasticsearchClient creates elasticsearch db connection
func newElasticsearchClient(meta *elasticsearchMetadata) (*elasticsearch.Client, error) {
config := elasticsearch.Config{Addresses: meta.addresses}
if meta.username != "" {
config.Username = meta.username
}
if meta.password != "" {
config.Password = meta.password
}

if meta.unsafeSsl {
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
config.Transport = tr
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not an expert in elastic and maybe it's not possible but, could we avoid the if block using something like this?

Suggested change
if meta.unsafeSsl {
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
config.Transport = tr
}
config.Transport = &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: meta.unsafeSsl},
}

We are using that approach inside utils package for the HTTPClient

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make senses. I dig further to check


esClient, err := elasticsearch.NewClient(config)
if err != nil {
elasticsearchLog.Error(err, fmt.Sprintf("Found error when creating client: %s", err))
return nil, err
}

_, err = esClient.Info()
if err != nil {
elasticsearchLog.Error(err, fmt.Sprintf("Found error when pinging search engine: %s", err))
return nil, err
}
return esClient, nil
}

func (s *elasticsearchScaler) Close(ctx context.Context) error {
return nil
}

// IsActive returns true if there are pending messages to be processed
func (s *elasticsearchScaler) IsActive(ctx context.Context) (bool, error) {
messages, err := s.getQueryResult()
if err != nil {
elasticsearchLog.Error(err, fmt.Sprintf("Error inspecting elasticsearch: %s", err))
return false, err
}
return messages > 0, nil
}

// getQueryResult returns result of the scaler query
func (s *elasticsearchScaler) getQueryResult() (int, error) {
// Build the request body.
var body bytes.Buffer
if err := json.NewEncoder(&body).Encode(buildQuery(s.metadata)); err != nil {
elasticsearchLog.Error(err, "Error encoding query: %s", err)
}

// Run the templated search
res, err := s.esClient.SearchTemplate(
&body,
s.esClient.SearchTemplate.WithIndex(s.metadata.indexes...),
Copy link
Member

@JorTurFer JorTurFer Nov 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should pass the context until here because you could add .WithContext. With this change, the request context can be managed from outside the scaler.

)
if err != nil {
elasticsearchLog.Error(err, fmt.Sprintf("Could not query elasticsearch: %s", err))
return 0, err
}

defer res.Body.Close()
b, err := ioutil.ReadAll(res.Body)
if err != nil {
return 0, err
}
v, err := getValueFromSearch(b, s.metadata.valueLocation)
if err != nil {
return 0, err
}
return v, nil
}

func buildQuery(metadata *elasticsearchMetadata) map[string]interface{} {
parameters := map[string]interface{}{}
for _, p := range metadata.parameters {
if p != "" {
kv := splitAndTrimBySep(p, ":")
parameters[kv[0]] = kv[1]
}
}
query := map[string]interface{}{
"id": metadata.searchTemplateName,
}
if len(parameters) > 0 {
query["params"] = parameters
}
return query
}

func getValueFromSearch(body []byte, valueLocation string) (int, error) {
r := gjson.GetBytes(body, valueLocation)
errorMsg := "valueLocation must point to value of type number but got: '%s'"
if r.Type == gjson.String {
q, err := strconv.Atoi(r.String())
if err != nil {
return 0, fmt.Errorf(errorMsg, r.String())
}
return q, nil
}
if r.Type != gjson.Number {
return 0, fmt.Errorf(errorMsg, r.Type.String())
}
return int(r.Num), nil
}

// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler
func (s *elasticsearchScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
targetValue := resource.NewQuantity(int64(s.metadata.targetValue), resource.DecimalSI)
metricName := kedautil.NormalizeString(fmt.Sprintf("elasticsearch-%s", s.metadata.searchTemplateName))

externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.targetValue, metricName),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name should be generated using the ScalerIndex and the "metricName". The ScalerIndex is a parameter that all scalers receive in the constructor and which should be propagated to here in order to avoid conflicts. Also, it'd be tested during the unit test to ensure that it's correctly propagated.
This point is explained more in depth here.
All the other scalers propagate it, so you can check any other :)

},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetValue,
},
}
metricSpec := v2beta2.MetricSpec{
External: externalMetric, Type: externalMetricType,
}
return []v2beta2.MetricSpec{metricSpec}
}

// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *elasticsearchScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
num, err := s.getQueryResult()
if err != nil {
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting elasticsearch: %s", err)
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(num), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

// Splits a string separated by a specified separator and trims space from all the elements.
func splitAndTrimBySep(s string, sep string) []string {
x := strings.Split(s, sep)
for i := range x {
x[i] = strings.Trim(x[i], " ")
}
return x
}
Loading