From 2e718c0f1d71ee74dfe8047c73c751a7a7bf19f4 Mon Sep 17 00:00:00 2001 From: Andrea Frittoli Date: Fri, 22 May 2020 18:00:35 +0100 Subject: [PATCH] Add support for more cloud events Change the event type string for taskruns from dev.tekton.event.task.* to dev.tekton.event.taskrun.*. Since cloud events are only sent by pipeline resources - which are alpha - we don't need to comply with the beta deprecation policy for this. Extend the cloudevents module to include more event types for TaskRuns and event types for PipelineRuns, with unit test coverage. This is achieved by introducing the RunsToCompletion and RunsToCompletionStatus interface which are met by TaskRun, PipelineRun and their respective status types. At this stage there is no event producer that generates these new events. This is preparing the stage for the next changes where we will start to emit cloud events in addition to the k8s events we emit today. Partially addresses #2082 Partially addresses #2684 Signed-off-by: Andrea Frittoli --- .../pipeline/v1beta1/pipelinerun_types.go | 20 ++++ pkg/apis/pipeline/v1beta1/run_interface.go | 42 +++++++ pkg/apis/pipeline/v1beta1/taskrun_types.go | 15 +++ .../events/cloudevent/cloudevent.go | 113 ++++++++++++++---- .../events/cloudevent/cloudevent_test.go | 79 +++++++++++- 5 files changed, 246 insertions(+), 23 deletions(-) create mode 100644 pkg/apis/pipeline/v1beta1/run_interface.go diff --git a/pkg/apis/pipeline/v1beta1/pipelinerun_types.go b/pkg/apis/pipeline/v1beta1/pipelinerun_types.go index 16654e6c39a..6e9823ee67b 100644 --- a/pkg/apis/pipeline/v1beta1/pipelinerun_types.go +++ b/pkg/apis/pipeline/v1beta1/pipelinerun_types.go @@ -72,6 +72,21 @@ func (pr *PipelineRun) GetTaskRunRef() corev1.ObjectReference { } } +// GetTypeMeta returns the task run type meta +func (pr *PipelineRun) GetTypeMeta() *metav1.TypeMeta { + return &pr.TypeMeta +} + +// GetObjectMeta returns the task run type meta +func (pr *PipelineRun) GetObjectMeta() *metav1.ObjectMeta { + return &pr.ObjectMeta +} + +// GetStatus returns the task run status as a RunsToCompletionStatus +func (pr *PipelineRun) GetStatus() RunsToCompletionStatus { + return &pr.Status +} + // GetOwnerReference gets the pipeline run as owner reference for any related objects func (pr *PipelineRun) GetOwnerReference() metav1.OwnerReference { return *metav1.NewControllerRef(pr, groupVersionKind) @@ -100,6 +115,11 @@ func (pr *PipelineRun) GetRunKey() string { // IsTimedOut returns true if a pipelinerun has exceeded its spec.Timeout based on its status.Timeout func (pr *PipelineRun) IsTimedOut() bool { + return pr.HasTimedOut() +} + +// HasTimedOut returns true if a pipelinerun has exceeded its spec.Timeout based on its status.Timeout +func (pr *PipelineRun) HasTimedOut() bool { pipelineTimeout := pr.Spec.Timeout startTime := pr.Status.StartTime diff --git a/pkg/apis/pipeline/v1beta1/run_interface.go b/pkg/apis/pipeline/v1beta1/run_interface.go new file mode 100644 index 00000000000..ddaf2ffb979 --- /dev/null +++ b/pkg/apis/pipeline/v1beta1/run_interface.go @@ -0,0 +1,42 @@ +/* +Copyright 2020 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1beta1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "knative.dev/pkg/apis" +) + +// RunsToCompletionStatus is implemented by TaskRun.Status and PipelineRun.Status +type RunsToCompletionStatus interface { + GetCondition(t apis.ConditionType) *apis.Condition + InitializeConditions() + SetCondition(newCond *apis.Condition) +} + +// RunsToCompletion is implemented by TaskRun and PipelineRun +type RunsToCompletion interface { + GetTypeMeta() *metav1.TypeMeta + GetObjectMeta() *metav1.ObjectMeta + GetOwnerReference() metav1.OwnerReference + GetStatus() RunsToCompletionStatus + IsDone() bool + HasStarted() bool + IsCancelled() bool + HasTimedOut() bool + GetRunKey() string +} diff --git a/pkg/apis/pipeline/v1beta1/taskrun_types.go b/pkg/apis/pipeline/v1beta1/taskrun_types.go index f52b4e5b2a0..3e52a29054d 100644 --- a/pkg/apis/pipeline/v1beta1/taskrun_types.go +++ b/pkg/apis/pipeline/v1beta1/taskrun_types.go @@ -180,11 +180,26 @@ type TaskRunResult struct { Value string `json:"value"` } +// GetTypeMeta returns the task run type meta +func (tr *TaskRun) GetTypeMeta() *metav1.TypeMeta { + return &tr.TypeMeta +} + +// GetObjectMeta returns the task run type meta +func (tr *TaskRun) GetObjectMeta() *metav1.ObjectMeta { + return &tr.ObjectMeta +} + // GetOwnerReference gets the task run as owner reference for any related objects func (tr *TaskRun) GetOwnerReference() metav1.OwnerReference { return *metav1.NewControllerRef(tr, taskRunGroupVersionKind) } +// GetStatus returns the task run status as a RunsToCompletionStatus +func (tr *TaskRun) GetStatus() RunsToCompletionStatus { + return &tr.Status +} + // GetCondition returns the Condition matching the given type. func (trs *TaskRunStatus) GetCondition(t apis.ConditionType) *apis.Condition { return taskRunCondSet.Manage(trs).GetCondition(t) diff --git a/pkg/reconciler/events/cloudevent/cloudevent.go b/pkg/reconciler/events/cloudevent/cloudevent.go index 25ed3aa734f..780fce0879b 100644 --- a/pkg/reconciler/events/cloudevent/cloudevent.go +++ b/pkg/reconciler/events/cloudevent/cloudevent.go @@ -35,12 +35,32 @@ import ( type TektonEventType string const ( + // TektonTaskRunStartedV1 is sent for TaskRuns with "ConditionSucceeded" "Unknown" + // the first time they are picked up by the reconciler + TektonTaskRunStartedV1 TektonEventType = "dev.tekton.event.taskrun.started.v1" + // TektonTaskRunRunningV1 is sent for TaskRuns with "ConditionSucceeded" "Unknown" + // once the TaskRun is validated and Pod created + TektonTaskRunRunningV1 TektonEventType = "dev.tekton.event.taskrun.running.v1" // TektonTaskRunUnknownV1 is sent for TaskRuns with "ConditionSucceeded" "Unknown" - TektonTaskRunUnknownV1 TektonEventType = "dev.tekton.event.task.unknown.v1" + // It can be used as a confirmation that the TaskRun is still running. + TektonTaskRunUnknownV1 TektonEventType = "dev.tekton.event.taskrun.unknown.v1" // TektonTaskRunSuccessfulV1 is sent for TaskRuns with "ConditionSucceeded" "True" - TektonTaskRunSuccessfulV1 TektonEventType = "dev.tekton.event.task.successful.v1" + TektonTaskRunSuccessfulV1 TektonEventType = "dev.tekton.event.taskrun.successful.v1" // TektonTaskRunFailedV1 is sent for TaskRuns with "ConditionSucceeded" "False" - TektonTaskRunFailedV1 TektonEventType = "dev.tekton.event.task.failed.v1" + TektonTaskRunFailedV1 TektonEventType = "dev.tekton.event.taskrun.failed.v1" + // TektonPipelineRunStartedV1 is sent for PipelineRuns with "ConditionSucceeded" "Unknown" + // the first time they are picked up by the reconciler + TektonPipelineRunStartedV1 TektonEventType = "dev.tekton.event.pipelinerun.started.v1" + // TektonPipelineRunRunningV1 is sent for PipelineRuns with "ConditionSucceeded" "Unknown" + // once the PipelineRun is validated and Pod created + TektonPipelineRunRunningV1 TektonEventType = "dev.tekton.event.pipelinerun.running.v1" + // TektonPipelineRunUnknownV1 is sent for PipelineRuns with "ConditionSucceeded" "Unknown" + // It can be used as a confirmation that the PipelineRun is still running. + TektonPipelineRunUnknownV1 TektonEventType = "dev.tekton.event.pipelinerun.unknown.v1" + // TektonPipelineRunSuccessfulV1 is sent for PipelineRuns with "ConditionSucceeded" "True" + TektonPipelineRunSuccessfulV1 TektonEventType = "dev.tekton.event.pipelinerun.successful.v1" + // TektonPipelineRunFailedV1 is sent for PipelineRuns with "ConditionSucceeded" "False" + TektonPipelineRunFailedV1 TektonEventType = "dev.tekton.event.pipelinerun.failed.v1" ) func (t TektonEventType) String() string { @@ -51,17 +71,41 @@ func (t TektonEventType) String() string { type CEClient cloudevents.Client // TektonCloudEventData type is used to marshal and unmarshal the payload of -// a Tekton cloud event. It only includes a TaskRun for now. Using a type opens -// the possibility for the future to add more data to the payload +// a Tekton cloud event. It can include a PipelineRun or a PipelineRun type TektonCloudEventData struct { - TaskRun *v1beta1.TaskRun `json:"taskRun"` + TaskRun *v1beta1.TaskRun `json:"taskRun,omitempty"` + PipelineRun *v1beta1.PipelineRun `json:"pipelineRun,omitempty"` } // NewTektonCloudEventData returns a new instance of NewTektonCloudEventData -func NewTektonCloudEventData(taskRun *v1beta1.TaskRun) TektonCloudEventData { - return TektonCloudEventData{ - TaskRun: taskRun, +func NewTektonCloudEventData(runObject v1beta1.RunsToCompletion) TektonCloudEventData { + tektonCloudEventData := TektonCloudEventData{} + switch runObject.(type) { + case *v1beta1.TaskRun: + tektonCloudEventData.TaskRun = runObject.(*v1beta1.TaskRun) + case *v1beta1.PipelineRun: + tektonCloudEventData.PipelineRun = runObject.(*v1beta1.PipelineRun) } + return tektonCloudEventData +} + +// EventForRunsToCompletion creates a new event based for a RunsToCompletion, +// or return an error if not possible. +func EventForRunsToCompletion(runObject v1beta1.RunsToCompletion) (*cloudevents.Event, error) { + event := cloudevents.NewEvent() + event.SetID(uuid.New().String()) + event.SetSubject(runObject.GetObjectMeta().Name) + event.SetSource(runObject.GetObjectMeta().SelfLink) // TODO: SelfLink is deprecated https://github.com/tektoncd/pipeline/issues/2676 + eventType, err := getEventType(runObject) + if err != nil { + return nil, err + } + event.SetType(eventType.String()) + + if err := event.SetData(cloudevents.ApplicationJSON, NewTektonCloudEventData(runObject)); err != nil { + return nil, err + } + return &event, nil } // EventForTaskRun will create a new event based on a TaskRun, @@ -71,27 +115,52 @@ func EventForTaskRun(taskRun *v1beta1.TaskRun) (*cloudevents.Event, error) { if taskRun == nil { return nil, errors.New("Cannot send an event for an empty TaskRun") } - event := cloudevents.NewEvent() - event.SetID(uuid.New().String()) - event.SetSubject(taskRun.ObjectMeta.Name) - event.SetSource(taskRun.ObjectMeta.SelfLink) // TODO: SelfLink is deprecated + return EventForRunsToCompletion(taskRun) +} - c := taskRun.Status.GetCondition(apis.ConditionSucceeded) +// EventForPipelineRun will create a new event based on a TaskRun, +// or return an error if not possible. +func EventForPipelineRun(pipelineRun *v1beta1.PipelineRun) (*cloudevents.Event, error) { + // Check if the TaskRun is defined + if pipelineRun == nil { + return nil, errors.New("Cannot send an event for an empty PipelineRun") + } + return EventForRunsToCompletion(pipelineRun) +} + +func getEventType(runObject v1beta1.RunsToCompletion) (*TektonEventType, error) { + c := runObject.GetStatus().GetCondition(apis.ConditionSucceeded) + t := runObject.GetTypeMeta() + var eventType TektonEventType switch { case c.IsUnknown(): - event.SetType(TektonTaskRunUnknownV1.String()) + // TBD We should have different event types here, e.g. started, running + // That requires having either knowledge about the previous condition or + // TaskRun and PipelineRun using dedicated "Reasons" or "Conditions" + switch t.Kind { + case "TaskRun": + eventType = TektonTaskRunUnknownV1 + case "PipelineRun": + eventType = TektonPipelineRunUnknownV1 + } case c.IsFalse(): - event.SetType(TektonTaskRunFailedV1.String()) + switch t.Kind { + case "TaskRun": + eventType = TektonTaskRunFailedV1 + case "PipelineRun": + eventType = TektonPipelineRunFailedV1 + } case c.IsTrue(): - event.SetType(TektonTaskRunSuccessfulV1.String()) + switch t.Kind { + case "TaskRun": + eventType = TektonTaskRunSuccessfulV1 + case "PipelineRun": + eventType = TektonPipelineRunSuccessfulV1 + } default: return nil, fmt.Errorf("unknown condition for in TaskRun.Status %s", c.Status) } - - if err := event.SetData(cloudevents.ApplicationJSON, NewTektonCloudEventData(taskRun)); err != nil { - return nil, err - } - return &event, nil + return &eventType, nil } // GetCloudEventDeliveryCompareOptions returns compare options to sort diff --git a/pkg/reconciler/events/cloudevent/cloudevent_test.go b/pkg/reconciler/events/cloudevent/cloudevent_test.go index aefaf847a78..ddeff54c82c 100644 --- a/pkg/reconciler/events/cloudevent/cloudevent_test.go +++ b/pkg/reconciler/events/cloudevent/cloudevent_test.go @@ -30,12 +30,17 @@ import ( ) const ( - defaultEventSourceURI = "/taskrun/1234" + defaultEventSourceURI = "/runtocompletion/1234" taskRunName = "faketaskrunname" + pipelineRunName = "fakepipelinerunname" ) func getTaskRunByCondition(status corev1.ConditionStatus) *v1beta1.TaskRun { return &v1beta1.TaskRun{ + TypeMeta: metav1.TypeMeta{ + Kind: "TaskRun", + APIVersion: "v1beta1", + }, ObjectMeta: metav1.ObjectMeta{ Name: taskRunName, Namespace: "marshmallow", @@ -53,6 +58,29 @@ func getTaskRunByCondition(status corev1.ConditionStatus) *v1beta1.TaskRun { } } +func getPipelineRunByCondition(status corev1.ConditionStatus) *v1beta1.PipelineRun { + return &v1beta1.PipelineRun{ + TypeMeta: metav1.TypeMeta{ + Kind: "PipelineRun", + APIVersion: "v1beta1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: pipelineRunName, + Namespace: "marshmallow", + SelfLink: defaultEventSourceURI, + }, + Spec: v1beta1.PipelineRunSpec{}, + Status: v1beta1.PipelineRunStatus{ + Status: duckv1beta1.Status{ + Conditions: []apis.Condition{{ + Type: apis.ConditionSucceeded, + Status: status, + }}, + }, + }, + } +} + func TestEventForTaskRun(t *testing.T) { for _, c := range []struct { desc string @@ -101,3 +129,52 @@ func TestEventForTaskRun(t *testing.T) { }) } } + +func TestEventForPipelineRun(t *testing.T) { + for _, c := range []struct { + desc string + pipelineRun *v1beta1.PipelineRun + wantEventType TektonEventType + }{{ + desc: "send a cloud event with unknown status taskrun", + pipelineRun: getPipelineRunByCondition(corev1.ConditionUnknown), + wantEventType: TektonPipelineRunUnknownV1, + }, { + desc: "send a cloud event with successful status taskrun", + pipelineRun: getPipelineRunByCondition(corev1.ConditionTrue), + wantEventType: TektonPipelineRunSuccessfulV1, + }, { + desc: "send a cloud event with unknown status taskrun", + pipelineRun: getPipelineRunByCondition(corev1.ConditionFalse), + wantEventType: TektonPipelineRunFailedV1, + }} { + t.Run(c.desc, func(t *testing.T) { + names.TestingSeed() + + got, err := EventForPipelineRun(c.pipelineRun) + if err != nil { + t.Fatalf("I did not expect an error but I got %s", err) + } else { + wantSubject := pipelineRunName + if d := cmp.Diff(wantSubject, got.Subject()); d != "" { + t.Errorf("Wrong Event ID %s", diff.PrintWantGot(d)) + } + if d := cmp.Diff(string(c.wantEventType), got.Type()); d != "" { + t.Errorf("Wrong Event Type %s", diff.PrintWantGot(d)) + } + wantData := NewTektonCloudEventData(c.pipelineRun) + gotData := TektonCloudEventData{} + if err := got.DataAs(&gotData); err != nil { + t.Errorf("Unexpected error from DataAsl; %s", err) + } + if d := cmp.Diff(wantData, gotData); d != "" { + t.Errorf("Wrong Event data %s", diff.PrintWantGot(d)) + } + + if err := got.Validate(); err != nil { + t.Errorf("Expected event to be valid; %s", err) + } + } + }) + } +}