Skip to content

Commit

Permalink
k8s: add broker tls config to pp/sr client
Browse files Browse the repository at this point in the history
(cherry picked from commit 3d3288a)
  • Loading branch information
alenkacz authored and joejulian committed Apr 12, 2023
1 parent 158f4bc commit 077bb64
Show file tree
Hide file tree
Showing 13 changed files with 301 additions and 72 deletions.
21 changes: 15 additions & 6 deletions src/go/k8s/controllers/redpanda/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,12 @@ func (r *ClusterReconciler) Reconcile(
schemaRegistrySu = resources.NewSuperUsers(r.Client, &redpandaCluster, r.Scheme, resources.ScramSchemaRegistryUsername, resources.SchemaRegistrySuffix, log)
schemaRegistrySuKey = schemaRegistrySu.Key()
}
pki := certmanager.NewPki(r.Client, &redpandaCluster, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), clusterSvc.ServiceFQDN(r.clusterDomain), r.Scheme, log)
pki, err := certmanager.NewPki(ctx, r.Client, &redpandaCluster, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), clusterSvc.ServiceFQDN(r.clusterDomain), r.Scheme, log)
if err != nil {
return ctrl.Result{}, fmt.Errorf("creating pki: %w", err)
}
sa := resources.NewServiceAccount(r.Client, &redpandaCluster, r.Scheme, log)
configMapResource := resources.NewConfigMap(r.Client, &redpandaCluster, r.Scheme, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), proxySuKey, schemaRegistrySuKey, log)
configMapResource := resources.NewConfigMap(r.Client, &redpandaCluster, r.Scheme, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), proxySuKey, schemaRegistrySuKey, pki.BrokerTLSConfigProvider(), log)
secretResource := resources.PreStartStopScriptSecret(r.Client, &redpandaCluster, r.Scheme, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), proxySuKey, schemaRegistrySuKey, log)

sts := resources.NewStatefulSet(
Expand Down Expand Up @@ -225,7 +228,7 @@ func (r *ClusterReconciler) Reconcile(
}

for _, res := range toApply {
err := res.Ensure(ctx)
err = res.Ensure(ctx)

var e *resources.RequeueAfterError
if errors.As(err, &e) {
Expand Down Expand Up @@ -355,7 +358,7 @@ func validateImagePullPolicy(imagePullPolicy corev1.PullPolicy) error {
return nil
}

//nolint:funlen // refactor in the next iteration
//nolint:funlen,gocyclo // refactor in the next iteration
func (r *ClusterReconciler) handlePodFinalizer(
ctx context.Context, rp *redpandav1alpha1.Cluster, log logr.Logger,
) error {
Expand Down Expand Up @@ -414,7 +417,10 @@ func (r *ClusterReconciler) handlePodFinalizer(
headlessSvc := resources.NewHeadlessService(r.Client, rp, r.Scheme, headlessPorts, log)
clusterSvc := resources.NewClusterService(r.Client, rp, r.Scheme, clusterPorts, log)

pki := certmanager.NewPki(r.Client, rp, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), clusterSvc.ServiceFQDN(r.clusterDomain), r.Scheme, log)
pki, err := certmanager.NewPki(ctx, r.Client, rp, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), clusterSvc.ServiceFQDN(r.clusterDomain), r.Scheme, log)
if err != nil {
return fmt.Errorf("creating pki: %w", err)
}

adminClient, err := r.AdminAPIClientFactory(ctx, r.Client, rp, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), pki.AdminAPIConfigProvider())
if err != nil {
Expand Down Expand Up @@ -540,7 +546,10 @@ func (r *ClusterReconciler) fetchAdminNodeID(ctx context.Context, rp *redpandav1
headlessSvc := resources.NewHeadlessService(r.Client, rp, r.Scheme, headlessPorts, log)
clusterSvc := resources.NewClusterService(r.Client, rp, r.Scheme, clusterPorts, log)

pki := certmanager.NewPki(r.Client, rp, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), clusterSvc.ServiceFQDN(r.clusterDomain), r.Scheme, log)
pki, err := certmanager.NewPki(ctx, r.Client, rp, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), clusterSvc.ServiceFQDN(r.clusterDomain), r.Scheme, log)
if err != nil {
return -1, fmt.Errorf("creating pki: %w", err)
}

ordinal, err := strconv.ParseInt(pod.Name[len(rp.Name)+1:], 10, 0)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,11 @@ func (r *ClusterConfigurationDriftReconciler) Reconcile(
schemaRegistrySu = resources.NewSuperUsers(r.Client, &redpandaCluster, r.Scheme, resources.ScramSchemaRegistryUsername, resources.SchemaRegistrySuffix, log)
schemaRegistrySuKey = schemaRegistrySu.Key()
}
pki := certmanager.NewPki(r.Client, &redpandaCluster, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), clusterSvc.ServiceFQDN(r.clusterDomain), r.Scheme, log)
configMapResource := resources.NewConfigMap(r.Client, &redpandaCluster, r.Scheme, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), proxySuKey, schemaRegistrySuKey, log)
pki, err := certmanager.NewPki(ctx, r.Client, &redpandaCluster, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), clusterSvc.ServiceFQDN(r.clusterDomain), r.Scheme, log)
if err != nil {
return ctrl.Result{}, fmt.Errorf("creating pki: %w", err)
}
configMapResource := resources.NewConfigMap(r.Client, &redpandaCluster, r.Scheme, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), proxySuKey, schemaRegistrySuKey, pki.BrokerTLSConfigProvider(), log)

lastAppliedConfig, cmExists, err := configMapResource.GetLastAppliedConfigurationFromCluster(ctx)
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions src/go/k8s/controllers/redpanda/cluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,10 @@ var _ = Describe("RedPandaCluster controller", func() {
Key: "tls.crt",
Path: "tls.crt",
},
{
Key: "ca.crt",
Path: "ca.crt",
},
},
DefaultMode: &defaultMode,
},
Expand All @@ -353,6 +357,14 @@ var _ = Describe("RedPandaCluster controller", func() {
Key: "ca.crt",
Path: "ca.crt",
},
{
Key: "tls.key",
Path: "tls.key",
},
{
Key: "tls.crt",
Path: "tls.crt",
},
},
DefaultMode: &defaultMode,
},
Expand Down
6 changes: 5 additions & 1 deletion src/go/k8s/pkg/console/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,18 @@ func NewAdminAPI(
) (adminutils.AdminAPIClient, error) {
headlessSvc := resources.NewHeadlessService(cl, cluster, scheme, nil, log)
clusterSvc := resources.NewClusterService(cl, cluster, scheme, nil, log)
pki := certmanager.NewPki(
pki, err := certmanager.NewPki(
ctx,
cl,
cluster,
headlessSvc.HeadlessServiceFQDN(clusterDomain),
clusterSvc.ServiceFQDN(clusterDomain),
scheme,
log,
)
if err != nil {
return nil, fmt.Errorf("creating PKI: %w", err)
}
adminTLSConfigProvider := pki.AdminAPIConfigProvider()
adminAPIClient, err := adminAPI(
ctx,
Expand Down
16 changes: 13 additions & 3 deletions src/go/k8s/pkg/resources/certmanager/pki.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,22 @@ type PkiReconciler struct {

// NewPki creates PkiReconciler
func NewPki(
ctx context.Context,
client k8sclient.Client,
pandaCluster *redpandav1alpha1.Cluster,
fqdn string,
clusterFQDN string,
scheme *runtime.Scheme,
logger logr.Logger,
) *PkiReconciler {
) (*PkiReconciler, error) {
cc, err := NewClusterCertificates(ctx, pandaCluster, keyStoreKey(pandaCluster), client, fqdn, clusterFQDN, scheme, logger)
if err != nil {
return nil, err
}
return &PkiReconciler{
client, scheme, pandaCluster, fqdn, clusterFQDN, logger.WithValues("Reconciler", "pki"),
NewClusterCertificates(pandaCluster, keyStoreKey(pandaCluster), client, fqdn, clusterFQDN, scheme, logger),
}
cc,
}, nil
}

func keyStoreKey(pandaCluster *redpandav1alpha1.Cluster) types.NamespacedName {
Expand Down Expand Up @@ -92,3 +97,8 @@ func (r *PkiReconciler) StatefulSetVolumeProvider() resourcetypes.StatefulsetTLS
func (r *PkiReconciler) AdminAPIConfigProvider() resourcetypes.AdminTLSConfigProvider {
return r.clusterCertificates
}

// BrokerTLSConfigProvider returns provider of broker TLS
func (r *PkiReconciler) BrokerTLSConfigProvider() resourcetypes.BrokerTLSConfigProvider {
return r.clusterCertificates
}
4 changes: 3 additions & 1 deletion src/go/k8s/pkg/resources/certmanager/pki_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@ func TestKafkaAPIWithMultipleTLSListeners(t *testing.T) {
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
c := fake.NewClientBuilder().Build()
pkiRes := certmanager.NewPki(
pkiRes, err := certmanager.NewPki(
context.TODO(),
c,
&tc.cluster,
"cluster.local1",
"cluster.local",
scheme.Scheme,
ctrl.Log.WithName("test"))
require.NoError(t, err)
require.NoError(t, pkiRes.Ensure(context.TODO()))

for _, cert := range tc.expectedCertificates {
Expand Down
100 changes: 90 additions & 10 deletions src/go/k8s/pkg/resources/certmanager/type_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ import (
"fmt"

"github.com/go-logr/logr"
cmapiv1 "github.com/jetstack/cert-manager/pkg/apis/certmanager/v1"
cmmetav1 "github.com/jetstack/cert-manager/pkg/apis/meta/v1"
redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources"
resourcetypes "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/types"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -179,14 +181,15 @@ type ClusterCertificates struct {

// NewClusterCertificates creates new cluster tls certificates resources
func NewClusterCertificates(
ctx context.Context,
cluster *redpandav1alpha1.Cluster,
keystoreSecret types.NamespacedName,
k8sClient client.Client,
fqdn string,
clusterFQDN string,
scheme *runtime.Scheme,
logger logr.Logger,
) *ClusterCertificates {
) (*ClusterCertificates, error) {
cc := &ClusterCertificates{
pandaCluster: cluster,
client: k8sClient,
Expand All @@ -200,38 +203,52 @@ func NewClusterCertificates(
adminAPI: tlsDisabledAPICertificates(),
pandaProxyAPI: tlsDisabledAPICertificates(),
}
var err error
if kafkaListeners := kafkaAPIListeners(cluster); len(kafkaListeners) > 0 {
cc.kafkaAPI = cc.prepareAPI(kafkaAPI, RedpandaNodeCert, []string{OperatorClientCert, UserClientCert, AdminClientCert}, kafkaListeners, &keystoreSecret)
cc.kafkaAPI, err = cc.prepareAPI(ctx, kafkaAPI, RedpandaNodeCert, []string{OperatorClientCert, UserClientCert, AdminClientCert}, kafkaListeners, &keystoreSecret)
if err != nil {
return nil, fmt.Errorf("kafka api certificates %w", err)
}
}

if adminListeners := adminAPIListeners(cluster); len(adminListeners) > 0 {
cc.adminAPI = cc.prepareAPI(adminAPI, adminAPINodeCert, []string{adminAPIClientCert}, adminListeners, &keystoreSecret)
cc.adminAPI, err = cc.prepareAPI(ctx, adminAPI, adminAPINodeCert, []string{adminAPIClientCert}, adminListeners, &keystoreSecret)
if err != nil {
return nil, fmt.Errorf("kafka api certificates %w", err)
}
}

if pandaProxyListeners := pandaProxyAPIListeners(cluster); len(pandaProxyListeners) > 0 {
cc.pandaProxyAPI = cc.prepareAPI(pandaproxyAPI, pandaproxyAPINodeCert, []string{pandaproxyAPIClientCert}, pandaProxyListeners, &keystoreSecret)
cc.pandaProxyAPI, err = cc.prepareAPI(ctx, pandaproxyAPI, pandaproxyAPINodeCert, []string{pandaproxyAPIClientCert}, pandaProxyListeners, &keystoreSecret)
if err != nil {
return nil, fmt.Errorf("kafka api certificates %w", err)
}
}

if schemaRegistryListeners := schemaRegistryAPIListeners(cluster); len(schemaRegistryListeners) > 0 {
cc.schemaRegistryAPI = cc.prepareAPI(schemaRegistryAPI, schemaRegistryAPINodeCert, []string{schemaRegistryAPIClientCert}, schemaRegistryListeners, &keystoreSecret)
cc.schemaRegistryAPI, err = cc.prepareAPI(ctx, schemaRegistryAPI, schemaRegistryAPINodeCert, []string{schemaRegistryAPIClientCert}, schemaRegistryListeners, &keystoreSecret)
if err != nil {
return nil, fmt.Errorf("kafka api certificates %w", err)
}
}

return cc
return cc, nil
}

func (cc *ClusterCertificates) prepareAPI(
ctx context.Context,
rootCertSuffix string,
nodeCertSuffix string,
clientCerts []string,
listeners []APIListener,
keystoreSecret *types.NamespacedName,
) *apiCertificates {
) (*apiCertificates, error) {
tlsListeners := getTLSListeners(listeners)
externalTLSListener := getExternalTLSListener(listeners)
internalTLSListener := getInternalTLSListener(listeners)

if len(tlsListeners) == 0 {
return tlsDisabledAPICertificates()
return tlsDisabledAPICertificates(), nil
}
result := tlsEnabledAPICertificates(cc.pandaCluster.Namespace)

Expand All @@ -251,7 +268,15 @@ func (cc *ClusterCertificates) prepareAPI(
// every time both listeners share the same set of certificates
nodeSecretRef := tlsListeners[0].GetTLS().NodeSecretRef
result.externalNodeCertificate = nodeSecretRef
result.selfSignedNodeCertificate = tlsListeners[0].GetTLS().IssuerRef == nil && nodeSecretRef == nil
isSelfSigned, err := isSelfSigned(ctx,
nodeSecretRef,
tlsListeners[0].GetTLS().IssuerRef,
cc.pandaCluster.Namespace,
cc.client)
if err != nil {
return nil, fmt.Errorf("selfsigned check %w", err)
}
result.selfSignedNodeCertificate = isSelfSigned
if nodeSecretRef == nil || nodeSecretRef.Name == "" {
certName := NewCertName(cc.pandaCluster.Name, nodeCertSuffix)
certsKey := types.NamespacedName{Name: string(certName), Namespace: cc.pandaCluster.Namespace}
Expand Down Expand Up @@ -295,7 +320,43 @@ func (cc *ClusterCertificates) prepareAPI(
}
}

return result
return result, nil
}

// returns true if node certificate will be selfSigned
func isSelfSigned(ctx context.Context, nodeSecretRef *corev1.ObjectReference, externalIssuerRef *cmmetav1.ObjectReference, clusterNamespace string, k8sClient client.Client) (bool, error) {
if nodeSecretRef != nil {
var secret corev1.Secret
err := k8sClient.Get(ctx, types.NamespacedName{Name: nodeSecretRef.Name, Namespace: nodeSecretRef.Namespace}, &secret)
if err != nil {
return false, err
}
_, ok := secret.Data[cmmetav1.TLSCAKey]
return ok, nil // if the ca key exists, it means it's self-signed
}
if externalIssuerRef != nil {
var issuerSpec cmapiv1.IssuerSpec
switch externalIssuerRef.Kind {
case "Issuer":
var issuer cmapiv1.Issuer
err := k8sClient.Get(ctx, types.NamespacedName{Name: externalIssuerRef.Name, Namespace: clusterNamespace}, &issuer)
if err != nil {
return false, err
}
issuerSpec = issuer.Spec
case "ClusterIssuer":
var issuer cmapiv1.ClusterIssuer
err := k8sClient.Get(ctx, types.NamespacedName{Name: externalIssuerRef.Name, Namespace: clusterNamespace}, &issuer)
if err != nil {
return false, err
}
issuerSpec = issuer.Spec
default:
return false, fmt.Errorf("unknown issuer kind %s", externalIssuerRef.Kind) //nolint:goerr113 // no need for err type
}
return issuerSpec.SelfSigned != nil, nil
}
return true, nil // by default our own issuer is self-signed
}

func prepareRoot(
Expand Down Expand Up @@ -605,3 +666,22 @@ func (cc *ClusterCertificates) GetTLSConfig(

return &tlsConfig, nil
}

// KafkaClientBrokerTLS returns configuration to connect to kafka api with tls
func (cc *ClusterCertificates) KafkaClientBrokerTLS(mountPoints *resourcetypes.TLSMountPoints) *config.ServerTLS {
if !cc.kafkaAPI.tlsEnabled {
return nil
}
result := config.ServerTLS{
Enabled: true,
}
if len(cc.kafkaAPI.clientCertificates) > 0 {
result.KeyFile = fmt.Sprintf("%s/%s", mountPoints.KafkaAPI.ClientCAMountDir, corev1.TLSPrivateKeyKey)
result.CertFile = fmt.Sprintf("%s/%s", mountPoints.KafkaAPI.ClientCAMountDir, corev1.TLSCertKey)
}
if cc.kafkaAPI.selfSignedNodeCertificate {
// we need to also include the node ca since the node cert is self-signed
result.TruststoreFile = fmt.Sprintf("%s/%s", mountPoints.KafkaAPI.NodeCertMountDir, cmmetav1.TLSCAKey)
}
return &result
}
Loading

0 comments on commit 077bb64

Please sign in to comment.