diff --git a/worker/kubernetes.go b/worker/kubernetes.go index 4370a604..46d5df68 100644 --- a/worker/kubernetes.go +++ b/worker/kubernetes.go @@ -6,14 +6,15 @@ import ( "fmt" "io" "text/template" + "time" "github.com/ohsu-comp-bio/funnel/tes" v1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" + batchv1 "k8s.io/client-go/kubernetes/typed/batch/v1" "k8s.io/client-go/rest" ) @@ -84,14 +85,7 @@ func (kcmd KubernetesCommand) Run(ctx context.Context) error { return fmt.Errorf("creating job: %v", err) } - // Wait until the job finishes - watcher, err := client.Watch(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("job-name=%s-%d", taskId, kcmd.JobId)}) - if err != nil { - return err - } - - defer watcher.Stop() - waitForJobFinish(ctx, watcher) + waitForJobFinish(ctx, client, metav1.ListOptions{LabelSelector: fmt.Sprintf("job-name=%s-%d", taskId, kcmd.JobId)}) pods, err := clientset.CoreV1().Pods(kcmd.Namespace).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("job-name=%s-%d", taskId, kcmd.JobId)}) if err != nil { @@ -145,23 +139,30 @@ func (kcmd KubernetesCommand) Stop() error { } // Waits until the job finishes -func waitForJobFinish(ctx context.Context, watcher watch.Interface) { +func waitForJobFinish(ctx context.Context, client batchv1.JobInterface, listOptions metav1.ListOptions) { + ticker := time.NewTicker(10 * time.Second) + for { select { - case event := <-watcher.ResultChan(): - if event.Type == watch.Error || event.Type == watch.Deleted { + case <-ctx.Done(): + return + case <-ticker.C: + jobs, err := client.List(ctx, listOptions) + + if err != nil { return - } else if event.Type == watch.Bookmark { - continue } - job := event.Object.(*v1.Job) + if len(jobs.Items) == 0 { + // Should not happen + return + } + // There should be always only one job + job := jobs.Items[0] if job.Status.Succeeded > 0 || job.Status.Failed > 0 { return } - case <-ctx.Done(): - return } } }