Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an elasticsearch scaler based on search template #2304

Closed
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ repos:
entry: "(?i)(black|white)[_-]?(list|List)"
pass_filenames: true
- id: sort-scalers
name: Check if scalers are sorted in scaler_handler.go
name: Check if scalers are sorted in scale_handler.go
language: system
entry: "bash tools/sort_scalers.sh"
files: .*scale_handler\.go$
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
- Improve Redis Scaler, upgrade library, add username and Sentinel support ([#2181](https://github.com/kedacore/keda/pull/2181))
- Add GCP identity authentication when using Pubsub Scaler ([#2225](https://github.com/kedacore/keda/pull/2225))
- Add ScalersCache to reuse scalers unless they need changing ([#2187](https://github.com/kedacore/keda/pull/2187))
- Add an elasticsearch scaler based on search template ([#2304](https://github.com/kedacore/keda/pull/2304))

### Improvements

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/Shopify/sarama v1.30.0
github.com/aws/aws-sdk-go v1.42.3
github.com/denisenkom/go-mssqldb v0.11.0
github.com/elastic/go-elasticsearch/v7 v7.15.1
github.com/go-logr/logr v0.4.0
github.com/go-playground/assert/v2 v2.0.1
github.com/go-redis/redis/v8 v8.11.4
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/elastic/go-elasticsearch/v7 v7.15.1 h1:Wd8RLHb5D8xPBU8vGlnLXyflkso9G+rCmsXjqH8LLQQ=
github.com/elastic/go-elasticsearch/v7 v7.15.1/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
Expand Down
276 changes: 276 additions & 0 deletions pkg/scalers/elasticsearch_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
package scalers

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"strings"

"github.com/elastic/go-elasticsearch/v7"
kedautil "github.com/kedacore/keda/v2/pkg/util"
"github.com/tidwall/gjson"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

type elasticsearchScaler struct {
metadata *elasticsearchMetadata
esClient *elasticsearch.Client
}

type elasticsearchMetadata struct {
addresses []string
unsafeSsl bool
username string
password string
indexes []string
searchTemplateName string
parameters []string
valueLocation string
targetValue int
metricName string
}

var elasticsearchLog = logf.Log.WithName("elasticsearch_scaler")

// NewElasticsearchScaler creates a new elasticsearch scaler
func NewElasticsearchScaler(config *ScalerConfig) (Scaler, error) {
meta, err := parseElasticsearchMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing elasticsearch metadata: %s", err)
}

esClient, err := newElasticsearchClient(meta)
if err != nil {
return nil, fmt.Errorf("error getting elasticsearch client: %s", err)
}
return &elasticsearchScaler{
metadata: meta,
esClient: esClient,
}, nil
}

const defaultUnsafeSsl = false

func parseElasticsearchMetadata(config *ScalerConfig) (*elasticsearchMetadata, error) {
meta := elasticsearchMetadata{}

var err error
addresses, err := GetFromAuthOrMeta(config, "addresses")
if err != nil {
return nil, err
}
meta.addresses = splitAndTrimBySep(addresses, ",")

if val, ok := config.TriggerMetadata["unsafeSsl"]; ok {
meta.unsafeSsl, err = strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("error parsing unsafeSsl: %s", err)
}
} else {
meta.unsafeSsl = defaultUnsafeSsl
}

if val, ok := config.AuthParams["username"]; ok {
meta.username = val
} else if val, ok := config.TriggerMetadata["username"]; ok {
meta.username = val
}

if config.AuthParams["password"] != "" {
meta.password = config.AuthParams["password"]
} else if config.TriggerMetadata["passwordFromEnv"] != "" {
meta.password = config.ResolvedEnv[config.TriggerMetadata["passwordFromEnv"]]
}

index, err := GetFromAuthOrMeta(config, "index")
if err != nil {
return nil, err
}
meta.indexes = splitAndTrimBySep(index, ";")

meta.searchTemplateName, err = GetFromAuthOrMeta(config, "searchTemplateName")
if err != nil {
return nil, err
}

if val, ok := config.TriggerMetadata["parameters"]; ok {
meta.parameters = splitAndTrimBySep(val, ";")
}

meta.valueLocation, err = GetFromAuthOrMeta(config, "valueLocation")
if err != nil {
return nil, err
}

targetValue, err := GetFromAuthOrMeta(config, "targetValue")
if err != nil {
return nil, err
}
meta.targetValue, err = strconv.Atoi(targetValue)
if err != nil {
return nil, fmt.Errorf("targetValue parsing error %s", err.Error())
}

meta.metricName = GenerateMetricNameWithIndex(config.ScalerIndex, kedautil.NormalizeString(fmt.Sprintf("elasticsearch-%s", meta.searchTemplateName)))
return &meta, nil
}

// newElasticsearchClient creates elasticsearch db connection
func newElasticsearchClient(meta *elasticsearchMetadata) (*elasticsearch.Client, error) {
config := elasticsearch.Config{Addresses: meta.addresses}
if meta.username != "" {
config.Username = meta.username
}
if meta.password != "" {
config.Password = meta.password
}

transport := http.DefaultTransport.(*http.Transport)
transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: meta.unsafeSsl}
config.Transport = transport

esClient, err := elasticsearch.NewClient(config)
if err != nil {
elasticsearchLog.Error(err, fmt.Sprintf("Found error when creating client: %s", err))
return nil, err
}

_, err = esClient.Info()
if err != nil {
elasticsearchLog.Error(err, fmt.Sprintf("Found error when pinging search engine: %s", err))
return nil, err
}
return esClient, nil
}

func (s *elasticsearchScaler) Close(ctx context.Context) error {
return nil
}

// IsActive returns true if there are pending messages to be processed
func (s *elasticsearchScaler) IsActive(ctx context.Context) (bool, error) {
messages, err := s.getQueryResult(ctx)
if err != nil {
elasticsearchLog.Error(err, fmt.Sprintf("Error inspecting elasticsearch: %s", err))
return false, err
}
return messages > 0, nil
}

// getQueryResult returns result of the scaler query
func (s *elasticsearchScaler) getQueryResult(ctx context.Context) (int, error) {
// Build the request body.
var body bytes.Buffer
if err := json.NewEncoder(&body).Encode(buildQuery(s.metadata)); err != nil {
elasticsearchLog.Error(err, "Error encoding query: %s", err)
}

// Run the templated search
res, err := s.esClient.SearchTemplate(
&body,
s.esClient.SearchTemplate.WithIndex(s.metadata.indexes...),
Copy link
Member

@JorTurFer JorTurFer Nov 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should pass the context until here because you could add .WithContext. With this change, the request context can be managed from outside the scaler.

s.esClient.SearchTemplate.WithContext(ctx),
)
if err != nil {
elasticsearchLog.Error(err, fmt.Sprintf("Could not query elasticsearch: %s", err))
return 0, err
}

defer res.Body.Close()
b, err := ioutil.ReadAll(res.Body)
if err != nil {
return 0, err
}
v, err := getValueFromSearch(b, s.metadata.valueLocation)
if err != nil {
return 0, err
}
return v, nil
}

func buildQuery(metadata *elasticsearchMetadata) map[string]interface{} {
parameters := map[string]interface{}{}
for _, p := range metadata.parameters {
if p != "" {
kv := splitAndTrimBySep(p, ":")
parameters[kv[0]] = kv[1]
}
}
query := map[string]interface{}{
"id": metadata.searchTemplateName,
}
if len(parameters) > 0 {
query["params"] = parameters
}
return query
}

func getValueFromSearch(body []byte, valueLocation string) (int, error) {
r := gjson.GetBytes(body, valueLocation)
errorMsg := "valueLocation must point to value of type number but got: '%s'"
if r.Type == gjson.String {
q, err := strconv.Atoi(r.String())
if err != nil {
return 0, fmt.Errorf(errorMsg, r.String())
}
return q, nil
}
if r.Type != gjson.Number {
return 0, fmt.Errorf(errorMsg, r.Type.String())
}
return int(r.Num), nil
}

// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler
func (s *elasticsearchScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
targetValue := resource.NewQuantity(int64(s.metadata.targetValue), resource.DecimalSI)

externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.targetValue, s.metadata.metricName),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should call only once to GenerateMetricNameWithIndex. This index is not related with any inside the scaler, it's related with the trigger index inside the ScaledObject to avoid conflicts with duplicated names in the same ScaledObject.
You could call to the function here or in the constructor like you are doing but don't do it both, because the final metric name with current code will be like s{s.metadata.targetValue}-s{config.ScalerIndex}-elasticsearch-%s. It should be like s{config.ScalerIndex}-elasticsearch-%s

},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetValue,
},
}
metricSpec := v2beta2.MetricSpec{
External: externalMetric, Type: externalMetricType,
}
return []v2beta2.MetricSpec{metricSpec}
}

// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *elasticsearchScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
num, err := s.getQueryResult(ctx)
if err != nil {
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting elasticsearch: %s", err)
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(num), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

// Splits a string separated by a specified separator and trims space from all the elements.
func splitAndTrimBySep(s string, sep string) []string {
x := strings.Split(s, sep)
for i := range x {
x[i] = strings.Trim(x[i], " ")
}
return x
}
Loading