From 171d89ab999a1e92c84f08f29519bbed23dd61c9 Mon Sep 17 00:00:00 2001 From: rafal-bigaj Date: Fri, 2 Sep 2022 11:13:36 +0200 Subject: [PATCH] Improve DAG validation for pipelines with hundreds of tasks DAG validation rewritten using Kahn's algorithm to find cycles in task dependencies. Original implementation, as pointed at https://github.com/tektoncd/pipeline/issues/5420 is a root cause of poor validation webhook performance, which fails on default timeout (10s). --- .../pipeline/v1/pipeline_validation_test.go | 14 +- pkg/reconciler/pipeline/dag/dag.go | 91 ++++++++---- pkg/reconciler/pipeline/dag/dag_test.go | 139 ++++++++++++++++++ pkg/reconciler/pipeline/dag/export_test.go | 21 +++ 4 files changed, 234 insertions(+), 31 deletions(-) create mode 100644 pkg/reconciler/pipeline/dag/export_test.go diff --git a/pkg/apis/pipeline/v1/pipeline_validation_test.go b/pkg/apis/pipeline/v1/pipeline_validation_test.go index 2c640f61321..89abf08a6e4 100644 --- a/pkg/apis/pipeline/v1/pipeline_validation_test.go +++ b/pkg/apis/pipeline/v1/pipeline_validation_test.go @@ -569,9 +569,11 @@ func TestPipelineSpec_Validate_Failure_CycleDAG(t *testing.T) { name := "invalid pipeline spec with DAG having cyclic dependency" ps := &PipelineSpec{ Tasks: []PipelineTask{{ - Name: "foo", TaskRef: &TaskRef{Name: "foo-task"}, RunAfter: []string{"bar"}, + Name: "foo", TaskRef: &TaskRef{Name: "foo-task"}, RunAfter: []string{"baz"}, }, { Name: "bar", TaskRef: &TaskRef{Name: "bar-task"}, RunAfter: []string{"foo"}, + }, { + Name: "baz", TaskRef: &TaskRef{Name: "baz-task"}, RunAfter: []string{"bar"}, }}, } ctx := config.SkipValidationDueToPropagatedParametersAndWorkspaces(context.Background(), false) @@ -679,9 +681,15 @@ func TestValidateGraph_Failure(t *testing.T) { }, { Name: "bar", TaskRef: &TaskRef{Name: "bar-task"}, RunAfter: []string{"foo"}, }} - if err := validateGraph(tasks); err == nil { + expectedError := apis.FieldError{ + Message: `invalid value: cycle detected; task "foo" depends on "bar"`, + Paths: []string{"tasks"}, + } + err := validateGraph(tasks) + if err == nil { t.Error("Pipeline.validateGraph() did not return error for invalid DAG of pipeline tasks:", desc) - + } else if d := cmp.Diff(expectedError.Error(), err.Error(), cmpopts.IgnoreUnexported(apis.FieldError{})); d != "" { + t.Errorf("Pipeline.validateGraph() errors diff %s", diff.PrintWantGot(d)) } } diff --git a/pkg/reconciler/pipeline/dag/dag.go b/pkg/reconciler/pipeline/dag/dag.go index 52d39eeac32..f02b7e40e22 100644 --- a/pkg/reconciler/pipeline/dag/dag.go +++ b/pkg/reconciler/pipeline/dag/dag.go @@ -19,6 +19,7 @@ package dag import ( "errors" "fmt" + "sort" "strings" "github.com/tektoncd/pipeline/pkg/list" @@ -79,6 +80,11 @@ func Build(tasks Tasks, deps map[string][]string) (*Graph, error) { } } + // Ensure no cycles in the graph + if err := findCyclesInDependencies(deps); err != nil { + return nil, fmt.Errorf("cycle detected; %w", err) + } + // Process all from and runAfter constraints to add task dependency for pt, taskDeps := range deps { for _, previousTask := range taskDeps { @@ -120,41 +126,72 @@ func GetCandidateTasks(g *Graph, doneTasks ...string) (sets.String, error) { return d, nil } -func linkPipelineTasks(prev *Node, next *Node) error { - // Check for self cycle - if prev.Task.HashKey() == next.Task.HashKey() { - return fmt.Errorf("cycle detected; task %q depends on itself", next.Task.HashKey()) - } - // Check if we are adding cycles. - path := []string{next.Task.HashKey(), prev.Task.HashKey()} - if err := lookForNode(prev.Prev, path, next.Task.HashKey()); err != nil { - return fmt.Errorf("cycle detected: %w", err) - } +func linkPipelineTasks(prev *Node, next *Node) { next.Prev = append(next.Prev, prev) prev.Next = append(prev.Next, next) - return nil } -func lookForNode(nodes []*Node, path []string, next string) error { - for _, n := range nodes { - path = append(path, n.Task.HashKey()) - if n.Task.HashKey() == next { - return errors.New(getVisitedPath(path)) +// use Kahn's algorithm to find cycles in dependencies +func findCyclesInDependencies(deps map[string][]string) error { + independentTasks := sets.NewString() + dag := make(map[string]sets.String, len(deps)) + childMap := make(map[string]sets.String, len(deps)) + for task, taskDeps := range deps { + if len(taskDeps) == 0 { + continue } - if err := lookForNode(n.Prev, path, next); err != nil { - return err + dag[task] = sets.NewString(taskDeps...) + for _, dep := range taskDeps { + if len(deps[dep]) == 0 { + independentTasks.Insert(dep) + } + if children, ok := childMap[dep]; ok { + children.Insert(task) + } else { + childMap[dep] = sets.NewString(task) + } } } - return nil + + for { + parent, ok := independentTasks.PopAny() + if !ok { + break + } + children := childMap[parent] + for { + child, ok := children.PopAny() + if !ok { + break + } + dag[child].Delete(parent) + if dag[child].Len() == 0 { + independentTasks.Insert(child) + delete(dag, child) + } + } + } + + return getInterdependencyError(dag) } -func getVisitedPath(path []string) string { - // Reverse the path since we traversed the Graph using prev pointers. - for i := len(path)/2 - 1; i >= 0; i-- { - opp := len(path) - 1 - i - path[i], path[opp] = path[opp], path[i] +func getInterdependencyError(dag map[string]sets.String) error { + if len(dag) == 0 { + return nil } - return strings.Join(path, " -> ") + firstChild := "" + for task, _ := range dag { + if firstChild == "" || firstChild > task { + firstChild = task + } + } + deps := dag[firstChild].List() + depNames := make([]string, 0, len(deps)) + sort.Strings(deps) + for _, dep := range deps { + depNames = append(depNames, fmt.Sprintf("%q", dep)) + } + return fmt.Errorf("task %q depends on %s", firstChild, strings.Join(depNames, ", ")) } func addLink(pt string, previousTask string, nodes map[string]*Node) error { @@ -163,9 +200,7 @@ func addLink(pt string, previousTask string, nodes map[string]*Node) error { return fmt.Errorf("task %s depends on %s but %s wasn't present in Pipeline", pt, previousTask, previousTask) } next := nodes[pt] - if err := linkPipelineTasks(prev, next); err != nil { - return fmt.Errorf("couldn't create link from %s to %s: %w", prev.Task.HashKey(), next.Task.HashKey(), err) - } + linkPipelineTasks(prev, next) return nil } diff --git a/pkg/reconciler/pipeline/dag/dag_test.go b/pkg/reconciler/pipeline/dag/dag_test.go index dcad79f095d..9431f6a249e 100644 --- a/pkg/reconciler/pipeline/dag/dag_test.go +++ b/pkg/reconciler/pipeline/dag/dag_test.go @@ -17,6 +17,7 @@ limitations under the License. package dag_test import ( + "fmt" "strings" "testing" @@ -549,6 +550,78 @@ func TestBuild_InvalidDAG(t *testing.T) { } } +func TestBuildGraphWithHundredsOfTasks_Success(t *testing.T) { + var tasks []v1beta1.PipelineTask + // separate branches with sequential tasks and redundant links (each task explicitly depends on all predecessors) + // b00 - 000 - 001 - ... - 100 + // b01 - 000 - 001 - ... - 100 + // .. + // b04 - 000 - 001 - ... - 100 + nBranches, nTasks := 5, 100 + for branchIdx := 0; branchIdx < nBranches; branchIdx++ { + var taskDeps []string + firstTaskName := fmt.Sprintf("b%02d", branchIdx) + firstTask := v1beta1.PipelineTask{ + Name: firstTaskName, + TaskRef: &v1beta1.TaskRef{Name: firstTaskName + "-task"}, + RunAfter: taskDeps, + } + tasks = append(tasks, firstTask) + taskDeps = append(taskDeps, firstTaskName) + for taskIdx := 0; taskIdx < nTasks; taskIdx++ { + taskName := fmt.Sprintf("%s-%03d", firstTaskName, taskIdx) + task := v1beta1.PipelineTask{ + Name: taskName, + TaskRef: &v1beta1.TaskRef{Name: taskName + "-task"}, + RunAfter: taskDeps, + } + tasks = append(tasks, task) + taskDeps = append(taskDeps, taskName) + } + } + + _, err := dag.Build(v1beta1.PipelineTaskList(tasks), v1beta1.PipelineTaskList(tasks).Deps()) + if err != nil { + t.Error(err) + } +} + +func TestBuildGraphWithHundredsOfTasks_InvalidDAG(t *testing.T) { + var tasks []v1beta1.PipelineTask + // branches with circular interdependencies + nBranches, nTasks := 5, 100 + for branchIdx := 0; branchIdx < nBranches; branchIdx++ { + depBranchIdx := branchIdx + 1 + if depBranchIdx == nBranches { + depBranchIdx = 0 + } + taskDeps := []string{fmt.Sprintf("b%02d", depBranchIdx)} + firstTaskName := fmt.Sprintf("b%02d", branchIdx) + firstTask := v1beta1.PipelineTask{ + Name: firstTaskName, + TaskRef: &v1beta1.TaskRef{Name: firstTaskName + "-task"}, + RunAfter: taskDeps, + } + tasks = append(tasks, firstTask) + taskDeps = append(taskDeps, firstTaskName) + for taskIdx := 0; taskIdx < nTasks; taskIdx++ { + taskName := fmt.Sprintf("%s-%03d", firstTaskName, taskIdx) + task := v1beta1.PipelineTask{ + Name: taskName, + TaskRef: &v1beta1.TaskRef{Name: taskName + "-task"}, + RunAfter: taskDeps, + } + tasks = append(tasks, task) + taskDeps = append(taskDeps, taskName) + } + } + + _, err := dag.Build(v1beta1.PipelineTaskList(tasks), v1beta1.PipelineTaskList(tasks).Deps()) + if err == nil { + t.Errorf("Pipeline.Validate() did not return error for invalid pipeline with cycles") + } +} + func testGraph(t *testing.T) *dag.Graph { // b a // | / \ @@ -630,3 +703,69 @@ func assertSameDAG(t *testing.T, l, r *dag.Graph) { } } } + +func TestFindCyclesInDependencies(t *testing.T) { + deps := map[string][]string{ + "a": {}, + "b": {"c", "d"}, + "c": {}, + "d": {}, + } + + err := dag.FindCyclesInDependencies(deps) + if err != nil { + t.Error(err) + } + + tcs := []struct { + name string + deps map[string][]string + err string + }{{ + name: "valid-empty-deps", + deps: map[string][]string{ + "a": {}, + "b": {"c", "d"}, + "c": {}, + "d": {}, + }, + }, { + name: "self-link", + deps: map[string][]string{ + "a": {"a"}, + }, + err: `task "a" depends on "a"`, + }, { + name: "interdependent-tasks", + deps: map[string][]string{ + "a": {"b"}, + "b": {"a"}, + }, + err: `task "a" depends on "b"`, + }, { + name: "multiple-cycles", + deps: map[string][]string{ + "a": {"b", "c"}, + "b": {"a"}, + "c": {"d"}, + "d": {"a", "b"}, + }, + err: `task "a" depends on "b", "c"`, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + err := dag.FindCyclesInDependencies(tc.deps) + if tc.err == "" { + if err != nil { + t.Errorf("expected to see no error for valid DAG but had: %v", err) + } + } else { + if err == nil || !strings.Contains(err.Error(), tc.err) { + t.Errorf("expected to see an error: %q for invalid DAG but had: %v", tc.err, err) + } + } + }) + } + +} diff --git a/pkg/reconciler/pipeline/dag/export_test.go b/pkg/reconciler/pipeline/dag/export_test.go new file mode 100644 index 00000000000..8109ba48836 --- /dev/null +++ b/pkg/reconciler/pipeline/dag/export_test.go @@ -0,0 +1,21 @@ +/* +Copyright 2019 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dag + +// exports for tests + +var FindCyclesInDependencies = findCyclesInDependencies