From c05a3eb774601e3d899e4025cc61c103d64c5271 Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Fri, 15 Jul 2022 16:03:20 +0100 Subject: [PATCH] rpk: Support listener authN Signed-off-by: Ben Pope --- src/go/rpk/pkg/cli/cmd/redpanda/config.go | 2 +- src/go/rpk/pkg/cli/cmd/redpanda/start.go | 38 ++++++++- src/go/rpk/pkg/cli/cmd/redpanda/start_test.go | 20 ++--- src/go/rpk/pkg/config/config.go | 2 +- src/go/rpk/pkg/config/config_test.go | 6 +- src/go/rpk/pkg/config/params.go | 2 +- src/go/rpk/pkg/config/schema.go | 43 +++++----- src/go/rpk/pkg/config/weak.go | 79 ++++++++++++++----- src/go/rpk/pkg/config/weak_test.go | 12 +-- 9 files changed, 145 insertions(+), 59 deletions(-) diff --git a/src/go/rpk/pkg/cli/cmd/redpanda/config.go b/src/go/rpk/pkg/cli/cmd/redpanda/config.go index c5be52368df2b..497ea5c27d73f 100644 --- a/src/go/rpk/pkg/cli/cmd/redpanda/config.go +++ b/src/go/rpk/pkg/cli/cmd/redpanda/config.go @@ -115,7 +115,7 @@ func bootstrap(fs afero.Fs) *cobra.Command { cfg.Redpanda.ID = id cfg.Redpanda.RPCServer.Address = ownIP.String() - cfg.Redpanda.KafkaAPI = []config.NamedSocketAddress{{ + cfg.Redpanda.KafkaAPI = []config.NamedAuthNSocketAddress{{ Address: ownIP.String(), Port: config.DefaultKafkaPort, }} diff --git a/src/go/rpk/pkg/cli/cmd/redpanda/start.go b/src/go/rpk/pkg/cli/cmd/redpanda/start.go index 29bd6805541ec..464b7652904c8 100644 --- a/src/go/rpk/pkg/cli/cmd/redpanda/start.go +++ b/src/go/rpk/pkg/cli/cmd/redpanda/start.go @@ -196,7 +196,7 @@ func NewStartCommand(fs afero.Fs, launcher rp.Launcher) *cobra.Command { ",", ), ) - kafkaAPI, err := parseNamedAddresses( + kafkaAPI, err := parseNamedAuthNAddresses( kafkaAddr, config.DefaultKafkaPort, ) @@ -939,6 +939,42 @@ func parseNamedAddress( }, nil } +func parseNamedAuthNAddresses( + addrs []string, defaultPort int, +) ([]config.NamedAuthNSocketAddress, error) { + as := make([]config.NamedAuthNSocketAddress, 0, len(addrs)) + for _, addr := range addrs { + a, err := parseNamedAuthNAddress(addr, defaultPort, nil) + if err != nil { + return nil, err + } + if a != nil { + as = append(as, *a) + } + } + return as, nil +} + +func parseNamedAuthNAddress( + addr string, defaultPort int, authN *string, +) (*config.NamedAuthNSocketAddress, error) { + if addr == "" { + return nil, nil + } + scheme, hostport, err := net.ParseHostMaybeScheme(addr) + if err != nil { + return nil, err + } + host, port := net.SplitHostPortDefault(hostport, defaultPort) + + return &config.NamedAuthNSocketAddress{ + Address: host, + Port: port, + Name: scheme, + AuthN: authN, + }, nil +} + func sendEnv(fs afero.Fs, env api.EnvironmentPayload, conf *config.Config, skipChecks bool, err error) { if err != nil { env.ErrorMsg = err.Error() diff --git a/src/go/rpk/pkg/cli/cmd/redpanda/start_test.go b/src/go/rpk/pkg/cli/cmd/redpanda/start_test.go index cfcfde0a3556d..e77239e404718 100644 --- a/src/go/rpk/pkg/cli/cmd/redpanda/start_test.go +++ b/src/go/rpk/pkg/cli/cmd/redpanda/start_test.go @@ -262,7 +262,7 @@ func TestStartCommand(t *testing.T) { Address: "192.168.54.2", Port: 9643, }} - expectedKafkaAPI := []config.NamedSocketAddress{{ + expectedKafkaAPI := []config.NamedAuthNSocketAddress{{ Name: "external", Address: "192.168.73.45", Port: 9092, @@ -327,7 +327,7 @@ func TestStartCommand(t *testing.T) { Address: "192.168.54.2", Port: 9643, }} - expectedKafkaAPI := []config.NamedSocketAddress{{ + expectedKafkaAPI := []config.NamedAuthNSocketAddress{{ Name: "external", Address: "192.168.73.45", Port: 9092, @@ -378,7 +378,7 @@ func TestStartCommand(t *testing.T) { require.NoError(st, err) // The value set through the --kafka-addr flag should // have been picked. - expectedKafkaAPI := []config.NamedSocketAddress{{ + expectedKafkaAPI := []config.NamedAuthNSocketAddress{{ Name: "flag", Address: "192.168.34.3", Port: 9093, @@ -825,7 +825,7 @@ func TestStartCommand(t *testing.T) { postCheck: func(fs afero.Fs, _ *redpanda.RedpandaArgs, st *testing.T) { conf, err := new(config.Params).Load(fs) require.NoError(st, err) - expectedAddr := []config.NamedSocketAddress{{ + expectedAddr := []config.NamedAuthNSocketAddress{{ Address: "192.168.34.32", Port: 33145, }} @@ -845,7 +845,7 @@ func TestStartCommand(t *testing.T) { postCheck: func(fs afero.Fs, _ *redpanda.RedpandaArgs, st *testing.T) { conf, err := new(config.Params).Load(fs) require.NoError(st, err) - expectedAddr := []config.NamedSocketAddress{{ + expectedAddr := []config.NamedAuthNSocketAddress{{ Address: "192.168.34.32", Port: 9092, }} @@ -865,7 +865,7 @@ func TestStartCommand(t *testing.T) { postCheck: func(fs afero.Fs, _ *redpanda.RedpandaArgs, st *testing.T) { conf, err := new(config.Params).Load(fs) require.NoError(st, err) - expectedAddr := []config.NamedSocketAddress{{ + expectedAddr := []config.NamedAuthNSocketAddress{{ Name: "nondefaultname", Address: "192.168.34.32", Port: 9092, @@ -886,7 +886,7 @@ func TestStartCommand(t *testing.T) { postCheck: func(fs afero.Fs, _ *redpanda.RedpandaArgs, st *testing.T) { conf, err := new(config.Params).Load(fs) require.NoError(st, err) - expectedAddr := []config.NamedSocketAddress{{ + expectedAddr := []config.NamedAuthNSocketAddress{{ Name: "nondefaultname", Address: "192.168.34.32", Port: 9092, @@ -923,7 +923,7 @@ func TestStartCommand(t *testing.T) { postCheck: func(fs afero.Fs, _ *redpanda.RedpandaArgs, st *testing.T) { conf, err := new(config.Params).Load(fs) require.NoError(st, err) - expectedAddr := []config.NamedSocketAddress{{ + expectedAddr := []config.NamedAuthNSocketAddress{{ Address: "host", Port: 3123, }} @@ -941,7 +941,7 @@ func TestStartCommand(t *testing.T) { }, before: func(fs afero.Fs) error { conf := config.Default() - conf.Redpanda.KafkaAPI = []config.NamedSocketAddress{{ + conf.Redpanda.KafkaAPI = []config.NamedAuthNSocketAddress{{ Address: "192.168.33.33", Port: 9892, }} @@ -950,7 +950,7 @@ func TestStartCommand(t *testing.T) { postCheck: func(fs afero.Fs, _ *redpanda.RedpandaArgs, st *testing.T) { conf, err := new(config.Params).Load(fs) require.NoError(st, err) - expectedAddr := []config.NamedSocketAddress{{ + expectedAddr := []config.NamedAuthNSocketAddress{{ Address: "192.168.33.33", Port: 9892, }} diff --git a/src/go/rpk/pkg/config/config.go b/src/go/rpk/pkg/config/config.go index 06d439df26caa..76d99548cc43d 100644 --- a/src/go/rpk/pkg/config/config.go +++ b/src/go/rpk/pkg/config/config.go @@ -36,7 +36,7 @@ func Default() *Config { Address: "0.0.0.0", Port: 33145, }, - KafkaAPI: []NamedSocketAddress{{ + KafkaAPI: []NamedAuthNSocketAddress{{ Address: "0.0.0.0", Port: 9092, }}, diff --git a/src/go/rpk/pkg/config/config_test.go b/src/go/rpk/pkg/config/config_test.go index 1551a5450c9d1..c99f140cc6427 100644 --- a/src/go/rpk/pkg/config/config_test.go +++ b/src/go/rpk/pkg/config/config_test.go @@ -214,7 +214,7 @@ tune_cpu: true`, port: 9092 `, check: func(st *testing.T, c *Config) { - expected := []NamedSocketAddress{{ + expected := []NamedAuthNSocketAddress{{ Name: "external", Address: "192.168.73.45", Port: 9092, @@ -235,7 +235,7 @@ tune_cpu: true`, }]`, format: "json", check: func(st *testing.T, c *Config) { - expected := []NamedSocketAddress{{ + expected := []NamedAuthNSocketAddress{{ Port: 9092, Address: "192.168.54.2", }} @@ -347,7 +347,7 @@ func TestDefault(t *testing.T) { Redpanda: RedpandaConfig{ Directory: "/var/lib/redpanda/data", RPCServer: SocketAddress{"0.0.0.0", 33145}, - KafkaAPI: []NamedSocketAddress{{ + KafkaAPI: []NamedAuthNSocketAddress{{ Address: "0.0.0.0", Port: 9092, }}, diff --git a/src/go/rpk/pkg/config/params.go b/src/go/rpk/pkg/config/params.go index 3ed84d00ea7df..7539fbde307ad 100644 --- a/src/go/rpk/pkg/config/params.go +++ b/src/go/rpk/pkg/config/params.go @@ -259,7 +259,7 @@ func (p *Params) Load(fs afero.Fs) (*Config, error) { Address: "0.0.0.0", Port: 33145, }, - KafkaAPI: []NamedSocketAddress{{ + KafkaAPI: []NamedAuthNSocketAddress{{ Address: "0.0.0.0", Port: 9092, }}, diff --git a/src/go/rpk/pkg/config/schema.go b/src/go/rpk/pkg/config/schema.go index f3e7ebfdd5be4..b870b31ab6644 100644 --- a/src/go/rpk/pkg/config/schema.go +++ b/src/go/rpk/pkg/config/schema.go @@ -44,24 +44,24 @@ func (c *Config) File() *Config { } type RedpandaConfig struct { - Directory string `yaml:"data_directory,omitempty" json:"data_directory"` - ID int `yaml:"node_id" json:"node_id"` - Rack string `yaml:"rack,omitempty" json:"rack"` - SeedServers []SeedServer `yaml:"seed_servers" json:"seed_servers"` - RPCServer SocketAddress `yaml:"rpc_server" json:"rpc_server"` - RPCServerTLS []ServerTLS `yaml:"rpc_server_tls,omitempty" json:"rpc_server_tls"` - KafkaAPI []NamedSocketAddress `yaml:"kafka_api" json:"kafka_api"` - KafkaAPITLS []ServerTLS `yaml:"kafka_api_tls,omitempty" json:"kafka_api_tls"` - AdminAPI []NamedSocketAddress `yaml:"admin" json:"admin"` - AdminAPITLS []ServerTLS `yaml:"admin_api_tls,omitempty" json:"admin_api_tls"` - CoprocSupervisorServer SocketAddress `yaml:"coproc_supervisor_server,omitempty" json:"coproc_supervisor_server"` - AdminAPIDocDir string `yaml:"admin_api_doc_dir,omitempty" json:"admin_api_doc_dir"` - DashboardDir string `yaml:"dashboard_dir,omitempty" json:"dashboard_dir"` - CloudStorageCacheDirectory string `yaml:"cloud_storage_cache_directory,omitempty" json:"cloud_storage_cache_directory"` - AdvertisedRPCAPI *SocketAddress `yaml:"advertised_rpc_api,omitempty" json:"advertised_rpc_api,omitempty"` - AdvertisedKafkaAPI []NamedSocketAddress `yaml:"advertised_kafka_api,omitempty" json:"advertised_kafka_api,omitempty"` - DeveloperMode bool `yaml:"developer_mode" json:"developer_mode"` - Other map[string]interface{} `yaml:",inline"` + Directory string `yaml:"data_directory,omitempty" json:"data_directory"` + ID int `yaml:"node_id" json:"node_id"` + Rack string `yaml:"rack,omitempty" json:"rack"` + SeedServers []SeedServer `yaml:"seed_servers" json:"seed_servers"` + RPCServer SocketAddress `yaml:"rpc_server" json:"rpc_server"` + RPCServerTLS []ServerTLS `yaml:"rpc_server_tls,omitempty" json:"rpc_server_tls"` + KafkaAPI []NamedAuthNSocketAddress `yaml:"kafka_api" json:"kafka_api"` + KafkaAPITLS []ServerTLS `yaml:"kafka_api_tls,omitempty" json:"kafka_api_tls"` + AdminAPI []NamedSocketAddress `yaml:"admin" json:"admin"` + AdminAPITLS []ServerTLS `yaml:"admin_api_tls,omitempty" json:"admin_api_tls"` + CoprocSupervisorServer SocketAddress `yaml:"coproc_supervisor_server,omitempty" json:"coproc_supervisor_server"` + AdminAPIDocDir string `yaml:"admin_api_doc_dir,omitempty" json:"admin_api_doc_dir"` + DashboardDir string `yaml:"dashboard_dir,omitempty" json:"dashboard_dir"` + CloudStorageCacheDirectory string `yaml:"cloud_storage_cache_directory,omitempty" json:"cloud_storage_cache_directory"` + AdvertisedRPCAPI *SocketAddress `yaml:"advertised_rpc_api,omitempty" json:"advertised_rpc_api,omitempty"` + AdvertisedKafkaAPI []NamedSocketAddress `yaml:"advertised_kafka_api,omitempty" json:"advertised_kafka_api,omitempty"` + DeveloperMode bool `yaml:"developer_mode" json:"developer_mode"` + Other map[string]interface{} `yaml:",inline"` } type Pandaproxy struct { @@ -101,6 +101,13 @@ type NamedSocketAddress struct { Name string `yaml:"name,omitempty" json:"name,omitempty"` } +type NamedAuthNSocketAddress struct { + Address string `yaml:"address" json:"address"` + Port int `yaml:"port" json:"port"` + Name string `yaml:"name,omitempty" json:"name,omitempty"` + AuthN *string `yaml:"authentication_method,omitempty" json:"authentication_method,omitempty"` +} + type TLS struct { KeyFile string `yaml:"key_file,omitempty" json:"key_file"` CertFile string `yaml:"cert_file,omitempty" json:"cert_file"` diff --git a/src/go/rpk/pkg/config/weak.go b/src/go/rpk/pkg/config/weak.go index 0c9b6a2c4cca5..ef4769550f7f9 100644 --- a/src/go/rpk/pkg/config/weak.go +++ b/src/go/rpk/pkg/config/weak.go @@ -219,6 +219,30 @@ func (nsa *namedSocketAddresses) UnmarshalYAML(n *yaml.Node) error { return nil } +// namedAuthNSocketAddresses is an intermediary one_or_many type to be used +// during our transition to strictly typed configuration parameters. +// This type will: +// - parse an array of NamedAuthNSocketAddress +// - parse a single NamedAuthNSocketAddress to an array. +type namedAuthNSocketAddresses []NamedAuthNSocketAddress + +func (nsa *namedAuthNSocketAddresses) UnmarshalYAML(n *yaml.Node) error { + var multi []NamedAuthNSocketAddress + err := n.Decode(&multi) + if err == nil { + *nsa = multi + return nil + } + + var single NamedAuthNSocketAddress + err = n.Decode(&single) + if err != nil { + return err + } + *nsa = []NamedAuthNSocketAddress{single} + return nil +} + // serverTLSArray is an intermediary one_or_many type to be used during our // transition to strictly typed configuration parameters. This type will: // - parse an array of ServerTLS @@ -305,24 +329,24 @@ func (c *Config) UnmarshalYAML(n *yaml.Node) error { func (rpc *RedpandaConfig) UnmarshalYAML(n *yaml.Node) error { var internal struct { - Directory weakString `yaml:"data_directory"` - ID weakInt `yaml:"node_id" ` - Rack weakString `yaml:"rack"` - SeedServers seedServers `yaml:"seed_servers"` - RPCServer SocketAddress `yaml:"rpc_server"` - RPCServerTLS serverTLSArray `yaml:"rpc_server_tls"` - KafkaAPI namedSocketAddresses `yaml:"kafka_api"` - KafkaAPITLS serverTLSArray `yaml:"kafka_api_tls"` - AdminAPI namedSocketAddresses `yaml:"admin"` - AdminAPITLS serverTLSArray `yaml:"admin_api_tls"` - CoprocSupervisorServer SocketAddress `yaml:"coproc_supervisor_server"` - AdminAPIDocDir weakString `yaml:"admin_api_doc_dir"` - DashboardDir weakString `yaml:"dashboard_dir"` - CloudStorageCacheDirectory weakString `yaml:"cloud_storage_cache_directory"` - AdvertisedRPCAPI *SocketAddress `yaml:"advertised_rpc_api"` - AdvertisedKafkaAPI namedSocketAddresses `yaml:"advertised_kafka_api"` - DeveloperMode weakBool `yaml:"developer_mode"` - Other map[string]interface{} `yaml:",inline"` + Directory weakString `yaml:"data_directory"` + ID weakInt `yaml:"node_id" ` + Rack weakString `yaml:"rack"` + SeedServers seedServers `yaml:"seed_servers"` + RPCServer SocketAddress `yaml:"rpc_server"` + RPCServerTLS serverTLSArray `yaml:"rpc_server_tls"` + KafkaAPI namedAuthNSocketAddresses `yaml:"kafka_api"` + KafkaAPITLS serverTLSArray `yaml:"kafka_api_tls"` + AdminAPI namedSocketAddresses `yaml:"admin"` + AdminAPITLS serverTLSArray `yaml:"admin_api_tls"` + CoprocSupervisorServer SocketAddress `yaml:"coproc_supervisor_server"` + AdminAPIDocDir weakString `yaml:"admin_api_doc_dir"` + DashboardDir weakString `yaml:"dashboard_dir"` + CloudStorageCacheDirectory weakString `yaml:"cloud_storage_cache_directory"` + AdvertisedRPCAPI *SocketAddress `yaml:"advertised_rpc_api"` + AdvertisedKafkaAPI namedSocketAddresses `yaml:"advertised_kafka_api"` + DeveloperMode weakBool `yaml:"developer_mode"` + Other map[string]interface{} `yaml:",inline"` } if err := n.Decode(&internal); err != nil { @@ -588,6 +612,25 @@ func (nsa *NamedSocketAddress) UnmarshalYAML(n *yaml.Node) error { return nil } +func (nsa *NamedAuthNSocketAddress) UnmarshalYAML(n *yaml.Node) error { + var internal struct { + Name weakString `yaml:"name"` + Address weakString `yaml:"address" mapstructure:"address"` + Port weakInt `yaml:"port" mapstructure:"port"` + AuthN *weakString `yaml:"authentication_method" mapstructure:"authentication_method"` + } + + if err := n.Decode(&internal); err != nil { + return err + } + + nsa.Name = string(internal.Name) + nsa.Address = string(internal.Address) + nsa.Port = int(internal.Port) + nsa.AuthN = (*string)(internal.AuthN) + return nil +} + func (t *TLS) UnmarshalYAML(n *yaml.Node) error { var internal struct { KeyFile weakString `yaml:"key_file"` diff --git a/src/go/rpk/pkg/config/weak_test.go b/src/go/rpk/pkg/config/weak_test.go index 00f4b3cb4f273..49b12bb6e0e55 100644 --- a/src/go/rpk/pkg/config/weak_test.go +++ b/src/go/rpk/pkg/config/weak_test.go @@ -883,9 +883,9 @@ rpk: {RequireClientAuth: false, TruststoreFile: "certs/tls-ca.pem"}, }, AdvertisedRPCAPI: &SocketAddress{"0.0.0.0", 33145}, - KafkaAPI: []NamedSocketAddress{ - {"0.0.0.0", 9092, "internal"}, - {"0.0.0.0", 9093, "external"}, + KafkaAPI: []NamedAuthNSocketAddress{ + {"0.0.0.0", 9092, "internal", nil}, + {"0.0.0.0", 9093, "external", nil}, }, KafkaAPITLS: []ServerTLS{ {Name: "external", KeyFile: "certs/tls-key.pem"}, @@ -1117,9 +1117,9 @@ rpk: {RequireClientAuth: false, TruststoreFile: "certs/tls-ca.pem"}, }, AdvertisedRPCAPI: &SocketAddress{"0.0.0.0", 33145}, - KafkaAPI: []NamedSocketAddress{ - {"0.0.0.0", 9092, "internal"}, - {"0.0.0.0", 9093, "external"}, + KafkaAPI: []NamedAuthNSocketAddress{ + {"0.0.0.0", 9092, "internal", nil}, + {"0.0.0.0", 9093, "external", nil}, }, KafkaAPITLS: []ServerTLS{ {Name: "external", KeyFile: "certs/tls-key.pem"},