diff --git a/examples/integration/mpi4py/__init__.py b/examples/integration/mpi4py/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/integration/mpi4py/_child.py b/examples/integration/mpi4py/_child.py new file mode 100644 index 00000000..e9d7186a --- /dev/null +++ b/examples/integration/mpi4py/_child.py @@ -0,0 +1,5 @@ +import os + + +async def child_fn() -> str: + return f"child OK pid={os.getpid()}" diff --git a/examples/integration/mpi4py/inherit_parent_main.py b/examples/integration/mpi4py/inherit_parent_main.py new file mode 100644 index 00000000..60e30a95 --- /dev/null +++ b/examples/integration/mpi4py/inherit_parent_main.py @@ -0,0 +1,50 @@ +""" +Integration test: spawning tractor actors from an MPI process. + +When a parent is launched via ``mpirun``, Open MPI sets ``OMPI_*`` env +vars that bind ``MPI_Init`` to the ``orted`` daemon. Tractor children +inherit those env vars, so if ``inherit_parent_main=True`` (the default) +the child re-executes ``__main__``, re-imports ``mpi4py``, and +``MPI_Init_thread`` fails because the child was never spawned by +``orted``:: + + getting local rank failed + --> Returned value No permission (-17) instead of ORTE_SUCCESS + +Passing ``inherit_parent_main=False`` and placing RPC functions in a +separate importable module (``_child``) avoids the re-import entirely. + +Usage:: + + mpirun --allow-run-as-root -np 1 python -m \ + examples.integration.mpi4py.inherit_parent_main +""" + +from mpi4py import MPI + +import os +import trio +import tractor + +from ._child import child_fn + + +async def main() -> None: + rank = MPI.COMM_WORLD.Get_rank() + print(f"[parent] rank={rank} pid={os.getpid()}", flush=True) + + async with tractor.open_nursery(start_method='trio') as an: + portal = await an.start_actor( + 'mpi-child', + enable_modules=[child_fn.__module__], + # Without this the child replays __main__, which + # re-imports mpi4py and crashes on MPI_Init. + inherit_parent_main=False, + ) + result = await portal.run(child_fn) + print(f"[parent] got: {result}", flush=True) + await portal.cancel_actor() + + +if __name__ == "__main__": + trio.run(main) diff --git a/tests/test_spawning.py b/tests/test_spawning.py index 283d1785..7e230085 100644 --- a/tests/test_spawning.py +++ b/tests/test_spawning.py @@ -1,5 +1,12 @@ """ -Spawning basics +Spawning basics including audit of, + +- subproc bootstrap, such as subactor runtime-data/config inheritance, +- basic (and mostly legacy) `ActorNursery` subactor starting and + cancel APIs. + +Simple (and generally legacy) examples from the original +API design. """ from functools import partial @@ -98,7 +105,9 @@ async def movie_theatre_question(): @tractor_test -async def test_movie_theatre_convo(start_method): +async def test_movie_theatre_convo( + start_method: str, +): ''' The main ``tractor`` routine. @@ -151,13 +160,16 @@ async def test_most_beautiful_word( name='some_linguist', ) - print(await portal.result()) + res: Any = await portal.wait_for_result() + assert res == return_value # The ``async with`` will unblock here since the 'some_linguist' # actor has completed its main task ``cellar_door``. # this should pull the cached final result already captured during # the nursery block exit. - print(await portal.result()) + res: Any = await portal.wait_for_result() + assert res == return_value + print(res) async def check_loglevel(level): @@ -168,16 +180,24 @@ async def check_loglevel(level): log.critical('yoyoyo') +@pytest.mark.parametrize( + 'level', [ + 'debug', + 'cancel', + 'critical' + ], + ids='loglevel={}'.format, +) def test_loglevel_propagated_to_subactor( - start_method, - capfd, - reg_addr, + capfd: pytest.CaptureFixture, + start_method: str, + reg_addr: tuple, + level: str, ): if start_method == 'mp_forkserver': pytest.skip( - "a bug with `capfd` seems to make forkserver capture not work?") - - level = 'critical' + "a bug with `capfd` seems to make forkserver capture not work?" + ) async def main(): async with tractor.open_nursery( @@ -197,3 +217,121 @@ def test_loglevel_propagated_to_subactor( # ensure subactor spits log message on stderr captured = capfd.readouterr() assert 'yoyoyo' in captured.err + + +async def check_parent_main_inheritance( + expect_inherited: bool, +) -> bool: + ''' + Assert that the child actor's ``_parent_main_data`` matches the + ``inherit_parent_main`` flag it was spawned with. + + With the trio spawn backend the parent's ``__main__`` bootstrap + data is captured and forwarded to each child so it can replay + the parent's ``__main__`` as ``__mp_main__``, mirroring the + stdlib ``multiprocessing`` bootstrap: + https://docs.python.org/3/library/multiprocessing.html#the-spawn-and-forkserver-start-methods + + When ``inherit_parent_main=False`` the data dict is empty + (``{}``) so no fixup ever runs and the child keeps its own + ``__main__`` untouched. + + NOTE: under `pytest` the parent ``__main__`` is + ``pytest.__main__`` whose ``_fixup_main_from_name()`` is a no-op + (the name ends with ``.__main__``), so we cannot observe + a difference in ``sys.modules['__main__'].__name__`` between the + two modes. Checking ``_parent_main_data`` directly is the most + reliable verification that the flag is threaded through + correctly; a ``RemoteActorError[AssertionError]`` propagates on + mismatch. + + ''' + import tractor + actor: tractor.Actor = tractor.current_actor() + has_data: bool = bool(actor._parent_main_data) + assert has_data == expect_inherited, ( + f'Expected _parent_main_data to be ' + f'{"non-empty" if expect_inherited else "empty"}, ' + f'got: {actor._parent_main_data!r}' + ) + return has_data + + +def test_run_in_actor_can_skip_parent_main_inheritance( + start_method: str, # <- only support on `trio` backend rn. +): + ''' + Verify ``inherit_parent_main=False`` on ``run_in_actor()`` + prevents parent ``__main__`` data from reaching the child. + + ''' + if start_method != 'trio': + pytest.skip( + 'parent main-inheritance opt-out only affects the trio backend' + ) + + async def main(): + async with tractor.open_nursery(start_method='trio') as an: + + # Default: child receives parent __main__ bootstrap data + replaying = await an.run_in_actor( + check_parent_main_inheritance, + name='replaying-parent-main', + expect_inherited=True, + ) + await replaying.result() + + # Opt-out: child gets no parent __main__ data + isolated = await an.run_in_actor( + check_parent_main_inheritance, + name='isolated-parent-main', + inherit_parent_main=False, + expect_inherited=False, + ) + await isolated.result() + + trio.run(main) + + +def test_start_actor_can_skip_parent_main_inheritance( + start_method: str, # <- only support on `trio` backend rn. +): + ''' + Verify ``inherit_parent_main=False`` on ``start_actor()`` + prevents parent ``__main__`` data from reaching the child. + + ''' + if start_method != 'trio': + pytest.skip( + 'parent main-inheritance opt-out only affects the trio backend' + ) + + async def main(): + async with tractor.open_nursery(start_method='trio') as an: + + # Default: child receives parent __main__ bootstrap data + replaying = await an.start_actor( + 'replaying-parent-main', + enable_modules=[__name__], + ) + result = await replaying.run( + check_parent_main_inheritance, + expect_inherited=True, + ) + assert result is True + await replaying.cancel_actor() + + # Opt-out: child gets no parent __main__ data + isolated = await an.start_actor( + 'isolated-parent-main', + enable_modules=[__name__], + inherit_parent_main=False, + ) + result = await isolated.run( + check_parent_main_inheritance, + expect_inherited=False, + ) + assert result is False + await isolated.cancel_actor() + + trio.run(main) diff --git a/tractor/runtime/_runtime.py b/tractor/runtime/_runtime.py index 0ffc6112..477d8f9b 100644 --- a/tractor/runtime/_runtime.py +++ b/tractor/runtime/_runtime.py @@ -119,6 +119,7 @@ from ..discovery._discovery import get_registry from ._portal import Portal from . import _state from ..spawn import _mp_fixup_main +from ..spawn._mp_fixup_main import ParentMainData from . import _rpc if TYPE_CHECKING: @@ -218,7 +219,7 @@ class Actor: return self._ipc_server # Information about `__main__` from parent - _parent_main_data: dict[str, str] + _parent_main_data: ParentMainData _parent_chan_cs: CancelScope|None = None _spawn_spec: msgtypes.SpawnSpec|None = None @@ -240,10 +241,11 @@ class Actor: name: str, uuid: str, *, - enable_modules: list[str] = [], + enable_modules: list[str] | None = None, loglevel: str|None = None, registry_addrs: list[Address]|None = None, spawn_method: str|None = None, + inherit_parent_main: bool = True, arbiter_addr: UnwrappedAddress|None = None, @@ -265,12 +267,15 @@ class Actor: self._cancel_called_by: tuple[str, tuple]|None = None self._cancel_called: bool = False - # retreive and store parent `__main__` data which + # retrieve and store parent `__main__` data which # will be passed to children - self._parent_main_data = _mp_fixup_main._mp_figure_out_main() + self._parent_main_data: ParentMainData = _mp_fixup_main._mp_figure_out_main( + inherit_parent_main=inherit_parent_main, + ) # TODO? only add this when `is_debug_mode() == True` no? # always include debugging tools module + enable_modules = list(enable_modules or []) if _state.is_root_process(): enable_modules.append('tractor.devx.debug._tty_lock') @@ -547,11 +552,15 @@ class Actor: ''' try: - if self._spawn_method == 'trio': - parent_data = self._parent_main_data + if ( + self._spawn_method == 'trio' + and + (parent_data := self._parent_main_data) + ): if 'init_main_from_name' in parent_data: _mp_fixup_main._fixup_main_from_name( parent_data['init_main_from_name']) + elif 'init_main_from_path' in parent_data: _mp_fixup_main._fixup_main_from_path( parent_data['init_main_from_path']) diff --git a/tractor/runtime/_supervise.py b/tractor/runtime/_supervise.py index 3cd7d4c7..6d2d573f 100644 --- a/tractor/runtime/_supervise.py +++ b/tractor/runtime/_supervise.py @@ -194,18 +194,26 @@ class ActorNursery: loglevel: str|None = None, # set log level per subactor debug_mode: bool|None = None, infect_asyncio: bool = False, + inherit_parent_main: bool = True, # TODO: ideally we can rm this once we no longer have # a `._ria_nursery` since the dependent APIs have been # removed! nursery: trio.Nursery|None = None, - proc_kwargs: dict[str, any] = {} + proc_kwargs: dict[str, typing.Any] | None = None, ) -> Portal: ''' Start a (daemon) actor: an process that has no designated "main task" besides the runtime. + Pass ``inherit_parent_main=False`` to keep this child on its + own bootstrap module for the trio spawn backend instead of + applying the parent ``__main__`` re-exec fixup during startup. + This does not affect ``multiprocessing`` ``spawn`` or + ``forkserver`` which reconstruct the parent's ``__main__`` as + part of their normal stdlib bootstrap. + ''' __runtimeframe__: int = 1 # noqa loglevel: str = ( @@ -224,7 +232,8 @@ class ActorNursery: _rtv['_debug_mode'] = debug_mode self._at_least_one_child_in_debug = True - enable_modules = enable_modules or [] + enable_modules = list(enable_modules or []) + proc_kwargs = dict(proc_kwargs or {}) if rpc_module_paths: warnings.warn( @@ -242,6 +251,7 @@ class ActorNursery: # modules allowed to invoked funcs from enable_modules=enable_modules, loglevel=loglevel, + inherit_parent_main=inherit_parent_main, # verbatim relay this actor's registrar addresses registry_addrs=current_actor().registry_addrs, @@ -289,7 +299,8 @@ class ActorNursery: enable_modules: list[str] | None = None, loglevel: str | None = None, # set log level per subactor infect_asyncio: bool = False, - proc_kwargs: dict[str, any] = {}, + inherit_parent_main: bool = True, + proc_kwargs: dict[str, typing.Any] | None = None, **kwargs, # explicit args to ``fn`` @@ -310,6 +321,7 @@ class ActorNursery: # use the explicit function name if not provided name = fn.__name__ + proc_kwargs = dict(proc_kwargs or {}) portal: Portal = await self.start_actor( name, enable_modules=[mod_path] + ( @@ -320,6 +332,7 @@ class ActorNursery: # use the run_in_actor nursery nursery=self._ria_nursery, infect_asyncio=infect_asyncio, + inherit_parent_main=inherit_parent_main, proc_kwargs=proc_kwargs ) diff --git a/tractor/spawn/_mp_fixup_main.py b/tractor/spawn/_mp_fixup_main.py index 11d5f1c6..2bd19d1b 100644 --- a/tractor/spawn/_mp_fixup_main.py +++ b/tractor/spawn/_mp_fixup_main.py @@ -14,103 +14,72 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -""" -Helpers pulled mostly verbatim from ``multiprocessing.spawn`` +''' +(Originally) Helpers pulled verbatim from ``multiprocessing.spawn`` to aid with "fixing up" the ``__main__`` module in subprocesses. -These helpers are needed for any spawing backend that doesn't already -handle this. For example when using ``trio_run_in_process`` it is needed -but obviously not when we're already using ``multiprocessing``. +Now just delegates directly to appropriate `mp.spawn` fns. -""" -import os -import sys -import platform -import types -import runpy +Note +---- +These helpers are needed for any spawning backend that doesn't already +handle this. For example it's needed when using our +`start_method='trio'` backend but not when we're already using +a ``multiprocessing`` backend such as 'mp_spawn', 'mp_forkserver'. + +?TODO? +- what will be required for an eventual subint backend? + +The helpers imported from `mp.spawn` provide the stdlib's +spawn/forkserver bootstrap that rebuilds the parent's `__main__` in +a fresh child interpreter. In particular, we capture enough info to +later replay the parent's main module as `__mp_main__` (or by path) +in the child process. + +See: +https://docs.python.org/3/library/multiprocessing.html#the-spawn-and-forkserver-start-methods + +''' +import multiprocessing as mp +from multiprocessing.spawn import ( + _fixup_main_from_name as _fixup_main_from_name, + _fixup_main_from_path as _fixup_main_from_path, + get_preparation_data, +) +from typing import NotRequired +from typing import TypedDict -ORIGINAL_DIR = os.path.abspath(os.getcwd()) +class ParentMainData(TypedDict): + init_main_from_name: NotRequired[str] + init_main_from_path: NotRequired[str] -def _mp_figure_out_main() -> dict[str, str]: - """Taken from ``multiprocessing.spawn.get_preparation_data()``. +def _mp_figure_out_main( + inherit_parent_main: bool = True, +) -> ParentMainData: + ''' + Delegate to `multiprocessing.spawn.get_preparation_data()` + when `inherit_parent_main=True`. - Retrieve parent actor `__main__` module data. - """ - d = {} - # Figure out whether to initialise main in the subprocess as a module - # or through direct execution (or to leave it alone entirely) - main_module = sys.modules['__main__'] - main_mod_name = getattr(main_module.__spec__, "name", None) - if main_mod_name is not None: - d['init_main_from_name'] = main_mod_name - # elif sys.platform != 'win32' or (not WINEXE and not WINSERVICE): - elif platform.system() != 'Windows': - main_path = getattr(main_module, '__file__', None) - if main_path is not None: - if ( - not os.path.isabs(main_path) and ( - ORIGINAL_DIR is not None) - ): - # process.ORIGINAL_DIR is not None): - # main_path = os.path.join(process.ORIGINAL_DIR, main_path) - main_path = os.path.join(ORIGINAL_DIR, main_path) - d['init_main_from_path'] = os.path.normpath(main_path) + Retrieve parent (actor) proc's `__main__` module data. + + ''' + if not inherit_parent_main: + return {} + + proc: mp.Process = mp.current_process() + prep_data: dict = get_preparation_data( + name=proc.name, + ) + # XXX, unserializable (and unneeded by us) by default + # see `mp.spawn.get_preparation_data()` impl details. + prep_data.pop('authkey', None) + + d: ParentMainData = {} + if 'init_main_from_name' in prep_data: + d['init_main_from_name'] = prep_data['init_main_from_name'] + if 'init_main_from_path' in prep_data: + d['init_main_from_path'] = prep_data['init_main_from_path'] return d - - -# Multiprocessing module helpers to fix up the main module in -# spawned subprocesses -def _fixup_main_from_name(mod_name: str) -> None: - # __main__.py files for packages, directories, zip archives, etc, run - # their "main only" code unconditionally, so we don't even try to - # populate anything in __main__, nor do we make any changes to - # __main__ attributes - current_main = sys.modules['__main__'] - if mod_name == "__main__" or mod_name.endswith(".__main__"): - return - - # If this process was forked, __main__ may already be populated - if getattr(current_main.__spec__, "name", None) == mod_name: - return - - # Otherwise, __main__ may contain some non-main code where we need to - # support unpickling it properly. We rerun it as __mp_main__ and make - # the normal __main__ an alias to that - # old_main_modules.append(current_main) - main_module = types.ModuleType("__mp_main__") - main_content = runpy.run_module(mod_name, - run_name="__mp_main__", - alter_sys=True) # type: ignore - main_module.__dict__.update(main_content) - sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module - - -def _fixup_main_from_path(main_path: str) -> None: - # If this process was forked, __main__ may already be populated - current_main = sys.modules['__main__'] - - # Unfortunately, the main ipython launch script historically had no - # "if __name__ == '__main__'" guard, so we work around that - # by treating it like a __main__.py file - # See https://github.com/ipython/ipython/issues/4698 - main_name = os.path.splitext(os.path.basename(main_path))[0] - if main_name == 'ipython': - return - - # Otherwise, if __file__ already has the setting we expect, - # there's nothing more to do - if getattr(current_main, '__file__', None) == main_path: - return - - # If the parent process has sent a path through rather than a module - # name we assume it is an executable script that may contain - # non-main code that needs to be executed - # old_main_modules.append(current_main) - main_module = types.ModuleType("__mp_main__") - main_content = runpy.run_path(main_path, - run_name="__mp_main__") # type: ignore - main_module.__dict__.update(main_content) - sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module