diff --git a/pkg/reconciler/pipelinerun/pipelinerun.go b/pkg/reconciler/pipelinerun/pipelinerun.go index f6687352742..4d66169a98c 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/pipelinerun/pipelinerun.go @@ -524,6 +524,9 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun, get return err } + // Reset the skipped status to trigger recalculation + pipelineRunFacts.ResetSkippedCache() + after := pipelineRunFacts.GetPipelineConditionStatus(pr, logger) switch after.Status { case corev1.ConditionTrue: @@ -565,12 +568,15 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip } resources.ApplyTaskResults(nextRprts, resolvedResultRefs) + // After we apply Task Results, we may be able to evaluate more + // when expressions, so reset the skipped cache + pipelineRunFacts.ResetSkippedCache() // GetFinalTasks only returns tasks when a DAG is complete nextRprts = append(nextRprts, pipelineRunFacts.GetFinalTasks()...) for _, rprt := range nextRprts { - if rprt == nil || rprt.Skip(pipelineRunFacts) { + if rprt == nil || pipelineRunFacts.IsTaskSkipped(rprt.PipelineTask.Name) { continue } if rprt.ResolvedConditionChecks == nil || rprt.ResolvedConditionChecks.IsSuccess() { diff --git a/pkg/reconciler/pipelinerun/pipelinerun_test.go b/pkg/reconciler/pipelinerun/pipelinerun_test.go index 1a0ce877505..598fa91f848 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun_test.go +++ b/pkg/reconciler/pipelinerun/pipelinerun_test.go @@ -2287,7 +2287,7 @@ func TestReconcileWithWhenExpressionsWithTaskResults(t *testing.T) { t.Fatalf("Failure to list TaskRun's %s", err) } if len(actualSkippedTask.Items) != 0 { - t.Fatalf("Expected 0 TaskRuns got %d", len(actualSkippedTask.Items)) + t.Fatalf("When listing %s, expected 0 TaskRuns got %d", skippedTask, len(actualSkippedTask.Items)) } } } diff --git a/pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go b/pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go index 787a89a1ef8..e9b43961c09 100644 --- a/pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go +++ b/pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go @@ -71,7 +71,7 @@ type ResolvedPipelineRunTask struct { // IsDone returns true only if the task is skipped, succeeded or failed func (t ResolvedPipelineRunTask) IsDone(facts *PipelineRunFacts) bool { - return t.Skip(facts) || t.IsSuccessful() || t.IsFailure() + return facts.IsTaskSkipped(t.PipelineTask.Name) || t.IsSuccessful() || t.IsFailure() } // IsSuccessful returns true only if the taskrun itself has completed successfully @@ -122,35 +122,6 @@ func (t ResolvedPipelineRunTask) IsStarted() bool { return true } -func (t *ResolvedPipelineRunTask) checkParentsDone(facts *PipelineRunFacts) bool { - stateMap := facts.State.ToMap() - node := facts.TasksGraph.Nodes[t.PipelineTask.Name] - for _, p := range node.Prev { - if !stateMap[p.Task.HashKey()].IsDone(facts) { - return false - } - } - return true -} - -// Skip returns true if a PipelineTask will not be run because -// (1) its When Expressions evaluated to false -// (2) its Condition Checks failed -// (3) its parent task was skipped -// (4) Pipeline is in stopping state (one of the PipelineTasks failed) -// Note that this means Skip returns false if a conditionCheck is in progress -func (t *ResolvedPipelineRunTask) Skip(facts *PipelineRunFacts) bool { - if facts.isFinalTask(t.PipelineTask.Name) || t.IsStarted() { - return false - } - - if t.conditionsSkip() || t.whenExpressionsSkip(facts) || t.parentTasksSkip(facts) || facts.IsStopping() { - return true - } - - return false -} - func (t *ResolvedPipelineRunTask) conditionsSkip() bool { if len(t.ResolvedConditionChecks) > 0 { if t.ResolvedConditionChecks.IsDone() && !t.ResolvedConditionChecks.IsSuccess() { @@ -160,30 +131,17 @@ func (t *ResolvedPipelineRunTask) conditionsSkip() bool { return false } -func (t *ResolvedPipelineRunTask) whenExpressionsSkip(facts *PipelineRunFacts) bool { - if t.checkParentsDone(facts) { - if len(t.PipelineTask.WhenExpressions) > 0 { - if !t.PipelineTask.WhenExpressions.HaveVariables() { - if !t.PipelineTask.WhenExpressions.AllowsExecution() { - return true - } +func (t *ResolvedPipelineRunTask) whenExpressionsSkip() bool { + if len(t.PipelineTask.WhenExpressions) > 0 { + if !t.PipelineTask.WhenExpressions.HaveVariables() { + if !t.PipelineTask.WhenExpressions.AllowsExecution() { + return true } } } return false } -func (t *ResolvedPipelineRunTask) parentTasksSkip(facts *PipelineRunFacts) bool { - stateMap := facts.State.ToMap() - node := facts.TasksGraph.Nodes[t.PipelineTask.Name] - for _, p := range node.Prev { - if stateMap[p.Task.HashKey()].Skip(facts) { - return true - } - } - return false -} - // GetTaskRun is a function that will retrieve the TaskRun name. type GetTaskRun func(name string) (*v1beta1.TaskRun, error) diff --git a/pkg/reconciler/pipelinerun/resources/pipelinerunresolution_test.go b/pkg/reconciler/pipelinerun/resources/pipelinerunresolution_test.go index 3c2471af97a..5fa6b65207c 100644 --- a/pkg/reconciler/pipelinerun/resources/pipelinerunresolution_test.go +++ b/pkg/reconciler/pipelinerun/resources/pipelinerunresolution_test.go @@ -535,7 +535,11 @@ func DagFromState(state PipelineRunState) (*dag.Graph, error) { return dag.Build(v1beta1.PipelineTaskList(pts)) } -func TestIsSkipped(t *testing.T) { +func TestSkipMap(t *testing.T) { + // NOTE(afrittoli) This test now logically belongs to + // pipelinerunstate_test.go, but I left it here for now as it + // makes the review much easier. I will move this to the other + // module in a follow-up patch. tcs := []struct { name string @@ -611,6 +615,7 @@ func TestIsSkipped(t *testing.T) { PipelineTask: &pts[6], }}, expected: map[string]bool{ + "mytask6": false, "mytask7": false, }, }, { @@ -627,6 +632,7 @@ func TestIsSkipped(t *testing.T) { PipelineTask: &pts[6], }}, expected: map[string]bool{ + "mytask6": true, "mytask7": true, }, }, { @@ -647,6 +653,7 @@ func TestIsSkipped(t *testing.T) { PipelineTask: &pts[6], }}, expected: map[string]bool{ + "mytask6": false, "mytask7": false, }, }, { @@ -685,6 +692,7 @@ func TestIsSkipped(t *testing.T) { }, }}, expected: map[string]bool{ + "mytask6": false, "mytask7": true, }, }, { @@ -705,6 +713,7 @@ func TestIsSkipped(t *testing.T) { }, }}, expected: map[string]bool{ + "mytask6": false, "mytask7": true, }, }, { @@ -736,6 +745,8 @@ func TestIsSkipped(t *testing.T) { }, }}, expected: map[string]bool{ + "mytask6": false, + "mytask7": true, "mytask10": true, }, }, { @@ -763,6 +774,8 @@ func TestIsSkipped(t *testing.T) { }, }}, expected: map[string]bool{ + "mytask6": false, + "mytask1": false, "mytask8": true, }, }, { @@ -790,6 +803,8 @@ func TestIsSkipped(t *testing.T) { }, }}, expected: map[string]bool{ + "mytask1": false, + "mytask6": false, "mytask7": true, }, }, { @@ -834,6 +849,7 @@ func TestIsSkipped(t *testing.T) { }, }}, expected: map[string]bool{ + "mytask1": false, "mytask12": false, }, }} @@ -844,19 +860,15 @@ func TestIsSkipped(t *testing.T) { if err != nil { t.Fatalf("Could not get a dag from the TC state %#v: %v", tc.state, err) } - stateMap := tc.state.ToMap() facts := PipelineRunFacts{ State: tc.state, TasksGraph: d, FinalTasksGraph: &dag.Graph{}, } - for taskName, isSkipped := range tc.expected { - rprt := stateMap[taskName] - if rprt == nil { - t.Fatalf("Could not get task %s from the state: %v", taskName, tc.state) - } - if d := cmp.Diff(isSkipped, rprt.Skip(&facts)); d != "" { - t.Errorf("Didn't get expected isSkipped %s", diff.PrintWantGot(d)) + actual := facts.SkipMap() + for taskname, expected := range tc.expected { + if d := cmp.Diff(actual[taskname], expected); d != "" { + t.Errorf("Skipped status did not match for a task %s", diff.PrintWantGot(d)) } } }) diff --git a/pkg/reconciler/pipelinerun/resources/pipelinerunstate.go b/pkg/reconciler/pipelinerun/resources/pipelinerunstate.go index f9a5e965d70..763f9974907 100644 --- a/pkg/reconciler/pipelinerun/resources/pipelinerunstate.go +++ b/pkg/reconciler/pipelinerun/resources/pipelinerunstate.go @@ -34,9 +34,23 @@ type PipelineRunState []*ResolvedPipelineRunTask // PipelineRunFacts is a collection of list of ResolvedPipelineTask, graph of DAG tasks, and graph of finally tasks type PipelineRunFacts struct { - State PipelineRunState - TasksGraph *dag.Graph + // State represents the state of all pipeline tasks in the pipeline + State PipelineRunState + + // TasksGraph is the graph of all the pipeline tasks in the main pipeline + TasksGraph *dag.Graph + + // FinalTasksGraph is the graph of all the final tasks executed once the main pipeline is complete FinalTasksGraph *dag.Graph + + // WontRun is a hash of all PipelineTask names that stores whether a task will be + // executed or not, because it's either not reachable via the DAG due to the pipeline + // state, or because it has failed conditions. + // We store this list in along the state, because it requires traversing the whole + // graph; this way so we can build it once, when the reconcile starts, and update + // at the end of reconcile, instead of having to traverse the whole graph every time + // we want to know if a specific task will be skipped. + WontRun map[string]bool } // pipelineRunStatusCount holds the count of successful, failed, cancelled, skipped, and incomplete tasks @@ -185,6 +199,84 @@ func (facts *PipelineRunFacts) IsStopping() bool { return false } +// IsTaskSkipped returns true if the PipelineRun won't be scheduling the name task +func (facts *PipelineRunFacts) IsTaskSkipped(pipelineTaskName string) bool { + if facts.WontRun == nil { + // We do lazy initialisation of this map + facts.WontRun = facts.SkipMap() + } + wontRun, _ := facts.WontRun[pipelineTaskName] + return wontRun +} + +// ResetSkippedCache resets the skipped cache in the facts map +func (facts *PipelineRunFacts) ResetSkippedCache() { + facts.WontRun = nil +} + +// SkipMap returns map pipeline task -> bool that tells whether a +// PipelineTask will not be run because: +// (1) its When Expressions evaluated to false (and all parents were done) +// (2) its Condition Checks failed +// (3) any of it parent tasks were skipped +// (4) Pipeline is in stopping state (one of the PipelineTasks failed) +// Note that this means Skip returns false if a conditionCheck is in progress +func (facts *PipelineRunFacts) SkipMap() map[string]bool { + skipMapResult := make(map[string]bool) + pipelineTaskMap := facts.State.ToMap() + isStopping := facts.IsStopping() + skipMap(pipelineTaskMap, isStopping, skipMapResult, facts.TasksGraph) + return skipMapResult +} + +func skipMap(pipelineTaskMap map[string]*ResolvedPipelineRunTask, isStopping bool, partialSkipMap map[string]bool, g *dag.Graph, doneTasks ...string) { + // We can safely ignore errors here because we're artificially + // traversing the graph to discover skip tasks + nextTasks, _ := dag.GetSchedulable(g, doneTasks...) + // If there are tasks left, we need to continue to traverse the graph + if len(nextTasks) > 0 { + TASKLOOP: + for task := range nextTasks { + t, _ := pipelineTaskMap[task] + // Default to false, in case no check matches + // Regular task waiting for its turn to run + partialSkipMap[task] = false + // Checks that do not depend on parents + if t.IsStarted() { + partialSkipMap[task] = false + continue + } else if isStopping || t.conditionsSkip() { + partialSkipMap[task] = true + continue + } + // Check that depend on parents + node := g.Nodes[t.PipelineTask.Name] + parentsDone := true + for _, p := range node.Prev { + parent := pipelineTaskMap[p.Task.HashKey()] + // Parents must be tracked in the partial skipped map + skipped, _ := partialSkipMap[parent.PipelineTask.Name] + // One parent is enough to mark this as skipped + if skipped { + partialSkipMap[task] = true + continue TASKLOOP + } + done := parent.IsSuccessful() || parent.IsFailure() + // One parent not done is enough to mark parents as not done + if !done { + parentsDone = false + break + } + } + if parentsDone && t.whenExpressionsSkip() { + partialSkipMap[task] = true + } + } + newDoneTasks := append(doneTasks, nextTasks.List()...) + skipMap(pipelineTaskMap, isStopping, partialSkipMap, g, newDoneTasks...) + } +} + // DAGExecutionQueue returns a list of DAG tasks which needs to be scheduled next func (facts *PipelineRunFacts) DAGExecutionQueue() (PipelineRunState, error) { tasks := PipelineRunState{} @@ -296,9 +388,13 @@ func (facts *PipelineRunFacts) GetPipelineConditionStatus(pr *v1beta1.PipelineRu // GetSkippedTasks constructs a list of SkippedTask struct to be included in the PipelineRun Status func (facts *PipelineRunFacts) GetSkippedTasks() []v1beta1.SkippedTask { + // GetSkippedTasks is invoked at the end of the reconcile cycle + // to update the pipelinerun message with the number of skipped tasks + // `facts.IsTaskSkipped` reflects the status at the beginning of the + // reconcile, so we need to force it to recompute the correct skipped list var skipped []v1beta1.SkippedTask for _, rprt := range facts.State { - if rprt.Skip(facts) { + if facts.IsTaskSkipped(rprt.PipelineTask.Name) { skippedTask := v1beta1.SkippedTask{ Name: rprt.PipelineTask.Name, WhenExpressions: rprt.PipelineTask.WhenExpressions, @@ -315,7 +411,7 @@ func (facts *PipelineRunFacts) successfulOrSkippedDAGTasks() []string { tasks := []string{} for _, t := range facts.State { if facts.isDAGTask(t.PipelineTask.Name) { - if t.IsSuccessful() || t.Skip(facts) { + if t.IsSuccessful() || facts.IsTaskSkipped(t.PipelineTask.Name) { tasks = append(tasks, t.PipelineTask.Name) } } @@ -367,7 +463,7 @@ func (facts *PipelineRunFacts) getPipelineTasksCount() pipelineRunStatusCount { case t.IsCancelled(): s.Cancelled++ // increment skip counter since the task is skipped - case t.Skip(facts): + case facts.IsTaskSkipped(t.PipelineTask.Name): s.Skipped++ // increment incomplete counter since the task is pending and not executed yet default: diff --git a/pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go b/pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go index 0407c89f19e..a2d331feda3 100644 --- a/pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go +++ b/pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go @@ -17,6 +17,7 @@ limitations under the License. package resources import ( + "fmt" "testing" "time" @@ -408,6 +409,7 @@ func TestGetNextTaskWithRetries(t *testing.T) { } func TestPipelineRunState_SuccessfulOrSkippedDAGTasks(t *testing.T) { + largePipelineState := buildPipelineStateWithLargeDepencyGraph(t) tcs := []struct { name string state PipelineRunState @@ -454,6 +456,10 @@ func TestPipelineRunState_SuccessfulOrSkippedDAGTasks(t *testing.T) { "not skipped since it failed", state: conditionCheckFailedWithOthersFailedState, expectedNames: []string{pts[5].Name}, + }, { + name: "large deps, not started", + state: largePipelineState, + expectedNames: []string{}, }} for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { @@ -474,6 +480,65 @@ func TestPipelineRunState_SuccessfulOrSkippedDAGTasks(t *testing.T) { } } +func buildPipelineStateWithLargeDepencyGraph(t *testing.T) PipelineRunState { + t.Helper() + var task = &v1beta1.Task{ + ObjectMeta: metav1.ObjectMeta{ + Name: "task", + }, + Spec: v1beta1.TaskSpec{ + Steps: []v1beta1.Step{{Container: corev1.Container{ + Name: "step1", + }}}, + }, + } + var pipelineRunState PipelineRunState + pipelineRunState = []*ResolvedPipelineRunTask{{ + PipelineTask: &v1beta1.PipelineTask{ + Name: "t1", + TaskRef: &v1beta1.TaskRef{Name: "task"}, + }, + TaskRun: nil, + ResolvedTaskResources: &resources.ResolvedTaskResources{ + TaskSpec: &task.Spec, + }, + }} + for i := 2; i < 80; i++ { + dependFrom := 1 + if i > 10 { + if i%10 == 0 { + dependFrom = i - 10 + } else { + dependFrom = i - (i % 10) + } + } + params := []v1beta1.Param{} + var alpha byte + for alpha = 'a'; alpha <= 'j'; alpha++ { + params = append(params, v1beta1.Param{ + Name: fmt.Sprintf("%c", alpha), + Value: v1beta1.ArrayOrString{ + Type: v1beta1.ParamTypeString, + StringVal: fmt.Sprintf("$(tasks.t%d.results.%c)", dependFrom, alpha), + }, + }) + } + pipelineRunState = append(pipelineRunState, &ResolvedPipelineRunTask{ + PipelineTask: &v1beta1.PipelineTask{ + Name: fmt.Sprintf("t%d", i), + Params: params, + TaskRef: &v1beta1.TaskRef{Name: "task"}, + }, + TaskRun: nil, + ResolvedTaskResources: &resources.ResolvedTaskResources{ + TaskSpec: &task.Spec, + }, + }, + ) + } + return pipelineRunState +} + func TestPipelineRunState_GetFinalTasks(t *testing.T) { tcs := []struct { name string