From d7ca68cf6132876b064227f7fbd8dabe987ab2ce Mon Sep 17 00:00:00 2001 From: goodboy Date: Fri, 17 Apr 2026 01:58:05 -0400 Subject: [PATCH] Mv `trio_proc`/`mp_proc` to per-backend submods MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Split the monolithic `spawn._spawn` into a slim "core" + per-backend submodules so a future `._subint` backend (per issue #379) can drop in without piling more onto `_spawn.py`. `._spawn` retains the cross-backend supervisor machinery: `SpawnMethodKey`, `_methods` registry, `_spawn_method`/`_ctx` state, `try_set_start_method()`, the `new_proc()` dispatcher, and the shared helpers `exhaust_portal()`, `cancel_on_completion()`, `hard_kill()`, `soft_kill()`, `proc_waiter()`. Deats, - mv `trio_proc()` → new `spawn._trio` - mv `mp_proc()` → new `spawn._mp`, reads `_ctx` and `_spawn_method` via `from . import _spawn` for late binding bc both get mutated by `try_set_start_method()` - `_methods` wires up the new submods via late bottom-of-module imports to side-step circular dep (both backend mods pull shared helpers from `._spawn`) - prune now-unused imports from `_spawn.py` — `sys`, `is_root_process`, `current_actor`, `is_main_process`, `_mp_main`, `ActorFailure`, `pretty_struct`, `_pformat` Also, - `_testing.pytest.pytest_generate_tests()` now drives the valid-backend set from `typing.get_args(SpawnMethodKey)` so adding a new backend (e.g. `'subint'`) doesn't require touching the harness - refresh `spawn/__init__.py` docstring for the new layout (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- tractor/_testing/pytest.py | 12 +- tractor/spawn/__init__.py | 31 ++- tractor/spawn/_mp.py | 235 +++++++++++++++++++++ tractor/spawn/_spawn.py | 413 +------------------------------------ tractor/spawn/_trio.py | 292 ++++++++++++++++++++++++++ 5 files changed, 565 insertions(+), 418 deletions(-) create mode 100644 tractor/spawn/_mp.py create mode 100644 tractor/spawn/_trio.py diff --git a/tractor/_testing/pytest.py b/tractor/_testing/pytest.py index f843fb4c..1d803c9e 100644 --- a/tractor/_testing/pytest.py +++ b/tractor/_testing/pytest.py @@ -27,6 +27,7 @@ import inspect import platform from typing import ( Callable, + get_args, ) import pytest @@ -341,12 +342,11 @@ def pytest_generate_tests( # XXX some weird windows bug with `pytest`? spawn_backend = 'trio' - # TODO: maybe just use the literal `._spawn.SpawnMethodKey`? - assert spawn_backend in ( - 'mp_spawn', - 'mp_forkserver', - 'trio', - ) + # drive the valid-backend set from the canonical `Literal` so + # adding a new spawn backend (e.g. `'subint'`) doesn't require + # touching the harness. + from tractor.spawn._spawn import SpawnMethodKey + assert spawn_backend in get_args(SpawnMethodKey) # NOTE: used-to-be-used-to dyanmically parametrize tests for when # you just passed --spawn-backend=`mp` on the cli, but now we expect diff --git a/tractor/spawn/__init__.py b/tractor/spawn/__init__.py index 03f2b0f8..06ba413e 100644 --- a/tractor/spawn/__init__.py +++ b/tractor/spawn/__init__.py @@ -15,12 +15,31 @@ # along with this program. If not, see . ''' -Actor process spawning machinery using -multiple backends (trio, multiprocessing). +Actor process spawning machinery using multiple backends. -NOTE: to avoid circular imports, this ``__init__`` -does NOT eagerly import submodules. Use direct -module paths like ``tractor.spawn._spawn`` or -``tractor.spawn._entry`` instead. +Layout +------ +- `._spawn`: the "core" supervisor machinery — spawn-method + registry (`SpawnMethodKey`, `_methods`, `_spawn_method`, + `_ctx`, `try_set_start_method`), the `new_proc` dispatcher, + and the cross-backend helpers `exhaust_portal`, + `cancel_on_completion`, `hard_kill`, `soft_kill`, + `proc_waiter`. + +Per-backend submodules (each exposes a single `*_proc()` +coroutine registered in `_spawn._methods`): + +- `._trio`: the `trio`-native subprocess backend (default, + all platforms), spawns via `trio.lowlevel.open_process()`. +- `._mp`: the stdlib `multiprocessing` backends — + `'mp_spawn'` and `'mp_forkserver'` variants — driven by + the `mp.context` bound to `_spawn._ctx`. + +Entry-point helpers live in `._entry`/`._mp_fixup_main`/ +`._forkserver_override`. + +NOTE: to avoid circular imports, this ``__init__`` does NOT +eagerly import submodules. Use direct module paths like +``tractor.spawn._spawn`` or ``tractor.spawn._trio`` instead. ''' diff --git a/tractor/spawn/_mp.py b/tractor/spawn/_mp.py new file mode 100644 index 00000000..addc8996 --- /dev/null +++ b/tractor/spawn/_mp.py @@ -0,0 +1,235 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +The `multiprocessing` subprocess spawning backends (`spawn` +and `forkserver` variants). + +Driven by the stdlib `multiprocessing` context selected via +`try_set_start_method()` in the `_spawn` core module, which +sets the module-global `_ctx` and `_spawn_method` read here. + +''' +from __future__ import annotations +import multiprocessing as mp +from typing import ( + Any, + TYPE_CHECKING, +) + +import trio +from trio import TaskStatus + +from tractor.runtime._state import ( + current_actor, + is_main_process, +) +from tractor.log import get_logger +from tractor.discovery._addr import UnwrappedAddress +from tractor.runtime._portal import Portal +from tractor.runtime._runtime import Actor +from tractor._exceptions import ActorFailure +from ._entry import _mp_main +# NOTE: module-import (not from-import) so we dynamically see +# the *current* `_ctx` / `_spawn_method` values, which are mutated +# by `try_set_start_method()` after module load time. +from . import _spawn +from ._spawn import ( + cancel_on_completion, + proc_waiter, + soft_kill, +) + + +if TYPE_CHECKING: + from tractor.ipc import ( + _server, + Channel, + ) + from tractor.runtime._supervise import ActorNursery + + +log = get_logger('tractor') + + +async def mp_proc( + name: str, + actor_nursery: ActorNursery, # type: ignore # noqa + subactor: Actor, + errors: dict[tuple[str, str], Exception], + # passed through to actor main + bind_addrs: list[UnwrappedAddress], + parent_addr: UnwrappedAddress, + _runtime_vars: dict[str, Any], # serialized and sent to _child + *, + infect_asyncio: bool = False, + task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED, + proc_kwargs: dict[str, any] = {} + +) -> None: + + # uggh zone + try: + from multiprocessing import semaphore_tracker # type: ignore + resource_tracker = semaphore_tracker + resource_tracker._resource_tracker = resource_tracker._semaphore_tracker # noqa + except ImportError: + # 3.8 introduces a more general version that also tracks shared mems + from multiprocessing import resource_tracker # type: ignore + + assert _spawn._ctx + start_method = _spawn._ctx.get_start_method() + if start_method == 'forkserver': + + from multiprocessing import forkserver # type: ignore + # XXX do our hackery on the stdlib to avoid multiple + # forkservers (one at each subproc layer). + fs = forkserver._forkserver + curr_actor = current_actor() + if is_main_process() and not curr_actor._forkserver_info: + # if we're the "main" process start the forkserver + # only once and pass its ipc info to downstream + # children + # forkserver.set_forkserver_preload(enable_modules) + forkserver.ensure_running() + fs_info = ( + fs._forkserver_address, # type: ignore # noqa + fs._forkserver_alive_fd, # type: ignore # noqa + getattr(fs, '_forkserver_pid', None), + getattr( + resource_tracker._resource_tracker, '_pid', None), + resource_tracker._resource_tracker._fd, + ) + else: # request to forkerserver to fork a new child + assert curr_actor._forkserver_info + fs_info = ( + fs._forkserver_address, # type: ignore # noqa + fs._forkserver_alive_fd, # type: ignore # noqa + fs._forkserver_pid, # type: ignore # noqa + resource_tracker._resource_tracker._pid, + resource_tracker._resource_tracker._fd, + ) = curr_actor._forkserver_info + else: + # spawn method + fs_info = (None, None, None, None, None) + + proc: mp.Process = _spawn._ctx.Process( # type: ignore + target=_mp_main, + args=( + subactor, + bind_addrs, + fs_info, + _spawn._spawn_method, + parent_addr, + infect_asyncio, + ), + # daemon=True, + name=name, + ) + + # `multiprocessing` only (since no async interface): + # register the process before start in case we get a cancel + # request before the actor has fully spawned - then we can wait + # for it to fully come up before sending a cancel request + actor_nursery._children[subactor.aid.uid] = (subactor, proc, None) + + proc.start() + if not proc.is_alive(): + raise ActorFailure("Couldn't start sub-actor?") + + log.runtime(f"Started {proc}") + + ipc_server: _server.Server = actor_nursery._actor.ipc_server + try: + # wait for actor to spawn and connect back to us + # channel should have handshake completed by the + # local actor by the time we get a ref to it + event, chan = await ipc_server.wait_for_peer( + subactor.aid.uid, + ) + + # XXX: monkey patch poll API to match the ``subprocess`` API.. + # not sure why they don't expose this but kk. + proc.poll = lambda: proc.exitcode # type: ignore + + # except: + # TODO: in the case we were cancelled before the sub-proc + # registered itself back we must be sure to try and clean + # any process we may have started. + + portal = Portal(chan) + actor_nursery._children[subactor.aid.uid] = (subactor, proc, portal) + + # unblock parent task + task_status.started(portal) + + # wait for ``ActorNursery`` block to signal that + # subprocesses can be waited upon. + # This is required to ensure synchronization + # with user code that may want to manually await results + # from nursery spawned sub-actors. We don't want the + # containing nurseries here to collect results or error + # while user code is still doing it's thing. Only after the + # nursery block closes do we allow subactor results to be + # awaited and reported upwards to the supervisor. + with trio.CancelScope(shield=True): + await actor_nursery._join_procs.wait() + + async with trio.open_nursery() as nursery: + if portal in actor_nursery._cancel_after_result_on_exit: + nursery.start_soon( + cancel_on_completion, + portal, + subactor, + errors + ) + + # This is a "soft" (cancellable) join/reap which + # will remote cancel the actor on a ``trio.Cancelled`` + # condition. + await soft_kill( + proc, + proc_waiter, + portal + ) + + # cancel result waiter that may have been spawned in + # tandem if not done already + log.warning( + "Cancelling existing result waiter task for " + f"{subactor.aid.uid}") + nursery.cancel_scope.cancel() + + finally: + # hard reap sequence + if proc.is_alive(): + log.cancel(f"Attempting to hard kill {proc}") + with trio.move_on_after(0.1) as cs: + cs.shield = True + await proc_waiter(proc) + + if cs.cancelled_caught: + proc.terminate() + + proc.join() + log.debug(f"Joined {proc}") + + # pop child entry to indicate we are no longer managing subactor + actor_nursery._children.pop(subactor.aid.uid) + + # TODO: prolly report to ``mypy`` how this causes all sorts of + # false errors.. + # subactor, proc, portal = actor_nursery._children.pop(subactor.uid) diff --git a/tractor/spawn/_spawn.py b/tractor/spawn/_spawn.py index 9d89648c..d040813a 100644 --- a/tractor/spawn/_spawn.py +++ b/tractor/spawn/_spawn.py @@ -20,7 +20,6 @@ Machinery for actor process spawning using multiple backends. """ from __future__ import annotations import multiprocessing as mp -import sys import platform from typing import ( Any, @@ -34,14 +33,8 @@ from typing import ( import trio from trio import TaskStatus -from ..devx import ( - debug, - pformat as _pformat -) +from ..devx import debug from tractor.runtime._state import ( - current_actor, - is_main_process, - is_root_process, debug_mode, _runtime_vars, ) @@ -49,12 +42,7 @@ from tractor.log import get_logger from tractor.discovery._addr import UnwrappedAddress from tractor.runtime._portal import Portal from tractor.runtime._runtime import Actor -from ._entry import _mp_main -from tractor._exceptions import ActorFailure -from tractor.msg import ( - types as msgtypes, - pretty_struct, -) +from tractor.msg import types as msgtypes if TYPE_CHECKING: @@ -445,398 +433,11 @@ async def new_proc( ) -async def trio_proc( - name: str, - actor_nursery: ActorNursery, - subactor: Actor, - errors: dict[tuple[str, str], Exception], - - # passed through to actor main - bind_addrs: list[UnwrappedAddress], - parent_addr: UnwrappedAddress, - _runtime_vars: dict[str, Any], # serialized and sent to _child - *, - infect_asyncio: bool = False, - task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED, - proc_kwargs: dict[str, any] = {} - -) -> None: - ''' - Create a new ``Process`` using a "spawn method" as (configured using - ``try_set_start_method()``). - - This routine should be started in a actor runtime task and the logic - here is to be considered the core supervision strategy. - - ''' - spawn_cmd = [ - sys.executable, - "-m", - # Hardcode this (instead of using ``_child.__name__`` to avoid a - # double import warning: https://stackoverflow.com/a/45070583 - "tractor._child", - # We provide the child's unique identifier on this exec/spawn - # line for debugging purposes when viewing the process tree from - # the OS; it otherwise can be passed via the parent channel if - # we prefer in the future (for privacy). - "--uid", - # TODO, how to pass this over "wire" encodings like - # cmdline args? - # -[ ] maybe we can add an `msgtypes.Aid.min_tuple()` ? - str(subactor.aid.uid), - # Address the child must connect to on startup - "--parent_addr", - str(parent_addr) - ] - - if subactor.loglevel: - spawn_cmd += [ - "--loglevel", - subactor.loglevel - ] - # Tell child to run in guest mode on top of ``asyncio`` loop - if infect_asyncio: - spawn_cmd.append("--asyncio") - - cancelled_during_spawn: bool = False - proc: trio.Process|None = None - ipc_server: _server.Server = actor_nursery._actor.ipc_server - try: - try: - proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd, **proc_kwargs) - log.runtime( - f'Started new child subproc\n' - f'(>\n' - f' |_{proc}\n' - ) - - # wait for actor to spawn and connect back to us - # channel should have handshake completed by the - # local actor by the time we get a ref to it - event, chan = await ipc_server.wait_for_peer( - subactor.aid.uid - ) - - except trio.Cancelled: - cancelled_during_spawn = True - # we may cancel before the child connects back in which - # case avoid clobbering the pdb tty. - if debug_mode(): - with trio.CancelScope(shield=True): - # don't clobber an ongoing pdb - if is_root_process(): - await debug.maybe_wait_for_debugger() - - elif proc is not None: - async with debug.acquire_debug_lock( - subactor_uid=subactor.aid.uid - ): - # soft wait on the proc to terminate - with trio.move_on_after(0.5): - await proc.wait() - raise - - # a sub-proc ref **must** exist now - assert proc - - portal = Portal(chan) - actor_nursery._children[subactor.aid.uid] = ( - subactor, - proc, - portal, - ) - - # send a "spawning specification" which configures the - # initial runtime state of the child. - sspec = msgtypes.SpawnSpec( - _parent_main_data=subactor._parent_main_data, - enable_modules=subactor.enable_modules, - reg_addrs=subactor.reg_addrs, - bind_addrs=bind_addrs, - _runtime_vars=_runtime_vars, - ) - log.runtime( - f'Sending spawn spec to child\n' - f'{{}}=> {chan.aid.reprol()!r}\n' - f'\n' - f'{pretty_struct.pformat(sspec)}\n' - ) - await chan.send(sspec) - - # track subactor in current nursery - curr_actor: Actor = current_actor() - curr_actor._actoruid2nursery[subactor.aid.uid] = actor_nursery - - # resume caller at next checkpoint now that child is up - task_status.started(portal) - - # wait for ActorNursery.wait() to be called - with trio.CancelScope(shield=True): - await actor_nursery._join_procs.wait() - - async with trio.open_nursery() as nursery: - if portal in actor_nursery._cancel_after_result_on_exit: - nursery.start_soon( - cancel_on_completion, - portal, - subactor, - errors - ) - - # This is a "soft" (cancellable) join/reap which - # will remote cancel the actor on a ``trio.Cancelled`` - # condition. - await soft_kill( - proc, - trio.Process.wait, # XXX, uses `pidfd_open()` below. - portal - ) - - # cancel result waiter that may have been spawned in - # tandem if not done already - log.cancel( - 'Cancelling portal result reaper task\n' - f'c)> {subactor.aid.reprol()!r}\n' - ) - nursery.cancel_scope.cancel() - - finally: - # XXX NOTE XXX: The "hard" reap since no actor zombies are - # allowed! Do this **after** cancellation/teardown to avoid - # killing the process too early. - if proc: - reap_repr: str = _pformat.nest_from_op( - input_op='>x)', - text=subactor.pformat(), - ) - log.cancel( - f'Hard reap sequence starting for subactor\n' - f'{reap_repr}' - ) - - with trio.CancelScope(shield=True): - # don't clobber an ongoing pdb - if cancelled_during_spawn: - # Try again to avoid TTY clobbering. - async with debug.acquire_debug_lock( - subactor_uid=subactor.aid.uid - ): - with trio.move_on_after(0.5): - await proc.wait() - - await debug.maybe_wait_for_debugger( - child_in_debug=_runtime_vars.get( - '_debug_mode', False - ), - header_msg=( - 'Delaying subproc reaper while debugger locked..\n' - ), - - # TODO: need a diff value then default? - # poll_steps=9999999, - ) - # TODO: solve the following issue where we need - # to do a similar wait like this but in an - # "intermediary" parent actor that itself isn't - # in debug but has a child that is, and we need - # to hold off on relaying SIGINT until that child - # is complete. - # https://github.com/goodboy/tractor/issues/320 - # -[ ] we need to handle non-root parent-actors specially - # by somehow determining if a child is in debug and then - # avoiding cancel/kill of said child by this - # (intermediary) parent until such a time as the root says - # the pdb lock is released and we are good to tear down - # (our children).. - # - # -[ ] so maybe something like this where we try to - # acquire the lock and get notified of who has it, - # check that uid against our known children? - # this_uid: tuple[str, str] = current_actor().uid - # await debug.acquire_debug_lock(this_uid) - - if proc.poll() is None: - log.cancel(f"Attempting to hard kill {proc}") - await hard_kill(proc) - - log.debug(f"Joined {proc}") - else: - log.warning('Nursery cancelled before sub-proc started') - - if not cancelled_during_spawn: - # pop child entry to indicate we no longer managing this - # subactor - actor_nursery._children.pop(subactor.aid.uid) - - -async def mp_proc( - name: str, - actor_nursery: ActorNursery, # type: ignore # noqa - subactor: Actor, - errors: dict[tuple[str, str], Exception], - # passed through to actor main - bind_addrs: list[UnwrappedAddress], - parent_addr: UnwrappedAddress, - _runtime_vars: dict[str, Any], # serialized and sent to _child - *, - infect_asyncio: bool = False, - task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED, - proc_kwargs: dict[str, any] = {} - -) -> None: - - # uggh zone - try: - from multiprocessing import semaphore_tracker # type: ignore - resource_tracker = semaphore_tracker - resource_tracker._resource_tracker = resource_tracker._semaphore_tracker # noqa - except ImportError: - # 3.8 introduces a more general version that also tracks shared mems - from multiprocessing import resource_tracker # type: ignore - - assert _ctx - start_method = _ctx.get_start_method() - if start_method == 'forkserver': - - from multiprocessing import forkserver # type: ignore - # XXX do our hackery on the stdlib to avoid multiple - # forkservers (one at each subproc layer). - fs = forkserver._forkserver - curr_actor = current_actor() - if is_main_process() and not curr_actor._forkserver_info: - # if we're the "main" process start the forkserver - # only once and pass its ipc info to downstream - # children - # forkserver.set_forkserver_preload(enable_modules) - forkserver.ensure_running() - fs_info = ( - fs._forkserver_address, # type: ignore # noqa - fs._forkserver_alive_fd, # type: ignore # noqa - getattr(fs, '_forkserver_pid', None), - getattr( - resource_tracker._resource_tracker, '_pid', None), - resource_tracker._resource_tracker._fd, - ) - else: # request to forkerserver to fork a new child - assert curr_actor._forkserver_info - fs_info = ( - fs._forkserver_address, # type: ignore # noqa - fs._forkserver_alive_fd, # type: ignore # noqa - fs._forkserver_pid, # type: ignore # noqa - resource_tracker._resource_tracker._pid, - resource_tracker._resource_tracker._fd, - ) = curr_actor._forkserver_info - else: - # spawn method - fs_info = (None, None, None, None, None) - - proc: mp.Process = _ctx.Process( # type: ignore - target=_mp_main, - args=( - subactor, - bind_addrs, - fs_info, - _spawn_method, - parent_addr, - infect_asyncio, - ), - # daemon=True, - name=name, - ) - - # `multiprocessing` only (since no async interface): - # register the process before start in case we get a cancel - # request before the actor has fully spawned - then we can wait - # for it to fully come up before sending a cancel request - actor_nursery._children[subactor.aid.uid] = (subactor, proc, None) - - proc.start() - if not proc.is_alive(): - raise ActorFailure("Couldn't start sub-actor?") - - log.runtime(f"Started {proc}") - - ipc_server: _server.Server = actor_nursery._actor.ipc_server - try: - # wait for actor to spawn and connect back to us - # channel should have handshake completed by the - # local actor by the time we get a ref to it - event, chan = await ipc_server.wait_for_peer( - subactor.aid.uid, - ) - - # XXX: monkey patch poll API to match the ``subprocess`` API.. - # not sure why they don't expose this but kk. - proc.poll = lambda: proc.exitcode # type: ignore - - # except: - # TODO: in the case we were cancelled before the sub-proc - # registered itself back we must be sure to try and clean - # any process we may have started. - - portal = Portal(chan) - actor_nursery._children[subactor.aid.uid] = (subactor, proc, portal) - - # unblock parent task - task_status.started(portal) - - # wait for ``ActorNursery`` block to signal that - # subprocesses can be waited upon. - # This is required to ensure synchronization - # with user code that may want to manually await results - # from nursery spawned sub-actors. We don't want the - # containing nurseries here to collect results or error - # while user code is still doing it's thing. Only after the - # nursery block closes do we allow subactor results to be - # awaited and reported upwards to the supervisor. - with trio.CancelScope(shield=True): - await actor_nursery._join_procs.wait() - - async with trio.open_nursery() as nursery: - if portal in actor_nursery._cancel_after_result_on_exit: - nursery.start_soon( - cancel_on_completion, - portal, - subactor, - errors - ) - - # This is a "soft" (cancellable) join/reap which - # will remote cancel the actor on a ``trio.Cancelled`` - # condition. - await soft_kill( - proc, - proc_waiter, - portal - ) - - # cancel result waiter that may have been spawned in - # tandem if not done already - log.warning( - "Cancelling existing result waiter task for " - f"{subactor.aid.uid}") - nursery.cancel_scope.cancel() - - finally: - # hard reap sequence - if proc.is_alive(): - log.cancel(f"Attempting to hard kill {proc}") - with trio.move_on_after(0.1) as cs: - cs.shield = True - await proc_waiter(proc) - - if cs.cancelled_caught: - proc.terminate() - - proc.join() - log.debug(f"Joined {proc}") - - # pop child entry to indicate we are no longer managing subactor - actor_nursery._children.pop(subactor.aid.uid) - - # TODO: prolly report to ``mypy`` how this causes all sorts of - # false errors.. - # subactor, proc, portal = actor_nursery._children.pop(subactor.uid) +# NOTE: bottom-of-module to avoid a circular import since the +# backend submodules pull `cancel_on_completion`/`soft_kill`/ +# `hard_kill`/`proc_waiter` from this module. +from ._trio import trio_proc +from ._mp import mp_proc # proc spawning backend target map diff --git a/tractor/spawn/_trio.py b/tractor/spawn/_trio.py new file mode 100644 index 00000000..a79d1c8d --- /dev/null +++ b/tractor/spawn/_trio.py @@ -0,0 +1,292 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +The `trio` subprocess spawning backend. + +Spawns sub-actors as fresh OS processes driven by +`trio.lowlevel.open_process()` — our default, cross-platform +spawn method. + +''' +from __future__ import annotations +import sys +from typing import ( + Any, + TYPE_CHECKING, +) + +import trio +from trio import TaskStatus + +from ..devx import ( + debug, + pformat as _pformat, +) +from tractor.runtime._state import ( + current_actor, + is_root_process, + debug_mode, + _runtime_vars, +) +from tractor.log import get_logger +from tractor.discovery._addr import UnwrappedAddress +from tractor.runtime._portal import Portal +from tractor.runtime._runtime import Actor +from tractor.msg import ( + types as msgtypes, + pretty_struct, +) +from ._spawn import ( + cancel_on_completion, + hard_kill, + soft_kill, +) + + +if TYPE_CHECKING: + from tractor.ipc import ( + _server, + Channel, + ) + from tractor.runtime._supervise import ActorNursery + + +log = get_logger('tractor') + + +async def trio_proc( + name: str, + actor_nursery: ActorNursery, + subactor: Actor, + errors: dict[tuple[str, str], Exception], + + # passed through to actor main + bind_addrs: list[UnwrappedAddress], + parent_addr: UnwrappedAddress, + _runtime_vars: dict[str, Any], # serialized and sent to _child + *, + infect_asyncio: bool = False, + task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED, + proc_kwargs: dict[str, any] = {} + +) -> None: + ''' + Create a new ``Process`` using a "spawn method" as (configured using + ``try_set_start_method()``). + + This routine should be started in a actor runtime task and the logic + here is to be considered the core supervision strategy. + + ''' + spawn_cmd = [ + sys.executable, + "-m", + # Hardcode this (instead of using ``_child.__name__`` to avoid a + # double import warning: https://stackoverflow.com/a/45070583 + "tractor._child", + # We provide the child's unique identifier on this exec/spawn + # line for debugging purposes when viewing the process tree from + # the OS; it otherwise can be passed via the parent channel if + # we prefer in the future (for privacy). + "--uid", + # TODO, how to pass this over "wire" encodings like + # cmdline args? + # -[ ] maybe we can add an `msgtypes.Aid.min_tuple()` ? + str(subactor.aid.uid), + # Address the child must connect to on startup + "--parent_addr", + str(parent_addr) + ] + + if subactor.loglevel: + spawn_cmd += [ + "--loglevel", + subactor.loglevel + ] + # Tell child to run in guest mode on top of ``asyncio`` loop + if infect_asyncio: + spawn_cmd.append("--asyncio") + + cancelled_during_spawn: bool = False + proc: trio.Process|None = None + ipc_server: _server.Server = actor_nursery._actor.ipc_server + try: + try: + proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd, **proc_kwargs) + log.runtime( + f'Started new child subproc\n' + f'(>\n' + f' |_{proc}\n' + ) + + # wait for actor to spawn and connect back to us + # channel should have handshake completed by the + # local actor by the time we get a ref to it + event, chan = await ipc_server.wait_for_peer( + subactor.aid.uid + ) + + except trio.Cancelled: + cancelled_during_spawn = True + # we may cancel before the child connects back in which + # case avoid clobbering the pdb tty. + if debug_mode(): + with trio.CancelScope(shield=True): + # don't clobber an ongoing pdb + if is_root_process(): + await debug.maybe_wait_for_debugger() + + elif proc is not None: + async with debug.acquire_debug_lock( + subactor_uid=subactor.aid.uid + ): + # soft wait on the proc to terminate + with trio.move_on_after(0.5): + await proc.wait() + raise + + # a sub-proc ref **must** exist now + assert proc + + portal = Portal(chan) + actor_nursery._children[subactor.aid.uid] = ( + subactor, + proc, + portal, + ) + + # send a "spawning specification" which configures the + # initial runtime state of the child. + sspec = msgtypes.SpawnSpec( + _parent_main_data=subactor._parent_main_data, + enable_modules=subactor.enable_modules, + reg_addrs=subactor.reg_addrs, + bind_addrs=bind_addrs, + _runtime_vars=_runtime_vars, + ) + log.runtime( + f'Sending spawn spec to child\n' + f'{{}}=> {chan.aid.reprol()!r}\n' + f'\n' + f'{pretty_struct.pformat(sspec)}\n' + ) + await chan.send(sspec) + + # track subactor in current nursery + curr_actor: Actor = current_actor() + curr_actor._actoruid2nursery[subactor.aid.uid] = actor_nursery + + # resume caller at next checkpoint now that child is up + task_status.started(portal) + + # wait for ActorNursery.wait() to be called + with trio.CancelScope(shield=True): + await actor_nursery._join_procs.wait() + + async with trio.open_nursery() as nursery: + if portal in actor_nursery._cancel_after_result_on_exit: + nursery.start_soon( + cancel_on_completion, + portal, + subactor, + errors + ) + + # This is a "soft" (cancellable) join/reap which + # will remote cancel the actor on a ``trio.Cancelled`` + # condition. + await soft_kill( + proc, + trio.Process.wait, # XXX, uses `pidfd_open()` below. + portal + ) + + # cancel result waiter that may have been spawned in + # tandem if not done already + log.cancel( + 'Cancelling portal result reaper task\n' + f'c)> {subactor.aid.reprol()!r}\n' + ) + nursery.cancel_scope.cancel() + + finally: + # XXX NOTE XXX: The "hard" reap since no actor zombies are + # allowed! Do this **after** cancellation/teardown to avoid + # killing the process too early. + if proc: + reap_repr: str = _pformat.nest_from_op( + input_op='>x)', + text=subactor.pformat(), + ) + log.cancel( + f'Hard reap sequence starting for subactor\n' + f'{reap_repr}' + ) + + with trio.CancelScope(shield=True): + # don't clobber an ongoing pdb + if cancelled_during_spawn: + # Try again to avoid TTY clobbering. + async with debug.acquire_debug_lock( + subactor_uid=subactor.aid.uid + ): + with trio.move_on_after(0.5): + await proc.wait() + + await debug.maybe_wait_for_debugger( + child_in_debug=_runtime_vars.get( + '_debug_mode', False + ), + header_msg=( + 'Delaying subproc reaper while debugger locked..\n' + ), + + # TODO: need a diff value then default? + # poll_steps=9999999, + ) + # TODO: solve the following issue where we need + # to do a similar wait like this but in an + # "intermediary" parent actor that itself isn't + # in debug but has a child that is, and we need + # to hold off on relaying SIGINT until that child + # is complete. + # https://github.com/goodboy/tractor/issues/320 + # -[ ] we need to handle non-root parent-actors specially + # by somehow determining if a child is in debug and then + # avoiding cancel/kill of said child by this + # (intermediary) parent until such a time as the root says + # the pdb lock is released and we are good to tear down + # (our children).. + # + # -[ ] so maybe something like this where we try to + # acquire the lock and get notified of who has it, + # check that uid against our known children? + # this_uid: tuple[str, str] = current_actor().uid + # await debug.acquire_debug_lock(this_uid) + + if proc.poll() is None: + log.cancel(f"Attempting to hard kill {proc}") + await hard_kill(proc) + + log.debug(f"Joined {proc}") + else: + log.warning('Nursery cancelled before sub-proc started') + + if not cancelled_during_spawn: + # pop child entry to indicate we no longer managing this + # subactor + actor_nursery._children.pop(subactor.aid.uid)