Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-v0.36.x] de-dupe order and resource dependencies #5479

Merged
merged 1 commit into from
Sep 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 15 additions & 28 deletions pkg/apis/pipeline/v1beta1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,51 +469,38 @@ func (pt PipelineTask) Validate(ctx context.Context) (errs *apis.FieldError) {

// Deps returns all other PipelineTask dependencies of this PipelineTask, based on resource usage or ordering
func (pt PipelineTask) Deps() []string {
deps := []string{}
// hold the list of dependencies in a set to avoid duplicates
deps := sets.NewString()

deps = append(deps, pt.resourceDeps()...)
deps = append(deps, pt.orderingDeps()...)

uniqueDeps := sets.NewString()
for _, w := range deps {
if uniqueDeps.Has(w) {
continue
}
uniqueDeps.Insert(w)
}

return uniqueDeps.List()
}

func (pt PipelineTask) resourceDeps() []string {
resourceDeps := []string{}
// add any new dependents from a resource/workspace
if pt.Resources != nil {
for _, rd := range pt.Resources.Inputs {
resourceDeps = append(resourceDeps, rd.From...)
for _, f := range rd.From {
deps.Insert(f)
}
}
}

// Add any dependents from conditional resources.
for _, cond := range pt.Conditions {
for _, rd := range cond.Resources {
resourceDeps = append(resourceDeps, rd.From...)
for _, f := range rd.From {
deps.Insert(f)
}
}
}

// Add any dependents from result references.
// add any new dependents from result references - resource dependency
for _, ref := range PipelineTaskResultRefs(&pt) {
resourceDeps = append(resourceDeps, ref.PipelineTask)
deps.Insert(ref.PipelineTask)
}

return resourceDeps
}

func (pt PipelineTask) orderingDeps() []string {
orderingDeps := []string{}
// add any new dependents from runAfter - order dependency
for _, runAfter := range pt.RunAfter {
orderingDeps = append(orderingDeps, runAfter)
deps.Insert(runAfter)
}
return orderingDeps

return deps.List()
}

// PipelineTaskList is a list of PipelineTasks
Expand Down
46 changes: 46 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipeline_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,12 +590,58 @@ func TestPipelineTaskList_Deps(t *testing.T) {
Operator: "in",
Values: []string{"foo"},
}},
Matrix: []Param{{
Value: ArrayOrString{
Type: ParamTypeArray,
ArrayVal: []string{
"$(tasks.task-2.results.result)",
"$(tasks.task-5.results.result)",
},
}},
},
}, {
Name: "task-6",
Resources: &PipelineTaskResources{
Inputs: []PipelineTaskInputResource{{
From: []string{"task-1", "task-1"},
}},
},
}, {
Name: "task-7",
WhenExpressions: WhenExpressions{{
Input: "$(tasks.task-3.results.result1)",
Operator: "in",
Values: []string{"foo"},
}, {
Input: "$(tasks.task-3.results.result2)",
Operator: "in",
Values: []string{"foo"},
}},
}, {
Name: "task-8",
Params: []Param{{
Value: ArrayOrString{
Type: "string",
StringVal: "$(tasks.task-4.results.result1)",
}}, {
Value: ArrayOrString{
Type: "string",
StringVal: "$(tasks.task-4.results.result2)",
}},
},
}, {
Name: "task-9",
RunAfter: []string{"task-1", "task-1", "task-1", "task-1"},
}},
expectedDeps: map[string][]string{
"task-2": {"task-1"},
"task-3": {"task-1", "task-2"},
"task-4": {"task-1", "task-2", "task-3"},
"task-5": {"task-1", "task-2", "task-3", "task-4"},
"task-6": {"task-1"},
"task-7": {"task-3"},
"task-8": {"task-4"},
"task-9": {"task-1"},
},
}}
for _, tc := range pipelines {
Expand Down
80 changes: 78 additions & 2 deletions pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,7 @@ func TestDAGExecutionQueueSequentialRuns(t *testing.T) {
}

func TestPipelineRunState_CompletedOrSkippedDAGTasks(t *testing.T) {
largePipelineState := buildPipelineStateWithLargeDepencyGraph(t)
largePipelineState := buildPipelineStateWithLargeDependencyGraph(t)
tcs := []struct {
name string
state PipelineRunState
Expand Down Expand Up @@ -1036,6 +1036,14 @@ func TestPipelineRunState_CompletedOrSkippedDAGTasks(t *testing.T) {
name: "large deps, not started",
state: largePipelineState,
expectedNames: []string{},
}, {
name: "large deps through params, not started",
state: buildPipelineStateWithMultipleTaskResults(t, false),
expectedNames: []string{},
}, {
name: "large deps through params and when expressions, not started",
state: buildPipelineStateWithMultipleTaskResults(t, true),
expectedNames: []string{},
}, {
name: "one-run-started",
state: oneRunStartedState,
Expand Down Expand Up @@ -1069,7 +1077,7 @@ func TestPipelineRunState_CompletedOrSkippedDAGTasks(t *testing.T) {
}
}

func buildPipelineStateWithLargeDepencyGraph(t *testing.T) PipelineRunState {
func buildPipelineStateWithLargeDependencyGraph(t *testing.T) PipelineRunState {
t.Helper()
var task = &v1beta1.Task{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -1128,6 +1136,74 @@ func buildPipelineStateWithLargeDepencyGraph(t *testing.T) PipelineRunState {
return pipelineRunState
}

func buildPipelineStateWithMultipleTaskResults(t *testing.T, includeWhen bool) PipelineRunState {
t.Helper()
var task = &v1beta1.Task{
ObjectMeta: metav1.ObjectMeta{
Name: "task",
},
Spec: v1beta1.TaskSpec{
Steps: []v1beta1.Step{{
Name: "step1",
}},
},
}
var pipelineRunState PipelineRunState
pipelineRunState = []*ResolvedPipelineRunTask{{
PipelineTask: &v1beta1.PipelineTask{
Name: "t1",
TaskRef: &v1beta1.TaskRef{Name: "task"},
},
TaskRun: nil,
ResolvedTaskResources: &resources.ResolvedTaskResources{
TaskSpec: &task.Spec,
},
}}
for i := 2; i < 400; i++ {
var params []v1beta1.Param
whenExpressions := v1beta1.WhenExpressions{}
var alpha byte
// the task has a reference to multiple task results (a through j) from each parent task - causing a redundant references
// the task dependents on all predecessors in a graph through params and/or whenExpressions
for j := 1; j < i; j++ {
for alpha = 'a'; alpha <= 'j'; alpha++ {
// include param with task results
params = append(params, v1beta1.Param{
Name: fmt.Sprintf("%c", alpha),
Value: v1beta1.ArrayOrString{
Type: v1beta1.ParamTypeString,
StringVal: fmt.Sprintf("$(tasks.t%d.results.%c)", j, alpha),
},
})
}
if includeWhen {
for alpha = 'a'; alpha <= 'j'; alpha++ {
// include when expressions with task results
whenExpressions = append(whenExpressions, v1beta1.WhenExpression{
Input: fmt.Sprintf("$(tasks.t%d.results.%c)", j, alpha),
Operator: selection.In,
Values: []string{"true"},
})
}
}
}
pipelineRunState = append(pipelineRunState, &ResolvedPipelineRunTask{
PipelineTask: &v1beta1.PipelineTask{
Name: fmt.Sprintf("t%d", i),
Params: params,
TaskRef: &v1beta1.TaskRef{Name: "task"},
WhenExpressions: whenExpressions,
},
TaskRun: nil,
ResolvedTaskResources: &resources.ResolvedTaskResources{
TaskSpec: &task.Spec,
},
},
)
}
return pipelineRunState
}

func TestPipelineRunState_GetFinalTasks(t *testing.T) {
tcs := []struct {
name string
Expand Down