Skip to content

Commit

Permalink
Add support for Kafka SASL/PLAIN authentication via SCRAM-SHA-256 or …
Browse files Browse the repository at this point in the history
…SCRAM-SHA-512 mechanism (#2724)

* add that suppot Kafka SASL/PLAIN authentication of SCRAM-SHA-256 or SCRAM-SHA-512 mechanism

Signed-off-by: WalkerWang731 <wxy1990731@hotmail.com>

* rename XDGSCRAMClient to scramClient and remove paramater that no point on factory_test.go

Signed-off-by: WalkerWang731 <wxy1990731@hotmail.com>

* add type assertion

Signed-off-by: WalkerWang731 <wxy1990731@hotmail.com>

* replacement UserName to Username

Signed-off-by: WalkerWang731 <wxy1990731@hotmail.com>
  • Loading branch information
WalkerWang731 committed Jan 14, 2021
1 parent b73b0c8 commit 8fb235a
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 28 deletions.
11 changes: 6 additions & 5 deletions cmd/ingester/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,29 +55,30 @@ func TestOptionsWithFlags(t *testing.T) {

func TestTLSFlags(t *testing.T) {
kerb := auth.KerberosConfig{ServiceName: "kafka", ConfigPath: "/etc/krb5.conf", KeyTabPath: "/etc/security/kafka.keytab"}
plain := auth.PlainTextConfig{Username: "", Password: "", Mechanism: "PLAIN"}
tests := []struct {
flags []string
expected auth.AuthenticationConfig
}{
{
flags: []string{},
expected: auth.AuthenticationConfig{Authentication: "none", Kerberos: kerb},
expected: auth.AuthenticationConfig{Authentication: "none", Kerberos: kerb, PlainText: plain},
},
{
flags: []string{"--kafka.consumer.authentication=foo"},
expected: auth.AuthenticationConfig{Authentication: "foo", Kerberos: kerb},
expected: auth.AuthenticationConfig{Authentication: "foo", Kerberos: kerb, PlainText: plain},
},
{
flags: []string{"--kafka.consumer.authentication=kerberos", "--kafka.consumer.tls.enabled=true"},
expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}},
expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain},
},
{
flags: []string{"--kafka.consumer.authentication=tls"},
expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}},
expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain},
},
{
flags: []string{"--kafka.consumer.authentication=tls", "--kafka.consumer.tls.enabled=false"},
expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}},
expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain},
},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (f Factory) CreateDefaultConfig() configmodels.Exporter {

if opts.Config.Authentication == "plaintext" {
cfg.Authentication.PlainText = &kafkaexporter.PlainTextConfig{
Username: opts.Config.PlainText.UserName,
Username: opts.Config.PlainText.Username,
Password: opts.Config.PlainText.Password,
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (f *Factory) CreateDefaultConfig() configmodels.Receiver {

if opts.Authentication == "plaintext" {
cfg.Authentication.PlainText = &kafkaexporter.PlainTextConfig{
Username: opts.PlainText.UserName,
Username: opts.PlainText.Username,
Password: opts.PlainText.Password,
}
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ require (
github.com/uber/jaeger-lib v2.4.0+incompatible
github.com/vektra/mockery v0.0.0-20181123154057-e78b021dcbb5
github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
go.mongodb.org/mongo-driver v1.3.2 // indirect
go.uber.org/atomic v1.6.0
go.uber.org/automaxprocs v1.3.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -587,8 +587,10 @@ github.com/vektra/mockery v0.0.0-20181123154057-e78b021dcbb5 h1:Xim2mBRFdXzXmKRO
github.com/vektra/mockery v0.0.0-20181123154057-e78b021dcbb5/go.mod h1:ppEjwdhyy7Y31EnHRDm1JkChoC7LXIJ7Ex0VYLWtZtQ=
github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad h1:W0LEBv82YCGEtcmPA3uNZBI33/qF//HAAs3MawDjRa0=
github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad/go.mod h1:Hy8o65+MXnS6EwGElrSRjUzQDLXreJlzYLlWiHtt8hM=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
Expand Down
10 changes: 7 additions & 3 deletions pkg/kafka/auth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ func (config *AuthenticationConfig) SetConfiguration(saramaConfig *sarama.Config
setKerberosConfiguration(&config.Kerberos, saramaConfig)
return nil
case plaintext:
setPlainTextConfiguration(&config.PlainText, saramaConfig)
err := setPlainTextConfiguration(&config.PlainText, saramaConfig)
if err != nil {
return err
}
return nil
default:
return fmt.Errorf("Unknown/Unsupported authentication method %s to kafka cluster", config.Authentication)
Expand All @@ -81,7 +84,7 @@ func (config *AuthenticationConfig) InitFromViper(configPrefix string, v *viper.
config.Kerberos.ServiceName = v.GetString(configPrefix + kerberosPrefix + suffixKerberosServiceName)
config.Kerberos.Realm = v.GetString(configPrefix + kerberosPrefix + suffixKerberosRealm)
config.Kerberos.UseKeyTab = v.GetBool(configPrefix + kerberosPrefix + suffixKerberosUseKeyTab)
config.Kerberos.Username = v.GetString(configPrefix + kerberosPrefix + suffixKerberosUserName)
config.Kerberos.Username = v.GetString(configPrefix + kerberosPrefix + suffixKerberosUsername)
config.Kerberos.Password = v.GetString(configPrefix + kerberosPrefix + suffixKerberosPassword)
config.Kerberos.ConfigPath = v.GetString(configPrefix + kerberosPrefix + suffixKerberosConfig)
config.Kerberos.KeyTabPath = v.GetString(configPrefix + kerberosPrefix + suffixKerberosKeyTab)
Expand All @@ -97,6 +100,7 @@ func (config *AuthenticationConfig) InitFromViper(configPrefix string, v *viper.
config.TLS.Enabled = true
}

config.PlainText.UserName = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextUserName)
config.PlainText.Username = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextUsername)
config.PlainText.Password = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextPassword)
config.PlainText.Mechanism = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextMechanism)
}
24 changes: 15 additions & 9 deletions pkg/kafka/auth/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
suffixKerberosServiceName = ".service-name"
suffixKerberosRealm = ".realm"
suffixKerberosUseKeyTab = ".use-keytab"
suffixKerberosUserName = ".username"
suffixKerberosUsername = ".username"
suffixKerberosPassword = ".password"
suffixKerberosConfig = ".config-file"
suffixKerberosKeyTab = ".keytab-file"
Expand All @@ -43,12 +43,14 @@ const (
defaultKerberosUsername = ""
defaultKerberosKeyTab = "/etc/security/kafka.keytab"

plainTextPrefix = ".plaintext"
suffixPlainTextUserName = ".username"
suffixPlainTextPassword = ".password"
plainTextPrefix = ".plaintext"
suffixPlainTextUsername = ".username"
suffixPlainTextPassword = ".password"
suffixPlainTextMechanism = ".mechanism"

defaultPlainTextUserName = ""
defaultPlainTextPassword = ""
defaultPlainTextUsername = ""
defaultPlainTextPassword = ""
defaultPlainTextMechanism = "PLAIN"
)

func addKerberosFlags(configPrefix string, flagSet *flag.FlagSet) {
Expand All @@ -65,7 +67,7 @@ func addKerberosFlags(configPrefix string, flagSet *flag.FlagSet) {
defaultKerberosPassword,
"The Kerberos password used for authenticate with KDC")
flagSet.String(
configPrefix+kerberosPrefix+suffixKerberosUserName,
configPrefix+kerberosPrefix+suffixKerberosUsername,
defaultKerberosUsername,
"The Kerberos username used for authenticate with KDC")
flagSet.String(
Expand All @@ -84,13 +86,17 @@ func addKerberosFlags(configPrefix string, flagSet *flag.FlagSet) {

func addPlainTextFlags(configPrefix string, flagSet *flag.FlagSet) {
flagSet.String(
configPrefix+plainTextPrefix+suffixPlainTextUserName,
defaultPlainTextUserName,
configPrefix+plainTextPrefix+suffixPlainTextUsername,
defaultPlainTextUsername,
"The plaintext Username for SASL/PLAIN authentication")
flagSet.String(
configPrefix+plainTextPrefix+suffixPlainTextPassword,
defaultPlainTextPassword,
"The plaintext Password for SASL/PLAIN authentication")
flagSet.String(
configPrefix+plainTextPrefix+suffixPlainTextMechanism,
defaultPlainTextMechanism,
"The plaintext Mechanism for SASL/PLAIN authentication, e.g. 'SCRAM-SHA-256' or 'SCRAM-SHA-512' or 'PLAIN'")
}

// AddFlags add configuration flags to a flagSet.
Expand Down
68 changes: 64 additions & 4 deletions pkg/kafka/auth/plaintext.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,77 @@
package auth

import (
"crypto/sha256"
"crypto/sha512"
"fmt"
"hash"
"strings"

"github.com/Shopify/sarama"
"github.com/xdg/scram"
)

// scramClient is the client to use when the auth mechanism is SCRAM
type scramClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}

// Begin prepares the client for the SCRAM exchange
// with the server with a user name and a password
func (x *scramClient) Begin(userName, password, authzID string) (err error) {
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
x.ClientConversation = x.Client.NewConversation()
return nil
}

// Step steps client through the SCRAM exchange. It is
// called repeatedly until it errors or `Done` returns true.
func (x *scramClient) Step(challenge string) (response string, err error) {
response, err = x.ClientConversation.Step(challenge)
return
}

// Done should return true when the SCRAM conversation
// is over.
func (x *scramClient) Done() bool {
return x.ClientConversation.Done()
}

// PlainTextConfig describes the configuration properties needed for SASL/PLAIN with kafka
type PlainTextConfig struct {
UserName string `mapstructure:"username"`
Password string `mapstructure:"password" json:"-"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password" json:"-"`
Mechanism string `mapstructure:"mechanism"`
}

func setPlainTextConfiguration(config *PlainTextConfig, saramaConfig *sarama.Config) {
var _ sarama.SCRAMClient = (*scramClient)(nil)

func setPlainTextConfiguration(config *PlainTextConfig, saramaConfig *sarama.Config) error {
saramaConfig.Net.SASL.Enable = true
saramaConfig.Net.SASL.User = config.UserName
saramaConfig.Net.SASL.User = config.Username
saramaConfig.Net.SASL.Password = config.Password
switch strings.ToUpper(config.Mechanism) {
case "SCRAM-SHA-256":
saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &scramClient{HashGeneratorFcn: func() hash.Hash { return sha256.New() }}
}
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
case "SCRAM-SHA-512":
saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &scramClient{HashGeneratorFcn: func() hash.Hash { return sha512.New() }}
}
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
case "PLAIN":
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypePlaintext

default:
return fmt.Errorf("config plaintext.mechanism error: %s, only support 'SCRAM-SHA-256' or 'SCRAM-SHA-512' or 'PLAIN'", config.Mechanism)

}
return nil
}
11 changes: 6 additions & 5 deletions plugin/storage/kafka/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,29 +173,30 @@ func TestRequiredAcksFailures(t *testing.T) {

func TestTLSFlags(t *testing.T) {
kerb := auth.KerberosConfig{ServiceName: "kafka", ConfigPath: "/etc/krb5.conf", KeyTabPath: "/etc/security/kafka.keytab"}
plain := auth.PlainTextConfig{Username: "", Password: "", Mechanism: "PLAIN"}
tests := []struct {
flags []string
expected auth.AuthenticationConfig
}{
{
flags: []string{},
expected: auth.AuthenticationConfig{Authentication: "none", Kerberos: kerb},
expected: auth.AuthenticationConfig{Authentication: "none", Kerberos: kerb, PlainText: plain},
},
{
flags: []string{"--kafka.producer.authentication=foo"},
expected: auth.AuthenticationConfig{Authentication: "foo", Kerberos: kerb},
expected: auth.AuthenticationConfig{Authentication: "foo", Kerberos: kerb, PlainText: plain},
},
{
flags: []string{"--kafka.producer.authentication=kerberos", "--kafka.producer.tls.enabled=true"},
expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}},
expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain},
},
{
flags: []string{"--kafka.producer.authentication=tls"},
expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}},
expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain},
},
{
flags: []string{"--kafka.producer.authentication=tls", "--kafka.producer.tls.enabled=false"},
expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}},
expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain},
},
}

Expand Down

0 comments on commit 8fb235a

Please sign in to comment.