Skip to content

Commit

Permalink
Improve DAG validation for pipelines with hundreds of tasks
Browse files Browse the repository at this point in the history
DAG validation rewritten using Kahn's algorithm to find cycles in task dependencies.

Original implementation, as pointed at tektoncd#5420 is a root cause of poor validation webhook performance, which fails on default timeout (10s).
  • Loading branch information
rafalbigaj committed Sep 2, 2022
1 parent 2704d7f commit 18a6a2d
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 26 deletions.
14 changes: 11 additions & 3 deletions pkg/apis/pipeline/v1/pipeline_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,9 +569,11 @@ func TestPipelineSpec_Validate_Failure_CycleDAG(t *testing.T) {
name := "invalid pipeline spec with DAG having cyclic dependency"
ps := &PipelineSpec{
Tasks: []PipelineTask{{
Name: "foo", TaskRef: &TaskRef{Name: "foo-task"}, RunAfter: []string{"bar"},
Name: "foo", TaskRef: &TaskRef{Name: "foo-task"}, RunAfter: []string{"baz"},
}, {
Name: "bar", TaskRef: &TaskRef{Name: "bar-task"}, RunAfter: []string{"foo"},
}, {
Name: "baz", TaskRef: &TaskRef{Name: "baz-task"}, RunAfter: []string{"bar"},
}},
}
ctx := config.SkipValidationDueToPropagatedParametersAndWorkspaces(context.Background(), false)
Expand Down Expand Up @@ -679,9 +681,15 @@ func TestValidateGraph_Failure(t *testing.T) {
}, {
Name: "bar", TaskRef: &TaskRef{Name: "bar-task"}, RunAfter: []string{"foo"},
}}
if err := validateGraph(tasks); err == nil {
expectedError := apis.FieldError{
Message: `invalid value: cycle detected; task "foo" depends on "bar"`,
Paths: []string{"tasks"},
}
err := validateGraph(tasks)
if err == nil {
t.Error("Pipeline.validateGraph() did not return error for invalid DAG of pipeline tasks:", desc)

} else if d := cmp.Diff(expectedError.Error(), err.Error(), cmpopts.IgnoreUnexported(apis.FieldError{})); d != "" {
t.Errorf("Pipeline.validateGraph() errors diff %s", diff.PrintWantGot(d))
}
}

Expand Down
73 changes: 50 additions & 23 deletions pkg/reconciler/pipeline/dag/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ func Build(tasks Tasks, deps map[string][]string) (*Graph, error) {
}
}

// Ensure no cycles in the graph
if err := findCyclesInDependencies(deps); err != nil {
return nil, fmt.Errorf("cycle detected; %w", err)
}

// Process all from and runAfter constraints to add task dependency
for pt, taskDeps := range deps {
for _, previousTask := range taskDeps {
Expand Down Expand Up @@ -121,40 +126,62 @@ func GetCandidateTasks(g *Graph, doneTasks ...string) (sets.String, error) {
}

func linkPipelineTasks(prev *Node, next *Node) error {
// Check for self cycle
if prev.Task.HashKey() == next.Task.HashKey() {
return fmt.Errorf("cycle detected; task %q depends on itself", next.Task.HashKey())
}
// Check if we are adding cycles.
path := []string{next.Task.HashKey(), prev.Task.HashKey()}
if err := lookForNode(prev.Prev, path, next.Task.HashKey()); err != nil {
return fmt.Errorf("cycle detected: %w", err)
}
next.Prev = append(next.Prev, prev)
prev.Next = append(prev.Next, next)
return nil
}

func lookForNode(nodes []*Node, path []string, next string) error {
for _, n := range nodes {
path = append(path, n.Task.HashKey())
if n.Task.HashKey() == next {
return errors.New(getVisitedPath(path))
// use Kahn's algorithm to find cycles in dependencies
func findCyclesInDependencies(deps map[string][]string) error {
independentTasks := sets.NewString()
dag := make(map[string]sets.String, len(deps))
childMap := make(map[string]sets.String, len(deps))
for task, taskDeps := range deps {
dag[task] = sets.NewString(taskDeps...)
for _, dep := range taskDeps {
if len(deps[dep]) == 0 {
independentTasks.Insert(dep)
}
if children, ok := childMap[dep]; ok {
children.Insert(task)
} else {
childMap[dep] = sets.NewString(task)
}
}
if err := lookForNode(n.Prev, path, next); err != nil {
return err
}

for {
parent, ok := independentTasks.PopAny()
if !ok {
break
}
children := childMap[parent]
for {
child, ok := children.PopAny()
if !ok {
break
}
dag[child].Delete(parent)
if dag[child].Len() == 0 {
independentTasks.Insert(child)
delete(dag, child)
}
}
}
return nil

return getInterdependencyError(dag)
}

func getVisitedPath(path []string) string {
// Reverse the path since we traversed the Graph using prev pointers.
for i := len(path)/2 - 1; i >= 0; i-- {
opp := len(path) - 1 - i
path[i], path[opp] = path[opp], path[i]
func getInterdependencyError(dag map[string]sets.String) error {
for parent, children := range dag {
// report first interdependency
var childNames []string
for child := range children {
childNames = append(childNames, fmt.Sprintf("%q", child))
}
return fmt.Errorf("task %q depends on %s", parent, strings.Join(childNames, ", "))
}
return strings.Join(path, " -> ")
return nil
}

func addLink(pt string, previousTask string, nodes map[string]*Node) error {
Expand Down
73 changes: 73 additions & 0 deletions pkg/reconciler/pipeline/dag/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package dag_test

import (
"fmt"
"strings"
"testing"

Expand Down Expand Up @@ -549,6 +550,78 @@ func TestBuild_InvalidDAG(t *testing.T) {
}
}

func TestBuildGraphWithHundredsOfTasks_Success(t *testing.T) {
var tasks []v1beta1.PipelineTask
// separate branches with sequential tasks and redundant links (each task explicitly depends on all predecessors)
// b00 - 000 - 001 - ... - 100
// b01 - 000 - 001 - ... - 100
// ..
// b04 - 000 - 001 - ... - 100
nBranches, nTasks := 5, 100
for branchIdx := 0; branchIdx < nBranches; branchIdx++ {
var taskDeps []string
firstTaskName := fmt.Sprintf("b%02d", branchIdx)
firstTask := v1beta1.PipelineTask{
Name: firstTaskName,
TaskRef: &v1beta1.TaskRef{Name: firstTaskName + "-task"},
RunAfter: taskDeps,
}
tasks = append(tasks, firstTask)
taskDeps = append(taskDeps, firstTaskName)
for taskIdx := 0; taskIdx < nTasks; taskIdx++ {
taskName := fmt.Sprintf("%s-%03d", firstTaskName, taskIdx)
task := v1beta1.PipelineTask{
Name: taskName,
TaskRef: &v1beta1.TaskRef{Name: taskName + "-task"},
RunAfter: taskDeps,
}
tasks = append(tasks, task)
taskDeps = append(taskDeps, taskName)
}
}

_, err := dag.Build(v1beta1.PipelineTaskList(tasks), v1beta1.PipelineTaskList(tasks).Deps())
if err != nil {
t.Error(err)
}
}

func TestBuildGraphWithHundredsOfTasks_InvalidDAG(t *testing.T) {
var tasks []v1beta1.PipelineTask
// branches with circular interdependencies
nBranches, nTasks := 5, 100
for branchIdx := 0; branchIdx < nBranches; branchIdx++ {
depBranchIdx := branchIdx + 1
if depBranchIdx == nBranches {
depBranchIdx = 0
}
taskDeps := []string{fmt.Sprintf("b%02d", depBranchIdx)}
firstTaskName := fmt.Sprintf("b%02d", branchIdx)
firstTask := v1beta1.PipelineTask{
Name: firstTaskName,
TaskRef: &v1beta1.TaskRef{Name: firstTaskName + "-task"},
RunAfter: taskDeps,
}
tasks = append(tasks, firstTask)
taskDeps = append(taskDeps, firstTaskName)
for taskIdx := 0; taskIdx < nTasks; taskIdx++ {
taskName := fmt.Sprintf("%s-%03d", firstTaskName, taskIdx)
task := v1beta1.PipelineTask{
Name: taskName,
TaskRef: &v1beta1.TaskRef{Name: taskName + "-task"},
RunAfter: taskDeps,
}
tasks = append(tasks, task)
taskDeps = append(taskDeps, taskName)
}
}

_, err := dag.Build(v1beta1.PipelineTaskList(tasks), v1beta1.PipelineTaskList(tasks).Deps())
if err == nil {
t.Errorf("Pipeline.Validate() did not return error for invalid pipeline with cycles")
}
}

func testGraph(t *testing.T) *dag.Graph {
// b a
// | / \
Expand Down

0 comments on commit 18a6a2d

Please sign in to comment.