Skip to content

Commit

Permalink
k8s: Add migration best effort option to Redpanda reconciliation
Browse files Browse the repository at this point in the history
  • Loading branch information
RafalKorepta committed Nov 8, 2023
1 parent 44c8cca commit cd61079
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 0 deletions.
4 changes: 4 additions & 0 deletions src/go/k8s/api/redpanda/v1alpha1/redpanda_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ type RedpandaSpec struct {
ChartRef ChartRef `json:"chartRef,omitempty"`
// ClusterSpec defines the values to use in the cluster
ClusterSpec *RedpandaClusterSpec `json:"clusterSpec,omitempty"`
// Migration flag that adjust Kubernetes core resources with annotation and labels, so
// flux controller can import resources.
// Doc: https://docs.redpanda.com/current/upgrade/migrate/kubernetes/operator/
Migration bool `json:"migration"`
}

// RedpandaStatus defines the observed state of Redpanda
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2994,6 +2994,13 @@ spec:
type: string
type: object
type: object
migration:
description: 'Migration flag that adjust Kubernetes core resources
with annotation and labels, so flux controller can import resources.
Doc: https://docs.redpanda.com/current/upgrade/migrate/kubernetes/operator/'
type: boolean
required:
- migration
type: object
status:
description: RedpandaStatus defines the observed state of Redpanda
Expand Down
95 changes: 95 additions & 0 deletions src/go/k8s/internal/controller/redpanda/redpanda_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"reflect"
"time"

v1 "k8s.io/api/core/v1"

helmv2beta1 "github.com/fluxcd/helm-controller/api/v2beta1"
"github.com/fluxcd/pkg/apis/meta"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
Expand Down Expand Up @@ -145,6 +147,11 @@ func (r *RedpandaReconciler) Reconcile(c context.Context, req ctrl.Request) (ctr
}
}

if rp.Spec.Migration {
err := r.tryMigration(ctx, rp)
log.Error(err, "migration")
}

rp, result, err := r.reconcile(ctx, rp)

// Update status after reconciliation.
Expand All @@ -163,6 +170,94 @@ func (r *RedpandaReconciler) Reconcile(c context.Context, req ctrl.Request) (ctr
return result, err
}

func (r *RedpandaReconciler) tryMigration(ctx context.Context, rp *v1alpha1.Redpanda) error {
var pl v1.PodList
err := r.List(ctx, &pl, client.MatchingFields{"app.kubernetes.io/instance": rp.Name})
if err != nil {
return fmt.Errorf("listing pods: %w", err)
}

for i := range pl.Items {
newPod := pl.Items[i].DeepCopy()
if newPod.Labels == nil {
newPod.Labels = make(map[string]string)
}
newPod.Labels["app.kubernetes.io/component"] = "redpanda-statefulset"
err = r.Update(ctx, newPod)
if err != nil {
return fmt.Errorf("updating Pod (%s): %w", newPod.Name, err)
}
}

var svc v1.Service
err = r.Get(ctx, types.NamespacedName{
Namespace: rp.Namespace,
Name: rp.Name,
}, &svc)
if err != nil {
return fmt.Errorf("get internal service: %w", err)
}

internalService := svc.DeepCopy()
setHelmLabelsAndResources(internalService, rp)

internalService.Spec.Selector = make(map[string]string)
internalService.Spec.Selector["app.kubernetes.io/instance"] = rp.Name
internalService.Spec.Selector["app.kubernetes.io/name"] = "redpanda"

err = r.Update(ctx, internalService)
if err != nil {
return fmt.Errorf("updating internal service (%s): %w", internalService.Name, err)
}

err = r.Get(ctx, types.NamespacedName{
Namespace: rp.Namespace,
Name: fmt.Sprintf("%s-external", rp.Name),
}, &svc)
if err != nil {
return fmt.Errorf("get external service: %w", err)
}

externalService := svc.DeepCopy()
setHelmLabelsAndResources(externalService, rp)

err = r.Update(ctx, externalService)
if err != nil {
return fmt.Errorf("updating external service (%s): %w", externalService.Name, err)
}

var sa v1.ServiceAccount
err = r.Get(ctx, types.NamespacedName{
Namespace: rp.Namespace,
Name: rp.Name,
}, &sa)
if err != nil {
return fmt.Errorf("get service account: %w", err)
}

annotatedSA := sa.DeepCopy()
setHelmLabelsAndResources(annotatedSA, rp)

err = r.Update(ctx, annotatedSA)
if err != nil {
return fmt.Errorf("updating service account (%s): %w", annotatedSA.Name, err)
}

return nil
}

func setHelmLabelsAndResources(object client.Object, rp *v1alpha1.Redpanda) {
const helm = "Helm"
labels := make(map[string]string)
labels["app.kubernetes.io/managed-by"] = helm
object.SetLabels(labels)

annotations := make(map[string]string)
annotations["meta.helm.sh/release-name"] = rp.Name
annotations["meta.helm.sh/release-namespace"] = rp.Namespace
object.SetAnnotations(annotations)
}

func (r *RedpandaReconciler) reconcile(ctx context.Context, rp *v1alpha1.Redpanda) (*v1alpha1.Redpanda, ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)
log.WithName("RedpandaReconciler.reconcile")
Expand Down

0 comments on commit cd61079

Please sign in to comment.