Skip to content

Commit

Permalink
rpk: Support listener authN
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Pope <ben@redpanda.com>
  • Loading branch information
BenPope committed Jul 15, 2022
1 parent 0b8e0c1 commit c05a3eb
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 59 deletions.
2 changes: 1 addition & 1 deletion src/go/rpk/pkg/cli/cmd/redpanda/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func bootstrap(fs afero.Fs) *cobra.Command {

cfg.Redpanda.ID = id
cfg.Redpanda.RPCServer.Address = ownIP.String()
cfg.Redpanda.KafkaAPI = []config.NamedSocketAddress{{
cfg.Redpanda.KafkaAPI = []config.NamedAuthNSocketAddress{{
Address: ownIP.String(),
Port: config.DefaultKafkaPort,
}}
Expand Down
38 changes: 37 additions & 1 deletion src/go/rpk/pkg/cli/cmd/redpanda/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func NewStartCommand(fs afero.Fs, launcher rp.Launcher) *cobra.Command {
",",
),
)
kafkaAPI, err := parseNamedAddresses(
kafkaAPI, err := parseNamedAuthNAddresses(
kafkaAddr,
config.DefaultKafkaPort,
)
Expand Down Expand Up @@ -939,6 +939,42 @@ func parseNamedAddress(
}, nil
}

func parseNamedAuthNAddresses(
addrs []string, defaultPort int,
) ([]config.NamedAuthNSocketAddress, error) {
as := make([]config.NamedAuthNSocketAddress, 0, len(addrs))
for _, addr := range addrs {
a, err := parseNamedAuthNAddress(addr, defaultPort, nil)
if err != nil {
return nil, err
}
if a != nil {
as = append(as, *a)
}
}
return as, nil
}

func parseNamedAuthNAddress(
addr string, defaultPort int, authN *string,
) (*config.NamedAuthNSocketAddress, error) {
if addr == "" {
return nil, nil
}
scheme, hostport, err := net.ParseHostMaybeScheme(addr)
if err != nil {
return nil, err
}
host, port := net.SplitHostPortDefault(hostport, defaultPort)

return &config.NamedAuthNSocketAddress{
Address: host,
Port: port,
Name: scheme,
AuthN: authN,
}, nil
}

func sendEnv(fs afero.Fs, env api.EnvironmentPayload, conf *config.Config, skipChecks bool, err error) {
if err != nil {
env.ErrorMsg = err.Error()
Expand Down
20 changes: 10 additions & 10 deletions src/go/rpk/pkg/cli/cmd/redpanda/start_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func TestStartCommand(t *testing.T) {
Address: "192.168.54.2",
Port: 9643,
}}
expectedKafkaAPI := []config.NamedSocketAddress{{
expectedKafkaAPI := []config.NamedAuthNSocketAddress{{
Name: "external",
Address: "192.168.73.45",
Port: 9092,
Expand Down Expand Up @@ -327,7 +327,7 @@ func TestStartCommand(t *testing.T) {
Address: "192.168.54.2",
Port: 9643,
}}
expectedKafkaAPI := []config.NamedSocketAddress{{
expectedKafkaAPI := []config.NamedAuthNSocketAddress{{
Name: "external",
Address: "192.168.73.45",
Port: 9092,
Expand Down Expand Up @@ -378,7 +378,7 @@ func TestStartCommand(t *testing.T) {
require.NoError(st, err)
// The value set through the --kafka-addr flag should
// have been picked.
expectedKafkaAPI := []config.NamedSocketAddress{{
expectedKafkaAPI := []config.NamedAuthNSocketAddress{{
Name: "flag",
Address: "192.168.34.3",
Port: 9093,
Expand Down Expand Up @@ -825,7 +825,7 @@ func TestStartCommand(t *testing.T) {
postCheck: func(fs afero.Fs, _ *redpanda.RedpandaArgs, st *testing.T) {
conf, err := new(config.Params).Load(fs)
require.NoError(st, err)
expectedAddr := []config.NamedSocketAddress{{
expectedAddr := []config.NamedAuthNSocketAddress{{
Address: "192.168.34.32",
Port: 33145,
}}
Expand All @@ -845,7 +845,7 @@ func TestStartCommand(t *testing.T) {
postCheck: func(fs afero.Fs, _ *redpanda.RedpandaArgs, st *testing.T) {
conf, err := new(config.Params).Load(fs)
require.NoError(st, err)
expectedAddr := []config.NamedSocketAddress{{
expectedAddr := []config.NamedAuthNSocketAddress{{
Address: "192.168.34.32",
Port: 9092,
}}
Expand All @@ -865,7 +865,7 @@ func TestStartCommand(t *testing.T) {
postCheck: func(fs afero.Fs, _ *redpanda.RedpandaArgs, st *testing.T) {
conf, err := new(config.Params).Load(fs)
require.NoError(st, err)
expectedAddr := []config.NamedSocketAddress{{
expectedAddr := []config.NamedAuthNSocketAddress{{
Name: "nondefaultname",
Address: "192.168.34.32",
Port: 9092,
Expand All @@ -886,7 +886,7 @@ func TestStartCommand(t *testing.T) {
postCheck: func(fs afero.Fs, _ *redpanda.RedpandaArgs, st *testing.T) {
conf, err := new(config.Params).Load(fs)
require.NoError(st, err)
expectedAddr := []config.NamedSocketAddress{{
expectedAddr := []config.NamedAuthNSocketAddress{{
Name: "nondefaultname",
Address: "192.168.34.32",
Port: 9092,
Expand Down Expand Up @@ -923,7 +923,7 @@ func TestStartCommand(t *testing.T) {
postCheck: func(fs afero.Fs, _ *redpanda.RedpandaArgs, st *testing.T) {
conf, err := new(config.Params).Load(fs)
require.NoError(st, err)
expectedAddr := []config.NamedSocketAddress{{
expectedAddr := []config.NamedAuthNSocketAddress{{
Address: "host",
Port: 3123,
}}
Expand All @@ -941,7 +941,7 @@ func TestStartCommand(t *testing.T) {
},
before: func(fs afero.Fs) error {
conf := config.Default()
conf.Redpanda.KafkaAPI = []config.NamedSocketAddress{{
conf.Redpanda.KafkaAPI = []config.NamedAuthNSocketAddress{{
Address: "192.168.33.33",
Port: 9892,
}}
Expand All @@ -950,7 +950,7 @@ func TestStartCommand(t *testing.T) {
postCheck: func(fs afero.Fs, _ *redpanda.RedpandaArgs, st *testing.T) {
conf, err := new(config.Params).Load(fs)
require.NoError(st, err)
expectedAddr := []config.NamedSocketAddress{{
expectedAddr := []config.NamedAuthNSocketAddress{{
Address: "192.168.33.33",
Port: 9892,
}}
Expand Down
2 changes: 1 addition & 1 deletion src/go/rpk/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func Default() *Config {
Address: "0.0.0.0",
Port: 33145,
},
KafkaAPI: []NamedSocketAddress{{
KafkaAPI: []NamedAuthNSocketAddress{{
Address: "0.0.0.0",
Port: 9092,
}},
Expand Down
6 changes: 3 additions & 3 deletions src/go/rpk/pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ tune_cpu: true`,
port: 9092
`,
check: func(st *testing.T, c *Config) {
expected := []NamedSocketAddress{{
expected := []NamedAuthNSocketAddress{{
Name: "external",
Address: "192.168.73.45",
Port: 9092,
Expand All @@ -235,7 +235,7 @@ tune_cpu: true`,
}]`,
format: "json",
check: func(st *testing.T, c *Config) {
expected := []NamedSocketAddress{{
expected := []NamedAuthNSocketAddress{{
Port: 9092,
Address: "192.168.54.2",
}}
Expand Down Expand Up @@ -347,7 +347,7 @@ func TestDefault(t *testing.T) {
Redpanda: RedpandaConfig{
Directory: "/var/lib/redpanda/data",
RPCServer: SocketAddress{"0.0.0.0", 33145},
KafkaAPI: []NamedSocketAddress{{
KafkaAPI: []NamedAuthNSocketAddress{{
Address: "0.0.0.0",
Port: 9092,
}},
Expand Down
2 changes: 1 addition & 1 deletion src/go/rpk/pkg/config/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (p *Params) Load(fs afero.Fs) (*Config, error) {
Address: "0.0.0.0",
Port: 33145,
},
KafkaAPI: []NamedSocketAddress{{
KafkaAPI: []NamedAuthNSocketAddress{{
Address: "0.0.0.0",
Port: 9092,
}},
Expand Down
43 changes: 25 additions & 18 deletions src/go/rpk/pkg/config/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,24 +44,24 @@ func (c *Config) File() *Config {
}

type RedpandaConfig struct {
Directory string `yaml:"data_directory,omitempty" json:"data_directory"`
ID int `yaml:"node_id" json:"node_id"`
Rack string `yaml:"rack,omitempty" json:"rack"`
SeedServers []SeedServer `yaml:"seed_servers" json:"seed_servers"`
RPCServer SocketAddress `yaml:"rpc_server" json:"rpc_server"`
RPCServerTLS []ServerTLS `yaml:"rpc_server_tls,omitempty" json:"rpc_server_tls"`
KafkaAPI []NamedSocketAddress `yaml:"kafka_api" json:"kafka_api"`
KafkaAPITLS []ServerTLS `yaml:"kafka_api_tls,omitempty" json:"kafka_api_tls"`
AdminAPI []NamedSocketAddress `yaml:"admin" json:"admin"`
AdminAPITLS []ServerTLS `yaml:"admin_api_tls,omitempty" json:"admin_api_tls"`
CoprocSupervisorServer SocketAddress `yaml:"coproc_supervisor_server,omitempty" json:"coproc_supervisor_server"`
AdminAPIDocDir string `yaml:"admin_api_doc_dir,omitempty" json:"admin_api_doc_dir"`
DashboardDir string `yaml:"dashboard_dir,omitempty" json:"dashboard_dir"`
CloudStorageCacheDirectory string `yaml:"cloud_storage_cache_directory,omitempty" json:"cloud_storage_cache_directory"`
AdvertisedRPCAPI *SocketAddress `yaml:"advertised_rpc_api,omitempty" json:"advertised_rpc_api,omitempty"`
AdvertisedKafkaAPI []NamedSocketAddress `yaml:"advertised_kafka_api,omitempty" json:"advertised_kafka_api,omitempty"`
DeveloperMode bool `yaml:"developer_mode" json:"developer_mode"`
Other map[string]interface{} `yaml:",inline"`
Directory string `yaml:"data_directory,omitempty" json:"data_directory"`
ID int `yaml:"node_id" json:"node_id"`
Rack string `yaml:"rack,omitempty" json:"rack"`
SeedServers []SeedServer `yaml:"seed_servers" json:"seed_servers"`
RPCServer SocketAddress `yaml:"rpc_server" json:"rpc_server"`
RPCServerTLS []ServerTLS `yaml:"rpc_server_tls,omitempty" json:"rpc_server_tls"`
KafkaAPI []NamedAuthNSocketAddress `yaml:"kafka_api" json:"kafka_api"`
KafkaAPITLS []ServerTLS `yaml:"kafka_api_tls,omitempty" json:"kafka_api_tls"`
AdminAPI []NamedSocketAddress `yaml:"admin" json:"admin"`
AdminAPITLS []ServerTLS `yaml:"admin_api_tls,omitempty" json:"admin_api_tls"`
CoprocSupervisorServer SocketAddress `yaml:"coproc_supervisor_server,omitempty" json:"coproc_supervisor_server"`
AdminAPIDocDir string `yaml:"admin_api_doc_dir,omitempty" json:"admin_api_doc_dir"`
DashboardDir string `yaml:"dashboard_dir,omitempty" json:"dashboard_dir"`
CloudStorageCacheDirectory string `yaml:"cloud_storage_cache_directory,omitempty" json:"cloud_storage_cache_directory"`
AdvertisedRPCAPI *SocketAddress `yaml:"advertised_rpc_api,omitempty" json:"advertised_rpc_api,omitempty"`
AdvertisedKafkaAPI []NamedSocketAddress `yaml:"advertised_kafka_api,omitempty" json:"advertised_kafka_api,omitempty"`
DeveloperMode bool `yaml:"developer_mode" json:"developer_mode"`
Other map[string]interface{} `yaml:",inline"`
}

type Pandaproxy struct {
Expand Down Expand Up @@ -101,6 +101,13 @@ type NamedSocketAddress struct {
Name string `yaml:"name,omitempty" json:"name,omitempty"`
}

type NamedAuthNSocketAddress struct {
Address string `yaml:"address" json:"address"`
Port int `yaml:"port" json:"port"`
Name string `yaml:"name,omitempty" json:"name,omitempty"`
AuthN *string `yaml:"authentication_method,omitempty" json:"authentication_method,omitempty"`
}

type TLS struct {
KeyFile string `yaml:"key_file,omitempty" json:"key_file"`
CertFile string `yaml:"cert_file,omitempty" json:"cert_file"`
Expand Down
79 changes: 61 additions & 18 deletions src/go/rpk/pkg/config/weak.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,30 @@ func (nsa *namedSocketAddresses) UnmarshalYAML(n *yaml.Node) error {
return nil
}

// namedAuthNSocketAddresses is an intermediary one_or_many type to be used
// during our transition to strictly typed configuration parameters.
// This type will:
// - parse an array of NamedAuthNSocketAddress
// - parse a single NamedAuthNSocketAddress to an array.
type namedAuthNSocketAddresses []NamedAuthNSocketAddress

func (nsa *namedAuthNSocketAddresses) UnmarshalYAML(n *yaml.Node) error {
var multi []NamedAuthNSocketAddress
err := n.Decode(&multi)
if err == nil {
*nsa = multi
return nil
}

var single NamedAuthNSocketAddress
err = n.Decode(&single)
if err != nil {
return err
}
*nsa = []NamedAuthNSocketAddress{single}
return nil
}

// serverTLSArray is an intermediary one_or_many type to be used during our
// transition to strictly typed configuration parameters. This type will:
// - parse an array of ServerTLS
Expand Down Expand Up @@ -305,24 +329,24 @@ func (c *Config) UnmarshalYAML(n *yaml.Node) error {

func (rpc *RedpandaConfig) UnmarshalYAML(n *yaml.Node) error {
var internal struct {
Directory weakString `yaml:"data_directory"`
ID weakInt `yaml:"node_id" `
Rack weakString `yaml:"rack"`
SeedServers seedServers `yaml:"seed_servers"`
RPCServer SocketAddress `yaml:"rpc_server"`
RPCServerTLS serverTLSArray `yaml:"rpc_server_tls"`
KafkaAPI namedSocketAddresses `yaml:"kafka_api"`
KafkaAPITLS serverTLSArray `yaml:"kafka_api_tls"`
AdminAPI namedSocketAddresses `yaml:"admin"`
AdminAPITLS serverTLSArray `yaml:"admin_api_tls"`
CoprocSupervisorServer SocketAddress `yaml:"coproc_supervisor_server"`
AdminAPIDocDir weakString `yaml:"admin_api_doc_dir"`
DashboardDir weakString `yaml:"dashboard_dir"`
CloudStorageCacheDirectory weakString `yaml:"cloud_storage_cache_directory"`
AdvertisedRPCAPI *SocketAddress `yaml:"advertised_rpc_api"`
AdvertisedKafkaAPI namedSocketAddresses `yaml:"advertised_kafka_api"`
DeveloperMode weakBool `yaml:"developer_mode"`
Other map[string]interface{} `yaml:",inline"`
Directory weakString `yaml:"data_directory"`
ID weakInt `yaml:"node_id" `
Rack weakString `yaml:"rack"`
SeedServers seedServers `yaml:"seed_servers"`
RPCServer SocketAddress `yaml:"rpc_server"`
RPCServerTLS serverTLSArray `yaml:"rpc_server_tls"`
KafkaAPI namedAuthNSocketAddresses `yaml:"kafka_api"`
KafkaAPITLS serverTLSArray `yaml:"kafka_api_tls"`
AdminAPI namedSocketAddresses `yaml:"admin"`
AdminAPITLS serverTLSArray `yaml:"admin_api_tls"`
CoprocSupervisorServer SocketAddress `yaml:"coproc_supervisor_server"`
AdminAPIDocDir weakString `yaml:"admin_api_doc_dir"`
DashboardDir weakString `yaml:"dashboard_dir"`
CloudStorageCacheDirectory weakString `yaml:"cloud_storage_cache_directory"`
AdvertisedRPCAPI *SocketAddress `yaml:"advertised_rpc_api"`
AdvertisedKafkaAPI namedSocketAddresses `yaml:"advertised_kafka_api"`
DeveloperMode weakBool `yaml:"developer_mode"`
Other map[string]interface{} `yaml:",inline"`
}

if err := n.Decode(&internal); err != nil {
Expand Down Expand Up @@ -588,6 +612,25 @@ func (nsa *NamedSocketAddress) UnmarshalYAML(n *yaml.Node) error {
return nil
}

func (nsa *NamedAuthNSocketAddress) UnmarshalYAML(n *yaml.Node) error {
var internal struct {
Name weakString `yaml:"name"`
Address weakString `yaml:"address" mapstructure:"address"`
Port weakInt `yaml:"port" mapstructure:"port"`
AuthN *weakString `yaml:"authentication_method" mapstructure:"authentication_method"`
}

if err := n.Decode(&internal); err != nil {
return err
}

nsa.Name = string(internal.Name)
nsa.Address = string(internal.Address)
nsa.Port = int(internal.Port)
nsa.AuthN = (*string)(internal.AuthN)
return nil
}

func (t *TLS) UnmarshalYAML(n *yaml.Node) error {
var internal struct {
KeyFile weakString `yaml:"key_file"`
Expand Down
12 changes: 6 additions & 6 deletions src/go/rpk/pkg/config/weak_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,9 +883,9 @@ rpk:
{RequireClientAuth: false, TruststoreFile: "certs/tls-ca.pem"},
},
AdvertisedRPCAPI: &SocketAddress{"0.0.0.0", 33145},
KafkaAPI: []NamedSocketAddress{
{"0.0.0.0", 9092, "internal"},
{"0.0.0.0", 9093, "external"},
KafkaAPI: []NamedAuthNSocketAddress{
{"0.0.0.0", 9092, "internal", nil},
{"0.0.0.0", 9093, "external", nil},
},
KafkaAPITLS: []ServerTLS{
{Name: "external", KeyFile: "certs/tls-key.pem"},
Expand Down Expand Up @@ -1117,9 +1117,9 @@ rpk:
{RequireClientAuth: false, TruststoreFile: "certs/tls-ca.pem"},
},
AdvertisedRPCAPI: &SocketAddress{"0.0.0.0", 33145},
KafkaAPI: []NamedSocketAddress{
{"0.0.0.0", 9092, "internal"},
{"0.0.0.0", 9093, "external"},
KafkaAPI: []NamedAuthNSocketAddress{
{"0.0.0.0", 9092, "internal", nil},
{"0.0.0.0", 9093, "external", nil},
},
KafkaAPITLS: []ServerTLS{
{Name: "external", KeyFile: "certs/tls-key.pem"},
Expand Down

0 comments on commit c05a3eb

Please sign in to comment.