diff --git a/src/go/k8s/pkg/resources/configmap.go b/src/go/k8s/pkg/resources/configmap.go index 8a24e9977264..f2b77b9096f0 100644 --- a/src/go/k8s/pkg/resources/configmap.go +++ b/src/go/k8s/pkg/resources/configmap.go @@ -223,15 +223,15 @@ func (r *ConfigMapResource) CreateConfiguration( cr := &cfg.NodeConfiguration.Redpanda internalListener := r.pandaCluster.InternalListener() - cr.KafkaAPI = []config.NamedSocketAddress{} // we don't want to inherit default kafka port - cr.KafkaAPI = append(cr.KafkaAPI, config.NamedSocketAddress{ + cr.KafkaAPI = []config.NamedAuthNSocketAddress{} // we don't want to inherit default kafka port + cr.KafkaAPI = append(cr.KafkaAPI, config.NamedAuthNSocketAddress{ Address: "0.0.0.0", Port: internalListener.Port, Name: InternalListenerName, }) if r.pandaCluster.ExternalListener() != nil { - cr.KafkaAPI = append(cr.KafkaAPI, config.NamedSocketAddress{ + cr.KafkaAPI = append(cr.KafkaAPI, config.NamedAuthNSocketAddress{ Address: "0.0.0.0", Port: calculateExternalPort(internalListener.Port, r.pandaCluster.ExternalListener().Port), Name: ExternalListenerName, diff --git a/src/go/rpk/pkg/cli/cmd/redpanda/config.go b/src/go/rpk/pkg/cli/cmd/redpanda/config.go index c5be52368df2..497ea5c27d73 100644 --- a/src/go/rpk/pkg/cli/cmd/redpanda/config.go +++ b/src/go/rpk/pkg/cli/cmd/redpanda/config.go @@ -115,7 +115,7 @@ func bootstrap(fs afero.Fs) *cobra.Command { cfg.Redpanda.ID = id cfg.Redpanda.RPCServer.Address = ownIP.String() - cfg.Redpanda.KafkaAPI = []config.NamedSocketAddress{{ + cfg.Redpanda.KafkaAPI = []config.NamedAuthNSocketAddress{{ Address: ownIP.String(), Port: config.DefaultKafkaPort, }} diff --git a/src/go/rpk/pkg/cli/cmd/redpanda/start.go b/src/go/rpk/pkg/cli/cmd/redpanda/start.go index c306d27236b2..53038075ad20 100644 --- a/src/go/rpk/pkg/cli/cmd/redpanda/start.go +++ b/src/go/rpk/pkg/cli/cmd/redpanda/start.go @@ -196,7 +196,7 @@ func NewStartCommand(fs afero.Fs, launcher rp.Launcher) *cobra.Command { ",", ), ) - kafkaAPI, err := parseNamedAddresses( + kafkaAPI, err := parseNamedAuthNAddresses( kafkaAddr, config.DefaultKafkaPort, ) @@ -939,6 +939,59 @@ func parseNamedAddress( }, nil } +func parseNamedAuthNAddresses( + addrs []string, defaultPort int, +) ([]config.NamedAuthNSocketAddress, error) { + as := make([]config.NamedAuthNSocketAddress, 0, len(addrs)) + for _, addr := range addrs { + a, err := parseNamedAuthNAddress(addr, defaultPort) + if err != nil { + return nil, err + } + if a != nil { + as = append(as, *a) + } + } + return as, nil +} + +func parseNamedAuthNAddress( + addrAuthn string, defaultPort int, +) (*config.NamedAuthNSocketAddress, error) { + if addrAuthn == "" { + return nil, nil + } + addr, authn, err := splitAddressAuthN(addrAuthn) + if err != nil { + return nil, err + } + scheme, hostport, err := net.ParseHostMaybeScheme(addr) + if err != nil { + return nil, err + } + host, port := net.SplitHostPortDefault(hostport, defaultPort) + + return &config.NamedAuthNSocketAddress{ + Address: host, + Port: port, + Name: scheme, + AuthN: authn, + }, nil +} + +func splitAddressAuthN(str string) (addr string, authn *string, err error) { + bits := strings.Split(str, "|") + if len(bits) > 2 { + err = fmt.Errorf(`invalid format for listener, at most one "|" can be present: %q`, str) + return + } + addr = bits[0] + if len(bits) == 2 { + authn = &bits[1] + } + return +} + func sendEnv(fs afero.Fs, env api.EnvironmentPayload, conf *config.Config, skipChecks bool, err error) { if err != nil { env.ErrorMsg = err.Error() diff --git a/src/go/rpk/pkg/cli/cmd/redpanda/start_test.go b/src/go/rpk/pkg/cli/cmd/redpanda/start_test.go index cfcfde0a3556..eeaa3738a9ce 100644 --- a/src/go/rpk/pkg/cli/cmd/redpanda/start_test.go +++ b/src/go/rpk/pkg/cli/cmd/redpanda/start_test.go @@ -96,6 +96,54 @@ func TestMergeFlags(t *testing.T) { } } +func TestParseNamedAuthNAddress(t *testing.T) { + authNSasl := "sasl" + tests := []struct { + name string + arg string + expected config.NamedAuthNSocketAddress + expectedErrMsg string + }{ + { + name: "it should parse host:port", + arg: "host:9092", + expected: config.NamedAuthNSocketAddress{Address: "host", Port: 9092, Name: ""}, + }, + { + name: "it should parse scheme://host:port", + arg: "scheme://host:9092", + expected: config.NamedAuthNSocketAddress{Address: "host", Port: 9092, Name: "scheme"}, + }, + { + name: "it should parse host:port|authn", + arg: "host:9092|sasl", + expected: config.NamedAuthNSocketAddress{Address: "host", Port: 9092, Name: "", AuthN: &authNSasl}, + }, + { + name: "it should parse scheme://host:port|authn", + arg: "scheme://host:9092|sasl", + expected: config.NamedAuthNSocketAddress{Address: "host", Port: 9092, Name: "scheme", AuthN: &authNSasl}, + }, + { + name: "it should fail for multiple |", + arg: "host|sasl|ignore", + expected: config.NamedAuthNSocketAddress{}, + expectedErrMsg: `invalid format for listener, at most one "|" can be present: "host|sasl|ignore"`, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(st *testing.T) { + res, err := parseNamedAuthNAddress(tt.arg, 19092) + if tt.expectedErrMsg != "" { + require.EqualError(st, err, tt.expectedErrMsg) + return + } + require.Exactly(st, tt.expected, *res) + }) + } +} + func TestParseSeeds(t *testing.T) { tests := []struct { name string @@ -152,6 +200,7 @@ func TestParseSeeds(t *testing.T) { } func TestStartCommand(t *testing.T) { + authNSasl := "sasl" tests := []struct { name string launcher redpanda.Launcher @@ -262,7 +311,7 @@ func TestStartCommand(t *testing.T) { Address: "192.168.54.2", Port: 9643, }} - expectedKafkaAPI := []config.NamedSocketAddress{{ + expectedKafkaAPI := []config.NamedAuthNSocketAddress{{ Name: "external", Address: "192.168.73.45", Port: 9092, @@ -327,7 +376,7 @@ func TestStartCommand(t *testing.T) { Address: "192.168.54.2", Port: 9643, }} - expectedKafkaAPI := []config.NamedSocketAddress{{ + expectedKafkaAPI := []config.NamedAuthNSocketAddress{{ Name: "external", Address: "192.168.73.45", Port: 9092, @@ -378,7 +427,7 @@ func TestStartCommand(t *testing.T) { require.NoError(st, err) // The value set through the --kafka-addr flag should // have been picked. - expectedKafkaAPI := []config.NamedSocketAddress{{ + expectedKafkaAPI := []config.NamedAuthNSocketAddress{{ Name: "flag", Address: "192.168.34.3", Port: 9093, @@ -825,7 +874,7 @@ func TestStartCommand(t *testing.T) { postCheck: func(fs afero.Fs, _ *redpanda.RedpandaArgs, st *testing.T) { conf, err := new(config.Params).Load(fs) require.NoError(st, err) - expectedAddr := []config.NamedSocketAddress{{ + expectedAddr := []config.NamedAuthNSocketAddress{{ Address: "192.168.34.32", Port: 33145, }} @@ -845,7 +894,7 @@ func TestStartCommand(t *testing.T) { postCheck: func(fs afero.Fs, _ *redpanda.RedpandaArgs, st *testing.T) { conf, err := new(config.Params).Load(fs) require.NoError(st, err) - expectedAddr := []config.NamedSocketAddress{{ + expectedAddr := []config.NamedAuthNSocketAddress{{ Address: "192.168.34.32", Port: 9092, }} @@ -865,7 +914,7 @@ func TestStartCommand(t *testing.T) { postCheck: func(fs afero.Fs, _ *redpanda.RedpandaArgs, st *testing.T) { conf, err := new(config.Params).Load(fs) require.NoError(st, err) - expectedAddr := []config.NamedSocketAddress{{ + expectedAddr := []config.NamedAuthNSocketAddress{{ Name: "nondefaultname", Address: "192.168.34.32", Port: 9092, @@ -881,18 +930,23 @@ func TestStartCommand(t *testing.T) { name: "it should parse the --kafka-addr and persist it (list)", args: []string{ "--install-dir", "/var/lib/redpanda", - "--kafka-addr", "nondefaultname://192.168.34.32,host:9092", + "--kafka-addr", "nondefaultname://192.168.34.32,host:9092,authn://host:9093|sasl", }, postCheck: func(fs afero.Fs, _ *redpanda.RedpandaArgs, st *testing.T) { conf, err := new(config.Params).Load(fs) require.NoError(st, err) - expectedAddr := []config.NamedSocketAddress{{ + expectedAddr := []config.NamedAuthNSocketAddress{{ Name: "nondefaultname", Address: "192.168.34.32", Port: 9092, }, { Address: "host", Port: 9092, + }, { + Name: "authn", + Address: "host", + Port: 9093, + AuthN: &authNSasl, }} // Check that the generated config is as expected. require.Exactly( @@ -923,7 +977,7 @@ func TestStartCommand(t *testing.T) { postCheck: func(fs afero.Fs, _ *redpanda.RedpandaArgs, st *testing.T) { conf, err := new(config.Params).Load(fs) require.NoError(st, err) - expectedAddr := []config.NamedSocketAddress{{ + expectedAddr := []config.NamedAuthNSocketAddress{{ Address: "host", Port: 3123, }} @@ -941,7 +995,7 @@ func TestStartCommand(t *testing.T) { }, before: func(fs afero.Fs) error { conf := config.Default() - conf.Redpanda.KafkaAPI = []config.NamedSocketAddress{{ + conf.Redpanda.KafkaAPI = []config.NamedAuthNSocketAddress{{ Address: "192.168.33.33", Port: 9892, }} @@ -950,7 +1004,7 @@ func TestStartCommand(t *testing.T) { postCheck: func(fs afero.Fs, _ *redpanda.RedpandaArgs, st *testing.T) { conf, err := new(config.Params).Load(fs) require.NoError(st, err) - expectedAddr := []config.NamedSocketAddress{{ + expectedAddr := []config.NamedAuthNSocketAddress{{ Address: "192.168.33.33", Port: 9892, }} diff --git a/src/go/rpk/pkg/config/config.go b/src/go/rpk/pkg/config/config.go index 06d439df26ca..76d99548cc43 100644 --- a/src/go/rpk/pkg/config/config.go +++ b/src/go/rpk/pkg/config/config.go @@ -36,7 +36,7 @@ func Default() *Config { Address: "0.0.0.0", Port: 33145, }, - KafkaAPI: []NamedSocketAddress{{ + KafkaAPI: []NamedAuthNSocketAddress{{ Address: "0.0.0.0", Port: 9092, }}, diff --git a/src/go/rpk/pkg/config/config_test.go b/src/go/rpk/pkg/config/config_test.go index 1551a5450c9d..9e5768985369 100644 --- a/src/go/rpk/pkg/config/config_test.go +++ b/src/go/rpk/pkg/config/config_test.go @@ -47,6 +47,7 @@ func getValidConfig() *Config { } func TestSet(t *testing.T) { + authNSasl := "sasl" tests := []struct { name string key string @@ -214,7 +215,7 @@ tune_cpu: true`, port: 9092 `, check: func(st *testing.T, c *Config) { - expected := []NamedSocketAddress{{ + expected := []NamedAuthNSocketAddress{{ Name: "external", Address: "192.168.73.45", Port: 9092, @@ -226,6 +227,31 @@ tune_cpu: true`, require.Exactly(st, expected, c.Redpanda.KafkaAPI) }, }, + { + name: "extract kafka_api[].authentication_method", + key: "redpanda.kafka_api", + value: `- name: external + address: 192.168.73.45 + port: 9092 + authentication_method: sasl +- name: internal + address: 10.21.34.58 + port: 9092 +`, + check: func(st *testing.T, c *Config) { + expected := []NamedAuthNSocketAddress{{ + Name: "external", + Address: "192.168.73.45", + Port: 9092, + AuthN: &authNSasl, + }, { + Name: "internal", + Address: "10.21.34.58", + Port: 9092, + }} + require.Exactly(st, expected, c.Redpanda.KafkaAPI) + }, + }, { name: "partially set map fields (json)", key: "redpanda.kafka_api", @@ -235,7 +261,7 @@ tune_cpu: true`, }]`, format: "json", check: func(st *testing.T, c *Config) { - expected := []NamedSocketAddress{{ + expected := []NamedAuthNSocketAddress{{ Port: 9092, Address: "192.168.54.2", }} @@ -347,7 +373,7 @@ func TestDefault(t *testing.T) { Redpanda: RedpandaConfig{ Directory: "/var/lib/redpanda/data", RPCServer: SocketAddress{"0.0.0.0", 33145}, - KafkaAPI: []NamedSocketAddress{{ + KafkaAPI: []NamedAuthNSocketAddress{{ Address: "0.0.0.0", Port: 9092, }}, diff --git a/src/go/rpk/pkg/config/params.go b/src/go/rpk/pkg/config/params.go index 3ed84d00ea7d..7539fbde307a 100644 --- a/src/go/rpk/pkg/config/params.go +++ b/src/go/rpk/pkg/config/params.go @@ -259,7 +259,7 @@ func (p *Params) Load(fs afero.Fs) (*Config, error) { Address: "0.0.0.0", Port: 33145, }, - KafkaAPI: []NamedSocketAddress{{ + KafkaAPI: []NamedAuthNSocketAddress{{ Address: "0.0.0.0", Port: 9092, }}, diff --git a/src/go/rpk/pkg/config/params_test.go b/src/go/rpk/pkg/config/params_test.go index e25f0e4a0349..91077a800a60 100644 --- a/src/go/rpk/pkg/config/params_test.go +++ b/src/go/rpk/pkg/config/params_test.go @@ -154,7 +154,7 @@ func TestRedpandaSampleFile(t *testing.T) { Address: "0.0.0.0", Port: 33145, }, - KafkaAPI: []NamedSocketAddress{{ + KafkaAPI: []NamedAuthNSocketAddress{{ Address: "0.0.0.0", Port: 9092, }}, diff --git a/src/go/rpk/pkg/config/schema.go b/src/go/rpk/pkg/config/schema.go index 98f42b0fbae2..aa96c9edd2ea 100644 --- a/src/go/rpk/pkg/config/schema.go +++ b/src/go/rpk/pkg/config/schema.go @@ -44,26 +44,26 @@ func (c *Config) File() *Config { } type RedpandaConfig struct { - Directory string `yaml:"data_directory,omitempty" json:"data_directory"` - ID int `yaml:"node_id" json:"node_id"` - Rack string `yaml:"rack,omitempty" json:"rack"` - SeedServers []SeedServer `yaml:"seed_servers" json:"seed_servers"` - RPCServer SocketAddress `yaml:"rpc_server" json:"rpc_server"` - RPCServerTLS []ServerTLS `yaml:"rpc_server_tls,omitempty" json:"rpc_server_tls"` - KafkaAPI []NamedSocketAddress `yaml:"kafka_api" json:"kafka_api"` - KafkaAPITLS []ServerTLS `yaml:"kafka_api_tls,omitempty" json:"kafka_api_tls"` - AdminAPI []NamedSocketAddress `yaml:"admin" json:"admin"` - AdminAPITLS []ServerTLS `yaml:"admin_api_tls,omitempty" json:"admin_api_tls"` - CoprocSupervisorServer SocketAddress `yaml:"coproc_supervisor_server,omitempty" json:"coproc_supervisor_server"` - AdminAPIDocDir string `yaml:"admin_api_doc_dir,omitempty" json:"admin_api_doc_dir"` - DashboardDir string `yaml:"dashboard_dir,omitempty" json:"dashboard_dir"` - CloudStorageCacheDirectory string `yaml:"cloud_storage_cache_directory,omitempty" json:"cloud_storage_cache_directory"` - AdvertisedRPCAPI *SocketAddress `yaml:"advertised_rpc_api,omitempty" json:"advertised_rpc_api,omitempty"` - AdvertisedKafkaAPI []NamedSocketAddress `yaml:"advertised_kafka_api,omitempty" json:"advertised_kafka_api,omitempty"` - DeveloperMode bool `yaml:"developer_mode" json:"developer_mode"` - AggregateMetrics bool `yaml:"aggregate_metrics,omitempty" json:"aggregate_metrics,omitempty"` - DisablePublicMetrics bool `yaml:"disable_public_metrics,omitempty" json:"disable_public_metrics,omitempty"` - Other map[string]interface{} `yaml:",inline"` + Directory string `yaml:"data_directory,omitempty" json:"data_directory"` + ID int `yaml:"node_id" json:"node_id"` + Rack string `yaml:"rack,omitempty" json:"rack"` + SeedServers []SeedServer `yaml:"seed_servers" json:"seed_servers"` + RPCServer SocketAddress `yaml:"rpc_server" json:"rpc_server"` + RPCServerTLS []ServerTLS `yaml:"rpc_server_tls,omitempty" json:"rpc_server_tls"` + KafkaAPI []NamedAuthNSocketAddress `yaml:"kafka_api" json:"kafka_api"` + KafkaAPITLS []ServerTLS `yaml:"kafka_api_tls,omitempty" json:"kafka_api_tls"` + AdminAPI []NamedSocketAddress `yaml:"admin" json:"admin"` + AdminAPITLS []ServerTLS `yaml:"admin_api_tls,omitempty" json:"admin_api_tls"` + CoprocSupervisorServer SocketAddress `yaml:"coproc_supervisor_server,omitempty" json:"coproc_supervisor_server"` + AdminAPIDocDir string `yaml:"admin_api_doc_dir,omitempty" json:"admin_api_doc_dir"` + DashboardDir string `yaml:"dashboard_dir,omitempty" json:"dashboard_dir"` + CloudStorageCacheDirectory string `yaml:"cloud_storage_cache_directory,omitempty" json:"cloud_storage_cache_directory"` + AdvertisedRPCAPI *SocketAddress `yaml:"advertised_rpc_api,omitempty" json:"advertised_rpc_api,omitempty"` + AdvertisedKafkaAPI []NamedSocketAddress `yaml:"advertised_kafka_api,omitempty" json:"advertised_kafka_api,omitempty"` + DeveloperMode bool `yaml:"developer_mode" json:"developer_mode"` + AggregateMetrics bool `yaml:"aggregate_metrics,omitempty" json:"aggregate_metrics,omitempty"` + DisablePublicMetrics bool `yaml:"disable_public_metrics,omitempty" json:"disable_public_metrics,omitempty"` + Other map[string]interface{} `yaml:",inline"` } type Pandaproxy struct { @@ -103,6 +103,13 @@ type NamedSocketAddress struct { Name string `yaml:"name,omitempty" json:"name,omitempty"` } +type NamedAuthNSocketAddress struct { + Address string `yaml:"address" json:"address"` + Port int `yaml:"port" json:"port"` + Name string `yaml:"name,omitempty" json:"name,omitempty"` + AuthN *string `yaml:"authentication_method,omitempty" json:"authentication_method,omitempty"` +} + type TLS struct { KeyFile string `yaml:"key_file,omitempty" json:"key_file"` CertFile string `yaml:"cert_file,omitempty" json:"cert_file"` diff --git a/src/go/rpk/pkg/config/weak.go b/src/go/rpk/pkg/config/weak.go index 86d77c7e5a0a..db8cd39d4b67 100644 --- a/src/go/rpk/pkg/config/weak.go +++ b/src/go/rpk/pkg/config/weak.go @@ -219,6 +219,30 @@ func (nsa *namedSocketAddresses) UnmarshalYAML(n *yaml.Node) error { return nil } +// namedAuthNSocketAddresses is an intermediary one_or_many type to be used +// during our transition to strictly typed configuration parameters. +// This type will: +// - parse an array of NamedAuthNSocketAddress +// - parse a single NamedAuthNSocketAddress to an array. +type namedAuthNSocketAddresses []NamedAuthNSocketAddress + +func (nsa *namedAuthNSocketAddresses) UnmarshalYAML(n *yaml.Node) error { + var multi []NamedAuthNSocketAddress + err := n.Decode(&multi) + if err == nil { + *nsa = multi + return nil + } + + var single NamedAuthNSocketAddress + err = n.Decode(&single) + if err != nil { + return err + } + *nsa = []NamedAuthNSocketAddress{single} + return nil +} + // serverTLSArray is an intermediary one_or_many type to be used during our // transition to strictly typed configuration parameters. This type will: // - parse an array of ServerTLS @@ -305,26 +329,26 @@ func (c *Config) UnmarshalYAML(n *yaml.Node) error { func (rpc *RedpandaConfig) UnmarshalYAML(n *yaml.Node) error { var internal struct { - Directory weakString `yaml:"data_directory"` - ID weakInt `yaml:"node_id" ` - Rack weakString `yaml:"rack"` - SeedServers seedServers `yaml:"seed_servers"` - RPCServer SocketAddress `yaml:"rpc_server"` - RPCServerTLS serverTLSArray `yaml:"rpc_server_tls"` - KafkaAPI namedSocketAddresses `yaml:"kafka_api"` - KafkaAPITLS serverTLSArray `yaml:"kafka_api_tls"` - AdminAPI namedSocketAddresses `yaml:"admin"` - AdminAPITLS serverTLSArray `yaml:"admin_api_tls"` - CoprocSupervisorServer SocketAddress `yaml:"coproc_supervisor_server"` - AdminAPIDocDir weakString `yaml:"admin_api_doc_dir"` - DashboardDir weakString `yaml:"dashboard_dir"` - CloudStorageCacheDirectory weakString `yaml:"cloud_storage_cache_directory"` - AdvertisedRPCAPI *SocketAddress `yaml:"advertised_rpc_api"` - AdvertisedKafkaAPI namedSocketAddresses `yaml:"advertised_kafka_api"` - DeveloperMode weakBool `yaml:"developer_mode"` - AggregateMetrics weakBool `yaml:"aggregate_metrics"` - DisablePublicMetrics weakBool `yaml:"disable_public_metrics"` - Other map[string]interface{} `yaml:",inline"` + Directory weakString `yaml:"data_directory"` + ID weakInt `yaml:"node_id" ` + Rack weakString `yaml:"rack"` + SeedServers seedServers `yaml:"seed_servers"` + RPCServer SocketAddress `yaml:"rpc_server"` + RPCServerTLS serverTLSArray `yaml:"rpc_server_tls"` + KafkaAPI namedAuthNSocketAddresses `yaml:"kafka_api"` + KafkaAPITLS serverTLSArray `yaml:"kafka_api_tls"` + AdminAPI namedSocketAddresses `yaml:"admin"` + AdminAPITLS serverTLSArray `yaml:"admin_api_tls"` + CoprocSupervisorServer SocketAddress `yaml:"coproc_supervisor_server"` + AdminAPIDocDir weakString `yaml:"admin_api_doc_dir"` + DashboardDir weakString `yaml:"dashboard_dir"` + CloudStorageCacheDirectory weakString `yaml:"cloud_storage_cache_directory"` + AdvertisedRPCAPI *SocketAddress `yaml:"advertised_rpc_api"` + AdvertisedKafkaAPI namedSocketAddresses `yaml:"advertised_kafka_api"` + DeveloperMode weakBool `yaml:"developer_mode"` + AggregateMetrics weakBool `yaml:"aggregate_metrics"` + DisablePublicMetrics weakBool `yaml:"disable_public_metrics"` + Other map[string]interface{} `yaml:",inline"` } if err := n.Decode(&internal); err != nil { @@ -592,6 +616,25 @@ func (nsa *NamedSocketAddress) UnmarshalYAML(n *yaml.Node) error { return nil } +func (nsa *NamedAuthNSocketAddress) UnmarshalYAML(n *yaml.Node) error { + var internal struct { + Name weakString `yaml:"name"` + Address weakString `yaml:"address" mapstructure:"address"` + Port weakInt `yaml:"port" mapstructure:"port"` + AuthN *weakString `yaml:"authentication_method" mapstructure:"authentication_method"` + } + + if err := n.Decode(&internal); err != nil { + return err + } + + nsa.Name = string(internal.Name) + nsa.Address = string(internal.Address) + nsa.Port = int(internal.Port) + nsa.AuthN = (*string)(internal.AuthN) + return nil +} + func (t *TLS) UnmarshalYAML(n *yaml.Node) error { var internal struct { KeyFile weakString `yaml:"key_file"` diff --git a/src/go/rpk/pkg/config/weak_test.go b/src/go/rpk/pkg/config/weak_test.go index 8e558e1c8e2a..ba1440a46283 100644 --- a/src/go/rpk/pkg/config/weak_test.go +++ b/src/go/rpk/pkg/config/weak_test.go @@ -417,6 +417,110 @@ func TestNamedSocketAddressArray(t *testing.T) { } } +func TestNamedAuthNSocketAddressArray(t *testing.T) { + authNMtlsIdentity := "mtls_identity" + authNSasl := "sasl" + authNNOne := "none" + for _, test := range []struct { + name string + data string + exp []NamedAuthNSocketAddress + expErr bool + }{ + { + name: "single namedAuthNSocketAddress", + data: "test_api:\n address: 0.0.0.0\n port: 80\n name: socket\n authentication_method: mtls_identity\n", + exp: []NamedAuthNSocketAddress{ + { + Name: "socket", + Address: "0.0.0.0", + Port: 80, + AuthN: &authNMtlsIdentity, + }, + }, + }, + { + name: "list of 1 namedSocketAddress", + data: "test_api:\n - name: socket\n address: 0.0.0.0\n port: 80\n authentication_method: sasl\n", + exp: []NamedAuthNSocketAddress{ + { + Name: "socket", + Address: "0.0.0.0", + Port: 80, + AuthN: &authNSasl, + }, + }, + }, + { + name: "list of namedSocketAddress", + data: `test_api: + - name: socket + address: 0.0.0.0 + port: 80 + authentication_method: mtls_identity + - name: socket2 + address: 0.0.0.1 + port: 81 + authentication_method: sasl + - name: socket3 + address: 0.0.0.2 + port: 81 + authentication_method: none + - name: socket4 + address: 0.0.0.3 + port: 81`, + exp: []NamedAuthNSocketAddress{ + { + Name: "socket", + Address: "0.0.0.0", + Port: 80, + AuthN: &authNMtlsIdentity, + }, + { + Name: "socket2", + Address: "0.0.0.1", + Port: 81, + AuthN: &authNSasl, + }, + { + Name: "socket3", + Address: "0.0.0.2", + Port: 81, + AuthN: &authNNOne, + }, + { + Name: "socket4", + Address: "0.0.0.3", + Port: 81, + }, + }, + }, + { + name: "unsupported types", + data: "test_api:\n address: [0.0.0.0]\n port: 80\n name: socket\n", + expErr: true, + }, + } { + t.Run(test.name, func(t *testing.T) { + var ts struct { + Sockets namedAuthNSocketAddresses `yaml:"test_api"` + } + err := yaml.Unmarshal([]byte(test.data), &ts) + + gotErr := err != nil + if gotErr != test.expErr { + t.Errorf("input %q: got err? %v, exp err? %v; error: %v", + test.data, gotErr, test.expErr, err) + return + } + if test.expErr { + return + } + require.Equal(t, namedAuthNSocketAddresses(test.exp), ts.Sockets) + }) + } +} + func TestServerTLSArray(t *testing.T) { for _, test := range []struct { name string @@ -885,9 +989,9 @@ rpk: {RequireClientAuth: false, TruststoreFile: "certs/tls-ca.pem"}, }, AdvertisedRPCAPI: &SocketAddress{"0.0.0.0", 33145}, - KafkaAPI: []NamedSocketAddress{ - {"0.0.0.0", 9092, "internal"}, - {"0.0.0.0", 9093, "external"}, + KafkaAPI: []NamedAuthNSocketAddress{ + {"0.0.0.0", 9092, "internal", nil}, + {"0.0.0.0", 9093, "external", nil}, }, KafkaAPITLS: []ServerTLS{ {Name: "external", KeyFile: "certs/tls-key.pem"}, @@ -1123,9 +1227,9 @@ rpk: {RequireClientAuth: false, TruststoreFile: "certs/tls-ca.pem"}, }, AdvertisedRPCAPI: &SocketAddress{"0.0.0.0", 33145}, - KafkaAPI: []NamedSocketAddress{ - {"0.0.0.0", 9092, "internal"}, - {"0.0.0.0", 9093, "external"}, + KafkaAPI: []NamedAuthNSocketAddress{ + {"0.0.0.0", 9092, "internal", nil}, + {"0.0.0.0", 9093, "external", nil}, }, KafkaAPITLS: []ServerTLS{ {Name: "external", KeyFile: "certs/tls-key.pem"},