Skip to content

Commit

Permalink
Allow delay before job completeness checks in remote workflow.
Browse files Browse the repository at this point in the history
  • Loading branch information
riga committed Mar 2, 2023
1 parent c232192 commit b18d4ce
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 1 deletion.
3 changes: 3 additions & 0 deletions law/contrib/arc/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ def arc_job_config(self, config, job_num, branches):
def arc_check_job_completeness(self):
return False

def arc_check_job_completeness_delay(self):
return 0.0

def arc_use_local_scheduler(self):
return True

Expand Down
3 changes: 3 additions & 0 deletions law/contrib/glite/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ def glite_job_config(self, config, job_num, branches):
def glite_check_job_completeness(self):
return False

def glite_check_job_completeness_delay(self):
return 0.0

def glite_use_local_scheduler(self):
return True

Expand Down
3 changes: 3 additions & 0 deletions law/contrib/htcondor/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ def htcondor_job_config(self, config, job_num, branches):
def htcondor_check_job_completeness(self):
return False

def htcondor_check_job_completeness_delay(self):
return 0.0

def htcondor_use_local_scheduler(self):
return False

Expand Down
3 changes: 3 additions & 0 deletions law/contrib/lsf/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ def lsf_job_config(self, config, job_num, branches):
def lsf_check_job_completeness(self):
return False

def lsf_check_job_completeness_delay(self):
return 0.0

def lsf_use_local_scheduler(self):
return True

Expand Down
3 changes: 3 additions & 0 deletions law/contrib/slurm/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ def slurm_job_config(self, config, job_num, branches):
def slurm_check_job_completeness(self):
return False

def slurm_check_job_completeness_delay(self):
return 0.0

def slurm_use_local_scheduler(self):
return False

Expand Down
7 changes: 6 additions & 1 deletion law/workflow/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -954,6 +954,12 @@ def poll(self):
del unknown_jobs
del states_by_id

# get settings from the task for triggering post-finished status checks
check_completeness = self._get_task_attribute("check_job_completeness")()
check_completeness_delay = self._get_task_attribute("check_job_completeness_delay")()
if check_completeness_delay:
time.sleep(check_completeness_delay)

# store jobs per status and take further actions depending on the status
pending_jobs = OrderedDict()
running_jobs = OrderedDict()
Expand All @@ -974,7 +980,6 @@ def poll(self):

if data["status"] == self.job_manager.FINISHED:
# additionally check if the outputs really exist
check_completeness = self._get_task_attribute("check_job_completeness")()
if not check_completeness or all(
self.task.as_branch(b).complete()
for b in sub_data["branches"]
Expand Down

0 comments on commit b18d4ce

Please sign in to comment.