Skip to content

Commit

Permalink
Merge pull request #8850 from killianmuldoon/pr-stop-caching-pods
Browse files Browse the repository at this point in the history
🐛 ClusterCacheTracker: Stop pod caching when checking workload cluster
  • Loading branch information
k8s-ci-robot committed Jun 14, 2023
2 parents 06d5b86 + ed31880 commit 27195c0
Showing 1 changed file with 25 additions and 14 deletions.
39 changes: 25 additions & 14 deletions controllers/remote/cluster_cache_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,14 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
}

// Create a client and a cache for the cluster.
c, cache, err := t.createClient(ctx, config, cluster, indexes)
c, uncachedClient, cache, err := t.createClient(ctx, config, cluster, indexes)
if err != nil {
return nil, err
}

// Detect if the controller is running on the workload cluster.
runningOnCluster, err := t.runningOnWorkloadCluster(ctx, c, cluster)
// This function uses an uncached client to ensure pods aren't cached by the long-lived client.
runningOnCluster, err := t.runningOnWorkloadCluster(ctx, uncachedClient, cluster)
if err != nil {
return nil, err
}
Expand All @@ -303,7 +304,7 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
config.Host = inClusterConfig.Host

// Create a new client and overwrite the previously created client.
c, cache, err = t.createClient(ctx, config, cluster, indexes)
c, _, cache, err = t.createClient(ctx, config, cluster, indexes)
if err != nil {
return nil, errors.Wrap(err, "error creating client for self-hosted cluster")
}
Expand Down Expand Up @@ -355,26 +356,26 @@ func (t *ClusterCacheTracker) runningOnWorkloadCluster(ctx context.Context, c cl
return t.controllerPodMetadata.UID == pod.UID, nil
}

// createClient creates a client and a mapper based on a rest.Config.
func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Config, cluster client.ObjectKey, indexes []Index) (client.Client, *stoppableCache, error) {
// createClient creates a cached client, and uncached client and a mapper based on a rest.Config.
func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Config, cluster client.ObjectKey, indexes []Index) (client.Client, client.Client, *stoppableCache, error) {
// Create a http client for the cluster.
httpClient, err := rest.HTTPClientFor(config)
if err != nil {
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating http client", cluster.String())
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating http client", cluster.String())
}

// Create a mapper for it
mapper, err := apiutil.NewDynamicRESTMapper(config, httpClient)
if err != nil {
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating dynamic rest mapper", cluster.String())
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating dynamic rest mapper", cluster.String())
}

// Verify if we can get a rest mapping from the workload cluster apiserver.
// Note: This also checks if the apiserver is up in general. We do this already here
// to avoid further effort creating a cache and a client and to produce a clearer error message.
_, err = mapper.RESTMapping(corev1.SchemeGroupVersion.WithKind("Node").GroupKind(), corev1.SchemeGroupVersion.Version)
if err != nil {
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error getting rest mapping", cluster.String())
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error getting rest mapping", cluster.String())
}

// Create the cache for the remote cluster
Expand All @@ -385,7 +386,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con
}
remoteCache, err := cache.New(config, cacheOptions)
if err != nil {
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating cache", cluster.String())
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating cache", cluster.String())
}

cacheCtx, cacheCtxCancel := context.WithCancel(ctx)
Expand All @@ -398,12 +399,12 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con

for _, index := range indexes {
if err := cache.IndexField(ctx, index.Object, index.Field, index.ExtractValue); err != nil {
return nil, nil, errors.Wrapf(err, "error adding index for field %q to cache for remote cluster %q", index.Field, cluster.String())
return nil, nil, nil, errors.Wrapf(err, "error adding index for field %q to cache for remote cluster %q", index.Field, cluster.String())
}
}

// Create the client for the remote cluster
c, err := client.New(config, client.Options{
cachedClient, err := client.New(config, client.Options{
Scheme: t.scheme,
Mapper: mapper,
HTTPClient: httpClient,
Expand All @@ -414,9 +415,19 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con
},
})
if err != nil {
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q", cluster.String())
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q", cluster.String())
}

// Create an uncached client. This is used in `runningOnWorkloadCluster` to ensure we don't continuously cache
// pods in the client.
uncachedClient, err := client.New(config, client.Options{
Scheme: t.scheme,
Mapper: mapper,
HTTPClient: httpClient,
})
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "error creating uncached client for remote cluster %q", cluster.String())
}
// Start the cache!!!
go cache.Start(cacheCtx) //nolint:errcheck

Expand All @@ -425,7 +436,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con
defer cacheSyncCtxCancel()
if !cache.WaitForCacheSync(cacheSyncCtx) {
cache.Stop()
return nil, nil, fmt.Errorf("failed waiting for cache for remote cluster %v to sync: %w", cluster, cacheCtx.Err())
return nil, nil, nil, fmt.Errorf("failed waiting for cache for remote cluster %v to sync: %w", cluster, cacheCtx.Err())
}

// Start cluster healthcheck!!!
Expand All @@ -435,7 +446,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con
httpClient: httpClient,
})

return c, cache, nil
return cachedClient, uncachedClient, cache, nil
}

// deleteAccessor stops a clusterAccessor's cache and removes the clusterAccessor from the tracker.
Expand Down

0 comments on commit 27195c0

Please sign in to comment.