Skip to content

Commit

Permalink
Change resource key logic for k8s (#4916)
Browse files Browse the repository at this point in the history
* Refactor not to use loadRunningManifests

Signed-off-by: Yoshiki Fujikane <ffjlabo@gmail.com>

* determine namespace when loading manifests from git repo

Details:
- [feat] get the resource info from the actual cluster in the deployExecutor.Execute
- [faet] implement the loader.refineNamespace to infer the namespace from the manifests and app.pipecd.yaml stored in the git repo
- [refactor] fix loader.NewLoader to pass isNamespacedResource
  - deployExecutor
  - rollbackExecutor
- [refactor] fix to pass isNamespacedResource on detector
  - [memo] detector checks the diff by 1 minute. It might think about the amount of the traffic to the k8s cluster.
- [refactor] fix to pass isNamespacedResource on planner
- [refactor] fix to pass isNamespacedResource on planpreview
- [refactor] remove the logic to fix the namespace when craeting the resource key on MakeResourceKey
  - maybe this is the refactoring for livestatestore
- [refactor] use the actual resource key, not the annotation's one.

Signed-off-by: Yoshiki Fujikane <ffjlabo@gmail.com>

* Define func to get resource info from cluster

Signed-off-by: Yoshiki Fujikane <ffjlabo@gmail.com>

* Refactor refineManifests

Signed-off-by: Yoshiki Fujikane <ffjlabo@gmail.com>

* Fix for failing test and warning

Signed-off-by: Yoshiki Fujikane <ffjlabo@gmail.com>

* Change method name to determineNamespace

Signed-off-by: Yoshiki Fujikane <ffjlabo@gmail.com>

* Refactor to use Infof

Signed-off-by: Yoshiki Fujikane <ffjlabo@gmail.com>

* Refactor to change the order of the import path

Signed-off-by: Yoshiki Fujikane <ffjlabo@gmail.com>

* Refactor to remve detector.isNamespacedResources

Signed-off-by: Yoshiki Fujikane <ffjlabo@gmail.com>

---------

Signed-off-by: Yoshiki Fujikane <ffjlabo@gmail.com>
  • Loading branch information
ffjlabo committed Jun 11, 2024
1 parent b2dca47 commit 8129078
Show file tree
Hide file tree
Showing 16 changed files with 434 additions and 99 deletions.
1 change: 1 addition & 0 deletions pkg/app/piped/controller/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func (p *planner) Run(ctx context.Context) error {
in := pln.Input{
ApplicationID: p.deployment.ApplicationId,
ApplicationName: p.deployment.ApplicationName,
PlatformProviderName: p.deployment.PlatformProvider,
GitPath: *p.deployment.GitPath,
Trigger: *p.deployment.Trigger,
MostRecentSuccessfulCommitHash: p.lastSuccessfulCommitHash,
Expand Down
17 changes: 12 additions & 5 deletions pkg/app/piped/driftdetector/kubernetes/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/pipe-cd/pipecd/pkg/app/piped/livestatestore/kubernetes"
provider "github.com/pipe-cd/pipecd/pkg/app/piped/platformprovider/kubernetes"
Expand Down Expand Up @@ -121,6 +122,12 @@ func (d *detector) Run(ctx context.Context) error {
}

func (d *detector) check(ctx context.Context) {
isNamespacedResources, err := provider.GetIsNamespacedResources(d.provider.KubernetesConfig)
if err != nil {
d.logger.Error("failed to get isNamespacedResources", zap.Error(err))
return
}

appsByRepo := d.listGroupedApplication()

for repoID, apps := range appsByRepo {
Expand Down Expand Up @@ -166,16 +173,16 @@ func (d *detector) check(ctx context.Context) {

// Start checking all applications in this repository.
for _, app := range apps {
if err := d.checkApplication(ctx, app, gitRepo, headCommit); err != nil {
if err := d.checkApplication(ctx, app, gitRepo, headCommit, isNamespacedResources); err != nil {
d.logger.Error(fmt.Sprintf("failed to check application: %s", app.Id), zap.Error(err))
}
}
}
}

func (d *detector) checkApplication(ctx context.Context, app *model.Application, repo git.Repo, headCommit git.Commit) error {
func (d *detector) checkApplication(ctx context.Context, app *model.Application, repo git.Repo, headCommit git.Commit, isNamespacedResources map[schema.GroupVersionKind]bool) error {
watchingResourceKinds := d.stateGetter.GetWatchingResourceKinds()
headManifests, err := d.loadHeadManifests(ctx, app, repo, headCommit, watchingResourceKinds)
headManifests, err := d.loadHeadManifests(ctx, app, repo, headCommit, watchingResourceKinds, isNamespacedResources)
if err != nil {
return err
}
Expand Down Expand Up @@ -219,7 +226,7 @@ func (d *detector) checkApplication(ctx context.Context, app *model.Application,
return d.reporter.ReportApplicationSyncState(ctx, app.Id, state)
}

func (d *detector) loadHeadManifests(ctx context.Context, app *model.Application, repo git.Repo, headCommit git.Commit, watchingResourceKinds []provider.APIVersionKind) ([]provider.Manifest, error) {
func (d *detector) loadHeadManifests(ctx context.Context, app *model.Application, repo git.Repo, headCommit git.Commit, watchingResourceKinds []provider.APIVersionKind, isNamespacedResources map[schema.GroupVersionKind]bool) ([]provider.Manifest, error) {
var (
manifestCache = provider.AppManifestsCache{
AppID: app.Id,
Expand Down Expand Up @@ -281,7 +288,7 @@ func (d *detector) loadHeadManifests(ctx context.Context, app *model.Application
}
}

loader := provider.NewLoader(app.Name, appDir, repoDir, app.GitPath.ConfigFilename, cfg.KubernetesApplicationSpec.Input, d.gitClient, d.logger)
loader := provider.NewLoader(app.Name, appDir, repoDir, app.GitPath.ConfigFilename, cfg.KubernetesApplicationSpec.Input, isNamespacedResources, d.gitClient, d.logger)
manifests, err = loader.LoadManifests(ctx)
if err != nil {
err = fmt.Errorf("failed to load new manifests: %w", err)
Expand Down
19 changes: 18 additions & 1 deletion pkg/app/piped/executor/kubernetes/baseline.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,24 @@ func (e *deployExecutor) ensureBaselineRollout(ctx context.Context) model.StageS

// Load running manifests at the most successful deployed commit.
e.LogPersister.Infof("Loading running manifests at commit %s for handling", runningCommit)
manifests, err := e.loadRunningManifests(ctx)
ds, err := e.RunningDSP.Get(ctx, e.LogPersister)
if err != nil {
e.LogPersister.Errorf("Failed to prepare running deploy source (%v)", err)
return model.StageStatus_STAGE_FAILURE
}

loader := provider.NewLoader(
e.Deployment.ApplicationName,
ds.AppDir,
ds.RepoDir,
e.Deployment.GitPath.ConfigFilename,
e.appCfg.Input,
e.isNamespacedResources,
e.GitClient,
e.Logger,
)

manifests, err := loadManifests(ctx, e.Deployment.ApplicationId, runningCommit, e.AppManifestsCache, loader, e.Logger)
if err != nil {
e.LogPersister.Errorf("Failed while loading running manifests (%v)", err)
return model.StageStatus_STAGE_FAILURE
Expand Down
58 changes: 18 additions & 40 deletions pkg/app/piped/executor/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/pipe-cd/pipecd/pkg/app/piped/executor"
provider "github.com/pipe-cd/pipecd/pkg/app/piped/platformprovider/kubernetes"
Expand All @@ -36,8 +37,9 @@ import (
type deployExecutor struct {
executor.Input

commit string
appCfg *config.KubernetesApplicationSpec
commit string
appCfg *config.KubernetesApplicationSpec
isNamespacedResources map[schema.GroupVersionKind]bool

loader provider.Loader
applierGetter applierGetter
Expand Down Expand Up @@ -75,6 +77,18 @@ func (e *deployExecutor) Execute(sig executor.StopSignal) model.StageStatus {
ctx := sig.Context()
e.commit = e.Deployment.Trigger.Commit.Hash

cp, ok := e.PipedConfig.FindPlatformProvider(e.Deployment.PlatformProvider, model.ApplicationKind_KUBERNETES)
if !ok {
e.LogPersister.Errorf("provider %s was not found", e.Deployment.PlatformProvider)
return model.StageStatus_STAGE_FAILURE
}
isNamespacedResources, err := provider.GetIsNamespacedResources(cp.KubernetesConfig)
if err != nil {
e.LogPersister.Errorf("failed to get isNamespacedResources %v", zap.Error(err))
return model.StageStatus_STAGE_FAILURE
}
e.isNamespacedResources = isNamespacedResources

ds, err := e.TargetDSP.Get(ctx, e.LogPersister)
if err != nil {
e.LogPersister.Errorf("Failed to prepare target deploy source data (%v)", err)
Expand Down Expand Up @@ -110,6 +124,7 @@ func (e *deployExecutor) Execute(sig executor.StopSignal) model.StageStatus {
ds.RepoDir,
e.Deployment.GitPath.ConfigFilename,
e.appCfg.Input,
e.isNamespacedResources,
e.GitClient,
e.Logger,
)
Expand Down Expand Up @@ -154,44 +169,7 @@ func (e *deployExecutor) Execute(sig executor.StopSignal) model.StageStatus {
return executor.DetermineStageStatus(sig.Signal(), originalStatus, status)
}

func (e *deployExecutor) loadRunningManifests(ctx context.Context) (manifests []provider.Manifest, err error) {
commit := e.Deployment.RunningCommitHash
if commit == "" {
return nil, fmt.Errorf("unable to determine running commit")
}

loader := &manifestsLoadFunc{
loadFunc: func(ctx context.Context) ([]provider.Manifest, error) {
ds, err := e.RunningDSP.Get(ctx, e.LogPersister)
if err != nil {
e.LogPersister.Errorf("Failed to prepare running deploy source (%v)", err)
return nil, err
}

loader := provider.NewLoader(
e.Deployment.ApplicationName,
ds.AppDir,
ds.RepoDir,
e.Deployment.GitPath.ConfigFilename,
e.appCfg.Input,
e.GitClient,
e.Logger,
)
return loader.LoadManifests(ctx)
},
}

return loadManifests(ctx, e.Deployment.ApplicationId, commit, e.AppManifestsCache, loader, e.Logger)
}

type manifestsLoadFunc struct {
loadFunc func(context.Context) ([]provider.Manifest, error)
}

func (l *manifestsLoadFunc) LoadManifests(ctx context.Context) ([]provider.Manifest, error) {
return l.loadFunc(ctx)
}

// loadManifests loads the manifest using the given loader. It caches the loaded manifests for the given commit.
func loadManifests(ctx context.Context, appID, commit string, manifestsCache cache.Cache, loader provider.Loader, logger *zap.Logger) (manifests []provider.Manifest, err error) {
cache := provider.AppManifestsCache{
AppID: appID,
Expand Down
21 changes: 20 additions & 1 deletion pkg/app/piped/executor/kubernetes/primary.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,26 @@ func (e *deployExecutor) ensurePrimaryRollout(ctx context.Context) model.StageSt

// Find the running resources that are not defined in Git.
e.LogPersister.Info("Start finding all running PRIMARY resources but no longer defined in Git")
runningManifests, err := e.loadRunningManifests(ctx)
// Load running manifests at the most successful deployed commit.
e.LogPersister.Infof("Loading running manifests at commit %s for handling", e.Deployment.RunningCommitHash)
ds, err := e.RunningDSP.Get(ctx, e.LogPersister)
if err != nil {
e.LogPersister.Errorf("Failed to prepare running deploy source (%v)", err)
return model.StageStatus_STAGE_FAILURE
}

loader := provider.NewLoader(
e.Deployment.ApplicationName,
ds.AppDir,
ds.RepoDir,
e.Deployment.GitPath.ConfigFilename,
e.appCfg.Input,
e.isNamespacedResources,
e.GitClient,
e.Logger,
)

runningManifests, err := loadManifests(ctx, e.Deployment.ApplicationId, e.Deployment.RunningCommitHash, e.AppManifestsCache, loader, e.Logger)
if err != nil {
e.LogPersister.Errorf("Failed while loading running manifests (%v)", err)
return model.StageStatus_STAGE_FAILURE
Expand Down
41 changes: 39 additions & 2 deletions pkg/app/piped/executor/kubernetes/rollback.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (
"strings"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/tools/clientcmd"

"github.com/pipe-cd/pipecd/pkg/app/piped/executor"
provider "github.com/pipe-cd/pipecd/pkg/app/piped/platformprovider/kubernetes"
Expand All @@ -31,7 +34,8 @@ import (
type rollbackExecutor struct {
executor.Input

appDir string
appDir string
isNamespacedResources map[schema.GroupVersionKind]bool
}

func (e *rollbackExecutor) Execute(sig executor.StopSignal) model.StageStatus {
Expand All @@ -41,6 +45,39 @@ func (e *rollbackExecutor) Execute(sig executor.StopSignal) model.StageStatus {
status model.StageStatus
)

// Use discovery to discover APIs supported by the Kubernetes API server.
// This should be run periodically with a low rate because the APIs are not added frequently.
// https://godoc.org/k8s.io/client-go/discovery
cp, ok := e.PipedConfig.FindPlatformProvider(e.Deployment.PlatformProvider, model.ApplicationKind_KUBERNETES)
if !ok {
e.LogPersister.Errorf("provider %s was not found", e.Deployment.PlatformProvider)
return model.StageStatus_STAGE_FAILURE
}
kubeConfig, err := clientcmd.BuildConfigFromFlags(cp.KubernetesConfig.MasterURL, cp.KubernetesConfig.KubeConfigPath)
if err != nil {
e.LogPersister.Errorf("failed to build kube config", zap.Error(err))
return model.StageStatus_STAGE_FAILURE
}
discoveryClient, err := discovery.NewDiscoveryClientForConfig(kubeConfig)
if err != nil {
e.LogPersister.Errorf("failed to create discovery client: %v", zap.Error(err))
return model.StageStatus_STAGE_FAILURE
}
groupResources, err := discoveryClient.ServerPreferredResources()
if err != nil {
e.LogPersister.Errorf("failed to fetch preferred resources: %v", zap.Error(err))
return model.StageStatus_STAGE_FAILURE
}
e.LogPersister.Infof("successfully preferred resources that contains for %d groups", len(groupResources))

e.isNamespacedResources = make(map[schema.GroupVersionKind]bool)
for _, gr := range groupResources {
for _, resource := range gr.APIResources {
gvk := schema.FromAPIVersionAndKind(gr.GroupVersion, resource.Kind)
e.isNamespacedResources[gvk] = resource.Namespaced
}
}

switch model.Stage(e.Stage.Name) {
case model.StageRollback:
status = e.ensureRollback(ctx)
Expand Down Expand Up @@ -82,7 +119,7 @@ func (e *rollbackExecutor) ensureRollback(ctx context.Context) model.StageStatus

e.appDir = ds.AppDir

loader := provider.NewLoader(e.Deployment.ApplicationName, ds.AppDir, ds.RepoDir, e.Deployment.GitPath.ConfigFilename, appCfg.Input, e.GitClient, e.Logger)
loader := provider.NewLoader(e.Deployment.ApplicationName, ds.AppDir, ds.RepoDir, e.Deployment.GitPath.ConfigFilename, appCfg.Input, e.isNamespacedResources, e.GitClient, e.Logger)
e.Logger.Info("start executing kubernetes stage",
zap.String("stage-name", e.Stage.Name),
zap.String("app-dir", ds.AppDir),
Expand Down
22 changes: 19 additions & 3 deletions pkg/app/piped/planner/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/pipe-cd/pipecd/pkg/app/piped/deploysource"
"github.com/pipe-cd/pipecd/pkg/app/piped/planner"
Expand All @@ -40,6 +41,7 @@ const (

// Planner plans the deployment pipeline for kubernetes application.
type Planner struct {
isNamespacedResources map[schema.GroupVersionKind]bool
}

type registerer interface {
Expand All @@ -48,11 +50,25 @@ type registerer interface {

// Register registers this planner into the given registerer.
func Register(r registerer) {
r.Register(model.ApplicationKind_KUBERNETES, &Planner{})
r.Register(model.ApplicationKind_KUBERNETES, &Planner{
isNamespacedResources: make(map[schema.GroupVersionKind]bool),
})
}

// Plan decides which pipeline should be used for the given input.
func (p *Planner) Plan(ctx context.Context, in planner.Input) (out planner.Output, err error) {
cp, ok := in.PipedConfig.FindPlatformProvider(in.PlatformProviderName, model.ApplicationKind_KUBERNETES)
if !ok {
err = fmt.Errorf("provider %s was not found", in.PlatformProviderName)
return
}
isNamespacedResources, err := provider.GetIsNamespacedResources(cp.KubernetesConfig)
if err != nil {
err = fmt.Errorf("failed to get isNamespacedResources: %v", err)
return
}
p.isNamespacedResources = isNamespacedResources

ds, err := in.TargetDSP.Get(ctx, io.Discard)
if err != nil {
err = fmt.Errorf("error while preparing deploy source data (%v)", err)
Expand Down Expand Up @@ -81,7 +97,7 @@ func (p *Planner) Plan(ctx context.Context, in planner.Input) (out planner.Outpu
newManifests, ok := manifestCache.Get(in.Trigger.Commit.Hash)
if !ok {
// When the manifests were not in the cache we have to load them.
loader := provider.NewLoader(in.ApplicationName, ds.AppDir, ds.RepoDir, in.GitPath.ConfigFilename, cfg.Input, in.GitClient, in.Logger)
loader := provider.NewLoader(in.ApplicationName, ds.AppDir, ds.RepoDir, in.GitPath.ConfigFilename, cfg.Input, p.isNamespacedResources, in.GitClient, in.Logger)
newManifests, err = loader.LoadManifests(ctx)
if err != nil {
return
Expand Down Expand Up @@ -205,7 +221,7 @@ func (p *Planner) Plan(ctx context.Context, in planner.Input) (out planner.Outpu
err = fmt.Errorf("unable to find the running configuration (%v)", err)
return
}
loader := provider.NewLoader(in.ApplicationName, runningDs.AppDir, runningDs.RepoDir, in.GitPath.ConfigFilename, runningCfg.Input, in.GitClient, in.Logger)
loader := provider.NewLoader(in.ApplicationName, runningDs.AppDir, runningDs.RepoDir, in.GitPath.ConfigFilename, runningCfg.Input, p.isNamespacedResources, in.GitClient, in.Logger)
oldManifests, err = loader.LoadManifests(ctx)
if err != nil {
err = fmt.Errorf("failed to load previously deployed manifests: %w", err)
Expand Down
1 change: 1 addition & 0 deletions pkg/app/piped/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type gitClient interface {
type Input struct {
ApplicationID string
ApplicationName string
PlatformProviderName string
GitPath model.ApplicationGitPath
Trigger model.DeploymentTrigger
MostRecentSuccessfulCommitHash string
Expand Down
7 changes: 4 additions & 3 deletions pkg/app/piped/planpreview/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,10 @@ func (b *builder) plan(ctx context.Context, app *model.Application, targetDSP de
}

in := planner.Input{
ApplicationID: app.Id,
ApplicationName: app.Name,
GitPath: *app.GitPath,
ApplicationID: app.Id,
ApplicationName: app.Name,
PlatformProviderName: app.PlatformProvider,
GitPath: *app.GitPath,
Trigger: model.DeploymentTrigger{
Commit: &model.Commit{
Branch: b.repoCfg.Branch,
Expand Down
Loading

0 comments on commit 8129078

Please sign in to comment.