From d2fe86e3363f082ccb4ef92737a462e2bfee9489 Mon Sep 17 00:00:00 2001 From: pritidesai Date: Tue, 6 Sep 2022 17:18:30 -0700 Subject: [PATCH] de-dup order and resource dependencies In case of a pipeline with a huge list of redundant dependencies, the list is growing exponentially. This way of calculating the list of dependencies is causing extra delay in the validation cycle. This delay sometimes hit the webhook timeout during validation. This is one of the changes being proposed to make the validation cycle efficient and avoid unneccesary delay. --- pkg/apis/pipeline/v1/pipeline_types.go | 37 +++------ pkg/apis/pipeline/v1/pipeline_types_test.go | 29 +++++++ pkg/apis/pipeline/v1beta1/pipeline_types.go | 39 +++------ .../pipeline/v1beta1/pipeline_types_test.go | 47 +++++++++-- .../resources/pipelinerunstate_test.go | 80 ++++++++++++++++++- 5 files changed, 170 insertions(+), 62 deletions(-) diff --git a/pkg/apis/pipeline/v1/pipeline_types.go b/pkg/apis/pipeline/v1/pipeline_types.go index bfa88bd71ff..7700af48fdf 100644 --- a/pkg/apis/pipeline/v1/pipeline_types.go +++ b/pkg/apis/pipeline/v1/pipeline_types.go @@ -468,41 +468,22 @@ func (pt PipelineTask) Validate(ctx context.Context) (errs *apis.FieldError) { return } -// Deps returns all other PipelineTask dependencies of this PipelineTask, based on ordering +// Deps returns all other PipelineTask dependencies of this PipelineTask, based on resource usage or ordering func (pt PipelineTask) Deps() []string { - deps := []string{} + // hold the list of dependencies in a set to avoid duplicates + deps := sets.NewString() - deps = append(deps, pt.resultDeps()...) - deps = append(deps, pt.orderingDeps()...) - - uniqueDeps := sets.NewString() - for _, w := range deps { - if uniqueDeps.Has(w) { - continue - } - uniqueDeps.Insert(w) - } - - return uniqueDeps.List() -} - -func (pt PipelineTask) resultDeps() []string { - resultDeps := []string{} - - // Add any dependents from result references. + // add any new dependents from result references - resource dependency for _, ref := range PipelineTaskResultRefs(&pt) { - resultDeps = append(resultDeps, ref.PipelineTask) + deps.Insert(ref.PipelineTask) } - return resultDeps -} - -func (pt PipelineTask) orderingDeps() []string { - orderingDeps := []string{} + // add any new dependents from runAfter - order dependency for _, runAfter := range pt.RunAfter { - orderingDeps = append(orderingDeps, runAfter) + deps.Insert(runAfter) } - return orderingDeps + + return deps.List() } // PipelineTaskList is a list of PipelineTasks diff --git a/pkg/apis/pipeline/v1/pipeline_types_test.go b/pkg/apis/pipeline/v1/pipeline_types_test.go index 55220487d88..2f4199aaa17 100644 --- a/pkg/apis/pipeline/v1/pipeline_types_test.go +++ b/pkg/apis/pipeline/v1/pipeline_types_test.go @@ -528,6 +528,32 @@ func TestPipelineTaskList_Deps(t *testing.T) { }, }}, }, + }, { + Name: "task-7", + When: WhenExpressions{{ + Input: "$(tasks.task-3.results.result1)", + Operator: "in", + Values: []string{"foo"}, + }, { + Input: "$(tasks.task-3.results.result2)", + Operator: "in", + Values: []string{"foo"}, + }}, + }, { + Name: "task-8", + Params: []Param{{ + Value: ParamValue{ + Type: "string", + StringVal: "$(tasks.task-4.results.result1)", + }}, { + Value: ParamValue{ + Type: "string", + StringVal: "$(tasks.task-4.results.result2)", + }}, + }, + }, { + Name: "task-9", + RunAfter: []string{"task-1", "task-1", "task-1", "task-1"}, }}, expectedDeps: map[string][]string{ "task-2": {"task-1"}, @@ -535,6 +561,9 @@ func TestPipelineTaskList_Deps(t *testing.T) { "task-4": {"task-1", "task-2", "task-3"}, "task-5": {"task-1", "task-2", "task-3", "task-4"}, "task-6": {"task-1", "task-2", "task-3", "task-4", "task-5"}, + "task-7": {"task-3"}, + "task-8": {"task-4"}, + "task-9": {"task-1"}, }, }} for _, tc := range pipelines { diff --git a/pkg/apis/pipeline/v1beta1/pipeline_types.go b/pkg/apis/pipeline/v1beta1/pipeline_types.go index 680e113a3c4..b3e6c525f22 100644 --- a/pkg/apis/pipeline/v1beta1/pipeline_types.go +++ b/pkg/apis/pipeline/v1beta1/pipeline_types.go @@ -512,44 +512,29 @@ func (pt PipelineTask) Validate(ctx context.Context) (errs *apis.FieldError) { // Deps returns all other PipelineTask dependencies of this PipelineTask, based on resource usage or ordering func (pt PipelineTask) Deps() []string { - deps := []string{} + // hold the list of dependencies in a set to avoid duplicates + deps := sets.NewString() - deps = append(deps, pt.resourceDeps()...) - deps = append(deps, pt.orderingDeps()...) - - uniqueDeps := sets.NewString() - for _, w := range deps { - if uniqueDeps.Has(w) { - continue - } - uniqueDeps.Insert(w) - } - - return uniqueDeps.List() -} - -func (pt PipelineTask) resourceDeps() []string { - resourceDeps := []string{} + // add any new dependents from a resource/workspace if pt.Resources != nil { for _, rd := range pt.Resources.Inputs { - resourceDeps = append(resourceDeps, rd.From...) + for _, f := range rd.From { + deps.Insert(f) + } } } - // Add any dependents from result references. + // add any new dependents from result references - resource dependency for _, ref := range PipelineTaskResultRefs(&pt) { - resourceDeps = append(resourceDeps, ref.PipelineTask) + deps.Insert(ref.PipelineTask) } - return resourceDeps -} - -func (pt PipelineTask) orderingDeps() []string { - orderingDeps := []string{} + // add any new dependents from runAfter - order dependency for _, runAfter := range pt.RunAfter { - orderingDeps = append(orderingDeps, runAfter) + deps.Insert(runAfter) } - return orderingDeps + + return deps.List() } // PipelineTaskList is a list of PipelineTasks diff --git a/pkg/apis/pipeline/v1beta1/pipeline_types_test.go b/pkg/apis/pipeline/v1beta1/pipeline_types_test.go index 8651f86fa7d..848e3cb56e7 100644 --- a/pkg/apis/pipeline/v1beta1/pipeline_types_test.go +++ b/pkg/apis/pipeline/v1beta1/pipeline_types_test.go @@ -650,13 +650,50 @@ func TestPipelineTaskList_Deps(t *testing.T) { }, }}, }, + }, { + Name: "task-7", + Resources: &PipelineTaskResources{ + Inputs: []PipelineTaskInputResource{{ + From: []string{"task-1", "task-1"}, + }}, + }, + }, { + Name: "task-8", + WhenExpressions: WhenExpressions{{ + Input: "$(tasks.task-3.results.result1)", + Operator: "in", + Values: []string{"foo"}, + }, { + Input: "$(tasks.task-3.results.result2)", + Operator: "in", + Values: []string{"foo"}, + }}, + }, { + Name: "task-9", + Params: []Param{{ + Value: ParamValue{ + Type: "string", + StringVal: "$(tasks.task-4.results.result1)", + }}, { + Value: ParamValue{ + Type: "string", + StringVal: "$(tasks.task-4.results.result2)", + }}, + }, + }, { + Name: "task-10", + RunAfter: []string{"task-1", "task-1", "task-1", "task-1"}, }}, expectedDeps: map[string][]string{ - "task-2": {"task-1"}, - "task-3": {"task-1", "task-2"}, - "task-4": {"task-1", "task-2", "task-3"}, - "task-5": {"task-1", "task-2", "task-3", "task-4"}, - "task-6": {"task-1", "task-2", "task-3", "task-4", "task-5"}, + "task-2": {"task-1"}, + "task-3": {"task-1", "task-2"}, + "task-4": {"task-1", "task-2", "task-3"}, + "task-5": {"task-1", "task-2", "task-3", "task-4"}, + "task-6": {"task-1", "task-2", "task-3", "task-4", "task-5"}, + "task-7": {"task-1"}, + "task-8": {"task-3"}, + "task-9": {"task-4"}, + "task-10": {"task-1"}, }, }} for _, tc := range pipelines { diff --git a/pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go b/pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go index c5890a57e33..3c2a8c70f3d 100644 --- a/pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go +++ b/pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go @@ -1320,7 +1320,7 @@ func TestDAGExecutionQueueSequentialRuns(t *testing.T) { } func TestPipelineRunState_CompletedOrSkippedDAGTasks(t *testing.T) { - largePipelineState := buildPipelineStateWithLargeDepencyGraph(t) + largePipelineState := buildPipelineStateWithLargeDependencyGraph(t) tcs := []struct { name string state PipelineRunState @@ -1365,6 +1365,14 @@ func TestPipelineRunState_CompletedOrSkippedDAGTasks(t *testing.T) { name: "large deps, not started", state: largePipelineState, expectedNames: []string{}, + }, { + name: "large deps through params, not started", + state: buildPipelineStateWithMultipleTaskResults(t, false), + expectedNames: []string{}, + }, { + name: "large deps through params and when expressions, not started", + state: buildPipelineStateWithMultipleTaskResults(t, true), + expectedNames: []string{}, }, { name: "one-run-started", state: oneRunStartedState, @@ -1401,7 +1409,7 @@ func TestPipelineRunState_CompletedOrSkippedDAGTasks(t *testing.T) { } } -func buildPipelineStateWithLargeDepencyGraph(t *testing.T) PipelineRunState { +func buildPipelineStateWithLargeDependencyGraph(t *testing.T) PipelineRunState { t.Helper() var task = &v1beta1.Task{ ObjectMeta: metav1.ObjectMeta{ @@ -1460,6 +1468,74 @@ func buildPipelineStateWithLargeDepencyGraph(t *testing.T) PipelineRunState { return pipelineRunState } +func buildPipelineStateWithMultipleTaskResults(t *testing.T, includeWhen bool) PipelineRunState { + t.Helper() + var task = &v1beta1.Task{ + ObjectMeta: metav1.ObjectMeta{ + Name: "task", + }, + Spec: v1beta1.TaskSpec{ + Steps: []v1beta1.Step{{ + Name: "step1", + }}, + }, + } + var pipelineRunState PipelineRunState + pipelineRunState = []*ResolvedPipelineTask{{ + PipelineTask: &v1beta1.PipelineTask{ + Name: "t1", + TaskRef: &v1beta1.TaskRef{Name: "task"}, + }, + TaskRun: nil, + ResolvedTaskResources: &resources.ResolvedTaskResources{ + TaskSpec: &task.Spec, + }, + }} + for i := 2; i < 400; i++ { + var params []v1beta1.Param + whenExpressions := v1beta1.WhenExpressions{} + var alpha byte + // the task has a reference to multiple task results (a through j) from each parent task - causing a redundant references + // the task dependents on all predecessors in a graph through params and/or whenExpressions + for j := 1; j < i; j++ { + for alpha = 'a'; alpha <= 'j'; alpha++ { + // include param with task results + params = append(params, v1beta1.Param{ + Name: fmt.Sprintf("%c", alpha), + Value: v1beta1.ParamValue{ + Type: v1beta1.ParamTypeString, + StringVal: fmt.Sprintf("$(tasks.t%d.results.%c)", j, alpha), + }, + }) + } + if includeWhen { + for alpha = 'a'; alpha <= 'j'; alpha++ { + // include when expressions with task results + whenExpressions = append(whenExpressions, v1beta1.WhenExpression{ + Input: fmt.Sprintf("$(tasks.t%d.results.%c)", j, alpha), + Operator: selection.In, + Values: []string{"true"}, + }) + } + } + } + pipelineRunState = append(pipelineRunState, &ResolvedPipelineTask{ + PipelineTask: &v1beta1.PipelineTask{ + Name: fmt.Sprintf("t%d", i), + Params: params, + TaskRef: &v1beta1.TaskRef{Name: "task"}, + WhenExpressions: whenExpressions, + }, + TaskRun: nil, + ResolvedTaskResources: &resources.ResolvedTaskResources{ + TaskSpec: &task.Spec, + }, + }, + ) + } + return pipelineRunState +} + func TestPipelineRunState_GetFinalTasksAndNames(t *testing.T) { tcs := []struct { name string