Skip to content

Commit

Permalink
k8s: add broker tls config to pp/sr client
Browse files Browse the repository at this point in the history
(cherry picked from commit 3d3288a)
  • Loading branch information
alenkacz authored and joejulian committed Apr 13, 2023
1 parent d4c8c60 commit a3af88b
Show file tree
Hide file tree
Showing 21 changed files with 474 additions and 85 deletions.
21 changes: 15 additions & 6 deletions src/go/k8s/controllers/redpanda/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,12 @@ func (r *ClusterReconciler) Reconcile(
schemaRegistrySu = resources.NewSuperUsers(r.Client, &redpandaCluster, r.Scheme, resources.ScramSchemaRegistryUsername, resources.SchemaRegistrySuffix, log)
schemaRegistrySuKey = schemaRegistrySu.Key()
}
pki := certmanager.NewPki(r.Client, &redpandaCluster, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), clusterSvc.ServiceFQDN(r.clusterDomain), r.Scheme, log)
pki, err := certmanager.NewPki(ctx, r.Client, &redpandaCluster, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), clusterSvc.ServiceFQDN(r.clusterDomain), r.Scheme, log)
if err != nil {
return ctrl.Result{}, fmt.Errorf("creating pki: %w", err)
}
sa := resources.NewServiceAccount(r.Client, &redpandaCluster, r.Scheme, log)
configMapResource := resources.NewConfigMap(r.Client, &redpandaCluster, r.Scheme, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), proxySuKey, schemaRegistrySuKey, log)
configMapResource := resources.NewConfigMap(r.Client, &redpandaCluster, r.Scheme, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), proxySuKey, schemaRegistrySuKey, pki.BrokerTLSConfigProvider(), log)
secretResource := resources.PreStartStopScriptSecret(r.Client, &redpandaCluster, r.Scheme, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), proxySuKey, schemaRegistrySuKey, log)

sts := resources.NewStatefulSet(
Expand Down Expand Up @@ -225,7 +228,7 @@ func (r *ClusterReconciler) Reconcile(
}

for _, res := range toApply {
err := res.Ensure(ctx)
err = res.Ensure(ctx)

var e *resources.RequeueAfterError
if errors.As(err, &e) {
Expand Down Expand Up @@ -355,7 +358,7 @@ func validateImagePullPolicy(imagePullPolicy corev1.PullPolicy) error {
return nil
}

//nolint:funlen // refactor in the next iteration
//nolint:funlen,gocyclo // refactor in the next iteration
func (r *ClusterReconciler) handlePodFinalizer(
ctx context.Context, rp *redpandav1alpha1.Cluster, log logr.Logger,
) error {
Expand Down Expand Up @@ -414,7 +417,10 @@ func (r *ClusterReconciler) handlePodFinalizer(
headlessSvc := resources.NewHeadlessService(r.Client, rp, r.Scheme, headlessPorts, log)
clusterSvc := resources.NewClusterService(r.Client, rp, r.Scheme, clusterPorts, log)

pki := certmanager.NewPki(r.Client, rp, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), clusterSvc.ServiceFQDN(r.clusterDomain), r.Scheme, log)
pki, err := certmanager.NewPki(ctx, r.Client, rp, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), clusterSvc.ServiceFQDN(r.clusterDomain), r.Scheme, log)
if err != nil {
return fmt.Errorf("creating pki: %w", err)
}

adminClient, err := r.AdminAPIClientFactory(ctx, r.Client, rp, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), pki.AdminAPIConfigProvider())
if err != nil {
Expand Down Expand Up @@ -540,7 +546,10 @@ func (r *ClusterReconciler) fetchAdminNodeID(ctx context.Context, rp *redpandav1
headlessSvc := resources.NewHeadlessService(r.Client, rp, r.Scheme, headlessPorts, log)
clusterSvc := resources.NewClusterService(r.Client, rp, r.Scheme, clusterPorts, log)

pki := certmanager.NewPki(r.Client, rp, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), clusterSvc.ServiceFQDN(r.clusterDomain), r.Scheme, log)
pki, err := certmanager.NewPki(ctx, r.Client, rp, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), clusterSvc.ServiceFQDN(r.clusterDomain), r.Scheme, log)
if err != nil {
return -1, fmt.Errorf("creating pki: %w", err)
}

ordinal, err := strconv.ParseInt(pod.Name[len(rp.Name)+1:], 10, 0)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,11 @@ func (r *ClusterConfigurationDriftReconciler) Reconcile(
schemaRegistrySu = resources.NewSuperUsers(r.Client, &redpandaCluster, r.Scheme, resources.ScramSchemaRegistryUsername, resources.SchemaRegistrySuffix, log)
schemaRegistrySuKey = schemaRegistrySu.Key()
}
pki := certmanager.NewPki(r.Client, &redpandaCluster, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), clusterSvc.ServiceFQDN(r.clusterDomain), r.Scheme, log)
configMapResource := resources.NewConfigMap(r.Client, &redpandaCluster, r.Scheme, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), proxySuKey, schemaRegistrySuKey, log)
pki, err := certmanager.NewPki(ctx, r.Client, &redpandaCluster, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), clusterSvc.ServiceFQDN(r.clusterDomain), r.Scheme, log)
if err != nil {
return ctrl.Result{}, fmt.Errorf("creating pki: %w", err)
}
configMapResource := resources.NewConfigMap(r.Client, &redpandaCluster, r.Scheme, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), proxySuKey, schemaRegistrySuKey, pki.BrokerTLSConfigProvider(), log)

lastAppliedConfig, cmExists, err := configMapResource.GetLastAppliedConfigurationFromCluster(ctx)
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions src/go/k8s/controllers/redpanda/cluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,10 @@ var _ = Describe("RedPandaCluster controller", func() {
Key: "tls.crt",
Path: "tls.crt",
},
{
Key: "ca.crt",
Path: "ca.crt",
},
},
DefaultMode: &defaultMode,
},
Expand All @@ -353,6 +357,14 @@ var _ = Describe("RedPandaCluster controller", func() {
Key: "ca.crt",
Path: "ca.crt",
},
{
Key: "tls.key",
Path: "tls.key",
},
{
Key: "tls.crt",
Path: "tls.crt",
},
},
DefaultMode: &defaultMode,
},
Expand Down
6 changes: 5 additions & 1 deletion src/go/k8s/pkg/console/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,18 @@ func NewAdminAPI(
) (adminutils.AdminAPIClient, error) {
headlessSvc := resources.NewHeadlessService(cl, cluster, scheme, nil, log)
clusterSvc := resources.NewClusterService(cl, cluster, scheme, nil, log)
pki := certmanager.NewPki(
pki, err := certmanager.NewPki(
ctx,
cl,
cluster,
headlessSvc.HeadlessServiceFQDN(clusterDomain),
clusterSvc.ServiceFQDN(clusterDomain),
scheme,
log,
)
if err != nil {
return nil, fmt.Errorf("creating PKI: %w", err)
}
adminTLSConfigProvider := pki.AdminAPIConfigProvider()
adminAPIClient, err := adminAPI(
ctx,
Expand Down
16 changes: 13 additions & 3 deletions src/go/k8s/pkg/resources/certmanager/pki.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,22 @@ type PkiReconciler struct {

// NewPki creates PkiReconciler
func NewPki(
ctx context.Context,
client k8sclient.Client,
pandaCluster *redpandav1alpha1.Cluster,
fqdn string,
clusterFQDN string,
scheme *runtime.Scheme,
logger logr.Logger,
) *PkiReconciler {
) (*PkiReconciler, error) {
cc, err := NewClusterCertificates(ctx, pandaCluster, keyStoreKey(pandaCluster), client, fqdn, clusterFQDN, scheme, logger)
if err != nil {
return nil, err
}
return &PkiReconciler{
client, scheme, pandaCluster, fqdn, clusterFQDN, logger.WithValues("Reconciler", "pki"),
NewClusterCertificates(pandaCluster, keyStoreKey(pandaCluster), client, fqdn, clusterFQDN, scheme, logger),
}
cc,
}, nil
}

func keyStoreKey(pandaCluster *redpandav1alpha1.Cluster) types.NamespacedName {
Expand Down Expand Up @@ -92,3 +97,8 @@ func (r *PkiReconciler) StatefulSetVolumeProvider() resourcetypes.StatefulsetTLS
func (r *PkiReconciler) AdminAPIConfigProvider() resourcetypes.AdminTLSConfigProvider {
return r.clusterCertificates
}

// BrokerTLSConfigProvider returns provider of broker TLS
func (r *PkiReconciler) BrokerTLSConfigProvider() resourcetypes.BrokerTLSConfigProvider {
return r.clusterCertificates
}
4 changes: 3 additions & 1 deletion src/go/k8s/pkg/resources/certmanager/pki_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@ func TestKafkaAPIWithMultipleTLSListeners(t *testing.T) {
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
c := fake.NewClientBuilder().Build()
pkiRes := certmanager.NewPki(
pkiRes, err := certmanager.NewPki(
context.TODO(),
c,
&tc.cluster,
"cluster.local1",
"cluster.local",
scheme.Scheme,
ctrl.Log.WithName("test"))
require.NoError(t, err)
require.NoError(t, pkiRes.Ensure(context.TODO()))

for _, cert := range tc.expectedCertificates {
Expand Down
Loading

0 comments on commit a3af88b

Please sign in to comment.