Skip to content

Commit

Permalink
Merge pull request #5482 from BenPope/rpk-listener-authn
Browse files Browse the repository at this point in the history
rpk: Support listener authN
  • Loading branch information
BenPope committed Jul 20, 2022
2 parents b08ac47 + ff0ca93 commit 5b0b0eb
Show file tree
Hide file tree
Showing 11 changed files with 355 additions and 68 deletions.
6 changes: 3 additions & 3 deletions src/go/k8s/pkg/resources/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,15 +223,15 @@ func (r *ConfigMapResource) CreateConfiguration(
cr := &cfg.NodeConfiguration.Redpanda

internalListener := r.pandaCluster.InternalListener()
cr.KafkaAPI = []config.NamedSocketAddress{} // we don't want to inherit default kafka port
cr.KafkaAPI = append(cr.KafkaAPI, config.NamedSocketAddress{
cr.KafkaAPI = []config.NamedAuthNSocketAddress{} // we don't want to inherit default kafka port
cr.KafkaAPI = append(cr.KafkaAPI, config.NamedAuthNSocketAddress{
Address: "0.0.0.0",
Port: internalListener.Port,
Name: InternalListenerName,
})

if r.pandaCluster.ExternalListener() != nil {
cr.KafkaAPI = append(cr.KafkaAPI, config.NamedSocketAddress{
cr.KafkaAPI = append(cr.KafkaAPI, config.NamedAuthNSocketAddress{
Address: "0.0.0.0",
Port: calculateExternalPort(internalListener.Port, r.pandaCluster.ExternalListener().Port),
Name: ExternalListenerName,
Expand Down
2 changes: 1 addition & 1 deletion src/go/rpk/pkg/cli/cmd/redpanda/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func bootstrap(fs afero.Fs) *cobra.Command {

cfg.Redpanda.ID = id
cfg.Redpanda.RPCServer.Address = ownIP.String()
cfg.Redpanda.KafkaAPI = []config.NamedSocketAddress{{
cfg.Redpanda.KafkaAPI = []config.NamedAuthNSocketAddress{{
Address: ownIP.String(),
Port: config.DefaultKafkaPort,
}}
Expand Down
55 changes: 54 additions & 1 deletion src/go/rpk/pkg/cli/cmd/redpanda/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func NewStartCommand(fs afero.Fs, launcher rp.Launcher) *cobra.Command {
",",
),
)
kafkaAPI, err := parseNamedAddresses(
kafkaAPI, err := parseNamedAuthNAddresses(
kafkaAddr,
config.DefaultKafkaPort,
)
Expand Down Expand Up @@ -939,6 +939,59 @@ func parseNamedAddress(
}, nil
}

func parseNamedAuthNAddresses(
addrs []string, defaultPort int,
) ([]config.NamedAuthNSocketAddress, error) {
as := make([]config.NamedAuthNSocketAddress, 0, len(addrs))
for _, addr := range addrs {
a, err := parseNamedAuthNAddress(addr, defaultPort)
if err != nil {
return nil, err
}
if a != nil {
as = append(as, *a)
}
}
return as, nil
}

func parseNamedAuthNAddress(
addrAuthn string, defaultPort int,
) (*config.NamedAuthNSocketAddress, error) {
if addrAuthn == "" {
return nil, nil
}
addr, authn, err := splitAddressAuthN(addrAuthn)
if err != nil {
return nil, err
}
scheme, hostport, err := net.ParseHostMaybeScheme(addr)
if err != nil {
return nil, err
}
host, port := net.SplitHostPortDefault(hostport, defaultPort)

return &config.NamedAuthNSocketAddress{
Address: host,
Port: port,
Name: scheme,
AuthN: authn,
}, nil
}

func splitAddressAuthN(str string) (addr string, authn *string, err error) {
bits := strings.Split(str, "|")
if len(bits) > 2 {
err = fmt.Errorf(`invalid format for listener, at most one "|" can be present: %q`, str)
return
}
addr = bits[0]
if len(bits) == 2 {
authn = &bits[1]
}
return
}

func sendEnv(fs afero.Fs, env api.EnvironmentPayload, conf *config.Config, skipChecks bool, err error) {
if err != nil {
env.ErrorMsg = err.Error()
Expand Down
76 changes: 65 additions & 11 deletions src/go/rpk/pkg/cli/cmd/redpanda/start_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,54 @@ func TestMergeFlags(t *testing.T) {
}
}

func TestParseNamedAuthNAddress(t *testing.T) {
authNSasl := "sasl"
tests := []struct {
name string
arg string
expected config.NamedAuthNSocketAddress
expectedErrMsg string
}{
{
name: "it should parse host:port",
arg: "host:9092",
expected: config.NamedAuthNSocketAddress{Address: "host", Port: 9092, Name: ""},
},
{
name: "it should parse scheme://host:port",
arg: "scheme://host:9092",
expected: config.NamedAuthNSocketAddress{Address: "host", Port: 9092, Name: "scheme"},
},
{
name: "it should parse host:port|authn",
arg: "host:9092|sasl",
expected: config.NamedAuthNSocketAddress{Address: "host", Port: 9092, Name: "", AuthN: &authNSasl},
},
{
name: "it should parse scheme://host:port|authn",
arg: "scheme://host:9092|sasl",
expected: config.NamedAuthNSocketAddress{Address: "host", Port: 9092, Name: "scheme", AuthN: &authNSasl},
},
{
name: "it should fail for multiple |",
arg: "host|sasl|ignore",
expected: config.NamedAuthNSocketAddress{},
expectedErrMsg: `invalid format for listener, at most one "|" can be present: "host|sasl|ignore"`,
},
}

for _, tt := range tests {
t.Run(tt.name, func(st *testing.T) {
res, err := parseNamedAuthNAddress(tt.arg, 19092)
if tt.expectedErrMsg != "" {
require.EqualError(st, err, tt.expectedErrMsg)
return
}
require.Exactly(st, tt.expected, *res)
})
}
}

func TestParseSeeds(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -152,6 +200,7 @@ func TestParseSeeds(t *testing.T) {
}

func TestStartCommand(t *testing.T) {
authNSasl := "sasl"
tests := []struct {
name string
launcher redpanda.Launcher
Expand Down Expand Up @@ -262,7 +311,7 @@ func TestStartCommand(t *testing.T) {
Address: "192.168.54.2",
Port: 9643,
}}
expectedKafkaAPI := []config.NamedSocketAddress{{
expectedKafkaAPI := []config.NamedAuthNSocketAddress{{
Name: "external",
Address: "192.168.73.45",
Port: 9092,
Expand Down Expand Up @@ -327,7 +376,7 @@ func TestStartCommand(t *testing.T) {
Address: "192.168.54.2",
Port: 9643,
}}
expectedKafkaAPI := []config.NamedSocketAddress{{
expectedKafkaAPI := []config.NamedAuthNSocketAddress{{
Name: "external",
Address: "192.168.73.45",
Port: 9092,
Expand Down Expand Up @@ -378,7 +427,7 @@ func TestStartCommand(t *testing.T) {
require.NoError(st, err)
// The value set through the --kafka-addr flag should
// have been picked.
expectedKafkaAPI := []config.NamedSocketAddress{{
expectedKafkaAPI := []config.NamedAuthNSocketAddress{{
Name: "flag",
Address: "192.168.34.3",
Port: 9093,
Expand Down Expand Up @@ -825,7 +874,7 @@ func TestStartCommand(t *testing.T) {
postCheck: func(fs afero.Fs, _ *redpanda.RedpandaArgs, st *testing.T) {
conf, err := new(config.Params).Load(fs)
require.NoError(st, err)
expectedAddr := []config.NamedSocketAddress{{
expectedAddr := []config.NamedAuthNSocketAddress{{
Address: "192.168.34.32",
Port: 33145,
}}
Expand All @@ -845,7 +894,7 @@ func TestStartCommand(t *testing.T) {
postCheck: func(fs afero.Fs, _ *redpanda.RedpandaArgs, st *testing.T) {
conf, err := new(config.Params).Load(fs)
require.NoError(st, err)
expectedAddr := []config.NamedSocketAddress{{
expectedAddr := []config.NamedAuthNSocketAddress{{
Address: "192.168.34.32",
Port: 9092,
}}
Expand All @@ -865,7 +914,7 @@ func TestStartCommand(t *testing.T) {
postCheck: func(fs afero.Fs, _ *redpanda.RedpandaArgs, st *testing.T) {
conf, err := new(config.Params).Load(fs)
require.NoError(st, err)
expectedAddr := []config.NamedSocketAddress{{
expectedAddr := []config.NamedAuthNSocketAddress{{
Name: "nondefaultname",
Address: "192.168.34.32",
Port: 9092,
Expand All @@ -881,18 +930,23 @@ func TestStartCommand(t *testing.T) {
name: "it should parse the --kafka-addr and persist it (list)",
args: []string{
"--install-dir", "/var/lib/redpanda",
"--kafka-addr", "nondefaultname://192.168.34.32,host:9092",
"--kafka-addr", "nondefaultname://192.168.34.32,host:9092,authn://host:9093|sasl",
},
postCheck: func(fs afero.Fs, _ *redpanda.RedpandaArgs, st *testing.T) {
conf, err := new(config.Params).Load(fs)
require.NoError(st, err)
expectedAddr := []config.NamedSocketAddress{{
expectedAddr := []config.NamedAuthNSocketAddress{{
Name: "nondefaultname",
Address: "192.168.34.32",
Port: 9092,
}, {
Address: "host",
Port: 9092,
}, {
Name: "authn",
Address: "host",
Port: 9093,
AuthN: &authNSasl,
}}
// Check that the generated config is as expected.
require.Exactly(
Expand Down Expand Up @@ -923,7 +977,7 @@ func TestStartCommand(t *testing.T) {
postCheck: func(fs afero.Fs, _ *redpanda.RedpandaArgs, st *testing.T) {
conf, err := new(config.Params).Load(fs)
require.NoError(st, err)
expectedAddr := []config.NamedSocketAddress{{
expectedAddr := []config.NamedAuthNSocketAddress{{
Address: "host",
Port: 3123,
}}
Expand All @@ -941,7 +995,7 @@ func TestStartCommand(t *testing.T) {
},
before: func(fs afero.Fs) error {
conf := config.Default()
conf.Redpanda.KafkaAPI = []config.NamedSocketAddress{{
conf.Redpanda.KafkaAPI = []config.NamedAuthNSocketAddress{{
Address: "192.168.33.33",
Port: 9892,
}}
Expand All @@ -950,7 +1004,7 @@ func TestStartCommand(t *testing.T) {
postCheck: func(fs afero.Fs, _ *redpanda.RedpandaArgs, st *testing.T) {
conf, err := new(config.Params).Load(fs)
require.NoError(st, err)
expectedAddr := []config.NamedSocketAddress{{
expectedAddr := []config.NamedAuthNSocketAddress{{
Address: "192.168.33.33",
Port: 9892,
}}
Expand Down
2 changes: 1 addition & 1 deletion src/go/rpk/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func Default() *Config {
Address: "0.0.0.0",
Port: 33145,
},
KafkaAPI: []NamedSocketAddress{{
KafkaAPI: []NamedAuthNSocketAddress{{
Address: "0.0.0.0",
Port: 9092,
}},
Expand Down
32 changes: 29 additions & 3 deletions src/go/rpk/pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func getValidConfig() *Config {
}

func TestSet(t *testing.T) {
authNSasl := "sasl"
tests := []struct {
name string
key string
Expand Down Expand Up @@ -214,7 +215,7 @@ tune_cpu: true`,
port: 9092
`,
check: func(st *testing.T, c *Config) {
expected := []NamedSocketAddress{{
expected := []NamedAuthNSocketAddress{{
Name: "external",
Address: "192.168.73.45",
Port: 9092,
Expand All @@ -226,6 +227,31 @@ tune_cpu: true`,
require.Exactly(st, expected, c.Redpanda.KafkaAPI)
},
},
{
name: "extract kafka_api[].authentication_method",
key: "redpanda.kafka_api",
value: `- name: external
address: 192.168.73.45
port: 9092
authentication_method: sasl
- name: internal
address: 10.21.34.58
port: 9092
`,
check: func(st *testing.T, c *Config) {
expected := []NamedAuthNSocketAddress{{
Name: "external",
Address: "192.168.73.45",
Port: 9092,
AuthN: &authNSasl,
}, {
Name: "internal",
Address: "10.21.34.58",
Port: 9092,
}}
require.Exactly(st, expected, c.Redpanda.KafkaAPI)
},
},
{
name: "partially set map fields (json)",
key: "redpanda.kafka_api",
Expand All @@ -235,7 +261,7 @@ tune_cpu: true`,
}]`,
format: "json",
check: func(st *testing.T, c *Config) {
expected := []NamedSocketAddress{{
expected := []NamedAuthNSocketAddress{{
Port: 9092,
Address: "192.168.54.2",
}}
Expand Down Expand Up @@ -423,7 +449,7 @@ func TestDefault(t *testing.T) {
Redpanda: RedpandaConfig{
Directory: "/var/lib/redpanda/data",
RPCServer: SocketAddress{"0.0.0.0", 33145},
KafkaAPI: []NamedSocketAddress{{
KafkaAPI: []NamedAuthNSocketAddress{{
Address: "0.0.0.0",
Port: 9092,
}},
Expand Down
2 changes: 1 addition & 1 deletion src/go/rpk/pkg/config/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (p *Params) Load(fs afero.Fs) (*Config, error) {
Address: "0.0.0.0",
Port: 33145,
},
KafkaAPI: []NamedSocketAddress{{
KafkaAPI: []NamedAuthNSocketAddress{{
Address: "0.0.0.0",
Port: 9092,
}},
Expand Down
2 changes: 1 addition & 1 deletion src/go/rpk/pkg/config/params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func TestRedpandaSampleFile(t *testing.T) {
Address: "0.0.0.0",
Port: 33145,
},
KafkaAPI: []NamedSocketAddress{{
KafkaAPI: []NamedAuthNSocketAddress{{
Address: "0.0.0.0",
Port: 9092,
}},
Expand Down
Loading

0 comments on commit 5b0b0eb

Please sign in to comment.