Skip to content

Commit

Permalink
de-dup order and resource dependencies
Browse files Browse the repository at this point in the history
In case of a pipeline with a huge list of redundant dependencies, the list is
growing exponentially. This way of calculating the list of dependencies is
causing extra delay in the validation cycle. This delay sometimes hit the
webhook timeout during validation. This is one of the changes being proposed to
make the validation cycle efficient and avoid unneccesary delay.
  • Loading branch information
pritidesai committed Sep 8, 2022
1 parent 9275421 commit d2fe86e
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 62 deletions.
37 changes: 9 additions & 28 deletions pkg/apis/pipeline/v1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,41 +468,22 @@ func (pt PipelineTask) Validate(ctx context.Context) (errs *apis.FieldError) {
return
}

// Deps returns all other PipelineTask dependencies of this PipelineTask, based on ordering
// Deps returns all other PipelineTask dependencies of this PipelineTask, based on resource usage or ordering
func (pt PipelineTask) Deps() []string {
deps := []string{}
// hold the list of dependencies in a set to avoid duplicates
deps := sets.NewString()

deps = append(deps, pt.resultDeps()...)
deps = append(deps, pt.orderingDeps()...)

uniqueDeps := sets.NewString()
for _, w := range deps {
if uniqueDeps.Has(w) {
continue
}
uniqueDeps.Insert(w)
}

return uniqueDeps.List()
}

func (pt PipelineTask) resultDeps() []string {
resultDeps := []string{}

// Add any dependents from result references.
// add any new dependents from result references - resource dependency
for _, ref := range PipelineTaskResultRefs(&pt) {
resultDeps = append(resultDeps, ref.PipelineTask)
deps.Insert(ref.PipelineTask)
}

return resultDeps
}

func (pt PipelineTask) orderingDeps() []string {
orderingDeps := []string{}
// add any new dependents from runAfter - order dependency
for _, runAfter := range pt.RunAfter {
orderingDeps = append(orderingDeps, runAfter)
deps.Insert(runAfter)
}
return orderingDeps

return deps.List()
}

// PipelineTaskList is a list of PipelineTasks
Expand Down
29 changes: 29 additions & 0 deletions pkg/apis/pipeline/v1/pipeline_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,13 +528,42 @@ func TestPipelineTaskList_Deps(t *testing.T) {
},
}},
},
}, {
Name: "task-7",
When: WhenExpressions{{
Input: "$(tasks.task-3.results.result1)",
Operator: "in",
Values: []string{"foo"},
}, {
Input: "$(tasks.task-3.results.result2)",
Operator: "in",
Values: []string{"foo"},
}},
}, {
Name: "task-8",
Params: []Param{{
Value: ParamValue{
Type: "string",
StringVal: "$(tasks.task-4.results.result1)",
}}, {
Value: ParamValue{
Type: "string",
StringVal: "$(tasks.task-4.results.result2)",
}},
},
}, {
Name: "task-9",
RunAfter: []string{"task-1", "task-1", "task-1", "task-1"},
}},
expectedDeps: map[string][]string{
"task-2": {"task-1"},
"task-3": {"task-1"},
"task-4": {"task-1", "task-2", "task-3"},
"task-5": {"task-1", "task-2", "task-3", "task-4"},
"task-6": {"task-1", "task-2", "task-3", "task-4", "task-5"},
"task-7": {"task-3"},
"task-8": {"task-4"},
"task-9": {"task-1"},
},
}}
for _, tc := range pipelines {
Expand Down
39 changes: 12 additions & 27 deletions pkg/apis/pipeline/v1beta1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,44 +512,29 @@ func (pt PipelineTask) Validate(ctx context.Context) (errs *apis.FieldError) {

// Deps returns all other PipelineTask dependencies of this PipelineTask, based on resource usage or ordering
func (pt PipelineTask) Deps() []string {
deps := []string{}
// hold the list of dependencies in a set to avoid duplicates
deps := sets.NewString()

deps = append(deps, pt.resourceDeps()...)
deps = append(deps, pt.orderingDeps()...)

uniqueDeps := sets.NewString()
for _, w := range deps {
if uniqueDeps.Has(w) {
continue
}
uniqueDeps.Insert(w)
}

return uniqueDeps.List()
}

func (pt PipelineTask) resourceDeps() []string {
resourceDeps := []string{}
// add any new dependents from a resource/workspace
if pt.Resources != nil {
for _, rd := range pt.Resources.Inputs {
resourceDeps = append(resourceDeps, rd.From...)
for _, f := range rd.From {
deps.Insert(f)
}
}
}

// Add any dependents from result references.
// add any new dependents from result references - resource dependency
for _, ref := range PipelineTaskResultRefs(&pt) {
resourceDeps = append(resourceDeps, ref.PipelineTask)
deps.Insert(ref.PipelineTask)
}

return resourceDeps
}

func (pt PipelineTask) orderingDeps() []string {
orderingDeps := []string{}
// add any new dependents from runAfter - order dependency
for _, runAfter := range pt.RunAfter {
orderingDeps = append(orderingDeps, runAfter)
deps.Insert(runAfter)
}
return orderingDeps

return deps.List()
}

// PipelineTaskList is a list of PipelineTasks
Expand Down
47 changes: 42 additions & 5 deletions pkg/apis/pipeline/v1beta1/pipeline_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,13 +650,50 @@ func TestPipelineTaskList_Deps(t *testing.T) {
},
}},
},
}, {
Name: "task-7",
Resources: &PipelineTaskResources{
Inputs: []PipelineTaskInputResource{{
From: []string{"task-1", "task-1"},
}},
},
}, {
Name: "task-8",
WhenExpressions: WhenExpressions{{
Input: "$(tasks.task-3.results.result1)",
Operator: "in",
Values: []string{"foo"},
}, {
Input: "$(tasks.task-3.results.result2)",
Operator: "in",
Values: []string{"foo"},
}},
}, {
Name: "task-9",
Params: []Param{{
Value: ParamValue{
Type: "string",
StringVal: "$(tasks.task-4.results.result1)",
}}, {
Value: ParamValue{
Type: "string",
StringVal: "$(tasks.task-4.results.result2)",
}},
},
}, {
Name: "task-10",
RunAfter: []string{"task-1", "task-1", "task-1", "task-1"},
}},
expectedDeps: map[string][]string{
"task-2": {"task-1"},
"task-3": {"task-1", "task-2"},
"task-4": {"task-1", "task-2", "task-3"},
"task-5": {"task-1", "task-2", "task-3", "task-4"},
"task-6": {"task-1", "task-2", "task-3", "task-4", "task-5"},
"task-2": {"task-1"},
"task-3": {"task-1", "task-2"},
"task-4": {"task-1", "task-2", "task-3"},
"task-5": {"task-1", "task-2", "task-3", "task-4"},
"task-6": {"task-1", "task-2", "task-3", "task-4", "task-5"},
"task-7": {"task-1"},
"task-8": {"task-3"},
"task-9": {"task-4"},
"task-10": {"task-1"},
},
}}
for _, tc := range pipelines {
Expand Down
80 changes: 78 additions & 2 deletions pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1320,7 +1320,7 @@ func TestDAGExecutionQueueSequentialRuns(t *testing.T) {
}

func TestPipelineRunState_CompletedOrSkippedDAGTasks(t *testing.T) {
largePipelineState := buildPipelineStateWithLargeDepencyGraph(t)
largePipelineState := buildPipelineStateWithLargeDependencyGraph(t)
tcs := []struct {
name string
state PipelineRunState
Expand Down Expand Up @@ -1365,6 +1365,14 @@ func TestPipelineRunState_CompletedOrSkippedDAGTasks(t *testing.T) {
name: "large deps, not started",
state: largePipelineState,
expectedNames: []string{},
}, {
name: "large deps through params, not started",
state: buildPipelineStateWithMultipleTaskResults(t, false),
expectedNames: []string{},
}, {
name: "large deps through params and when expressions, not started",
state: buildPipelineStateWithMultipleTaskResults(t, true),
expectedNames: []string{},
}, {
name: "one-run-started",
state: oneRunStartedState,
Expand Down Expand Up @@ -1401,7 +1409,7 @@ func TestPipelineRunState_CompletedOrSkippedDAGTasks(t *testing.T) {
}
}

func buildPipelineStateWithLargeDepencyGraph(t *testing.T) PipelineRunState {
func buildPipelineStateWithLargeDependencyGraph(t *testing.T) PipelineRunState {
t.Helper()
var task = &v1beta1.Task{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -1460,6 +1468,74 @@ func buildPipelineStateWithLargeDepencyGraph(t *testing.T) PipelineRunState {
return pipelineRunState
}

func buildPipelineStateWithMultipleTaskResults(t *testing.T, includeWhen bool) PipelineRunState {
t.Helper()
var task = &v1beta1.Task{
ObjectMeta: metav1.ObjectMeta{
Name: "task",
},
Spec: v1beta1.TaskSpec{
Steps: []v1beta1.Step{{
Name: "step1",
}},
},
}
var pipelineRunState PipelineRunState
pipelineRunState = []*ResolvedPipelineTask{{
PipelineTask: &v1beta1.PipelineTask{
Name: "t1",
TaskRef: &v1beta1.TaskRef{Name: "task"},
},
TaskRun: nil,
ResolvedTaskResources: &resources.ResolvedTaskResources{
TaskSpec: &task.Spec,
},
}}
for i := 2; i < 400; i++ {
var params []v1beta1.Param
whenExpressions := v1beta1.WhenExpressions{}
var alpha byte
// the task has a reference to multiple task results (a through j) from each parent task - causing a redundant references
// the task dependents on all predecessors in a graph through params and/or whenExpressions
for j := 1; j < i; j++ {
for alpha = 'a'; alpha <= 'j'; alpha++ {
// include param with task results
params = append(params, v1beta1.Param{
Name: fmt.Sprintf("%c", alpha),
Value: v1beta1.ParamValue{
Type: v1beta1.ParamTypeString,
StringVal: fmt.Sprintf("$(tasks.t%d.results.%c)", j, alpha),
},
})
}
if includeWhen {
for alpha = 'a'; alpha <= 'j'; alpha++ {
// include when expressions with task results
whenExpressions = append(whenExpressions, v1beta1.WhenExpression{
Input: fmt.Sprintf("$(tasks.t%d.results.%c)", j, alpha),
Operator: selection.In,
Values: []string{"true"},
})
}
}
}
pipelineRunState = append(pipelineRunState, &ResolvedPipelineTask{
PipelineTask: &v1beta1.PipelineTask{
Name: fmt.Sprintf("t%d", i),
Params: params,
TaskRef: &v1beta1.TaskRef{Name: "task"},
WhenExpressions: whenExpressions,
},
TaskRun: nil,
ResolvedTaskResources: &resources.ResolvedTaskResources{
TaskSpec: &task.Spec,
},
},
)
}
return pipelineRunState
}

func TestPipelineRunState_GetFinalTasksAndNames(t *testing.T) {
tcs := []struct {
name string
Expand Down

0 comments on commit d2fe86e

Please sign in to comment.