Mv `trio_proc`/`mp_proc` to per-backend submods

Split the monolithic `spawn._spawn` into a slim
"core" + per-backend submodules so a future
`._subint` backend (per issue #379) can drop in
without piling more onto `_spawn.py`.

`._spawn` retains the cross-backend supervisor
machinery: `SpawnMethodKey`, `_methods` registry,
`_spawn_method`/`_ctx` state, `try_set_start_method()`,
the `new_proc()` dispatcher, and the shared helpers
`exhaust_portal()`, `cancel_on_completion()`,
`hard_kill()`, `soft_kill()`, `proc_waiter()`.

Deats,
- mv `trio_proc()` → new `spawn._trio`
- mv `mp_proc()` → new `spawn._mp`, reads `_ctx` and
  `_spawn_method` via `from . import _spawn` for
  late binding bc both get mutated by
  `try_set_start_method()`
- `_methods` wires up the new submods via late
  bottom-of-module imports to side-step circular
  dep (both backend mods pull shared helpers from
  `._spawn`)
- prune now-unused imports from `_spawn.py` — `sys`,
  `is_root_process`, `current_actor`,
  `is_main_process`, `_mp_main`, `ActorFailure`,
  `pretty_struct`, `_pformat`

Also,
- `_testing.pytest.pytest_generate_tests()` now
  drives the valid-backend set from
  `typing.get_args(SpawnMethodKey)` so adding a
  new backend (e.g. `'subint'`) doesn't require
  touching the harness
- refresh `spawn/__init__.py` docstring for the
  new layout

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
subint_spawner_backend
Gud Boi 2026-04-17 01:58:05 -04:00
parent b5b0504918
commit d7ca68cf61
5 changed files with 565 additions and 418 deletions

View File

@ -27,6 +27,7 @@ import inspect
import platform import platform
from typing import ( from typing import (
Callable, Callable,
get_args,
) )
import pytest import pytest
@ -341,12 +342,11 @@ def pytest_generate_tests(
# XXX some weird windows bug with `pytest`? # XXX some weird windows bug with `pytest`?
spawn_backend = 'trio' spawn_backend = 'trio'
# TODO: maybe just use the literal `._spawn.SpawnMethodKey`? # drive the valid-backend set from the canonical `Literal` so
assert spawn_backend in ( # adding a new spawn backend (e.g. `'subint'`) doesn't require
'mp_spawn', # touching the harness.
'mp_forkserver', from tractor.spawn._spawn import SpawnMethodKey
'trio', assert spawn_backend in get_args(SpawnMethodKey)
)
# NOTE: used-to-be-used-to dyanmically parametrize tests for when # NOTE: used-to-be-used-to dyanmically parametrize tests for when
# you just passed --spawn-backend=`mp` on the cli, but now we expect # you just passed --spawn-backend=`mp` on the cli, but now we expect

View File

@ -15,12 +15,31 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
''' '''
Actor process spawning machinery using Actor process spawning machinery using multiple backends.
multiple backends (trio, multiprocessing).
NOTE: to avoid circular imports, this ``__init__`` Layout
does NOT eagerly import submodules. Use direct ------
module paths like ``tractor.spawn._spawn`` or - `._spawn`: the "core" supervisor machinery spawn-method
``tractor.spawn._entry`` instead. registry (`SpawnMethodKey`, `_methods`, `_spawn_method`,
`_ctx`, `try_set_start_method`), the `new_proc` dispatcher,
and the cross-backend helpers `exhaust_portal`,
`cancel_on_completion`, `hard_kill`, `soft_kill`,
`proc_waiter`.
Per-backend submodules (each exposes a single `*_proc()`
coroutine registered in `_spawn._methods`):
- `._trio`: the `trio`-native subprocess backend (default,
all platforms), spawns via `trio.lowlevel.open_process()`.
- `._mp`: the stdlib `multiprocessing` backends
`'mp_spawn'` and `'mp_forkserver'` variants driven by
the `mp.context` bound to `_spawn._ctx`.
Entry-point helpers live in `._entry`/`._mp_fixup_main`/
`._forkserver_override`.
NOTE: to avoid circular imports, this ``__init__`` does NOT
eagerly import submodules. Use direct module paths like
``tractor.spawn._spawn`` or ``tractor.spawn._trio`` instead.
''' '''

View File

@ -0,0 +1,235 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
The `multiprocessing` subprocess spawning backends (`spawn`
and `forkserver` variants).
Driven by the stdlib `multiprocessing` context selected via
`try_set_start_method()` in the `_spawn` core module, which
sets the module-global `_ctx` and `_spawn_method` read here.
'''
from __future__ import annotations
import multiprocessing as mp
from typing import (
Any,
TYPE_CHECKING,
)
import trio
from trio import TaskStatus
from tractor.runtime._state import (
current_actor,
is_main_process,
)
from tractor.log import get_logger
from tractor.discovery._addr import UnwrappedAddress
from tractor.runtime._portal import Portal
from tractor.runtime._runtime import Actor
from tractor._exceptions import ActorFailure
from ._entry import _mp_main
# NOTE: module-import (not from-import) so we dynamically see
# the *current* `_ctx` / `_spawn_method` values, which are mutated
# by `try_set_start_method()` after module load time.
from . import _spawn
from ._spawn import (
cancel_on_completion,
proc_waiter,
soft_kill,
)
if TYPE_CHECKING:
from tractor.ipc import (
_server,
Channel,
)
from tractor.runtime._supervise import ActorNursery
log = get_logger('tractor')
async def mp_proc(
name: str,
actor_nursery: ActorNursery, # type: ignore # noqa
subactor: Actor,
errors: dict[tuple[str, str], Exception],
# passed through to actor main
bind_addrs: list[UnwrappedAddress],
parent_addr: UnwrappedAddress,
_runtime_vars: dict[str, Any], # serialized and sent to _child
*,
infect_asyncio: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED,
proc_kwargs: dict[str, any] = {}
) -> None:
# uggh zone
try:
from multiprocessing import semaphore_tracker # type: ignore
resource_tracker = semaphore_tracker
resource_tracker._resource_tracker = resource_tracker._semaphore_tracker # noqa
except ImportError:
# 3.8 introduces a more general version that also tracks shared mems
from multiprocessing import resource_tracker # type: ignore
assert _spawn._ctx
start_method = _spawn._ctx.get_start_method()
if start_method == 'forkserver':
from multiprocessing import forkserver # type: ignore
# XXX do our hackery on the stdlib to avoid multiple
# forkservers (one at each subproc layer).
fs = forkserver._forkserver
curr_actor = current_actor()
if is_main_process() and not curr_actor._forkserver_info:
# if we're the "main" process start the forkserver
# only once and pass its ipc info to downstream
# children
# forkserver.set_forkserver_preload(enable_modules)
forkserver.ensure_running()
fs_info = (
fs._forkserver_address, # type: ignore # noqa
fs._forkserver_alive_fd, # type: ignore # noqa
getattr(fs, '_forkserver_pid', None),
getattr(
resource_tracker._resource_tracker, '_pid', None),
resource_tracker._resource_tracker._fd,
)
else: # request to forkerserver to fork a new child
assert curr_actor._forkserver_info
fs_info = (
fs._forkserver_address, # type: ignore # noqa
fs._forkserver_alive_fd, # type: ignore # noqa
fs._forkserver_pid, # type: ignore # noqa
resource_tracker._resource_tracker._pid,
resource_tracker._resource_tracker._fd,
) = curr_actor._forkserver_info
else:
# spawn method
fs_info = (None, None, None, None, None)
proc: mp.Process = _spawn._ctx.Process( # type: ignore
target=_mp_main,
args=(
subactor,
bind_addrs,
fs_info,
_spawn._spawn_method,
parent_addr,
infect_asyncio,
),
# daemon=True,
name=name,
)
# `multiprocessing` only (since no async interface):
# register the process before start in case we get a cancel
# request before the actor has fully spawned - then we can wait
# for it to fully come up before sending a cancel request
actor_nursery._children[subactor.aid.uid] = (subactor, proc, None)
proc.start()
if not proc.is_alive():
raise ActorFailure("Couldn't start sub-actor?")
log.runtime(f"Started {proc}")
ipc_server: _server.Server = actor_nursery._actor.ipc_server
try:
# wait for actor to spawn and connect back to us
# channel should have handshake completed by the
# local actor by the time we get a ref to it
event, chan = await ipc_server.wait_for_peer(
subactor.aid.uid,
)
# XXX: monkey patch poll API to match the ``subprocess`` API..
# not sure why they don't expose this but kk.
proc.poll = lambda: proc.exitcode # type: ignore
# except:
# TODO: in the case we were cancelled before the sub-proc
# registered itself back we must be sure to try and clean
# any process we may have started.
portal = Portal(chan)
actor_nursery._children[subactor.aid.uid] = (subactor, proc, portal)
# unblock parent task
task_status.started(portal)
# wait for ``ActorNursery`` block to signal that
# subprocesses can be waited upon.
# This is required to ensure synchronization
# with user code that may want to manually await results
# from nursery spawned sub-actors. We don't want the
# containing nurseries here to collect results or error
# while user code is still doing it's thing. Only after the
# nursery block closes do we allow subactor results to be
# awaited and reported upwards to the supervisor.
with trio.CancelScope(shield=True):
await actor_nursery._join_procs.wait()
async with trio.open_nursery() as nursery:
if portal in actor_nursery._cancel_after_result_on_exit:
nursery.start_soon(
cancel_on_completion,
portal,
subactor,
errors
)
# This is a "soft" (cancellable) join/reap which
# will remote cancel the actor on a ``trio.Cancelled``
# condition.
await soft_kill(
proc,
proc_waiter,
portal
)
# cancel result waiter that may have been spawned in
# tandem if not done already
log.warning(
"Cancelling existing result waiter task for "
f"{subactor.aid.uid}")
nursery.cancel_scope.cancel()
finally:
# hard reap sequence
if proc.is_alive():
log.cancel(f"Attempting to hard kill {proc}")
with trio.move_on_after(0.1) as cs:
cs.shield = True
await proc_waiter(proc)
if cs.cancelled_caught:
proc.terminate()
proc.join()
log.debug(f"Joined {proc}")
# pop child entry to indicate we are no longer managing subactor
actor_nursery._children.pop(subactor.aid.uid)
# TODO: prolly report to ``mypy`` how this causes all sorts of
# false errors..
# subactor, proc, portal = actor_nursery._children.pop(subactor.uid)

View File

@ -20,7 +20,6 @@ Machinery for actor process spawning using multiple backends.
""" """
from __future__ import annotations from __future__ import annotations
import multiprocessing as mp import multiprocessing as mp
import sys
import platform import platform
from typing import ( from typing import (
Any, Any,
@ -34,14 +33,8 @@ from typing import (
import trio import trio
from trio import TaskStatus from trio import TaskStatus
from ..devx import ( from ..devx import debug
debug,
pformat as _pformat
)
from tractor.runtime._state import ( from tractor.runtime._state import (
current_actor,
is_main_process,
is_root_process,
debug_mode, debug_mode,
_runtime_vars, _runtime_vars,
) )
@ -49,12 +42,7 @@ from tractor.log import get_logger
from tractor.discovery._addr import UnwrappedAddress from tractor.discovery._addr import UnwrappedAddress
from tractor.runtime._portal import Portal from tractor.runtime._portal import Portal
from tractor.runtime._runtime import Actor from tractor.runtime._runtime import Actor
from ._entry import _mp_main from tractor.msg import types as msgtypes
from tractor._exceptions import ActorFailure
from tractor.msg import (
types as msgtypes,
pretty_struct,
)
if TYPE_CHECKING: if TYPE_CHECKING:
@ -445,398 +433,11 @@ async def new_proc(
) )
async def trio_proc( # NOTE: bottom-of-module to avoid a circular import since the
name: str, # backend submodules pull `cancel_on_completion`/`soft_kill`/
actor_nursery: ActorNursery, # `hard_kill`/`proc_waiter` from this module.
subactor: Actor, from ._trio import trio_proc
errors: dict[tuple[str, str], Exception], from ._mp import mp_proc
# passed through to actor main
bind_addrs: list[UnwrappedAddress],
parent_addr: UnwrappedAddress,
_runtime_vars: dict[str, Any], # serialized and sent to _child
*,
infect_asyncio: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED,
proc_kwargs: dict[str, any] = {}
) -> None:
'''
Create a new ``Process`` using a "spawn method" as (configured using
``try_set_start_method()``).
This routine should be started in a actor runtime task and the logic
here is to be considered the core supervision strategy.
'''
spawn_cmd = [
sys.executable,
"-m",
# Hardcode this (instead of using ``_child.__name__`` to avoid a
# double import warning: https://stackoverflow.com/a/45070583
"tractor._child",
# We provide the child's unique identifier on this exec/spawn
# line for debugging purposes when viewing the process tree from
# the OS; it otherwise can be passed via the parent channel if
# we prefer in the future (for privacy).
"--uid",
# TODO, how to pass this over "wire" encodings like
# cmdline args?
# -[ ] maybe we can add an `msgtypes.Aid.min_tuple()` ?
str(subactor.aid.uid),
# Address the child must connect to on startup
"--parent_addr",
str(parent_addr)
]
if subactor.loglevel:
spawn_cmd += [
"--loglevel",
subactor.loglevel
]
# Tell child to run in guest mode on top of ``asyncio`` loop
if infect_asyncio:
spawn_cmd.append("--asyncio")
cancelled_during_spawn: bool = False
proc: trio.Process|None = None
ipc_server: _server.Server = actor_nursery._actor.ipc_server
try:
try:
proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd, **proc_kwargs)
log.runtime(
f'Started new child subproc\n'
f'(>\n'
f' |_{proc}\n'
)
# wait for actor to spawn and connect back to us
# channel should have handshake completed by the
# local actor by the time we get a ref to it
event, chan = await ipc_server.wait_for_peer(
subactor.aid.uid
)
except trio.Cancelled:
cancelled_during_spawn = True
# we may cancel before the child connects back in which
# case avoid clobbering the pdb tty.
if debug_mode():
with trio.CancelScope(shield=True):
# don't clobber an ongoing pdb
if is_root_process():
await debug.maybe_wait_for_debugger()
elif proc is not None:
async with debug.acquire_debug_lock(
subactor_uid=subactor.aid.uid
):
# soft wait on the proc to terminate
with trio.move_on_after(0.5):
await proc.wait()
raise
# a sub-proc ref **must** exist now
assert proc
portal = Portal(chan)
actor_nursery._children[subactor.aid.uid] = (
subactor,
proc,
portal,
)
# send a "spawning specification" which configures the
# initial runtime state of the child.
sspec = msgtypes.SpawnSpec(
_parent_main_data=subactor._parent_main_data,
enable_modules=subactor.enable_modules,
reg_addrs=subactor.reg_addrs,
bind_addrs=bind_addrs,
_runtime_vars=_runtime_vars,
)
log.runtime(
f'Sending spawn spec to child\n'
f'{{}}=> {chan.aid.reprol()!r}\n'
f'\n'
f'{pretty_struct.pformat(sspec)}\n'
)
await chan.send(sspec)
# track subactor in current nursery
curr_actor: Actor = current_actor()
curr_actor._actoruid2nursery[subactor.aid.uid] = actor_nursery
# resume caller at next checkpoint now that child is up
task_status.started(portal)
# wait for ActorNursery.wait() to be called
with trio.CancelScope(shield=True):
await actor_nursery._join_procs.wait()
async with trio.open_nursery() as nursery:
if portal in actor_nursery._cancel_after_result_on_exit:
nursery.start_soon(
cancel_on_completion,
portal,
subactor,
errors
)
# This is a "soft" (cancellable) join/reap which
# will remote cancel the actor on a ``trio.Cancelled``
# condition.
await soft_kill(
proc,
trio.Process.wait, # XXX, uses `pidfd_open()` below.
portal
)
# cancel result waiter that may have been spawned in
# tandem if not done already
log.cancel(
'Cancelling portal result reaper task\n'
f'c)> {subactor.aid.reprol()!r}\n'
)
nursery.cancel_scope.cancel()
finally:
# XXX NOTE XXX: The "hard" reap since no actor zombies are
# allowed! Do this **after** cancellation/teardown to avoid
# killing the process too early.
if proc:
reap_repr: str = _pformat.nest_from_op(
input_op='>x)',
text=subactor.pformat(),
)
log.cancel(
f'Hard reap sequence starting for subactor\n'
f'{reap_repr}'
)
with trio.CancelScope(shield=True):
# don't clobber an ongoing pdb
if cancelled_during_spawn:
# Try again to avoid TTY clobbering.
async with debug.acquire_debug_lock(
subactor_uid=subactor.aid.uid
):
with trio.move_on_after(0.5):
await proc.wait()
await debug.maybe_wait_for_debugger(
child_in_debug=_runtime_vars.get(
'_debug_mode', False
),
header_msg=(
'Delaying subproc reaper while debugger locked..\n'
),
# TODO: need a diff value then default?
# poll_steps=9999999,
)
# TODO: solve the following issue where we need
# to do a similar wait like this but in an
# "intermediary" parent actor that itself isn't
# in debug but has a child that is, and we need
# to hold off on relaying SIGINT until that child
# is complete.
# https://github.com/goodboy/tractor/issues/320
# -[ ] we need to handle non-root parent-actors specially
# by somehow determining if a child is in debug and then
# avoiding cancel/kill of said child by this
# (intermediary) parent until such a time as the root says
# the pdb lock is released and we are good to tear down
# (our children)..
#
# -[ ] so maybe something like this where we try to
# acquire the lock and get notified of who has it,
# check that uid against our known children?
# this_uid: tuple[str, str] = current_actor().uid
# await debug.acquire_debug_lock(this_uid)
if proc.poll() is None:
log.cancel(f"Attempting to hard kill {proc}")
await hard_kill(proc)
log.debug(f"Joined {proc}")
else:
log.warning('Nursery cancelled before sub-proc started')
if not cancelled_during_spawn:
# pop child entry to indicate we no longer managing this
# subactor
actor_nursery._children.pop(subactor.aid.uid)
async def mp_proc(
name: str,
actor_nursery: ActorNursery, # type: ignore # noqa
subactor: Actor,
errors: dict[tuple[str, str], Exception],
# passed through to actor main
bind_addrs: list[UnwrappedAddress],
parent_addr: UnwrappedAddress,
_runtime_vars: dict[str, Any], # serialized and sent to _child
*,
infect_asyncio: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED,
proc_kwargs: dict[str, any] = {}
) -> None:
# uggh zone
try:
from multiprocessing import semaphore_tracker # type: ignore
resource_tracker = semaphore_tracker
resource_tracker._resource_tracker = resource_tracker._semaphore_tracker # noqa
except ImportError:
# 3.8 introduces a more general version that also tracks shared mems
from multiprocessing import resource_tracker # type: ignore
assert _ctx
start_method = _ctx.get_start_method()
if start_method == 'forkserver':
from multiprocessing import forkserver # type: ignore
# XXX do our hackery on the stdlib to avoid multiple
# forkservers (one at each subproc layer).
fs = forkserver._forkserver
curr_actor = current_actor()
if is_main_process() and not curr_actor._forkserver_info:
# if we're the "main" process start the forkserver
# only once and pass its ipc info to downstream
# children
# forkserver.set_forkserver_preload(enable_modules)
forkserver.ensure_running()
fs_info = (
fs._forkserver_address, # type: ignore # noqa
fs._forkserver_alive_fd, # type: ignore # noqa
getattr(fs, '_forkserver_pid', None),
getattr(
resource_tracker._resource_tracker, '_pid', None),
resource_tracker._resource_tracker._fd,
)
else: # request to forkerserver to fork a new child
assert curr_actor._forkserver_info
fs_info = (
fs._forkserver_address, # type: ignore # noqa
fs._forkserver_alive_fd, # type: ignore # noqa
fs._forkserver_pid, # type: ignore # noqa
resource_tracker._resource_tracker._pid,
resource_tracker._resource_tracker._fd,
) = curr_actor._forkserver_info
else:
# spawn method
fs_info = (None, None, None, None, None)
proc: mp.Process = _ctx.Process( # type: ignore
target=_mp_main,
args=(
subactor,
bind_addrs,
fs_info,
_spawn_method,
parent_addr,
infect_asyncio,
),
# daemon=True,
name=name,
)
# `multiprocessing` only (since no async interface):
# register the process before start in case we get a cancel
# request before the actor has fully spawned - then we can wait
# for it to fully come up before sending a cancel request
actor_nursery._children[subactor.aid.uid] = (subactor, proc, None)
proc.start()
if not proc.is_alive():
raise ActorFailure("Couldn't start sub-actor?")
log.runtime(f"Started {proc}")
ipc_server: _server.Server = actor_nursery._actor.ipc_server
try:
# wait for actor to spawn and connect back to us
# channel should have handshake completed by the
# local actor by the time we get a ref to it
event, chan = await ipc_server.wait_for_peer(
subactor.aid.uid,
)
# XXX: monkey patch poll API to match the ``subprocess`` API..
# not sure why they don't expose this but kk.
proc.poll = lambda: proc.exitcode # type: ignore
# except:
# TODO: in the case we were cancelled before the sub-proc
# registered itself back we must be sure to try and clean
# any process we may have started.
portal = Portal(chan)
actor_nursery._children[subactor.aid.uid] = (subactor, proc, portal)
# unblock parent task
task_status.started(portal)
# wait for ``ActorNursery`` block to signal that
# subprocesses can be waited upon.
# This is required to ensure synchronization
# with user code that may want to manually await results
# from nursery spawned sub-actors. We don't want the
# containing nurseries here to collect results or error
# while user code is still doing it's thing. Only after the
# nursery block closes do we allow subactor results to be
# awaited and reported upwards to the supervisor.
with trio.CancelScope(shield=True):
await actor_nursery._join_procs.wait()
async with trio.open_nursery() as nursery:
if portal in actor_nursery._cancel_after_result_on_exit:
nursery.start_soon(
cancel_on_completion,
portal,
subactor,
errors
)
# This is a "soft" (cancellable) join/reap which
# will remote cancel the actor on a ``trio.Cancelled``
# condition.
await soft_kill(
proc,
proc_waiter,
portal
)
# cancel result waiter that may have been spawned in
# tandem if not done already
log.warning(
"Cancelling existing result waiter task for "
f"{subactor.aid.uid}")
nursery.cancel_scope.cancel()
finally:
# hard reap sequence
if proc.is_alive():
log.cancel(f"Attempting to hard kill {proc}")
with trio.move_on_after(0.1) as cs:
cs.shield = True
await proc_waiter(proc)
if cs.cancelled_caught:
proc.terminate()
proc.join()
log.debug(f"Joined {proc}")
# pop child entry to indicate we are no longer managing subactor
actor_nursery._children.pop(subactor.aid.uid)
# TODO: prolly report to ``mypy`` how this causes all sorts of
# false errors..
# subactor, proc, portal = actor_nursery._children.pop(subactor.uid)
# proc spawning backend target map # proc spawning backend target map

View File

@ -0,0 +1,292 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
The `trio` subprocess spawning backend.
Spawns sub-actors as fresh OS processes driven by
`trio.lowlevel.open_process()` our default, cross-platform
spawn method.
'''
from __future__ import annotations
import sys
from typing import (
Any,
TYPE_CHECKING,
)
import trio
from trio import TaskStatus
from ..devx import (
debug,
pformat as _pformat,
)
from tractor.runtime._state import (
current_actor,
is_root_process,
debug_mode,
_runtime_vars,
)
from tractor.log import get_logger
from tractor.discovery._addr import UnwrappedAddress
from tractor.runtime._portal import Portal
from tractor.runtime._runtime import Actor
from tractor.msg import (
types as msgtypes,
pretty_struct,
)
from ._spawn import (
cancel_on_completion,
hard_kill,
soft_kill,
)
if TYPE_CHECKING:
from tractor.ipc import (
_server,
Channel,
)
from tractor.runtime._supervise import ActorNursery
log = get_logger('tractor')
async def trio_proc(
name: str,
actor_nursery: ActorNursery,
subactor: Actor,
errors: dict[tuple[str, str], Exception],
# passed through to actor main
bind_addrs: list[UnwrappedAddress],
parent_addr: UnwrappedAddress,
_runtime_vars: dict[str, Any], # serialized and sent to _child
*,
infect_asyncio: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED,
proc_kwargs: dict[str, any] = {}
) -> None:
'''
Create a new ``Process`` using a "spawn method" as (configured using
``try_set_start_method()``).
This routine should be started in a actor runtime task and the logic
here is to be considered the core supervision strategy.
'''
spawn_cmd = [
sys.executable,
"-m",
# Hardcode this (instead of using ``_child.__name__`` to avoid a
# double import warning: https://stackoverflow.com/a/45070583
"tractor._child",
# We provide the child's unique identifier on this exec/spawn
# line for debugging purposes when viewing the process tree from
# the OS; it otherwise can be passed via the parent channel if
# we prefer in the future (for privacy).
"--uid",
# TODO, how to pass this over "wire" encodings like
# cmdline args?
# -[ ] maybe we can add an `msgtypes.Aid.min_tuple()` ?
str(subactor.aid.uid),
# Address the child must connect to on startup
"--parent_addr",
str(parent_addr)
]
if subactor.loglevel:
spawn_cmd += [
"--loglevel",
subactor.loglevel
]
# Tell child to run in guest mode on top of ``asyncio`` loop
if infect_asyncio:
spawn_cmd.append("--asyncio")
cancelled_during_spawn: bool = False
proc: trio.Process|None = None
ipc_server: _server.Server = actor_nursery._actor.ipc_server
try:
try:
proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd, **proc_kwargs)
log.runtime(
f'Started new child subproc\n'
f'(>\n'
f' |_{proc}\n'
)
# wait for actor to spawn and connect back to us
# channel should have handshake completed by the
# local actor by the time we get a ref to it
event, chan = await ipc_server.wait_for_peer(
subactor.aid.uid
)
except trio.Cancelled:
cancelled_during_spawn = True
# we may cancel before the child connects back in which
# case avoid clobbering the pdb tty.
if debug_mode():
with trio.CancelScope(shield=True):
# don't clobber an ongoing pdb
if is_root_process():
await debug.maybe_wait_for_debugger()
elif proc is not None:
async with debug.acquire_debug_lock(
subactor_uid=subactor.aid.uid
):
# soft wait on the proc to terminate
with trio.move_on_after(0.5):
await proc.wait()
raise
# a sub-proc ref **must** exist now
assert proc
portal = Portal(chan)
actor_nursery._children[subactor.aid.uid] = (
subactor,
proc,
portal,
)
# send a "spawning specification" which configures the
# initial runtime state of the child.
sspec = msgtypes.SpawnSpec(
_parent_main_data=subactor._parent_main_data,
enable_modules=subactor.enable_modules,
reg_addrs=subactor.reg_addrs,
bind_addrs=bind_addrs,
_runtime_vars=_runtime_vars,
)
log.runtime(
f'Sending spawn spec to child\n'
f'{{}}=> {chan.aid.reprol()!r}\n'
f'\n'
f'{pretty_struct.pformat(sspec)}\n'
)
await chan.send(sspec)
# track subactor in current nursery
curr_actor: Actor = current_actor()
curr_actor._actoruid2nursery[subactor.aid.uid] = actor_nursery
# resume caller at next checkpoint now that child is up
task_status.started(portal)
# wait for ActorNursery.wait() to be called
with trio.CancelScope(shield=True):
await actor_nursery._join_procs.wait()
async with trio.open_nursery() as nursery:
if portal in actor_nursery._cancel_after_result_on_exit:
nursery.start_soon(
cancel_on_completion,
portal,
subactor,
errors
)
# This is a "soft" (cancellable) join/reap which
# will remote cancel the actor on a ``trio.Cancelled``
# condition.
await soft_kill(
proc,
trio.Process.wait, # XXX, uses `pidfd_open()` below.
portal
)
# cancel result waiter that may have been spawned in
# tandem if not done already
log.cancel(
'Cancelling portal result reaper task\n'
f'c)> {subactor.aid.reprol()!r}\n'
)
nursery.cancel_scope.cancel()
finally:
# XXX NOTE XXX: The "hard" reap since no actor zombies are
# allowed! Do this **after** cancellation/teardown to avoid
# killing the process too early.
if proc:
reap_repr: str = _pformat.nest_from_op(
input_op='>x)',
text=subactor.pformat(),
)
log.cancel(
f'Hard reap sequence starting for subactor\n'
f'{reap_repr}'
)
with trio.CancelScope(shield=True):
# don't clobber an ongoing pdb
if cancelled_during_spawn:
# Try again to avoid TTY clobbering.
async with debug.acquire_debug_lock(
subactor_uid=subactor.aid.uid
):
with trio.move_on_after(0.5):
await proc.wait()
await debug.maybe_wait_for_debugger(
child_in_debug=_runtime_vars.get(
'_debug_mode', False
),
header_msg=(
'Delaying subproc reaper while debugger locked..\n'
),
# TODO: need a diff value then default?
# poll_steps=9999999,
)
# TODO: solve the following issue where we need
# to do a similar wait like this but in an
# "intermediary" parent actor that itself isn't
# in debug but has a child that is, and we need
# to hold off on relaying SIGINT until that child
# is complete.
# https://github.com/goodboy/tractor/issues/320
# -[ ] we need to handle non-root parent-actors specially
# by somehow determining if a child is in debug and then
# avoiding cancel/kill of said child by this
# (intermediary) parent until such a time as the root says
# the pdb lock is released and we are good to tear down
# (our children)..
#
# -[ ] so maybe something like this where we try to
# acquire the lock and get notified of who has it,
# check that uid against our known children?
# this_uid: tuple[str, str] = current_actor().uid
# await debug.acquire_debug_lock(this_uid)
if proc.poll() is None:
log.cancel(f"Attempting to hard kill {proc}")
await hard_kill(proc)
log.debug(f"Joined {proc}")
else:
log.warning('Nursery cancelled before sub-proc started')
if not cancelled_during_spawn:
# pop child entry to indicate we no longer managing this
# subactor
actor_nursery._children.pop(subactor.aid.uid)