Skip to content

Commit

Permalink
Fix recursion issue on Skip
Browse files Browse the repository at this point in the history
The pipelinerun state is made of resolved pipelinerun tasks (rprt),
which are build from the actual status of the associated taskruns.

It is computationaly easy to know if a taskrun started, or completed
successfully or unsuccessfully; however determining whether a taskrun
has been skipped or will be skipped in the pipeline run execution,
requires evaluating the entire pipeline run status and associated dag.

The Skip method used to apply to a single rprt, evaluate the entire
pipeline run status and dag, return whether the specific rprt was
going to be skipped, and throw away the rest.

We used to invoke the Skip method on every rprt in the pipeline state
to calculate candidate tasks for execution. To make things worse,
we also invoked the "Skip" method as part of the "isDone", defined as
a logical OR between "isSuccessful", "isFailed" and "Skip".

With this change we compute the list of tasks to be skipped once, and
we store the result in a map to the pipelinerun facts, along with
pipelinerun state and associated dags. We introdce a new method on the
pipelinerun facts called "IsTaskSkipped". This method performs the
lazy computation of the skips map the first time it is invoked.
Any following invocation is able to provide the skip status of a
specific pipeline task by looking up in the map.

This solution manages to hide some of the details of the skip logic from
the core reconciler logic. I believe further refactor could help, but
I wanted to keep this PR as little as possible. I will further pursue
this work by revining #2821

I converted the unit test for "Skip" to a unit test for "SkipMap",
to ensure that the new logic gives the same result as we used to have.
The test should be moved to a different module as "SkipMap" lives
in a different module, however leaving it in place very much helps
with the review. I will move it in a follow-up patch.

This changes adds a unit test that reproduces the issue in #3521, which
used to fail (with timeout 30s) and now succeedes for pipelines roughly
up to 120 tasks / 120 links. On my laptop, going beyond 120 tasks/links
takes longer than 30s, so I left the unit test at 80 to avoid
introducing a flaky test in CI. There is still work to do to improve
this further, some profiling / tracing work might help.

Breaking large pipelines in logical groups (branches or pipelines in
pipelines) would help reduce the complexity and computational cost for
very large pipelines.

Fixes #3521

Signed-off-by: Andrea Frittoli <andrea.frittoli@uk.ibm.com>
  • Loading branch information
afrittoli committed Nov 16, 2020
1 parent cdc171b commit 72455ba
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 64 deletions.
8 changes: 7 additions & 1 deletion pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,9 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun, get
return err
}

// Reset the skipped status to trigger recalculation
pipelineRunFacts.ResetSkippedCache()

after := pipelineRunFacts.GetPipelineConditionStatus(pr, logger)
switch after.Status {
case corev1.ConditionTrue:
Expand Down Expand Up @@ -565,12 +568,15 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip
}

resources.ApplyTaskResults(nextRprts, resolvedResultRefs)
// After we apply Task Results, we may be able to evaluate more
// when expressions, so reset the skipped cache
pipelineRunFacts.ResetSkippedCache()

// GetFinalTasks only returns tasks when a DAG is complete
nextRprts = append(nextRprts, pipelineRunFacts.GetFinalTasks()...)

for _, rprt := range nextRprts {
if rprt == nil || rprt.Skip(pipelineRunFacts) {
if rprt == nil || pipelineRunFacts.IsTaskSkipped(rprt.PipelineTask.Name) {
continue
}
if rprt.ResolvedConditionChecks == nil || rprt.ResolvedConditionChecks.IsSuccess() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2287,7 +2287,7 @@ func TestReconcileWithWhenExpressionsWithTaskResults(t *testing.T) {
t.Fatalf("Failure to list TaskRun's %s", err)
}
if len(actualSkippedTask.Items) != 0 {
t.Fatalf("Expected 0 TaskRuns got %d", len(actualSkippedTask.Items))
t.Fatalf("When listing %s, expected 0 TaskRuns got %d", skippedTask, len(actualSkippedTask.Items))
}
}
}
Expand Down
54 changes: 6 additions & 48 deletions pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type ResolvedPipelineRunTask struct {

// IsDone returns true only if the task is skipped, succeeded or failed
func (t ResolvedPipelineRunTask) IsDone(facts *PipelineRunFacts) bool {
return t.Skip(facts) || t.IsSuccessful() || t.IsFailure()
return facts.IsTaskSkipped(t.PipelineTask.Name) || t.IsSuccessful() || t.IsFailure()
}

// IsSuccessful returns true only if the taskrun itself has completed successfully
Expand Down Expand Up @@ -122,35 +122,6 @@ func (t ResolvedPipelineRunTask) IsStarted() bool {
return true
}

func (t *ResolvedPipelineRunTask) checkParentsDone(facts *PipelineRunFacts) bool {
stateMap := facts.State.ToMap()
node := facts.TasksGraph.Nodes[t.PipelineTask.Name]
for _, p := range node.Prev {
if !stateMap[p.Task.HashKey()].IsDone(facts) {
return false
}
}
return true
}

// Skip returns true if a PipelineTask will not be run because
// (1) its When Expressions evaluated to false
// (2) its Condition Checks failed
// (3) its parent task was skipped
// (4) Pipeline is in stopping state (one of the PipelineTasks failed)
// Note that this means Skip returns false if a conditionCheck is in progress
func (t *ResolvedPipelineRunTask) Skip(facts *PipelineRunFacts) bool {
if facts.isFinalTask(t.PipelineTask.Name) || t.IsStarted() {
return false
}

if t.conditionsSkip() || t.whenExpressionsSkip(facts) || t.parentTasksSkip(facts) || facts.IsStopping() {
return true
}

return false
}

func (t *ResolvedPipelineRunTask) conditionsSkip() bool {
if len(t.ResolvedConditionChecks) > 0 {
if t.ResolvedConditionChecks.IsDone() && !t.ResolvedConditionChecks.IsSuccess() {
Expand All @@ -160,30 +131,17 @@ func (t *ResolvedPipelineRunTask) conditionsSkip() bool {
return false
}

func (t *ResolvedPipelineRunTask) whenExpressionsSkip(facts *PipelineRunFacts) bool {
if t.checkParentsDone(facts) {
if len(t.PipelineTask.WhenExpressions) > 0 {
if !t.PipelineTask.WhenExpressions.HaveVariables() {
if !t.PipelineTask.WhenExpressions.AllowsExecution() {
return true
}
func (t *ResolvedPipelineRunTask) whenExpressionsSkip() bool {
if len(t.PipelineTask.WhenExpressions) > 0 {
if !t.PipelineTask.WhenExpressions.HaveVariables() {
if !t.PipelineTask.WhenExpressions.AllowsExecution() {
return true
}
}
}
return false
}

func (t *ResolvedPipelineRunTask) parentTasksSkip(facts *PipelineRunFacts) bool {
stateMap := facts.State.ToMap()
node := facts.TasksGraph.Nodes[t.PipelineTask.Name]
for _, p := range node.Prev {
if stateMap[p.Task.HashKey()].Skip(facts) {
return true
}
}
return false
}

// GetTaskRun is a function that will retrieve the TaskRun name.
type GetTaskRun func(name string) (*v1beta1.TaskRun, error)

Expand Down
30 changes: 21 additions & 9 deletions pkg/reconciler/pipelinerun/resources/pipelinerunresolution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,11 @@ func DagFromState(state PipelineRunState) (*dag.Graph, error) {
return dag.Build(v1beta1.PipelineTaskList(pts))
}

func TestIsSkipped(t *testing.T) {
func TestSkipMap(t *testing.T) {
// NOTE(afrittoli) This test now logically belongs to
// pipelinerunstate_test.go, but I left it here for now as it
// makes the review much easier. I will move this to the other
// module in a follow-up patch.

tcs := []struct {
name string
Expand Down Expand Up @@ -611,6 +615,7 @@ func TestIsSkipped(t *testing.T) {
PipelineTask: &pts[6],
}},
expected: map[string]bool{
"mytask6": false,
"mytask7": false,
},
}, {
Expand All @@ -627,6 +632,7 @@ func TestIsSkipped(t *testing.T) {
PipelineTask: &pts[6],
}},
expected: map[string]bool{
"mytask6": true,
"mytask7": true,
},
}, {
Expand All @@ -647,6 +653,7 @@ func TestIsSkipped(t *testing.T) {
PipelineTask: &pts[6],
}},
expected: map[string]bool{
"mytask6": false,
"mytask7": false,
},
}, {
Expand Down Expand Up @@ -685,6 +692,7 @@ func TestIsSkipped(t *testing.T) {
},
}},
expected: map[string]bool{
"mytask6": false,
"mytask7": true,
},
}, {
Expand All @@ -705,6 +713,7 @@ func TestIsSkipped(t *testing.T) {
},
}},
expected: map[string]bool{
"mytask6": false,
"mytask7": true,
},
}, {
Expand Down Expand Up @@ -736,6 +745,8 @@ func TestIsSkipped(t *testing.T) {
},
}},
expected: map[string]bool{
"mytask6": false,
"mytask7": true,
"mytask10": true,
},
}, {
Expand Down Expand Up @@ -763,6 +774,8 @@ func TestIsSkipped(t *testing.T) {
},
}},
expected: map[string]bool{
"mytask6": false,
"mytask1": false,
"mytask8": true,
},
}, {
Expand Down Expand Up @@ -790,6 +803,8 @@ func TestIsSkipped(t *testing.T) {
},
}},
expected: map[string]bool{
"mytask1": false,
"mytask6": false,
"mytask7": true,
},
}, {
Expand Down Expand Up @@ -834,6 +849,7 @@ func TestIsSkipped(t *testing.T) {
},
}},
expected: map[string]bool{
"mytask1": false,
"mytask12": false,
},
}}
Expand All @@ -844,19 +860,15 @@ func TestIsSkipped(t *testing.T) {
if err != nil {
t.Fatalf("Could not get a dag from the TC state %#v: %v", tc.state, err)
}
stateMap := tc.state.ToMap()
facts := PipelineRunFacts{
State: tc.state,
TasksGraph: d,
FinalTasksGraph: &dag.Graph{},
}
for taskName, isSkipped := range tc.expected {
rprt := stateMap[taskName]
if rprt == nil {
t.Fatalf("Could not get task %s from the state: %v", taskName, tc.state)
}
if d := cmp.Diff(isSkipped, rprt.Skip(&facts)); d != "" {
t.Errorf("Didn't get expected isSkipped %s", diff.PrintWantGot(d))
actual := facts.SkipMap()
for taskname, expected := range tc.expected {
if d := cmp.Diff(actual[taskname], expected); d != "" {
t.Errorf("Skipped status did not match for a task %s", diff.PrintWantGot(d))
}
}
})
Expand Down
106 changes: 101 additions & 5 deletions pkg/reconciler/pipelinerun/resources/pipelinerunstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,23 @@ type PipelineRunState []*ResolvedPipelineRunTask

// PipelineRunFacts is a collection of list of ResolvedPipelineTask, graph of DAG tasks, and graph of finally tasks
type PipelineRunFacts struct {
State PipelineRunState
TasksGraph *dag.Graph
// State represents the state of all pipeline tasks in the pipeline
State PipelineRunState

// TasksGraph is the graph of all the pipeline tasks in the main pipeline
TasksGraph *dag.Graph

// FinalTasksGraph is the graph of all the final tasks executed once the main pipeline is complete
FinalTasksGraph *dag.Graph

// WontRun is a hash of all PipelineTask names that stores whether a task will be
// executed or not, because it's either not reachable via the DAG due to the pipeline
// state, or because it has failed conditions.
// We store this list in along the state, because it requires traversing the whole
// graph; this way so we can build it once, when the reconcile starts, and update
// at the end of reconcile, instead of having to traverse the whole graph every time
// we want to know if a specific task will be skipped.
WontRun map[string]bool
}

// pipelineRunStatusCount holds the count of successful, failed, cancelled, skipped, and incomplete tasks
Expand Down Expand Up @@ -185,6 +199,84 @@ func (facts *PipelineRunFacts) IsStopping() bool {
return false
}

// IsTaskSkipped returns true if the PipelineRun won't be scheduling the name task
func (facts *PipelineRunFacts) IsTaskSkipped(pipelineTaskName string) bool {
if facts.WontRun == nil {
// We do lazy initialisation of this map
facts.WontRun = facts.SkipMap()
}
wontRun, _ := facts.WontRun[pipelineTaskName]
return wontRun
}

// ResetSkippedCache resets the skipped cache in the facts map
func (facts *PipelineRunFacts) ResetSkippedCache() {
facts.WontRun = nil
}

// SkipMap returns map pipeline task -> bool that tells whether a
// PipelineTask will not be run because:
// (1) its When Expressions evaluated to false (and all parents were done)
// (2) its Condition Checks failed
// (3) any of it parent tasks were skipped
// (4) Pipeline is in stopping state (one of the PipelineTasks failed)
// Note that this means Skip returns false if a conditionCheck is in progress
func (facts *PipelineRunFacts) SkipMap() map[string]bool {
skipMapResult := make(map[string]bool)
pipelineTaskMap := facts.State.ToMap()
isStopping := facts.IsStopping()
skipMap(pipelineTaskMap, isStopping, skipMapResult, facts.TasksGraph)
return skipMapResult
}

func skipMap(pipelineTaskMap map[string]*ResolvedPipelineRunTask, isStopping bool, partialSkipMap map[string]bool, g *dag.Graph, doneTasks ...string) {
// We can safely ignore errors here because we're artificially
// traversing the graph to discover skip tasks
nextTasks, _ := dag.GetSchedulable(g, doneTasks...)
// If there are tasks left, we need to continue to traverse the graph
if len(nextTasks) > 0 {
TASKLOOP:
for task := range nextTasks {
t, _ := pipelineTaskMap[task]
// Default to false, in case no check matches
// Regular task waiting for its turn to run
partialSkipMap[task] = false
// Checks that do not depend on parents
if t.IsStarted() {
partialSkipMap[task] = false
continue
} else if isStopping || t.conditionsSkip() {
partialSkipMap[task] = true
continue
}
// Check that depend on parents
node := g.Nodes[t.PipelineTask.Name]
parentsDone := true
for _, p := range node.Prev {
parent := pipelineTaskMap[p.Task.HashKey()]
// Parents must be tracked in the partial skipped map
skipped, _ := partialSkipMap[parent.PipelineTask.Name]
// One parent is enough to mark this as skipped
if skipped {
partialSkipMap[task] = true
continue TASKLOOP
}
done := parent.IsSuccessful() || parent.IsFailure()
// One parent not done is enough to mark parents as not done
if !done {
parentsDone = false
break
}
}
if parentsDone && t.whenExpressionsSkip() {
partialSkipMap[task] = true
}
}
newDoneTasks := append(doneTasks, nextTasks.List()...)
skipMap(pipelineTaskMap, isStopping, partialSkipMap, g, newDoneTasks...)
}
}

// DAGExecutionQueue returns a list of DAG tasks which needs to be scheduled next
func (facts *PipelineRunFacts) DAGExecutionQueue() (PipelineRunState, error) {
tasks := PipelineRunState{}
Expand Down Expand Up @@ -296,9 +388,13 @@ func (facts *PipelineRunFacts) GetPipelineConditionStatus(pr *v1beta1.PipelineRu

// GetSkippedTasks constructs a list of SkippedTask struct to be included in the PipelineRun Status
func (facts *PipelineRunFacts) GetSkippedTasks() []v1beta1.SkippedTask {
// GetSkippedTasks is invoked at the end of the reconcile cycle
// to update the pipelinerun message with the number of skipped tasks
// `facts.IsTaskSkipped` reflects the status at the beginning of the
// reconcile, so we need to force it to recompute the correct skipped list
var skipped []v1beta1.SkippedTask
for _, rprt := range facts.State {
if rprt.Skip(facts) {
if facts.IsTaskSkipped(rprt.PipelineTask.Name) {
skippedTask := v1beta1.SkippedTask{
Name: rprt.PipelineTask.Name,
WhenExpressions: rprt.PipelineTask.WhenExpressions,
Expand All @@ -315,7 +411,7 @@ func (facts *PipelineRunFacts) successfulOrSkippedDAGTasks() []string {
tasks := []string{}
for _, t := range facts.State {
if facts.isDAGTask(t.PipelineTask.Name) {
if t.IsSuccessful() || t.Skip(facts) {
if t.IsSuccessful() || facts.IsTaskSkipped(t.PipelineTask.Name) {
tasks = append(tasks, t.PipelineTask.Name)
}
}
Expand Down Expand Up @@ -367,7 +463,7 @@ func (facts *PipelineRunFacts) getPipelineTasksCount() pipelineRunStatusCount {
case t.IsCancelled():
s.Cancelled++
// increment skip counter since the task is skipped
case t.Skip(facts):
case facts.IsTaskSkipped(t.PipelineTask.Name):
s.Skipped++
// increment incomplete counter since the task is pending and not executed yet
default:
Expand Down
Loading

0 comments on commit 72455ba

Please sign in to comment.