Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpk: Support listener authN #5482

Merged
merged 4 commits into from
Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/go/k8s/pkg/resources/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,15 +223,15 @@ func (r *ConfigMapResource) CreateConfiguration(
cr := &cfg.NodeConfiguration.Redpanda

internalListener := r.pandaCluster.InternalListener()
cr.KafkaAPI = []config.NamedSocketAddress{} // we don't want to inherit default kafka port
cr.KafkaAPI = append(cr.KafkaAPI, config.NamedSocketAddress{
cr.KafkaAPI = []config.NamedAuthNSocketAddress{} // we don't want to inherit default kafka port
cr.KafkaAPI = append(cr.KafkaAPI, config.NamedAuthNSocketAddress{
Address: "0.0.0.0",
Port: internalListener.Port,
Name: InternalListenerName,
})

if r.pandaCluster.ExternalListener() != nil {
cr.KafkaAPI = append(cr.KafkaAPI, config.NamedSocketAddress{
cr.KafkaAPI = append(cr.KafkaAPI, config.NamedAuthNSocketAddress{
Address: "0.0.0.0",
Port: calculateExternalPort(internalListener.Port, r.pandaCluster.ExternalListener().Port),
Name: ExternalListenerName,
Expand Down
2 changes: 1 addition & 1 deletion src/go/rpk/pkg/cli/cmd/redpanda/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func bootstrap(fs afero.Fs) *cobra.Command {

cfg.Redpanda.ID = id
cfg.Redpanda.RPCServer.Address = ownIP.String()
cfg.Redpanda.KafkaAPI = []config.NamedSocketAddress{{
cfg.Redpanda.KafkaAPI = []config.NamedAuthNSocketAddress{{
Address: ownIP.String(),
Port: config.DefaultKafkaPort,
}}
Expand Down
55 changes: 54 additions & 1 deletion src/go/rpk/pkg/cli/cmd/redpanda/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func NewStartCommand(fs afero.Fs, launcher rp.Launcher) *cobra.Command {
",",
),
)
kafkaAPI, err := parseNamedAddresses(
kafkaAPI, err := parseNamedAuthNAddresses(
kafkaAddr,
config.DefaultKafkaPort,
)
Expand Down Expand Up @@ -939,6 +939,59 @@ func parseNamedAddress(
}, nil
}

func parseNamedAuthNAddresses(
addrs []string, defaultPort int,
) ([]config.NamedAuthNSocketAddress, error) {
as := make([]config.NamedAuthNSocketAddress, 0, len(addrs))
for _, addr := range addrs {
a, err := parseNamedAuthNAddress(addr, defaultPort)
if err != nil {
return nil, err
}
if a != nil {
as = append(as, *a)
}
}
return as, nil
}

func parseNamedAuthNAddress(
addrAuthn string, defaultPort int,
) (*config.NamedAuthNSocketAddress, error) {
if addrAuthn == "" {
return nil, nil
}
addr, authn, err := splitAddressAuthN(addrAuthn)
if err != nil {
return nil, err
}
scheme, hostport, err := net.ParseHostMaybeScheme(addr)
if err != nil {
return nil, err
}
host, port := net.SplitHostPortDefault(hostport, defaultPort)

return &config.NamedAuthNSocketAddress{
Address: host,
Port: port,
Name: scheme,
AuthN: authn,
}, nil
}

func splitAddressAuthN(str string) (addr string, authn *string, err error) {
bits := strings.Split(str, "|")
if len(bits) > 2 {
err = fmt.Errorf(`invalid format for listener, at most one "|" can be present: %q`, str)
return
}
addr = bits[0]
if len(bits) == 2 {
authn = &bits[1]
}
return
}

func sendEnv(fs afero.Fs, env api.EnvironmentPayload, conf *config.Config, skipChecks bool, err error) {
if err != nil {
env.ErrorMsg = err.Error()
Expand Down
76 changes: 65 additions & 11 deletions src/go/rpk/pkg/cli/cmd/redpanda/start_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,54 @@ func TestMergeFlags(t *testing.T) {
}
}

func TestParseNamedAuthNAddress(t *testing.T) {
authNSasl := "sasl"
tests := []struct {
name string
arg string
expected config.NamedAuthNSocketAddress
expectedErrMsg string
}{
{
name: "it should parse host:port",
arg: "host:9092",
expected: config.NamedAuthNSocketAddress{Address: "host", Port: 9092, Name: ""},
},
{
name: "it should parse scheme://host:port",
arg: "scheme://host:9092",
expected: config.NamedAuthNSocketAddress{Address: "host", Port: 9092, Name: "scheme"},
},
{
name: "it should parse host:port|authn",
arg: "host:9092|sasl",
expected: config.NamedAuthNSocketAddress{Address: "host", Port: 9092, Name: "", AuthN: &authNSasl},
},
{
name: "it should parse scheme://host:port|authn",
arg: "scheme://host:9092|sasl",
expected: config.NamedAuthNSocketAddress{Address: "host", Port: 9092, Name: "scheme", AuthN: &authNSasl},
},
{
name: "it should fail for multiple |",
arg: "host|sasl|ignore",
expected: config.NamedAuthNSocketAddress{},
expectedErrMsg: `invalid format for listener, at most one "|" can be present: "host|sasl|ignore"`,
},
}

for _, tt := range tests {
t.Run(tt.name, func(st *testing.T) {
res, err := parseNamedAuthNAddress(tt.arg, 19092)
if tt.expectedErrMsg != "" {
require.EqualError(st, err, tt.expectedErrMsg)
return
}
require.Exactly(st, tt.expected, *res)
})
}
}

func TestParseSeeds(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -152,6 +200,7 @@ func TestParseSeeds(t *testing.T) {
}

func TestStartCommand(t *testing.T) {
authNSasl := "sasl"
tests := []struct {
name string
launcher redpanda.Launcher
Expand Down Expand Up @@ -262,7 +311,7 @@ func TestStartCommand(t *testing.T) {
Address: "192.168.54.2",
Port: 9643,
}}
expectedKafkaAPI := []config.NamedSocketAddress{{
expectedKafkaAPI := []config.NamedAuthNSocketAddress{{
Name: "external",
Address: "192.168.73.45",
Port: 9092,
Expand Down Expand Up @@ -327,7 +376,7 @@ func TestStartCommand(t *testing.T) {
Address: "192.168.54.2",
Port: 9643,
}}
expectedKafkaAPI := []config.NamedSocketAddress{{
expectedKafkaAPI := []config.NamedAuthNSocketAddress{{
Name: "external",
Address: "192.168.73.45",
Port: 9092,
Expand Down Expand Up @@ -378,7 +427,7 @@ func TestStartCommand(t *testing.T) {
require.NoError(st, err)
// The value set through the --kafka-addr flag should
// have been picked.
expectedKafkaAPI := []config.NamedSocketAddress{{
expectedKafkaAPI := []config.NamedAuthNSocketAddress{{
Name: "flag",
Address: "192.168.34.3",
Port: 9093,
Expand Down Expand Up @@ -825,7 +874,7 @@ func TestStartCommand(t *testing.T) {
postCheck: func(fs afero.Fs, _ *redpanda.RedpandaArgs, st *testing.T) {
conf, err := new(config.Params).Load(fs)
require.NoError(st, err)
expectedAddr := []config.NamedSocketAddress{{
expectedAddr := []config.NamedAuthNSocketAddress{{
Address: "192.168.34.32",
Port: 33145,
}}
Expand All @@ -845,7 +894,7 @@ func TestStartCommand(t *testing.T) {
postCheck: func(fs afero.Fs, _ *redpanda.RedpandaArgs, st *testing.T) {
conf, err := new(config.Params).Load(fs)
require.NoError(st, err)
expectedAddr := []config.NamedSocketAddress{{
expectedAddr := []config.NamedAuthNSocketAddress{{
Address: "192.168.34.32",
Port: 9092,
}}
Expand All @@ -865,7 +914,7 @@ func TestStartCommand(t *testing.T) {
postCheck: func(fs afero.Fs, _ *redpanda.RedpandaArgs, st *testing.T) {
conf, err := new(config.Params).Load(fs)
require.NoError(st, err)
expectedAddr := []config.NamedSocketAddress{{
expectedAddr := []config.NamedAuthNSocketAddress{{
Name: "nondefaultname",
Address: "192.168.34.32",
Port: 9092,
Expand All @@ -881,18 +930,23 @@ func TestStartCommand(t *testing.T) {
name: "it should parse the --kafka-addr and persist it (list)",
args: []string{
"--install-dir", "/var/lib/redpanda",
"--kafka-addr", "nondefaultname://192.168.34.32,host:9092",
"--kafka-addr", "nondefaultname://192.168.34.32,host:9092,authn://host:9093|sasl",
},
postCheck: func(fs afero.Fs, _ *redpanda.RedpandaArgs, st *testing.T) {
conf, err := new(config.Params).Load(fs)
require.NoError(st, err)
expectedAddr := []config.NamedSocketAddress{{
expectedAddr := []config.NamedAuthNSocketAddress{{
Name: "nondefaultname",
Address: "192.168.34.32",
Port: 9092,
}, {
Address: "host",
Port: 9092,
}, {
Name: "authn",
Address: "host",
Port: 9093,
AuthN: &authNSasl,
}}
// Check that the generated config is as expected.
require.Exactly(
Expand Down Expand Up @@ -923,7 +977,7 @@ func TestStartCommand(t *testing.T) {
postCheck: func(fs afero.Fs, _ *redpanda.RedpandaArgs, st *testing.T) {
conf, err := new(config.Params).Load(fs)
require.NoError(st, err)
expectedAddr := []config.NamedSocketAddress{{
expectedAddr := []config.NamedAuthNSocketAddress{{
Address: "host",
Port: 3123,
}}
Expand All @@ -941,7 +995,7 @@ func TestStartCommand(t *testing.T) {
},
before: func(fs afero.Fs) error {
conf := config.Default()
conf.Redpanda.KafkaAPI = []config.NamedSocketAddress{{
conf.Redpanda.KafkaAPI = []config.NamedAuthNSocketAddress{{
Address: "192.168.33.33",
Port: 9892,
}}
Expand All @@ -950,7 +1004,7 @@ func TestStartCommand(t *testing.T) {
postCheck: func(fs afero.Fs, _ *redpanda.RedpandaArgs, st *testing.T) {
conf, err := new(config.Params).Load(fs)
require.NoError(st, err)
expectedAddr := []config.NamedSocketAddress{{
expectedAddr := []config.NamedAuthNSocketAddress{{
Address: "192.168.33.33",
Port: 9892,
}}
Expand Down
2 changes: 1 addition & 1 deletion src/go/rpk/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func Default() *Config {
Address: "0.0.0.0",
Port: 33145,
},
KafkaAPI: []NamedSocketAddress{{
KafkaAPI: []NamedAuthNSocketAddress{{
Address: "0.0.0.0",
Port: 9092,
}},
Expand Down
32 changes: 29 additions & 3 deletions src/go/rpk/pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func getValidConfig() *Config {
}

func TestSet(t *testing.T) {
authNSasl := "sasl"
tests := []struct {
name string
key string
Expand Down Expand Up @@ -214,7 +215,7 @@ tune_cpu: true`,
port: 9092
`,
check: func(st *testing.T, c *Config) {
expected := []NamedSocketAddress{{
expected := []NamedAuthNSocketAddress{{
Name: "external",
Address: "192.168.73.45",
Port: 9092,
Expand All @@ -226,6 +227,31 @@ tune_cpu: true`,
require.Exactly(st, expected, c.Redpanda.KafkaAPI)
},
},
{
name: "extract kafka_api[].authentication_method",
key: "redpanda.kafka_api",
value: `- name: external
address: 192.168.73.45
port: 9092
authentication_method: sasl
- name: internal
address: 10.21.34.58
port: 9092
`,
check: func(st *testing.T, c *Config) {
expected := []NamedAuthNSocketAddress{{
Name: "external",
Address: "192.168.73.45",
Port: 9092,
AuthN: &authNSasl,
}, {
Name: "internal",
Address: "10.21.34.58",
Port: 9092,
}}
require.Exactly(st, expected, c.Redpanda.KafkaAPI)
},
},
{
name: "partially set map fields (json)",
key: "redpanda.kafka_api",
Expand All @@ -235,7 +261,7 @@ tune_cpu: true`,
}]`,
format: "json",
check: func(st *testing.T, c *Config) {
expected := []NamedSocketAddress{{
expected := []NamedAuthNSocketAddress{{
Port: 9092,
Address: "192.168.54.2",
}}
Expand Down Expand Up @@ -347,7 +373,7 @@ func TestDefault(t *testing.T) {
Redpanda: RedpandaConfig{
Directory: "/var/lib/redpanda/data",
RPCServer: SocketAddress{"0.0.0.0", 33145},
KafkaAPI: []NamedSocketAddress{{
KafkaAPI: []NamedAuthNSocketAddress{{
Address: "0.0.0.0",
Port: 9092,
}},
Expand Down
2 changes: 1 addition & 1 deletion src/go/rpk/pkg/config/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (p *Params) Load(fs afero.Fs) (*Config, error) {
Address: "0.0.0.0",
Port: 33145,
},
KafkaAPI: []NamedSocketAddress{{
KafkaAPI: []NamedAuthNSocketAddress{{
Address: "0.0.0.0",
Port: 9092,
}},
Expand Down
2 changes: 1 addition & 1 deletion src/go/rpk/pkg/config/params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func TestRedpandaSampleFile(t *testing.T) {
Address: "0.0.0.0",
Port: 33145,
},
KafkaAPI: []NamedSocketAddress{{
KafkaAPI: []NamedAuthNSocketAddress{{
Address: "0.0.0.0",
Port: 9092,
}},
Expand Down
Loading