Skip to content

Commit

Permalink
Add a new config Precedence
Browse files Browse the repository at this point in the history
Add a new config Precedence to allow user to configure
which Node IP should be used as tunnel endpoint IP.

Signed-off-by: Lan Luo <luola@vmware.com>
  • Loading branch information
luolanzone committed Apr 6, 2022
1 parent 67749de commit b047dbb
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ import (
config "sigs.k8s.io/controller-runtime/pkg/config/v1alpha1"
)

// Precedence defines the precedence of Node IP type.
type Precedence string

const (
PrecedencePrivate = "private"
PrecedencePublic = "public"
)

//+kubebuilder:object:root=true

// MultiClusterConfig is the Schema for the multiclusterconfigs API
Expand All @@ -30,6 +38,9 @@ type MultiClusterConfig struct {
config.ControllerManagerConfigurationSpec `json:",inline"`
// ServiceCIDR allows user to set the Cluster IP range of the cluster manually.
ServiceCIDR string `json:"serviceCIDR,omitempty"`
// The precedence about which IP (private or public one) of Node is preferred to
// be used as tunnel endpoint. If it's empty, private IP will be chose.
Precedence Precedence `json:"precedence,omitempty"`
}

func init() {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion multicluster/cmd/multicluster-controller/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ func runMember(o *Options) error {
mgr.GetScheme(),
env.GetPodNamespace(),
&clusterSetReconciler.RemoteCommonAreaManager,
opts.ServiceCIDR)
opts.ServiceCIDR,
opts.Precedence)
if err = gwNodeReconciler.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error creating Gateway Node controller: %v", err)
}
Expand Down
4 changes: 4 additions & 0 deletions multicluster/cmd/multicluster-controller/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ type Options struct {
options ctrl.Options
// The Service ClusterIP range used in the member cluster.
ServiceCIDR string
// The precedence about which IP (private or public one) of Node is preferred to
// be used as tunnel endpoint. If it's empty, private IP will be chose.
Precedence mcsv1alpha1.Precedence
}

func newOptions() *Options {
Expand All @@ -54,6 +57,7 @@ func (o *Options) complete(args []string) error {
}
o.options = options
o.ServiceCIDR = ctrlConfig.ServiceCIDR
o.Precedence = ctrlConfig.Precedence
klog.InfoS("Using config from file", "config", o.options)
} else {
klog.InfoS("Using default config", "config", o.options)
Expand Down
14 changes: 10 additions & 4 deletions multicluster/controllers/multicluster/gateway_node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type (
localClusterID string
installedTE cache.Indexer
serviceCIDR string
precedence mcsv1alpha1.Precedence
}
)

Expand All @@ -70,7 +71,8 @@ func NewGatewayNodeReconciler(
Scheme *runtime.Scheme,
namespace string,
remoteCommonAreaManager *commonarea.RemoteCommonAreaManager,
serviceCIDR string) *GatewayNodeReconciler {
serviceCIDR string,
precedence mcsv1alpha1.Precedence) *GatewayNodeReconciler {
reconciler := &GatewayNodeReconciler{
Client: Client,
Scheme: Scheme,
Expand All @@ -80,6 +82,7 @@ func NewGatewayNodeReconciler(
teIndexerBySubnets: tunnelEndpointIndexerBySubnetsFunc,
}),
serviceCIDR: serviceCIDR,
precedence: precedence,
}
return reconciler
}
Expand Down Expand Up @@ -109,6 +112,9 @@ func (r *GatewayNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request)
klog.InfoS("localClusterID is not initialized, skip reconcile")
return ctrl.Result{Requeue: true}, nil
}
if string(r.precedence) == "" {
r.precedence = mcsv1alpha1.PrecedencePrivate
}

podCIDRs, svcCIDR, err := r.getCIDRs(ctx)
if err != nil {
Expand Down Expand Up @@ -243,10 +249,10 @@ func (r *GatewayNodeReconciler) deleteTunnelEndpoint(ctx context.Context, te *mc

func (r *GatewayNodeReconciler) updateTunnelEndpointIP(te *mcsv1alpha1.TunnelEndpoint, node corev1.Node) error {
for _, addr := range node.Status.Addresses {
if addr.Type == corev1.NodeExternalIP {
if r.precedence == mcsv1alpha1.PrecedencePublic && addr.Type == corev1.NodeExternalIP {
te.Spec.PublicIP = addr.Address
}
if addr.Type == corev1.NodeInternalIP {
if r.precedence == mcsv1alpha1.PrecedencePrivate && addr.Type == corev1.NodeInternalIP {
te.Spec.PrivateIP = addr.Address
}
}
Expand All @@ -273,7 +279,7 @@ func (r *GatewayNodeReconciler) updateTunnelEndpoint(ctx context.Context,
if err := r.Client.Update(ctx, te, &client.UpdateOptions{}); err != nil {
return err
}
klog.InfoS("The TunnelEndpoint is updated for a Node", "tunneledpoint", klog.KObj(te), "node", node.Name)
klog.InfoS("The TunnelEndpoint is updated for a Node", "tunnelendpoint", klog.KObj(te), "node", node.Name)
r.installedTE.Update(te)
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func TestGatewayNodeReconciler_handleGatewayNodeEvents(t *testing.T) {
obj = append(obj, tt.existingTE)
}
fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(obj...).Build()
r := NewGatewayNodeReconciler(fakeClient, scheme, "default", &remoteMgr, "10.10.1.0/16")
r := NewGatewayNodeReconciler(fakeClient, scheme, "default", &remoteMgr, "10.10.1.0/16", mcsv1alpha1.PrecedencePublic)
if tt.existingTE != nil {
r.installedTE.Add(tt.existingTE)
}
Expand Down
134 changes: 93 additions & 41 deletions multicluster/controllers/multicluster/stale_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -85,12 +84,29 @@ func (c *StaleController) cleanup() error {
if err := c.cleanupStaleServiceResources(remoteCluster, localClusterID, resImpList); err != nil {
return err
}

if err := c.cleanupACNPResources(resImpList); err != nil {
return err
}
if err := c.cleanupTunnelEndpointImport(resImpList); err != nil {
return err
}

// Clean up stale ResourceExport in the leader cluster.
resExpList := &mcsv1alpha1.ResourceExportList{}
if err := remoteCluster.List(ctx, resExpList, &client.ListOptions{Namespace: remoteCluster.GetNamespace()}); err != nil {
return err
}

if len(resExpList.Items) == 0 {
return nil
}
if err := c.cleanupServiceResourceExport(remoteCluster, resExpList, localClusterID); err != nil {
return err
}
if err := c.cleanupTunnelEndpointResourceExport(remoteCluster, resExpList, localClusterID); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -146,43 +162,6 @@ func (c *StaleController) cleanupStaleServiceResources(remoteCluster commonarea.
}
}
}

// Clean up any ResourceExport if no corresponding ServiceExport locally
resExpList := &mcsv1alpha1.ResourceExportList{}
labelSelector := metav1.LabelSelector{
MatchLabels: map[string]string{
common.SourceClusterID: localClusterID,
},
}
selector, _ := metav1.LabelSelectorAsSelector(&labelSelector)
if err := remoteCluster.List(ctx, resExpList, &client.ListOptions{Namespace: remoteCluster.GetNamespace(), LabelSelector: selector}); err != nil {
return err
}
svcExpList := &k8smcsv1alpha1.ServiceExportList{}
if err := c.List(ctx, svcExpList, &client.ListOptions{}); err != nil {
return err
}
resExpItems := resExpList.Items
svcExpItems := svcExpList.Items

for k, re := range resExpItems {
for _, se := range svcExpItems {
if re.Spec.Name == se.Name && re.Spec.Namespace == se.Namespace {
// Set the valid ResourceExport item as empty ResourceExport, then all left non-empty items should be removed.
resExpItems[k] = mcsv1alpha1.ResourceExport{}
}
}
}

for _, r := range resExpItems {
re := r
if re.Name != "" {
klog.InfoS("Cleaning up ResourceExport", "ResourceExport", klog.KObj(&re))
if err := remoteCluster.Delete(ctx, &re, &client.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
return err
}
}
}
return nil
}

Expand All @@ -200,9 +179,7 @@ func (c *StaleController) cleanupACNPResources(resImpList *mcsv1alpha1.ResourceI
for _, resImp := range resImpList.Items {
if resImp.Spec.Kind == common.AntreaClusterNetworkPolicyKind {
acnpNameFromResImp := common.AntreaMCSPrefix + resImp.Spec.Name
if _, ok := staleMCACNPItems[acnpNameFromResImp]; ok {
delete(staleMCACNPItems, acnpNameFromResImp)
}
delete(staleMCACNPItems, acnpNameFromResImp)
}
}
for _, stalePolicy := range staleMCACNPItems {
Expand Down Expand Up @@ -241,6 +218,81 @@ func (c *StaleController) cleanupTunnelEndpointImport(resImpList *mcsv1alpha1.Re
return nil
}

// cleanupTunnelEndpointResourceExport remove any Service/Endpoint kind of ResourceExport when it has no
// corresponding Service locally.
func (c *StaleController) cleanupServiceResourceExport(remoteCluster commonarea.RemoteCommonArea,
resExpList *mcsv1alpha1.ResourceExportList, localClusterID string) error {
svcExpList := &k8smcsv1alpha1.ServiceExportList{}
if err := c.List(ctx, svcExpList, &client.ListOptions{}); err != nil {
return err
}
allResExpItems := resExpList.Items
svcExpItems := svcExpList.Items
var staleResExpItems []mcsv1alpha1.ResourceExport

for i, resExp := range allResExpItems {
if (resExp.Spec.Kind == common.ServiceKind || resExp.Spec.Kind == common.EndpointsKind) && resExp.Labels[common.SourceClusterID] == localClusterID {
staleResExpItems = append(staleResExpItems, allResExpItems[i])
}
}

for k, re := range staleResExpItems {
for _, se := range svcExpItems {
if re.Spec.Name == se.Name && re.Spec.Namespace == se.Namespace {
// Set the valid ResourceExport item as empty ResourceExport, then all left non-empty items should be removed.
staleResExpItems[k] = mcsv1alpha1.ResourceExport{}
}
}
}

for _, r := range staleResExpItems {
re := r
if re.Name != "" {
klog.InfoS("Cleaning up ResourceExport", "ResourceExport", klog.KObj(&re))
if err := remoteCluster.Delete(ctx, &re, &client.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
return err
}
}
}
return nil
}

// cleanupTunnelEndpointResourceExport remove any TunnelEndpoint kind of ResourceExport when it has no
// corresponding TunnelEndpoint locally.
func (c *StaleController) cleanupTunnelEndpointResourceExport(remoteCluster commonarea.RemoteCommonArea,
resExpList *mcsv1alpha1.ResourceExportList, localClusterID string) error {
resExpItems := resExpList.Items
var teExpItems []mcsv1alpha1.ResourceExport
for _, res := range resExpItems {
if res.Spec.Kind == common.TunnelEndpointKind && res.Spec.ClusterID == localClusterID {
teExpItems = append(teExpItems, res)
}
}

var teList mcsv1alpha1.TunnelEndpointList
if err := c.Client.List(ctx, &teList, &client.ListOptions{}); err != nil {
return err
}

for i, res := range teExpItems {
for _, te := range teList.Items {
if res.Spec.Name == te.Name {
teExpItems[i] = mcsv1alpha1.ResourceExport{}
}
}
}

for i, res := range teExpItems {
if res.Name != "" {
teRes := teExpItems[i]
if err := remoteCluster.Delete(ctx, &teExpItems[i], &client.DeleteOptions{}); err != nil {
klog.V(2).InfoS("Cleaning up stale TunnelEndpoint kind of ResourceExport", "resourceexport", klog.KObj(&teRes))
}
}
}
return nil
}

// Enqueue will be called after StaleController is initialized.
func (c *StaleController) Enqueue() {
// The key can be anything as we only have single item.
Expand Down
40 changes: 36 additions & 4 deletions multicluster/controllers/multicluster/stale_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,35 +220,64 @@ func TestStaleController_CleanupResourceExport(t *testing.T) {
svcExpNginx := k8smcsv1alpha1.ServiceExport{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "nginx",
Name: "keep-nginx",
},
}
toDeleteSvcResExport := mcsv1alpha1.ResourceExport{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "cluster-a-default-tobedeleted-service",
Name: "cluster-a-default-nginx-service",
Labels: map[string]string{
common.SourceClusterID: "cluster-a",
},
},
Spec: mcsv1alpha1.ResourceExportSpec{
Name: "nginx",
Namespace: "default",
Kind: common.ServiceKind,
},
}
toDeleteEPResExport := mcsv1alpha1.ResourceExport{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "cluster-a-default-nginx-endpoint",
Labels: map[string]string{
common.SourceClusterID: "cluster-a",
},
},
Spec: mcsv1alpha1.ResourceExportSpec{
Name: "nginx",
Namespace: "default",
Kind: common.EndpointsKind,
},
}
toDeleteTEResExport := mcsv1alpha1.ResourceExport{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "cluster-a-default-tobedeleted-tunnelendpoint",
},
Spec: mcsv1alpha1.ResourceExportSpec{
Name: "tobedeleted",
Namespace: "default",
ClusterID: "cluster-a",
Kind: common.TunnelEndpointKind,
},
}
toKeepSvcResExport := mcsv1alpha1.ResourceExport{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "cluster-a-default-nginx-service",
Name: "cluster-a-default-keep-nginx-service",
Labels: map[string]string{
common.SourceClusterID: "cluster-a",
},
},
Spec: mcsv1alpha1.ResourceExportSpec{
Name: "nginx",
Name: "keep-nginx",
Namespace: "default",
Kind: common.ServiceKind,
},
}

svcResExportFromOther := mcsv1alpha1.ResourceExport{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Expand All @@ -260,6 +289,7 @@ func TestStaleController_CleanupResourceExport(t *testing.T) {
Spec: mcsv1alpha1.ResourceExportSpec{
Name: "nginx",
Namespace: "default",
Kind: common.ServiceKind,
},
}
tests := []struct {
Expand All @@ -279,6 +309,8 @@ func TestStaleController_CleanupResourceExport(t *testing.T) {
existResExpList: &mcsv1alpha1.ResourceExportList{
Items: []mcsv1alpha1.ResourceExport{
toDeleteSvcResExport,
toDeleteEPResExport,
toDeleteTEResExport,
toKeepSvcResExport,
svcResExportFromOther,
},
Expand Down

0 comments on commit b047dbb

Please sign in to comment.