Merge pull request #434 from mahmoudhas/add-replay-parent-main-opt-out
Add per-actor parent-main inheritance opt-outsubint_spawner_backend
commit
3a9f4ea383
|
|
@ -0,0 +1,5 @@
|
|||
import os
|
||||
|
||||
|
||||
async def child_fn() -> str:
|
||||
return f"child OK pid={os.getpid()}"
|
||||
|
|
@ -0,0 +1,50 @@
|
|||
"""
|
||||
Integration test: spawning tractor actors from an MPI process.
|
||||
|
||||
When a parent is launched via ``mpirun``, Open MPI sets ``OMPI_*`` env
|
||||
vars that bind ``MPI_Init`` to the ``orted`` daemon. Tractor children
|
||||
inherit those env vars, so if ``inherit_parent_main=True`` (the default)
|
||||
the child re-executes ``__main__``, re-imports ``mpi4py``, and
|
||||
``MPI_Init_thread`` fails because the child was never spawned by
|
||||
``orted``::
|
||||
|
||||
getting local rank failed
|
||||
--> Returned value No permission (-17) instead of ORTE_SUCCESS
|
||||
|
||||
Passing ``inherit_parent_main=False`` and placing RPC functions in a
|
||||
separate importable module (``_child``) avoids the re-import entirely.
|
||||
|
||||
Usage::
|
||||
|
||||
mpirun --allow-run-as-root -np 1 python -m \
|
||||
examples.integration.mpi4py.inherit_parent_main
|
||||
"""
|
||||
|
||||
from mpi4py import MPI
|
||||
|
||||
import os
|
||||
import trio
|
||||
import tractor
|
||||
|
||||
from ._child import child_fn
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
rank = MPI.COMM_WORLD.Get_rank()
|
||||
print(f"[parent] rank={rank} pid={os.getpid()}", flush=True)
|
||||
|
||||
async with tractor.open_nursery(start_method='trio') as an:
|
||||
portal = await an.start_actor(
|
||||
'mpi-child',
|
||||
enable_modules=[child_fn.__module__],
|
||||
# Without this the child replays __main__, which
|
||||
# re-imports mpi4py and crashes on MPI_Init.
|
||||
inherit_parent_main=False,
|
||||
)
|
||||
result = await portal.run(child_fn)
|
||||
print(f"[parent] got: {result}", flush=True)
|
||||
await portal.cancel_actor()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
trio.run(main)
|
||||
|
|
@ -1,5 +1,12 @@
|
|||
"""
|
||||
Spawning basics
|
||||
Spawning basics including audit of,
|
||||
|
||||
- subproc bootstrap, such as subactor runtime-data/config inheritance,
|
||||
- basic (and mostly legacy) `ActorNursery` subactor starting and
|
||||
cancel APIs.
|
||||
|
||||
Simple (and generally legacy) examples from the original
|
||||
API design.
|
||||
|
||||
"""
|
||||
from functools import partial
|
||||
|
|
@ -98,7 +105,9 @@ async def movie_theatre_question():
|
|||
|
||||
|
||||
@tractor_test
|
||||
async def test_movie_theatre_convo(start_method):
|
||||
async def test_movie_theatre_convo(
|
||||
start_method: str,
|
||||
):
|
||||
'''
|
||||
The main ``tractor`` routine.
|
||||
|
||||
|
|
@ -151,13 +160,16 @@ async def test_most_beautiful_word(
|
|||
name='some_linguist',
|
||||
)
|
||||
|
||||
print(await portal.result())
|
||||
res: Any = await portal.wait_for_result()
|
||||
assert res == return_value
|
||||
# The ``async with`` will unblock here since the 'some_linguist'
|
||||
# actor has completed its main task ``cellar_door``.
|
||||
|
||||
# this should pull the cached final result already captured during
|
||||
# the nursery block exit.
|
||||
print(await portal.result())
|
||||
res: Any = await portal.wait_for_result()
|
||||
assert res == return_value
|
||||
print(res)
|
||||
|
||||
|
||||
async def check_loglevel(level):
|
||||
|
|
@ -168,16 +180,24 @@ async def check_loglevel(level):
|
|||
log.critical('yoyoyo')
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'level', [
|
||||
'debug',
|
||||
'cancel',
|
||||
'critical'
|
||||
],
|
||||
ids='loglevel={}'.format,
|
||||
)
|
||||
def test_loglevel_propagated_to_subactor(
|
||||
start_method,
|
||||
capfd,
|
||||
reg_addr,
|
||||
capfd: pytest.CaptureFixture,
|
||||
start_method: str,
|
||||
reg_addr: tuple,
|
||||
level: str,
|
||||
):
|
||||
if start_method == 'mp_forkserver':
|
||||
pytest.skip(
|
||||
"a bug with `capfd` seems to make forkserver capture not work?")
|
||||
|
||||
level = 'critical'
|
||||
"a bug with `capfd` seems to make forkserver capture not work?"
|
||||
)
|
||||
|
||||
async def main():
|
||||
async with tractor.open_nursery(
|
||||
|
|
@ -197,3 +217,121 @@ def test_loglevel_propagated_to_subactor(
|
|||
# ensure subactor spits log message on stderr
|
||||
captured = capfd.readouterr()
|
||||
assert 'yoyoyo' in captured.err
|
||||
|
||||
|
||||
async def check_parent_main_inheritance(
|
||||
expect_inherited: bool,
|
||||
) -> bool:
|
||||
'''
|
||||
Assert that the child actor's ``_parent_main_data`` matches the
|
||||
``inherit_parent_main`` flag it was spawned with.
|
||||
|
||||
With the trio spawn backend the parent's ``__main__`` bootstrap
|
||||
data is captured and forwarded to each child so it can replay
|
||||
the parent's ``__main__`` as ``__mp_main__``, mirroring the
|
||||
stdlib ``multiprocessing`` bootstrap:
|
||||
https://docs.python.org/3/library/multiprocessing.html#the-spawn-and-forkserver-start-methods
|
||||
|
||||
When ``inherit_parent_main=False`` the data dict is empty
|
||||
(``{}``) so no fixup ever runs and the child keeps its own
|
||||
``__main__`` untouched.
|
||||
|
||||
NOTE: under `pytest` the parent ``__main__`` is
|
||||
``pytest.__main__`` whose ``_fixup_main_from_name()`` is a no-op
|
||||
(the name ends with ``.__main__``), so we cannot observe
|
||||
a difference in ``sys.modules['__main__'].__name__`` between the
|
||||
two modes. Checking ``_parent_main_data`` directly is the most
|
||||
reliable verification that the flag is threaded through
|
||||
correctly; a ``RemoteActorError[AssertionError]`` propagates on
|
||||
mismatch.
|
||||
|
||||
'''
|
||||
import tractor
|
||||
actor: tractor.Actor = tractor.current_actor()
|
||||
has_data: bool = bool(actor._parent_main_data)
|
||||
assert has_data == expect_inherited, (
|
||||
f'Expected _parent_main_data to be '
|
||||
f'{"non-empty" if expect_inherited else "empty"}, '
|
||||
f'got: {actor._parent_main_data!r}'
|
||||
)
|
||||
return has_data
|
||||
|
||||
|
||||
def test_run_in_actor_can_skip_parent_main_inheritance(
|
||||
start_method: str, # <- only support on `trio` backend rn.
|
||||
):
|
||||
'''
|
||||
Verify ``inherit_parent_main=False`` on ``run_in_actor()``
|
||||
prevents parent ``__main__`` data from reaching the child.
|
||||
|
||||
'''
|
||||
if start_method != 'trio':
|
||||
pytest.skip(
|
||||
'parent main-inheritance opt-out only affects the trio backend'
|
||||
)
|
||||
|
||||
async def main():
|
||||
async with tractor.open_nursery(start_method='trio') as an:
|
||||
|
||||
# Default: child receives parent __main__ bootstrap data
|
||||
replaying = await an.run_in_actor(
|
||||
check_parent_main_inheritance,
|
||||
name='replaying-parent-main',
|
||||
expect_inherited=True,
|
||||
)
|
||||
await replaying.result()
|
||||
|
||||
# Opt-out: child gets no parent __main__ data
|
||||
isolated = await an.run_in_actor(
|
||||
check_parent_main_inheritance,
|
||||
name='isolated-parent-main',
|
||||
inherit_parent_main=False,
|
||||
expect_inherited=False,
|
||||
)
|
||||
await isolated.result()
|
||||
|
||||
trio.run(main)
|
||||
|
||||
|
||||
def test_start_actor_can_skip_parent_main_inheritance(
|
||||
start_method: str, # <- only support on `trio` backend rn.
|
||||
):
|
||||
'''
|
||||
Verify ``inherit_parent_main=False`` on ``start_actor()``
|
||||
prevents parent ``__main__`` data from reaching the child.
|
||||
|
||||
'''
|
||||
if start_method != 'trio':
|
||||
pytest.skip(
|
||||
'parent main-inheritance opt-out only affects the trio backend'
|
||||
)
|
||||
|
||||
async def main():
|
||||
async with tractor.open_nursery(start_method='trio') as an:
|
||||
|
||||
# Default: child receives parent __main__ bootstrap data
|
||||
replaying = await an.start_actor(
|
||||
'replaying-parent-main',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
result = await replaying.run(
|
||||
check_parent_main_inheritance,
|
||||
expect_inherited=True,
|
||||
)
|
||||
assert result is True
|
||||
await replaying.cancel_actor()
|
||||
|
||||
# Opt-out: child gets no parent __main__ data
|
||||
isolated = await an.start_actor(
|
||||
'isolated-parent-main',
|
||||
enable_modules=[__name__],
|
||||
inherit_parent_main=False,
|
||||
)
|
||||
result = await isolated.run(
|
||||
check_parent_main_inheritance,
|
||||
expect_inherited=False,
|
||||
)
|
||||
assert result is False
|
||||
await isolated.cancel_actor()
|
||||
|
||||
trio.run(main)
|
||||
|
|
|
|||
|
|
@ -119,6 +119,7 @@ from ..discovery._discovery import get_registry
|
|||
from ._portal import Portal
|
||||
from . import _state
|
||||
from ..spawn import _mp_fixup_main
|
||||
from ..spawn._mp_fixup_main import ParentMainData
|
||||
from . import _rpc
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
|
@ -218,7 +219,7 @@ class Actor:
|
|||
return self._ipc_server
|
||||
|
||||
# Information about `__main__` from parent
|
||||
_parent_main_data: dict[str, str]
|
||||
_parent_main_data: ParentMainData
|
||||
_parent_chan_cs: CancelScope|None = None
|
||||
_spawn_spec: msgtypes.SpawnSpec|None = None
|
||||
|
||||
|
|
@ -240,10 +241,11 @@ class Actor:
|
|||
name: str,
|
||||
uuid: str,
|
||||
*,
|
||||
enable_modules: list[str] = [],
|
||||
enable_modules: list[str] | None = None,
|
||||
loglevel: str|None = None,
|
||||
registry_addrs: list[Address]|None = None,
|
||||
spawn_method: str|None = None,
|
||||
inherit_parent_main: bool = True,
|
||||
|
||||
arbiter_addr: UnwrappedAddress|None = None,
|
||||
|
||||
|
|
@ -265,12 +267,15 @@ class Actor:
|
|||
self._cancel_called_by: tuple[str, tuple]|None = None
|
||||
self._cancel_called: bool = False
|
||||
|
||||
# retreive and store parent `__main__` data which
|
||||
# retrieve and store parent `__main__` data which
|
||||
# will be passed to children
|
||||
self._parent_main_data = _mp_fixup_main._mp_figure_out_main()
|
||||
self._parent_main_data: ParentMainData = _mp_fixup_main._mp_figure_out_main(
|
||||
inherit_parent_main=inherit_parent_main,
|
||||
)
|
||||
|
||||
# TODO? only add this when `is_debug_mode() == True` no?
|
||||
# always include debugging tools module
|
||||
enable_modules = list(enable_modules or [])
|
||||
if _state.is_root_process():
|
||||
enable_modules.append('tractor.devx.debug._tty_lock')
|
||||
|
||||
|
|
@ -547,11 +552,15 @@ class Actor:
|
|||
|
||||
'''
|
||||
try:
|
||||
if self._spawn_method == 'trio':
|
||||
parent_data = self._parent_main_data
|
||||
if (
|
||||
self._spawn_method == 'trio'
|
||||
and
|
||||
(parent_data := self._parent_main_data)
|
||||
):
|
||||
if 'init_main_from_name' in parent_data:
|
||||
_mp_fixup_main._fixup_main_from_name(
|
||||
parent_data['init_main_from_name'])
|
||||
|
||||
elif 'init_main_from_path' in parent_data:
|
||||
_mp_fixup_main._fixup_main_from_path(
|
||||
parent_data['init_main_from_path'])
|
||||
|
|
|
|||
|
|
@ -194,18 +194,26 @@ class ActorNursery:
|
|||
loglevel: str|None = None, # set log level per subactor
|
||||
debug_mode: bool|None = None,
|
||||
infect_asyncio: bool = False,
|
||||
inherit_parent_main: bool = True,
|
||||
|
||||
# TODO: ideally we can rm this once we no longer have
|
||||
# a `._ria_nursery` since the dependent APIs have been
|
||||
# removed!
|
||||
nursery: trio.Nursery|None = None,
|
||||
proc_kwargs: dict[str, any] = {}
|
||||
proc_kwargs: dict[str, typing.Any] | None = None,
|
||||
|
||||
) -> Portal:
|
||||
'''
|
||||
Start a (daemon) actor: an process that has no designated
|
||||
"main task" besides the runtime.
|
||||
|
||||
Pass ``inherit_parent_main=False`` to keep this child on its
|
||||
own bootstrap module for the trio spawn backend instead of
|
||||
applying the parent ``__main__`` re-exec fixup during startup.
|
||||
This does not affect ``multiprocessing`` ``spawn`` or
|
||||
``forkserver`` which reconstruct the parent's ``__main__`` as
|
||||
part of their normal stdlib bootstrap.
|
||||
|
||||
'''
|
||||
__runtimeframe__: int = 1 # noqa
|
||||
loglevel: str = (
|
||||
|
|
@ -224,7 +232,8 @@ class ActorNursery:
|
|||
_rtv['_debug_mode'] = debug_mode
|
||||
self._at_least_one_child_in_debug = True
|
||||
|
||||
enable_modules = enable_modules or []
|
||||
enable_modules = list(enable_modules or [])
|
||||
proc_kwargs = dict(proc_kwargs or {})
|
||||
|
||||
if rpc_module_paths:
|
||||
warnings.warn(
|
||||
|
|
@ -242,6 +251,7 @@ class ActorNursery:
|
|||
# modules allowed to invoked funcs from
|
||||
enable_modules=enable_modules,
|
||||
loglevel=loglevel,
|
||||
inherit_parent_main=inherit_parent_main,
|
||||
|
||||
# verbatim relay this actor's registrar addresses
|
||||
registry_addrs=current_actor().registry_addrs,
|
||||
|
|
@ -289,7 +299,8 @@ class ActorNursery:
|
|||
enable_modules: list[str] | None = None,
|
||||
loglevel: str | None = None, # set log level per subactor
|
||||
infect_asyncio: bool = False,
|
||||
proc_kwargs: dict[str, any] = {},
|
||||
inherit_parent_main: bool = True,
|
||||
proc_kwargs: dict[str, typing.Any] | None = None,
|
||||
|
||||
**kwargs, # explicit args to ``fn``
|
||||
|
||||
|
|
@ -310,6 +321,7 @@ class ActorNursery:
|
|||
# use the explicit function name if not provided
|
||||
name = fn.__name__
|
||||
|
||||
proc_kwargs = dict(proc_kwargs or {})
|
||||
portal: Portal = await self.start_actor(
|
||||
name,
|
||||
enable_modules=[mod_path] + (
|
||||
|
|
@ -320,6 +332,7 @@ class ActorNursery:
|
|||
# use the run_in_actor nursery
|
||||
nursery=self._ria_nursery,
|
||||
infect_asyncio=infect_asyncio,
|
||||
inherit_parent_main=inherit_parent_main,
|
||||
proc_kwargs=proc_kwargs
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -14,103 +14,72 @@
|
|||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
"""
|
||||
Helpers pulled mostly verbatim from ``multiprocessing.spawn``
|
||||
'''
|
||||
(Originally) Helpers pulled verbatim from ``multiprocessing.spawn``
|
||||
to aid with "fixing up" the ``__main__`` module in subprocesses.
|
||||
|
||||
These helpers are needed for any spawing backend that doesn't already
|
||||
handle this. For example when using ``trio_run_in_process`` it is needed
|
||||
but obviously not when we're already using ``multiprocessing``.
|
||||
Now just delegates directly to appropriate `mp.spawn` fns.
|
||||
|
||||
"""
|
||||
import os
|
||||
import sys
|
||||
import platform
|
||||
import types
|
||||
import runpy
|
||||
Note
|
||||
----
|
||||
These helpers are needed for any spawning backend that doesn't already
|
||||
handle this. For example it's needed when using our
|
||||
`start_method='trio'` backend but not when we're already using
|
||||
a ``multiprocessing`` backend such as 'mp_spawn', 'mp_forkserver'.
|
||||
|
||||
?TODO?
|
||||
- what will be required for an eventual subint backend?
|
||||
|
||||
The helpers imported from `mp.spawn` provide the stdlib's
|
||||
spawn/forkserver bootstrap that rebuilds the parent's `__main__` in
|
||||
a fresh child interpreter. In particular, we capture enough info to
|
||||
later replay the parent's main module as `__mp_main__` (or by path)
|
||||
in the child process.
|
||||
|
||||
See:
|
||||
https://docs.python.org/3/library/multiprocessing.html#the-spawn-and-forkserver-start-methods
|
||||
|
||||
'''
|
||||
import multiprocessing as mp
|
||||
from multiprocessing.spawn import (
|
||||
_fixup_main_from_name as _fixup_main_from_name,
|
||||
_fixup_main_from_path as _fixup_main_from_path,
|
||||
get_preparation_data,
|
||||
)
|
||||
from typing import NotRequired
|
||||
from typing import TypedDict
|
||||
|
||||
|
||||
ORIGINAL_DIR = os.path.abspath(os.getcwd())
|
||||
class ParentMainData(TypedDict):
|
||||
init_main_from_name: NotRequired[str]
|
||||
init_main_from_path: NotRequired[str]
|
||||
|
||||
|
||||
def _mp_figure_out_main() -> dict[str, str]:
|
||||
"""Taken from ``multiprocessing.spawn.get_preparation_data()``.
|
||||
def _mp_figure_out_main(
|
||||
inherit_parent_main: bool = True,
|
||||
) -> ParentMainData:
|
||||
'''
|
||||
Delegate to `multiprocessing.spawn.get_preparation_data()`
|
||||
when `inherit_parent_main=True`.
|
||||
|
||||
Retrieve parent actor `__main__` module data.
|
||||
"""
|
||||
d = {}
|
||||
# Figure out whether to initialise main in the subprocess as a module
|
||||
# or through direct execution (or to leave it alone entirely)
|
||||
main_module = sys.modules['__main__']
|
||||
main_mod_name = getattr(main_module.__spec__, "name", None)
|
||||
if main_mod_name is not None:
|
||||
d['init_main_from_name'] = main_mod_name
|
||||
# elif sys.platform != 'win32' or (not WINEXE and not WINSERVICE):
|
||||
elif platform.system() != 'Windows':
|
||||
main_path = getattr(main_module, '__file__', None)
|
||||
if main_path is not None:
|
||||
if (
|
||||
not os.path.isabs(main_path) and (
|
||||
ORIGINAL_DIR is not None)
|
||||
):
|
||||
# process.ORIGINAL_DIR is not None):
|
||||
# main_path = os.path.join(process.ORIGINAL_DIR, main_path)
|
||||
main_path = os.path.join(ORIGINAL_DIR, main_path)
|
||||
d['init_main_from_path'] = os.path.normpath(main_path)
|
||||
Retrieve parent (actor) proc's `__main__` module data.
|
||||
|
||||
'''
|
||||
if not inherit_parent_main:
|
||||
return {}
|
||||
|
||||
proc: mp.Process = mp.current_process()
|
||||
prep_data: dict = get_preparation_data(
|
||||
name=proc.name,
|
||||
)
|
||||
# XXX, unserializable (and unneeded by us) by default
|
||||
# see `mp.spawn.get_preparation_data()` impl details.
|
||||
prep_data.pop('authkey', None)
|
||||
|
||||
d: ParentMainData = {}
|
||||
if 'init_main_from_name' in prep_data:
|
||||
d['init_main_from_name'] = prep_data['init_main_from_name']
|
||||
if 'init_main_from_path' in prep_data:
|
||||
d['init_main_from_path'] = prep_data['init_main_from_path']
|
||||
|
||||
return d
|
||||
|
||||
|
||||
# Multiprocessing module helpers to fix up the main module in
|
||||
# spawned subprocesses
|
||||
def _fixup_main_from_name(mod_name: str) -> None:
|
||||
# __main__.py files for packages, directories, zip archives, etc, run
|
||||
# their "main only" code unconditionally, so we don't even try to
|
||||
# populate anything in __main__, nor do we make any changes to
|
||||
# __main__ attributes
|
||||
current_main = sys.modules['__main__']
|
||||
if mod_name == "__main__" or mod_name.endswith(".__main__"):
|
||||
return
|
||||
|
||||
# If this process was forked, __main__ may already be populated
|
||||
if getattr(current_main.__spec__, "name", None) == mod_name:
|
||||
return
|
||||
|
||||
# Otherwise, __main__ may contain some non-main code where we need to
|
||||
# support unpickling it properly. We rerun it as __mp_main__ and make
|
||||
# the normal __main__ an alias to that
|
||||
# old_main_modules.append(current_main)
|
||||
main_module = types.ModuleType("__mp_main__")
|
||||
main_content = runpy.run_module(mod_name,
|
||||
run_name="__mp_main__",
|
||||
alter_sys=True) # type: ignore
|
||||
main_module.__dict__.update(main_content)
|
||||
sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module
|
||||
|
||||
|
||||
def _fixup_main_from_path(main_path: str) -> None:
|
||||
# If this process was forked, __main__ may already be populated
|
||||
current_main = sys.modules['__main__']
|
||||
|
||||
# Unfortunately, the main ipython launch script historically had no
|
||||
# "if __name__ == '__main__'" guard, so we work around that
|
||||
# by treating it like a __main__.py file
|
||||
# See https://github.com/ipython/ipython/issues/4698
|
||||
main_name = os.path.splitext(os.path.basename(main_path))[0]
|
||||
if main_name == 'ipython':
|
||||
return
|
||||
|
||||
# Otherwise, if __file__ already has the setting we expect,
|
||||
# there's nothing more to do
|
||||
if getattr(current_main, '__file__', None) == main_path:
|
||||
return
|
||||
|
||||
# If the parent process has sent a path through rather than a module
|
||||
# name we assume it is an executable script that may contain
|
||||
# non-main code that needs to be executed
|
||||
# old_main_modules.append(current_main)
|
||||
main_module = types.ModuleType("__mp_main__")
|
||||
main_content = runpy.run_path(main_path,
|
||||
run_name="__mp_main__") # type: ignore
|
||||
main_module.__dict__.update(main_content)
|
||||
sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module
|
||||
|
|
|
|||
Loading…
Reference in New Issue