Skip to content

Commit

Permalink
Merge pull request #155 from riga/feature/passthrough_workflow_type
Browse files Browse the repository at this point in the history
Passthrough workflow type.
  • Loading branch information
riga committed May 24, 2023
2 parents 257417e + 75c8c45 commit f54534a
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 7 deletions.
2 changes: 0 additions & 2 deletions law/task/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,8 +713,6 @@ def complete(self):
if self.cache_requirements:
if self._cached_requirements is no_value:
self._cached_requirements = self.requires()
else:
print("WRAPPER.COMPLETE() TAKING REQS FROM CACHE BITCHES")
reqs = self._cached_requirements
else:
reqs = self.requires()
Expand Down
40 changes: 35 additions & 5 deletions law/workflow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,10 @@ class BaseWorkflow(six.with_metaclass(WorkflowRegister, Task)):
description="the type of the workflow to use; uses the first workflow type in the MRO when "
"empty; default: empty",
)
effective_workflow = luigi.Parameter(
default=NO_STR,
description="do not set manually",
)
acceptance = luigi.FloatParameter(
default=1.0,
significant=False,
Expand Down Expand Up @@ -405,13 +409,17 @@ class BaseWorkflow(six.with_metaclass(WorkflowRegister, Task)):
create_branch_map_before_repr = False
workflow_run_decorators = None
cache_workflow_requirements = False
passthrough_requested_workflow = True

# accessible properties
workflow_property = None
cached_workflow_property = None

exclude_index = True

exclude_params_req = {"effective_workflow"}
exclude_params_index = {"effective_workflow"}
exclude_params_repr = {"workflow"}
exclude_params_branch = {"acceptance", "tolerance", "pilot", "branches"}
exclude_params_workflow = {"branch"}

Expand All @@ -423,6 +431,13 @@ def modify_param_values(cls, params):
if params.get("workflow") in [None, NO_STR]:
params["workflow"] = cls.find_workflow_cls().workflow_proxy_cls.workflow_type

# set the effective workflow parameter based on the actual resolution
workflow_cls = cls.find_workflow_cls(
name=params["workflow"],
fallback_to_first=cls.passthrough_requested_workflow,
)
params["effective_workflow"] = workflow_cls.workflow_proxy_cls.workflow_type

# show deprecation error when start or end branch parameters are set
if "start_branch" in params or "end_branch" in params:
raise DeprecationWarning(
Expand All @@ -433,14 +448,21 @@ def modify_param_values(cls, params):
return params

@classmethod
def find_workflow_cls(cls, name=None):
def find_workflow_cls(cls, name=None, fallback_to_first=False):
first_cls = None

for workflow_cls in cls.mro():
if not issubclass(workflow_cls, BaseWorkflow):
continue
if not workflow_cls._defined_workflow_proxy:
continue
if name in [workflow_cls.workflow_proxy_cls.workflow_type, None, NO_STR]:
return workflow_cls
if first_cls is None:
first_cls = workflow_cls

if fallback_to_first and first_cls is not None:
return first_cls

msg = " for type '{}'".format(name) if name else ""
raise ValueError("cannot determine workflow class{} in task class {}".format(msg, cls))
Expand Down Expand Up @@ -488,9 +510,11 @@ def _initialize_workflow(self, force=False):
return

if self.is_workflow():
self._workflow_cls = self.find_workflow_cls(self.workflow)
self._workflow_cls = self.find_workflow_cls(self.effective_workflow)
self._workflow_proxy = self._workflow_cls.workflow_proxy_cls(task=self)
logger.debug("created workflow proxy instance of type '{}'".format(self.workflow))
logger.debug(
"created workflow proxy instance of type '{}'".format(self.effective_workflow),
)

self._workflow_initialized = True

Expand Down Expand Up @@ -527,8 +551,14 @@ def _repr_params(self, *args, **kwargs):
params = super(BaseWorkflow, self)._repr_params(*args, **kwargs)

if self.is_workflow():
# when this is a workflow, add the workflow type
params.setdefault("workflow", self.workflow)
# when this is a workflow, add the requested or effective workflow type,
# depending on whether the requested one is to be passed through
workflow = (
self.workflow
if self.passthrough_requested_workflow
else self.effective_workflow
)
params.setdefault("workflow", workflow)
# skip branches when empty
if not params.get("branches"):
params.pop("branches", None)
Expand Down

0 comments on commit f54534a

Please sign in to comment.