Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

External Scaler: Add tls options in TriggerAuth metadata #4407

Merged
merged 16 commits into from
May 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
### Improvements

- **Azure Data Exporer Scaler**: Use azidentity SDK ([#4489](https://github.com/kedacore/keda/issues/4489))
- **External Scaler**: Add tls options in TriggerAuth metadata. ([#3565](https://github.com/kedacore/keda/issues/3565))
- **GCP PubSub Scaler**: Make it more flexible for metrics ([#4243](https://github.com/kedacore/keda/issues/4243))

### Fixes
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/azure_queue_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func parseAzureQueueMetadata(config *ScalerConfig, logger logr.Logger) (*azureQu

// before triggerAuthentication CRD, pod identity was configured using this property
if val, ok := config.TriggerMetadata["useAAdPodIdentity"]; ok && config.PodIdentity.Provider == "" {
if val == "true" {
if val == stringTrue {
config.PodIdentity = kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderAzure}
}
}
Expand Down
48 changes: 43 additions & 5 deletions pkg/scalers/external_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package scalers
import (
"context"
"fmt"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -17,6 +18,7 @@ import (
"k8s.io/metrics/pkg/apis/external_metrics"

pb "github.com/kedacore/keda/v2/pkg/scalers/externalscaler"
"github.com/kedacore/keda/v2/pkg/util"
)

type externalScaler struct {
Expand All @@ -35,6 +37,10 @@ type externalScalerMetadata struct {
tlsCertFile string
originalMetadata map[string]string
scalerIndex int
caCert string
tlsClientCert string
tlsClientKey string
unsafeSsl bool
}

type connectionGroup struct {
Expand Down Expand Up @@ -112,7 +118,26 @@ func parseExternalScalerMetadata(config *ScalerConfig) (externalScalerMetadata,
}

meta.originalMetadata = make(map[string]string)
if val, ok := config.AuthParams["caCert"]; ok {
meta.caCert = val
}

if val, ok := config.AuthParams["tlsClientCert"]; ok {
meta.tlsClientCert = val
}

if val, ok := config.AuthParams["tlsClientKey"]; ok {
meta.tlsClientKey = val
}

meta.unsafeSsl = false
if val, ok := config.TriggerMetadata["unsafeSsl"]; ok && val != "" {
boolVal, err := strconv.ParseBool(val)
if err != nil {
return meta, fmt.Errorf("failed to parse insecureSkipVerify value. Must be either true or false")
}
meta.unsafeSsl = boolVal
}
// Add elements to metadata
for key, value := range config.TriggerMetadata {
// Check if key is in resolved environment and resolve
Expand All @@ -136,7 +161,7 @@ func (s *externalScaler) Close(context.Context) error {
func (s *externalScaler) GetMetricSpecForScaling(ctx context.Context) []v2.MetricSpec {
var result []v2.MetricSpec

grpcClient, err := getClientForConnectionPool(s.metadata)
grpcClient, err := getClientForConnectionPool(s.metadata, s.logger)
if err != nil {
s.logger.Error(err, "error building grpc connection")
return result
Expand Down Expand Up @@ -171,7 +196,7 @@ func (s *externalScaler) GetMetricSpecForScaling(ctx context.Context) []v2.Metri
// GetMetricsAndActivity returns value for a supported metric and an error if there is a problem getting the metric
func (s *externalScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
var metrics []external_metrics.ExternalMetricValue
grpcClient, err := getClientForConnectionPool(s.metadata)
grpcClient, err := getClientForConnectionPool(s.metadata, s.logger)
if err != nil {
return []external_metrics.ExternalMetricValue{}, false, err
}
Expand Down Expand Up @@ -212,7 +237,7 @@ func (s *externalPushScaler) Run(ctx context.Context, active chan<- bool) {
defer close(active)
// It's possible for the connection to get terminated anytime, we need to run this in a retry loop
runWithLog := func() {
grpcClient, err := getClientForConnectionPool(s.metadata)
grpcClient, err := getClientForConnectionPool(s.metadata, s.logger)
if err != nil {
s.logger.Error(err, "error running internalRun")
return
Expand All @@ -223,7 +248,7 @@ func (s *externalPushScaler) Run(ctx context.Context, active chan<- bool) {
}
}

// retry on error from runWithLog() starting by 2 sec backing off * 2 with a max of 1 minute
// retry on error from runWithLog() starting by 2 sec backing off * 2 with a max of 2 minute
retryDuration := time.Second * 2
// the caller of this function needs to ensure that they call Stop() on the resulting
// timer, to release background resources.
Expand Down Expand Up @@ -273,19 +298,32 @@ var connectionPoolMutex sync.Mutex

// getClientForConnectionPool returns a grpcClient and a done() Func. The done() function must be called once the client is no longer
// in use to clean up the shared grpc.ClientConn
func getClientForConnectionPool(metadata externalScalerMetadata) (pb.ExternalScalerClient, error) {
func getClientForConnectionPool(metadata externalScalerMetadata, logger logr.Logger) (pb.ExternalScalerClient, error) {
connectionPoolMutex.Lock()
defer connectionPoolMutex.Unlock()

buildGRPCConnection := func(metadata externalScalerMetadata) (*grpc.ClientConn, error) {
// FIXME: DEPRECATED to be removed in v2.13 https://github.com/kedacore/keda/issues/4549
if metadata.tlsCertFile != "" {
logger.V(1).Info("tlsCertFile in ScaleObject metadata will be deprecated in v2.12. Please use" +
"tlsClientCert, tlsClientKey and caCert in TriggerAuthentication instead.")
creds, err := credentials.NewClientTLSFromFile(metadata.tlsCertFile, "")
if err != nil {
return nil, err
}
return grpc.Dial(metadata.scalerAddress, grpc.WithTransportCredentials(creds))
}

tlsConfig, err := util.NewTLSConfig(metadata.tlsClientCert, metadata.tlsClientKey, metadata.caCert, metadata.unsafeSsl)
if err != nil {
return nil, err
}

if len(tlsConfig.Certificates) > 0 || metadata.caCert != "" {
// nosemgrep: go.grpc.ssrf.grpc-tainted-url-host.grpc-tainted-url-host
return grpc.Dial(metadata.scalerAddress, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
JorTurFer marked this conversation as resolved.
Show resolved Hide resolved
}

return grpc.Dial(metadata.scalerAddress, grpc.WithTransportCredentials(insecure.NewCredentials()))
}

Expand Down
61 changes: 53 additions & 8 deletions pkg/scalers/external_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,71 @@ import (
pb "github.com/kedacore/keda/v2/pkg/scalers/externalscaler"
)

var serverRootCA = `-----BEGIN CERTIFICATE-----
MIIDPTCCAiWgAwIBAgIUTPXiztn8CG3+NQdeEIlpA1F9Ec4wDQYJKoZIhvcNAQEL
BQAwLTEOMAwGA1UEAwwFa2VkYTExCzAJBgNVBAYTAlVTMQ4wDAYDVQQHDAVFYXJ0
aDAgFw0yMzAzMjYxMzU1MDJaGA8yMDUxMDgwMjEzNTUwMlowLTEOMAwGA1UEAwwF
a2VkYTExCzAJBgNVBAYTAlVTMQ4wDAYDVQQHDAVFYXJ0aDCCASIwDQYJKoZIhvcN
AQEBBQADggEPADCCAQoCggEBAOaQl2+EEZycNQlO1nEeFgheUZ20gTVAButKjvEK
vIZv+x4AdwNaOcKahro5b09QinoGakTJEOrpks+VUSqQpJ+zVmja5vpIb92gnmGQ
B3rl7nD9rP/bLffa5bNDhmMR7vRd88PYvopn6+hTyX3EGkvbCZD8WNs5f8jslzek
0xj4U4LgC9T1pBykNl5nZs5Fd4CdaO+vi3cywmgjaiPzDOYMbYH4pzflH7aNsEEc
IYs9fQ8SwzsocXpKUS+bTg9OmrDMAwan+mxz6m15BxvJzHQqmp/aE70BSkinBwCg
dgzgQUwg6ko/jnJixP4tkr8p8nURBL7GNvuVIS7Z2EjD240CAwEAAaNTMFEwHQYD
VR0OBBYEFN03E9o2ne0s5GZSZ7rZczisME5RMB8GA1UdIwQYMBaAFN03E9o2ne0s
5GZSZ7rZczisME5RMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEB
ADM6vkgjttrU9bYRviwdgohvGNYegDXfyKt25gwYn5+UwUtqjjztTWMr6aAKLNob
Dqjb0BDR5Ow0kD9KXyO4m0gTBzxrHzDnFeTQxE2h8Gl/VRGueJQ2sfmU8oG2/3GV
4nEWLAu1XMqXcFQWT9X+JS3Wxqc1DLrAeX8u0ZIx5Lkk4kPV3d3BP8KyX+AQGt57
p0kdhXOTNW1kUPCrtnc0uBJNHqlev3KkHebH20G7iAZFCCpul9cyLK1fftCBuVE/
jtq3TnHHw+BroQPje3zF/MZTAA8Z9RejkpALMtoHeE68ar07FPlC8wZXDlfQXzYS
PWO1PoIiMX1UsfdZ35JCOF4=
-----END CERTIFICATE-----
`
var clientCert = `-----BEGIN CERTIFICATE-----
MIIC9jCCAd4CFEGcfEWHP2ckC/kIgUEDUPkAVHHIMA0GCSqGSIb3DQEBCwUAMC0x
DjAMBgNVBAMMBWtlZGEyMQswCQYDVQQGEwJVUzEOMAwGA1UEBwwFRWFydGgwIBcN
MjMwMzI2MTM1NTAzWhgPMjA1MTA2MDcxMzU1MDNaMEAxCzAJBgNVBAYTAlVTMQsw
CQYDVQQIDAJDQTEUMBIGA1UECgwLTXlPcmcsIEluYy4xDjAMBgNVBAMMBWtlZGEy
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA3k7zr+QbOHXMqhyUM6Oz
SuGGqQttIGZEs12eLSRlGY9vL+pf/G3CubkGsTtp5b5tmP4CNYcGtU8wSJMn23Bq
BbXECpDXh6cuo+56VVAMJyNqZoIeS4JfKX5mvj4WrfpVJ3e+o6lrebAICK1qqTwq
z6N6ZUkeF552aTh8RAgEiKJlylmKdHc/IF3+oLc2aA7IAl+zxYOTCrjIiUrc54OB
gYCkCSb0P4SPHE8ryB0pN8S3LdtZrgVIzewd24joKbXL7hBZ3ltaj0t2kK2CTCxb
I3te0JdIaPV2TxCnK9dxLRkFXNjz/7V45rbRLXtSPoirKseUR2zbo5kfquY0J2hv
nwIDAQABMA0GCSqGSIb3DQEBCwUAA4IBAQCiZI/N/60Gj8711V+Uhj1j9iw3s4oU
qT4r9NozotrhjIMe14rhkJ1k+1x7pyLX8QHgO0WxiAp8tKX0kcUQO/ZQfTAM6FpW
cevGTxrVk5CcilafIBzaZF5Mz6diBxbTnhFS+hZXiwavkImBK4zZY9aUcVIjJfPv
xSaEVvMdofrhaio9M0deYzQ/Bf/uMkR3Fxs4qbhsg3gkbepFm3yJoSANzXCMvDnv
mauSvQwA0SRKECr46F8dSeFE1uIbN4ZNgrisBTVkoPYZuF7pAsSsjqGM0phUKiI8
5thG2dnqJSunC+vZW8QY+x3eq4XzFEpIYcaV9YpiGHbv8N6gzJydL/GB
-----END CERTIFICATE-----
`

type parseExternalScalerMetadataTestData struct {
metadata map[string]string
isError bool
metadata map[string]string
isError bool
authParams map[string]string
}

var testExternalScalerMetadata = []parseExternalScalerMetadataTestData{
{map[string]string{}, true},
{map[string]string{}, true, map[string]string{}},
// all properly formed
{map[string]string{"scalerAddress": "myservice", "test1": "7", "test2": "SAMPLE_CREDS"}, false},
{map[string]string{"scalerAddress": "myservice", "test1": "7", "test2": "SAMPLE_CREDS", "insecureSkipVerify": "true"}, false, map[string]string{"caCert": serverRootCA, "tlsClientCert": clientCert}},
// missing scalerAddress
{map[string]string{"test1": "1", "test2": "SAMPLE_CREDS"}, true},
{map[string]string{"test1": "1", "test2": "SAMPLE_CREDS"}, true, map[string]string{}},
}

func TestExternalScalerParseMetadata(t *testing.T) {
for _, testData := range testExternalScalerMetadata {
_, err := parseExternalScalerMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: map[string]string{}})
metadata, err := parseExternalScalerMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: map[string]string{}, AuthParams: testData.authParams})
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}

if testData.metadata["unsafeSsl"] == "true" && !metadata.unsafeSsl {
t.Error("Expected unsafeSsl to be true but got", metadata.unsafeSsl)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
Expand Down Expand Up @@ -104,7 +150,6 @@ type testServer struct {

func createGRPCServers(count int, t *testing.T) []testServer {
result := make([]testServer, 0, count)

for i := 0; i < count; i++ {
grpcServer := grpc.NewServer()
address := fmt.Sprintf("127.0.0.1:%d", 5050+i)
Expand Down Expand Up @@ -211,7 +256,7 @@ func TestWaitForState(t *testing.T) {
// server stop will lead to Idle.
<-waitForState(context.TODO(), grpcClient, connectivity.Idle, connectivity.Shutdown)
grpcClient.Close()
// after close the state to Shutdown.
// after close the state to shut down.
t.Log("close state:", grpcClient.GetState().String())
close(graceDone)
}()
Expand Down
39 changes: 24 additions & 15 deletions pkg/util/tls_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ func TestNewTLSConfig_WithPassword(t *testing.T) {
password string
issuer string
CACert string
isError bool
}{
{
name: "rsaCert_WithCACert",
Expand All @@ -195,6 +196,7 @@ func TestNewTLSConfig_WithPassword(t *testing.T) {
password: "keypass",
issuer: "O=Internet Widgits Pty Ltd,ST=Some-State,C=AU",
CACert: randomCACert,
isError: false,
},
{
name: "Cert_WithCACert",
Expand All @@ -203,6 +205,7 @@ func TestNewTLSConfig_WithPassword(t *testing.T) {
password: "keypass",
issuer: "O=Internet Widgits Pty Ltd,ST=Some-State,C=AU",
CACert: randomCACert,
isError: false,
},
{
name: "rsaCert_WithoutCACert",
Expand All @@ -211,6 +214,7 @@ func TestNewTLSConfig_WithPassword(t *testing.T) {
password: "keypass",
issuer: "O=Internet Widgits Pty Ltd,ST=Some-State,C=AU",
CACert: "",
isError: false,
},
{
name: "Cert_WithoutCACert",
Expand All @@ -219,28 +223,33 @@ func TestNewTLSConfig_WithPassword(t *testing.T) {
password: "keypass",
issuer: "O=Internet Widgits Pty Ltd,ST=Some-State,C=AU",
CACert: "",
isError: false,
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
config, err := NewTLSConfigWithPassword(test.cert, test.key, test.password, test.CACert, false)
if err != nil {
t.Errorf("Should have no error: %s", err)
}
cert, err := x509.ParseCertificate(config.Certificates[0].Certificate[0])
if err != nil {
t.Errorf("Bad certificate")
}
switch {
case err != nil && !test.isError:
t.Errorf("Expected success but got error: %s", err)
case test.isError && err == nil:
t.Errorf("Expect error but got success")
case err == nil:
cert, err := x509.ParseCertificate(config.Certificates[0].Certificate[0])
if err != nil {
t.Errorf("Bad certificate")
}

if test.CACert != "" {
caCertPool := getRootCAs()
caCertPool.AppendCertsFromPEM([]byte(randomCACert))
if !config.RootCAs.Equal(caCertPool) {
t.Errorf("TLS config return different CA cert")
if test.CACert != "" {
caCertPool := getRootCAs()
caCertPool.AppendCertsFromPEM([]byte(randomCACert))
if !config.RootCAs.Equal(caCertPool) {
t.Errorf("TLS config return different CA cert")
}
}
if cert.Issuer.String() != test.issuer {
t.Errorf("Expected Issuer %s but got %s\n", test.issuer, cert.Issuer.String())
}
}
if cert.Issuer.String() != test.issuer {
t.Errorf("Expected Issuer %s but got %s\n", test.issuer, cert.Issuer.String())
}
})
}
Expand Down