Skip to content

Commit

Permalink
Adds an initial implementation for conditionals
Browse files Browse the repository at this point in the history
In this implementation, condition evaluations aka ConditionChecks are
backed by TaskRuns. All conditionChecks associated with a `PipelineTask`
have to succeed before the task is executed. If a ConditionCheck fails,
the PipelineTask's associated TaskRun is marked failed i.e. its
`Status.ConditionSucceeded` is False. However, the PipelineRun itself
is not marked as failed.
  • Loading branch information
dibyom committed Jul 9, 2019
1 parent a52b36b commit c8311f7
Show file tree
Hide file tree
Showing 11 changed files with 1,209 additions and 79 deletions.
2 changes: 2 additions & 0 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func main() {
taskRunInformer := pipelineInformerFactory.Tekton().V1alpha1().TaskRuns()
resourceInformer := pipelineInformerFactory.Tekton().V1alpha1().PipelineResources()
podInformer := kubeInformerFactory.Core().V1().Pods()
conditionInformer := pipelineInformerFactory.Tekton().V1alpha1().Conditions()

pipelineInformer := pipelineInformerFactory.Tekton().V1alpha1().Pipelines()
pipelineRunInformer := pipelineInformerFactory.Tekton().V1alpha1().PipelineRuns()
Expand All @@ -135,6 +136,7 @@ func main() {
clusterTaskInformer,
taskRunInformer,
resourceInformer,
conditionInformer,
timeoutHandler,
)
// Build all of our controllers, with the clients constructed above.
Expand Down
13 changes: 7 additions & 6 deletions pkg/apis/pipeline/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ package pipeline

// GroupName is the Kubernetes resource group name for Pipeline types.
const (
GroupName = "tekton.dev"
TaskLabelKey = "/task"
TaskRunLabelKey = "/taskRun"
PipelineLabelKey = "/pipeline"
PipelineRunLabelKey = "/pipelineRun"
PipelineTaskLabelKey = "/pipelineTask"
GroupName = "tekton.dev"
TaskLabelKey = "/task"
TaskRunLabelKey = "/taskRun"
PipelineLabelKey = "/pipeline"
PipelineRunLabelKey = "/pipelineRun"
PipelineTaskLabelKey = "/pipelineTask"
PipelineRunConditionCheckKey = "/pipelineConditionCheck"
)
1 change: 1 addition & 0 deletions pkg/apis/pipeline/v1alpha1/condition_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package v1alpha1

import (
"context"

"github.com/knative/pkg/apis"
"k8s.io/apimachinery/pkg/api/equality"
)
Expand Down
212 changes: 151 additions & 61 deletions pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/knative/pkg/tracker"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
artifacts "github.com/tektoncd/pipeline/pkg/artifacts"
"github.com/tektoncd/pipeline/pkg/artifacts"
informers "github.com/tektoncd/pipeline/pkg/client/informers/externalversions/pipeline/v1alpha1"
listers "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1alpha1"
"github.com/tektoncd/pipeline/pkg/reconciler"
Expand Down Expand Up @@ -58,6 +58,9 @@ const (
// ReasonCouldntGetResource indicates that the reason for the failure status is that the
// associated PipelineRun's bound PipelineResources couldn't all be retrieved
ReasonCouldntGetResource = "CouldntGetResource"
// ReasonCouldntGetCondition indicates that the reason for the failure status is that the
// associated Pipeline's Conditions couldn't all be retrieved
ReasonCouldntGetCondition = "CouldntGetCondition"
// ReasonFailedValidation indicates that the reason for failure status is
// that pipelinerun failed runtime validation
ReasonFailedValidation = "PipelineValidationFailed"
Expand Down Expand Up @@ -89,6 +92,7 @@ type Reconciler struct {
taskLister listers.TaskLister
clusterTaskLister listers.ClusterTaskLister
resourceLister listers.PipelineResourceLister
conditionLister listers.ConditionLister
tracker tracker.Interface
configStore configStore
timeoutHandler *reconciler.TimeoutSet
Expand All @@ -106,6 +110,7 @@ func NewController(
clusterTaskInformer informers.ClusterTaskInformer,
taskRunInformer informers.TaskRunInformer,
resourceInformer informers.PipelineResourceInformer,
conditionInformer informers.ConditionInformer,
timeoutHandler *reconciler.TimeoutSet,
) *controller.Impl {

Expand All @@ -117,6 +122,7 @@ func NewController(
clusterTaskLister: clusterTaskInformer.Lister(),
taskRunLister: taskRunInformer.Lister(),
resourceLister: resourceInformer.Lister(),
conditionLister: conditionInformer.Lister(),
timeoutHandler: timeoutHandler,
}

Expand Down Expand Up @@ -302,6 +308,9 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
return c.clusterTaskLister.Get(name)
},
c.resourceLister.PipelineResources(pr.Namespace).Get,
func(name string) (*v1alpha1.Condition, error) {
return c.conditionLister.Conditions(pr.Namespace).Get(name)
},
p.Spec.Tasks, providedResources,
)
if err != nil {
Expand All @@ -323,6 +332,14 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
Message: fmt.Sprintf("PipelineRun %s can't be Run; it tries to bind Resources that don't exist: %s",
fmt.Sprintf("%s/%s", p.Namespace, pr.Name), err),
})
case *resources.ConditionNotFoundError:
pr.Status.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: ReasonCouldntGetCondition,
Message: fmt.Sprintf("PipelineRun %s can't be Run; it contains Conditions that don't exist: %s",
fmt.Sprintf("%s/%s", p.Namespace, pr.Name), err),
})
default:
pr.Status.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Expand Down Expand Up @@ -377,6 +394,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
if err != nil {
c.Logger.Errorf("Error getting potential next tasks for valid pipelinerun %s: %v", pr.Name, err)
}

rprts := pipelineState.GetNextTasks(candidateTasks)

var as artifacts.ArtifactStorageInterface
Expand All @@ -387,11 +405,21 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er

for _, rprt := range rprts {
if rprt != nil {
c.Logger.Infof("Creating a new TaskRun object %s", rprt.TaskRunName)
rprt.TaskRun, err = c.createTaskRun(c.Logger, rprt, pr, as.StorageBasePath(pr))
if err != nil {
c.Recorder.Eventf(pr, corev1.EventTypeWarning, "TaskRunCreationFailed", "Failed to create TaskRun %q: %v", rprt.TaskRunName, err)
return xerrors.Errorf("error creating TaskRun called %s for PipelineTask %s from PipelineRun %s: %w", rprt.TaskRunName, rprt.PipelineTask.Name, pr.Name, err)
if rprt.ResolvedConditionChecks == nil || rprt.ResolvedConditionChecks.IsSuccess() {
c.Logger.Infof("Creating a new TaskRun object %s", rprt.TaskRunName)
rprt.TaskRun, err = c.createTaskRun(c.Logger, rprt, pr, as.StorageBasePath(pr))
if err != nil {
c.Recorder.Eventf(pr, corev1.EventTypeWarning, "TaskRunCreationFailed", "Failed to create TaskRun %q: %v", rprt.TaskRunName, err)
return xerrors.Errorf("error creating TaskRun called %s for PipelineTask %s from PipelineRun %s: %w", rprt.TaskRunName, rprt.PipelineTask.Name, pr.Name, err)
}
} else if !rprt.ResolvedConditionChecks.HasStarted() {
for _, rcc := range rprt.ResolvedConditionChecks {
rcc.ConditionCheck, err = c.makeConditionCheckContainer(c.Logger, rprt, rcc, pr)
if err != nil {
c.Recorder.Eventf(pr, corev1.EventTypeWarning, "ConditionCheckCreationFailed", "Failed to create TaskRun %q: %v", rcc.ConditionCheckName, err)
return xerrors.Errorf("error creating ConditionCheck container called %s for PipelineTask %s from PipelineRun %s: %w", rcc.ConditionCheckName, rprt.PipelineTask.Name, pr.Name, err)
}
}
}
}
}
Expand All @@ -408,21 +436,54 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er

func updateTaskRunsStatus(pr *v1alpha1.PipelineRun, pipelineState []*resources.ResolvedPipelineRunTask) {
for _, rprt := range pipelineState {
if rprt.TaskRun == nil && rprt.ResolvedConditionChecks == nil {
continue
}
var prtrs *v1alpha1.PipelineRunTaskRunStatus
if rprt.TaskRun != nil {
prtrs := pr.Status.TaskRuns[rprt.TaskRun.Name]
if prtrs == nil {
prtrs = &v1alpha1.PipelineRunTaskRunStatus{
PipelineTaskName: rprt.PipelineTask.Name,
}
pr.Status.TaskRuns[rprt.TaskRun.Name] = prtrs
prtrs = pr.Status.TaskRuns[rprt.TaskRun.Name]
}
if prtrs == nil {
prtrs = &v1alpha1.PipelineRunTaskRunStatus{
PipelineTaskName: rprt.PipelineTask.Name,
}
}

if rprt.TaskRun != nil {
prtrs.Status = &rprt.TaskRun.Status
}

if len(rprt.ResolvedConditionChecks) > 0 {
cStatus := make(map[string]*v1alpha1.PipelineRunConditionCheckStatus)
for _, c := range rprt.ResolvedConditionChecks {
cStatus[c.ConditionCheckName] = &v1alpha1.PipelineRunConditionCheckStatus{
ConditionName: c.Condition.Name,
}
if c.ConditionCheck != nil {
ccStatus := v1alpha1.ConditionCheckStatus(c.ConditionCheck.Status)
cStatus[c.ConditionCheckName].Status = &ccStatus
}
}
prtrs.ConditionChecks = cStatus
if rprt.ResolvedConditionChecks.IsComplete() && !rprt.ResolvedConditionChecks.IsSuccess() {
if prtrs.Status == nil {
prtrs.Status = &v1alpha1.TaskRunStatus{}
}
prtrs.Status.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: resources.ReasonConditionCheckFailed,
Message: fmt.Sprintf("ConditionChecks failed for Task %s in PipelineRun %s", rprt.TaskRunName, pr.Name),
})
}
}
pr.Status.TaskRuns[rprt.TaskRunName] = prtrs
}
}

func (c *Reconciler) updateTaskRunsStatusDirectly(pr *v1alpha1.PipelineRun) error {
for taskRunName := range pr.Status.TaskRuns {
// TODO(dibyom): Add conditionCheck statuses here
prtrs := pr.Status.TaskRuns[taskRunName]
tr, err := c.taskRunLister.TaskRuns(pr.Namespace).Get(taskRunName)
if err != nil {
Expand All @@ -434,53 +495,10 @@ func (c *Reconciler) updateTaskRunsStatusDirectly(pr *v1alpha1.PipelineRun) erro
prtrs.Status = &tr.Status
}
}

return nil
}

func (c *Reconciler) createTaskRun(logger *zap.SugaredLogger, rprt *resources.ResolvedPipelineRunTask, pr *v1alpha1.PipelineRun, storageBasePath string) (*v1alpha1.TaskRun, error) {
var taskRunTimeout = &metav1.Duration{Duration: 0 * time.Second}

if pr.Spec.Timeout != nil {
pTimeoutTime := pr.Status.StartTime.Add(pr.Spec.Timeout.Duration)
if time.Now().After(pTimeoutTime) {
// Just in case something goes awry and we're creating the TaskRun after it should have already timed out,
// set a timeout of 0.
taskRunTimeout = &metav1.Duration{Duration: time.Until(pTimeoutTime)}
if taskRunTimeout.Duration < 0 {
taskRunTimeout = &metav1.Duration{Duration: 0 * time.Second}
}
} else {
taskRunTimeout = pr.Spec.Timeout
}
} else {
taskRunTimeout = nil
}

// If service account is configured for a given PipelineTask, override PipelineRun's seviceAccount
serviceAccount := pr.Spec.ServiceAccount
for _, sa := range pr.Spec.ServiceAccounts {
if sa.TaskName == rprt.PipelineTask.Name {
serviceAccount = sa.ServiceAccount
}
}

// Propagate labels from PipelineRun to TaskRun.
labels := make(map[string]string, len(pr.ObjectMeta.Labels)+1)
for key, val := range pr.ObjectMeta.Labels {
labels[key] = val
}
labels[pipeline.GroupName+pipeline.PipelineRunLabelKey] = pr.Name
if rprt.PipelineTask.Name != "" {
labels[pipeline.GroupName+pipeline.PipelineTaskLabelKey] = rprt.PipelineTask.Name
}

// Propagate annotations from PipelineRun to TaskRun.
annotations := make(map[string]string, len(pr.ObjectMeta.Annotations)+1)
for key, val := range pr.ObjectMeta.Annotations {
annotations[key] = val
}

tr, _ := c.taskRunLister.TaskRuns(pr.Namespace).Get(rprt.TaskRunName)

if tr != nil {
Expand All @@ -498,8 +516,8 @@ func (c *Reconciler) createTaskRun(logger *zap.SugaredLogger, rprt *resources.Re
Name: rprt.TaskRunName,
Namespace: pr.Namespace,
OwnerReferences: pr.GetOwnerReference(),
Labels: labels,
Annotations: annotations,
Labels: getTaskrunLabels(pr, rprt.PipelineTask.Name), // Propagate labels from PipelineRun to TaskRun.
Annotations: getTaskrunAnnotations(pr), // Propagate annotations from PipelineRun to TaskRun.
},
Spec: v1alpha1.TaskRunSpec{
TaskRef: &v1alpha1.TaskRef{
Expand All @@ -509,19 +527,38 @@ func (c *Reconciler) createTaskRun(logger *zap.SugaredLogger, rprt *resources.Re
Inputs: v1alpha1.TaskRunInputs{
Params: rprt.PipelineTask.Params,
},
ServiceAccount: serviceAccount,
Timeout: taskRunTimeout,
ServiceAccount: getServiceAccount(pr, rprt.PipelineTask.Name),
Timeout: getTaskRunTimeout(pr),
NodeSelector: pr.Spec.NodeSelector,
Tolerations: pr.Spec.Tolerations,
Affinity: pr.Spec.Affinity,
},
}
}}

resources.WrapSteps(&tr.Spec, rprt.PipelineTask, rprt.ResolvedTaskResources.Inputs, rprt.ResolvedTaskResources.Outputs, storageBasePath)

return c.PipelineClientSet.TektonV1alpha1().TaskRuns(pr.Namespace).Create(tr)
}

func getTaskrunAnnotations(pr *v1alpha1.PipelineRun) map[string]string {
annotations := make(map[string]string, len(pr.ObjectMeta.Annotations)+1)
for key, val := range pr.ObjectMeta.Annotations {
annotations[key] = val
}
return annotations
}

func getTaskrunLabels(pr *v1alpha1.PipelineRun, pipelineTaskName string) map[string]string {
labels := make(map[string]string, len(pr.ObjectMeta.Labels)+1)
for key, val := range pr.ObjectMeta.Labels {
labels[key] = val
}
labels[pipeline.GroupName+pipeline.PipelineRunLabelKey] = pr.Name
if pipelineTaskName != "" {
labels[pipeline.GroupName+pipeline.PipelineTaskLabelKey] = pipelineTaskName
}
return labels
}

func addRetryHistory(tr *v1alpha1.TaskRun) {
newStatus := *tr.Status.DeepCopy()
newStatus.RetriesStatus = nil
Expand Down Expand Up @@ -565,3 +602,56 @@ func (c *Reconciler) updateLabelsAndAnnotations(pr *v1alpha1.PipelineRun) (*v1al
}
return newPr, nil
}

func (c *Reconciler) makeConditionCheckContainer(logger *zap.SugaredLogger, rprt *resources.ResolvedPipelineRunTask, rcc *resources.ResolvedConditionCheck, pr *v1alpha1.PipelineRun) (*v1alpha1.ConditionCheck, error) {
labels := getTaskrunLabels(pr, rprt.PipelineTask.Name)
labels[pipeline.GroupName+pipeline.PipelineRunConditionCheckKey] = rcc.ConditionCheckName

tr := &v1alpha1.TaskRun{
ObjectMeta: metav1.ObjectMeta{
Name: rcc.ConditionCheckName,
Namespace: pr.Namespace,
OwnerReferences: pr.GetOwnerReference(),
Labels: labels,
Annotations: getTaskrunAnnotations(pr), // Propagate annotations from PipelineRun to TaskRun.
},
Spec: v1alpha1.TaskRunSpec{
TaskSpec: rcc.ConditionToTaskSpec(),
ServiceAccount: getServiceAccount(pr, rprt.PipelineTask.Name),
Timeout: getTaskRunTimeout(pr),
NodeSelector: pr.Spec.NodeSelector,
Tolerations: pr.Spec.Tolerations,
Affinity: pr.Spec.Affinity,
}}

cctr, err := c.PipelineClientSet.TektonV1alpha1().TaskRuns(pr.Namespace).Create(tr)
cc := v1alpha1.ConditionCheck(*cctr)
return &cc, err
}

func getTaskRunTimeout(pr *v1alpha1.PipelineRun) *metav1.Duration {
var taskRunTimeout = &metav1.Duration{Duration: 0 * time.Second}
pTimeoutTime := pr.Status.StartTime.Add(pr.Spec.Timeout.Duration)
if time.Now().After(pTimeoutTime) {
// Just in case something goes awry and we're creating the TaskRun after it should have already timed out,
// set a timeout of 0.
taskRunTimeout = &metav1.Duration{Duration: time.Until(pTimeoutTime)}
if taskRunTimeout.Duration < 0 {
taskRunTimeout = &metav1.Duration{Duration: 0 * time.Second}
}
} else {
taskRunTimeout = pr.Spec.Timeout
}
return taskRunTimeout
}

func getServiceAccount(pr *v1alpha1.PipelineRun, pipelineTaskName string) string {
// If service account is configured for a given PipelineTask, override PipelineRun's seviceAccount
serviceAccount := pr.Spec.ServiceAccount
for _, sa := range pr.Spec.ServiceAccounts {
if sa.TaskName == pipelineTaskName {
serviceAccount = sa.ServiceAccount
}
}
return serviceAccount
}
Loading

0 comments on commit c8311f7

Please sign in to comment.