Skip to content

Commit

Permalink
Refactor dynamic workflow condition.
Browse files Browse the repository at this point in the history
  • Loading branch information
riga committed Jan 19, 2024
1 parent bda49ee commit 95bedec
Showing 1 changed file with 55 additions and 31 deletions.
86 changes: 55 additions & 31 deletions law/workflow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
Workflow and workflow proxy base class definitions.
"""

__all__ = ["BaseWorkflow", "WorkflowParameter", "workflow_property", "dynamic_workflow_condition"]
__all__ = [
"BaseWorkflow", "WorkflowParameter", "workflow_property", "dynamic_workflow_condition",
"DynamicWorkflowCondition",
]


import re
Expand Down Expand Up @@ -236,13 +239,35 @@ def serialize(self, value):
return super(WorkflowParameter, self).serialize(value)


class dynamic_workflow_condition(object):
def dynamic_workflow_condition(
condition_fn=None,
create_branch_map_fn=None,
requires_fn=None,
output_fn=None,
):
"""
Decorator for a workflow method that defines whether the branch map can be dynamically
Decorator factory that is meant to wrap a workflow methods that defines a dynamic workflow
condition, returning a :py:class:`DynamicWorkflowCondition` instance.
"""
def decorator(condition_fn):
return DynamicWorkflowCondition(
condition_fn=condition_fn,
create_branch_map_fn=create_branch_map_fn,
requires_fn=requires_fn,
output_fn=output_fn,
)

return decorator if condition_fn is None else decorator(condition_fn)


class DynamicWorkflowCondition(object):
"""
Container for a workflow method that defines whether the branch map can be dynamically
constructed or whether a placeholder should be used until the condition is met. Similar to
Python's ``property``, the decorated object provides additional attributes for decorating other
methods that usually depend on the branch map, such as branch requirements or outputs.
Python's ``property``, instances of this class provide additional attributes for decorating
other methods that usually depend on the branch map, such as branch requirements or outputs.
It is recommended to use the :py:func:`dynamic_workflow_condition` decorator (factory).
Example:
.. code-block:: python
Expand Down Expand Up @@ -293,6 +318,11 @@ def run(self):
As a consequence, the amended workflow is fully dynamic with its exact shape potentially
depending heavily on conditions that are only known at runtime.
In case the ``workflow_condition`` involves a costly computation, it is recommended to cache
evluation of the condition by setting *cache_met_condition* argument to *True* or a string
denoting the task instance attribute where the met condition is stored. In the first case,
the attribute defaults to ``_dynamic_workflow_condition_met``.
"""

_decorator_result = object()
Expand All @@ -303,6 +333,7 @@ def __init__(
create_branch_map_fn=None,
requires_fn=None,
output_fn=None,
cache_met_condition=True,
):
super().__init__()

Expand All @@ -311,25 +342,32 @@ def __init__(
self._create_branch_map_fn = create_branch_map_fn
self._requires_fn = requires_fn
self._output_fn = output_fn
self.cache_met_condition = cache_met_condition
if self.cache_met_condition and not isinstance(cache_met_condition, str):
self.cache_met_condition = "_dynamic_workflow_condition_met"

def _wrap_condition_fn(self):
if self._condition_fn is None:
return None

@functools.wraps(self._condition_fn)
def condition(inst, *args, **kwargs):
return self._condition_fn(inst.as_workflow(), *args, **kwargs)
# when caching, and the condition is already met, return the cached value
if self.cache_met_condition and getattr(inst, self.cache_met_condition, False):
return getattr(inst, self.cache_met_condition)

# evaluate the condition
is_met = self._condition_fn(inst.as_workflow(), *args, **kwargs)

# write to cache if requested
if self.cache_met_condition and is_met:
setattr(inst, self.cache_met_condition, is_met)

return is_met

return condition

def create_branch_map(self, create_branch_map_fn):
# check the decorator method name
if create_branch_map_fn.__name__ != "create_branch_map":
raise NameError(
"the method decorated by dynamic_workflow_condition.create_branch_map should be "
"named 'create_branch_map', but got '{}'".format(create_branch_map_fn.__name__),
)

# store the function
self._create_branch_map_fn = create_branch_map_fn

Expand All @@ -352,13 +390,6 @@ def create_branch_map(inst, *args, **kwargs):
return create_branch_map

def requires(self, requires_fn):
# check the decorator method name
if requires_fn.__name__ != "requires":
raise NameError(
"the method decorated by dynamic_workflow_condition.requires should be "
"named 'requires', but got '{}'".format(requires_fn.__name__),
)

# store the function
self._requires_fn = requires_fn

Expand All @@ -381,13 +412,6 @@ def requires(inst, *args, **kwargs):
return requires

def output(self, output_fn):
# check the decorator method name
if output_fn.__name__ != "output":
raise NameError(
"the method decorated by dynamic_workflow_condition.output should be "
"named 'output', but got '{}'".format(output_fn.__name__),
)

# store the function
self._output_fn = output_fn

Expand Down Expand Up @@ -445,11 +469,11 @@ def check_dynamic_workflow_conditions(metacls, name, classdict):
# check that only one condition is present in classdict
condition_attr = None
for attr, value in classdict.items():
if not isinstance(value, dynamic_workflow_condition):
if not isinstance(value, DynamicWorkflowCondition):
continue
if condition_attr:
raise Exception(
"class '{}' defined with more than one dynamic_workflow_condition, found "
"class '{}' defined with more than one DynamicWorkflowCondition, found "
"'{}' after previously registered '{}'".format(name, attr, condition_attr),
)
condition_attr = attr
Expand Down Expand Up @@ -651,14 +675,14 @@ def __new__(cls, *args, **kwargs):
condition_attr = getattr(cls, "_condition_attr", None)
if condition_attr:
condition = getattr(inst, condition_attr, None)
if isinstance(condition, dynamic_workflow_condition):
if isinstance(condition, DynamicWorkflowCondition):
# bind the condition method itself
bound_condition_fn = condition._wrap_condition_fn().__get__(inst)
setattr(inst, condition_attr, bound_condition_fn)

# bind wrapped methods that currently correspond to placeholders
for attr, wrapper in condition._iter_wrappers(bound_condition_fn):
if getattr(inst, attr, None) != dynamic_workflow_condition._decorator_result:
if getattr(inst, attr, None) != DynamicWorkflowCondition._decorator_result:
continue
setattr(inst, attr, wrapper.__get__(inst))

Expand Down

0 comments on commit 95bedec

Please sign in to comment.