Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose do_not_convert decorator #5263

Merged
merged 29 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dali/python/nvidia/dali/_autograph/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
from nvidia.dali._autograph.impl.api import convert
from nvidia.dali._autograph.impl.api import converted_call
from nvidia.dali._autograph.impl.api import do_not_convert
from nvidia.dali._autograph.impl.api import autograph_artifact
from nvidia.dali._autograph.impl.api import is_autograph_artifact

# from nvidia.dali._autograph.impl.api import StackTraceMapper
from nvidia.dali._autograph.impl.api import to_code
Expand Down
9 changes: 4 additions & 5 deletions dali/python/nvidia/dali/_autograph/impl/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,11 +446,11 @@ def _fall_back_unconverted(f, args, kwargs, options, exc):
"""Falls back to calling the function unconverted, in case of error."""
# TODO(mdan): Consider adding an internal metric.
warning_template = (
"AutoGraph could not transform %s and will run it as-is.\n" "%s" "Cause: %s\n"
"AutoGraph could not transform %s and will run it as-is.\n"
"%s"
"Cause: %s\n"
"To silence this warning, decorate the function with @nvidia.dali.pipeline.do_not_convert"
)
# TODO(klecki): Expose the do_not_convert in DALI
# 'To silence this warning, decorate the function with'
# ' @tf.autograph.experimental.do_not_convert')
if isinstance(exc, errors.InaccessibleSourceCodeError):
if ag_ctx.INSPECT_SOURCE_SUPPORTED:
logging.warning(warning_template, f, "", exc)
Expand Down Expand Up @@ -598,7 +598,6 @@ def _log_callargs(f, args, kwargs):
#


@export_symbol("autograph.experimental.do_not_convert")
def do_not_convert(func=None):
"""Decorator that suppresses the conversion of a function.

Expand Down
38 changes: 37 additions & 1 deletion dali/python/nvidia/dali/external_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from nvidia.dali import backend as _b
from nvidia.dali import tensors as _tensors
from nvidia.dali import types as _types
from nvidia.dali import _autograph
from nvidia.dali._multiproc.messages import TaskArgs as _TaskArgs, SampleRange as _SampleRange
import nvidia.dali.types
from nvidia.dali._utils.external_source_impl import (
Expand Down Expand Up @@ -521,6 +522,22 @@ class ExternalSource:
Note, that due to prefetching, the callback may be invoked with a few iterations past
the end of dataset - make sure it consistently raises a ``StopIteration`` in that case.

|
.. warning::
The ``source``, when :ref:`conditional mode <conditional_execution>` is enabled, must not
be transformed by the AutoGraph. There are two conditions to prevent that:

1. The ``source`` or a factory creating the ``source`` must be defined at a global scope
(i.e. outside of ``pipeline_def`` scope).

2. If the ``source`` is created by a factory function that is invoked within pipeline
definition, the factory function must be decorated with
:meth:`@do_not_convert <nvidia.dali.pipeline.do_not_convert>`. Otherwise it will be
recursively converted when the pipeline definition is traced.

More details can be found in :meth:`@do_not_convert <nvidia.dali.pipeline.do_not_convert>`
documentation.

|
.. note::
Callable ``source`` can be run in parallel by multiple workers.
Expand Down Expand Up @@ -780,6 +797,23 @@ def __call__(
"(specify `batch=True` in the external source definition and make sure "
"your source returns batches)".format(what)
)
if _autograph.is_autograph_artifact(source_desc.source):
raise ValueError(
"The `source` parameter that was passed to external source was created "
"in a scope that was converted with AutoGraph. To allow the `source` to be "
"correctly used with parallel external source, it must remain unconverted. "
"To prevent conversion, two steps may need to be taken:\n"
"1. The `source` or a factory creating the `source` must be defined at a "
"global scope (i.e. outside of `pipeline_def` scope).\n"
"2. If the `source` is created by a factory function that is invoked within "
"pipeline definition, the factory function must be decorated with "
"`@nvidia.dali.pipeline.do_not_convert`. Otherwise it will be recursively "
"converted when the pipeline definition is traced.\n"
"You can read more details and see examples in the "
"`@nvidia.dali.pipeline.do_not_convert` decorator documentation. The AutoGraph "
"conversion is part of conditional execution in DALI."
)

else:
for kwarg_value, kwarg_name in (
(prefetch_queue_depth, "prefetch_queue_depth"),
Expand Down Expand Up @@ -936,7 +970,9 @@ def external_source(
repeat_last=False,
**kwargs,
):
"""Creates a data node which is populated with data from a Python source.
"""
Creates a data node which is populated with data from a Python source.

The data can be provided by the ``source`` function or iterable, or it can be provided by
``pipeline.feed_input(name, data, layout, cuda_stream)`` inside ``pipeline.iter_setup``.

Expand Down
112 changes: 108 additions & 4 deletions dali/python/nvidia/dali/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2017-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# Copyright (c) 2017-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -13,7 +13,7 @@
# limitations under the License.

# pylint: disable=no-member
from typing import Any, List, Tuple, Callable, Optional, Union, overload
from typing import Any, List, Tuple, Callable, Optional, Union, TypeVar, overload
from collections import deque
from nvidia.dali import backend as b
from nvidia.dali import types
Expand All @@ -26,12 +26,12 @@
from threading import local as tls
from . import data_node as _data_node
import atexit
import ctypes
import functools
import inspect
import sys
import warnings
import weakref
import ctypes
import sys
from .data_node import DataNode

pipeline_tls = tls()
Expand Down Expand Up @@ -1865,6 +1865,9 @@ def my_pipe():
"""

def actual_decorator(func):
if _conditionals._autograph.is_autograph_artifact(func):
raise ValueError("Pipeline definition cannot be marked with @do_not_convert.")

@functools.wraps(func)
def create_pipeline(*args, **kwargs):
conditionals_on = kwargs.get("enable_conditionals", enable_conditionals)
Expand All @@ -1884,6 +1887,107 @@ def create_pipeline(*args, **kwargs):
return actual_decorator(fn) if fn else actual_decorator


# Callable preserving a signature
_F = TypeVar("_F", bound=Callable[..., Any])


def do_not_convert(func: _F = None) -> _F:
"""Decorator that suppresses the conversion of a function by AutoGraph.

In conditional mode, DALI uses a fork of
`TensorFlow's AutoGraph <https://github.com/tensorflow/tensorflow/blob/master/tensorflow/python/autograph/g3doc/reference/index.md>`_
to transform the code, enabling us to rewrite and detect the ``if`` statements, so they can be
used in processing the DALI pipeline.

The AutoGraph conversion is applied to any top-level function or method called within the
pipeline definition (as well as the pipeline definition itself).
When a function is converted, all functions defined within its syntactical scope are also
converted. The rewriting, among other effects, makes these functions non-serializable.

To stop a function from being converted, its top-level encompassing function must be marked
with this decorator. This may sometimes require refactoring the function to outer scope.

Parallel mode of :meth:`external source <nvidia.dali.fn.external_source>` (``parallel=True``),
requires that its ``source`` parameter is serializable. To prevent the rewriting of the
``source``, the functions that are used to create the ``source``,
should be decorated with :meth:`@do_not_convert <nvidia.dali.pipeline.do_not_convert>`.

.. note::
Only functions that do not process :class:`DataNode` (so do not use DALI operators)
should be marked with this decorator.

For example::

from nvidia.dali import pipeline_def, fn

@pipeline_def(enable_conditionals=True)
def pipe():

def source_factory(size):
def source_fun(sample_info):
return np.full(size, sample_info.iter_idx)
return source_fun
klecki marked this conversation as resolved.
Show resolved Hide resolved

source = source_factory(size=(2, 1))
return fn.external_source(source=source, parallel=True, batch=False)

Should be converted into::

from nvidia.dali import pipeline_def, fn
from nvidia.dali.pipeline import do_not_convert

@do_not_convert
def source_factory(size):
def source_fun(sample_info):
return np.full(size, sample_info.iter_idx)
return source_fun

@pipeline_def(enable_conditionals=True)
def pipe():
source = source_factory(size=(2, 1))
return fn.external_source(source=source, parallel=True, batch=False)

The ``source_factory`` must be factored out, otherwise it would be converted as a part of
pipeline definition. As we are interested in preventing the AutoGraph conversion of
``source_fun`` we need to decorate its top-level encompassing function.

.. note::
If a function is declared outside of the pipeline definition, and is passed as a parameter,
but not directly invoked within the pipeline definition, it will not be converted.
In such case, a callback passed to
:meth:`external source <nvidia.dali.fn.external_source>` operator,
:meth:`python function <nvidia.dali.fn.python_function>` operator family or
:meth:`Numba function <nvidia.dali.plugin.numba.fn.experimental.numba_function>` operator
is not considered as being directly invoked in pipeline definition. Such callback is
executed when the pipeline is run, so after the pipeline is defined and built.

For example::

from nvidia.dali import pipeline_def, fn

def source_fun(sample_info):
return np.full((2, 2), sample_info.iter_idx)

@pipeline_def(enable_conditionals=True)
def pipe():
return fn.external_source(source=source_fun, batch=False)

The ``source_fun`` won't be converted, as it is defined outside of pipeline definition and
it is only passed via name to external source.
""" # noqa(E501)

if func is None:
return do_not_convert

if getattr(func, "_is_pipeline_def", False):
raise ValueError("Pipeline definition cannot be marked with @do_not_convert.")

# Marking a function as autograph_artifact will prevent it from being converted without
# adding any intermediate functions or adjusting the code. This is more lightweight solution
# that should keep numba happy.
return _conditionals._autograph.autograph_artifact(func)


def _collect_ops(output_nodes):
"""
Traverses the pipeline graph starting from the outputs to collect all reachable operators.
Expand Down
142 changes: 142 additions & 0 deletions dali/test/python/conditionals/test_external_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import numpy as np

from nose_utils import raises

from nvidia.dali import pipeline_def, fn
from nvidia.dali.pipeline import do_not_convert
from nvidia.dali._autograph import is_autograph_artifact


def assert_autograph_artifact(artifact_name, expected):
if is_autograph_artifact(artifact_name) != expected:
raise AssertionError(f"Expected {artifact_name} to be {expected}")


def es_with_local_source(parallel=False):
"""Runs a pipeline that has a locally defined source function for external source"""

@pipeline_def(
batch_size=4,
num_threads=1,
device_id=None,
enable_conditionals=True,
py_start_method="spawn",
)
def pipe_with_local():
def source_local(si):
assert_autograph_artifact(source_local, True)
return np.full((2,), 1)

return fn.external_source(source=source_local, parallel=parallel, batch=False)

p = pipe_with_local()
p.build()
(out,) = p.run()
assert np.array_equal(np.array(out.as_tensor()), np.full((4, 2), 1))


def es_with_nonlocal_converted_source(parallel=False):
"""Runs a pipeline that has a source function created by a factory function, defined
out of scope of the pipeline, it is not marked with @do_not_convert - so due to the
converted_call being made, the source_in_factory is also converted by AutoGraph.
"""

def source_factory(size):
def source_in_factory(si):
assert_autograph_artifact(source_in_factory, True)
return np.full(size, 10)

return source_in_factory

@pipeline_def(
batch_size=4,
num_threads=1,
device_id=None,
enable_conditionals=True,
py_start_method="spawn",
)
def pipe_with_converted_factory():
source = source_factory((3,))
assert_autograph_artifact(source, True)
return fn.external_source(source=source, parallel=parallel, batch=False)

p = pipe_with_converted_factory()
p.build()
(out,) = p.run()
assert np.array_equal(np.array(out.as_tensor()), np.full((4, 3), 10))


def es_with_nonlocal_not_converted_source(parallel=False):
"""Runs a pipeline that has a source function created by a factory function, defined
out of scope of the pipeline, marked with @do_not_convert - so the source_in_factory is run
without AutoGraph.
"""

@do_not_convert
def source_factory(size):
def source_in_factory(si):
assert_autograph_artifact(source_in_factory, False)
return np.full(size, 10)

return source_in_factory

@pipeline_def(
batch_size=4,
num_threads=1,
device_id=None,
enable_conditionals=True,
py_start_method="spawn",
)
def pipe_with_converted_factory():
source = source_factory((3,))
assert_autograph_artifact(source, False)
return fn.external_source(source=source, parallel=parallel, batch=False)

p = pipe_with_converted_factory()
p.build()
(out,) = p.run()
assert np.array_equal(np.array(out.as_tensor()), np.full((4, 3), 10))


# Sanity test: check if indeed those source callbacks are converted to AutoGraph artifacts
def test_es_with_converted_sources():
es_with_local_source()
es_with_nonlocal_converted_source()


@raises(
ValueError,
"To allow the `source` to be correctly used*, it must remain unconverted",
)
def test_parallel_es_with_local_source():
es_with_local_source(parallel=True)


@raises(
ValueError,
"To allow the `source` to be correctly used*, it must remain unconverted",
)
def test_parallel_es_with_nonlocal_source():
es_with_nonlocal_converted_source(parallel=True)


def test_es_with_not_converted_source():
es_with_nonlocal_not_converted_source()


def test_parallel_es_with_not_converted_callback():
es_with_nonlocal_not_converted_source(True)
Loading