diff --git a/tractor/_testing/pytest.py b/tractor/_testing/pytest.py
index f843fb4c..1d803c9e 100644
--- a/tractor/_testing/pytest.py
+++ b/tractor/_testing/pytest.py
@@ -27,6 +27,7 @@ import inspect
import platform
from typing import (
Callable,
+ get_args,
)
import pytest
@@ -341,12 +342,11 @@ def pytest_generate_tests(
# XXX some weird windows bug with `pytest`?
spawn_backend = 'trio'
- # TODO: maybe just use the literal `._spawn.SpawnMethodKey`?
- assert spawn_backend in (
- 'mp_spawn',
- 'mp_forkserver',
- 'trio',
- )
+ # drive the valid-backend set from the canonical `Literal` so
+ # adding a new spawn backend (e.g. `'subint'`) doesn't require
+ # touching the harness.
+ from tractor.spawn._spawn import SpawnMethodKey
+ assert spawn_backend in get_args(SpawnMethodKey)
# NOTE: used-to-be-used-to dyanmically parametrize tests for when
# you just passed --spawn-backend=`mp` on the cli, but now we expect
diff --git a/tractor/spawn/__init__.py b/tractor/spawn/__init__.py
index 03f2b0f8..06ba413e 100644
--- a/tractor/spawn/__init__.py
+++ b/tractor/spawn/__init__.py
@@ -15,12 +15,31 @@
# along with this program. If not, see .
'''
-Actor process spawning machinery using
-multiple backends (trio, multiprocessing).
+Actor process spawning machinery using multiple backends.
-NOTE: to avoid circular imports, this ``__init__``
-does NOT eagerly import submodules. Use direct
-module paths like ``tractor.spawn._spawn`` or
-``tractor.spawn._entry`` instead.
+Layout
+------
+- `._spawn`: the "core" supervisor machinery — spawn-method
+ registry (`SpawnMethodKey`, `_methods`, `_spawn_method`,
+ `_ctx`, `try_set_start_method`), the `new_proc` dispatcher,
+ and the cross-backend helpers `exhaust_portal`,
+ `cancel_on_completion`, `hard_kill`, `soft_kill`,
+ `proc_waiter`.
+
+Per-backend submodules (each exposes a single `*_proc()`
+coroutine registered in `_spawn._methods`):
+
+- `._trio`: the `trio`-native subprocess backend (default,
+ all platforms), spawns via `trio.lowlevel.open_process()`.
+- `._mp`: the stdlib `multiprocessing` backends —
+ `'mp_spawn'` and `'mp_forkserver'` variants — driven by
+ the `mp.context` bound to `_spawn._ctx`.
+
+Entry-point helpers live in `._entry`/`._mp_fixup_main`/
+`._forkserver_override`.
+
+NOTE: to avoid circular imports, this ``__init__`` does NOT
+eagerly import submodules. Use direct module paths like
+``tractor.spawn._spawn`` or ``tractor.spawn._trio`` instead.
'''
diff --git a/tractor/spawn/_mp.py b/tractor/spawn/_mp.py
new file mode 100644
index 00000000..addc8996
--- /dev/null
+++ b/tractor/spawn/_mp.py
@@ -0,0 +1,235 @@
+# tractor: structured concurrent "actors".
+# Copyright 2018-eternity Tyler Goodlet.
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+'''
+The `multiprocessing` subprocess spawning backends (`spawn`
+and `forkserver` variants).
+
+Driven by the stdlib `multiprocessing` context selected via
+`try_set_start_method()` in the `_spawn` core module, which
+sets the module-global `_ctx` and `_spawn_method` read here.
+
+'''
+from __future__ import annotations
+import multiprocessing as mp
+from typing import (
+ Any,
+ TYPE_CHECKING,
+)
+
+import trio
+from trio import TaskStatus
+
+from tractor.runtime._state import (
+ current_actor,
+ is_main_process,
+)
+from tractor.log import get_logger
+from tractor.discovery._addr import UnwrappedAddress
+from tractor.runtime._portal import Portal
+from tractor.runtime._runtime import Actor
+from tractor._exceptions import ActorFailure
+from ._entry import _mp_main
+# NOTE: module-import (not from-import) so we dynamically see
+# the *current* `_ctx` / `_spawn_method` values, which are mutated
+# by `try_set_start_method()` after module load time.
+from . import _spawn
+from ._spawn import (
+ cancel_on_completion,
+ proc_waiter,
+ soft_kill,
+)
+
+
+if TYPE_CHECKING:
+ from tractor.ipc import (
+ _server,
+ Channel,
+ )
+ from tractor.runtime._supervise import ActorNursery
+
+
+log = get_logger('tractor')
+
+
+async def mp_proc(
+ name: str,
+ actor_nursery: ActorNursery, # type: ignore # noqa
+ subactor: Actor,
+ errors: dict[tuple[str, str], Exception],
+ # passed through to actor main
+ bind_addrs: list[UnwrappedAddress],
+ parent_addr: UnwrappedAddress,
+ _runtime_vars: dict[str, Any], # serialized and sent to _child
+ *,
+ infect_asyncio: bool = False,
+ task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED,
+ proc_kwargs: dict[str, any] = {}
+
+) -> None:
+
+ # uggh zone
+ try:
+ from multiprocessing import semaphore_tracker # type: ignore
+ resource_tracker = semaphore_tracker
+ resource_tracker._resource_tracker = resource_tracker._semaphore_tracker # noqa
+ except ImportError:
+ # 3.8 introduces a more general version that also tracks shared mems
+ from multiprocessing import resource_tracker # type: ignore
+
+ assert _spawn._ctx
+ start_method = _spawn._ctx.get_start_method()
+ if start_method == 'forkserver':
+
+ from multiprocessing import forkserver # type: ignore
+ # XXX do our hackery on the stdlib to avoid multiple
+ # forkservers (one at each subproc layer).
+ fs = forkserver._forkserver
+ curr_actor = current_actor()
+ if is_main_process() and not curr_actor._forkserver_info:
+ # if we're the "main" process start the forkserver
+ # only once and pass its ipc info to downstream
+ # children
+ # forkserver.set_forkserver_preload(enable_modules)
+ forkserver.ensure_running()
+ fs_info = (
+ fs._forkserver_address, # type: ignore # noqa
+ fs._forkserver_alive_fd, # type: ignore # noqa
+ getattr(fs, '_forkserver_pid', None),
+ getattr(
+ resource_tracker._resource_tracker, '_pid', None),
+ resource_tracker._resource_tracker._fd,
+ )
+ else: # request to forkerserver to fork a new child
+ assert curr_actor._forkserver_info
+ fs_info = (
+ fs._forkserver_address, # type: ignore # noqa
+ fs._forkserver_alive_fd, # type: ignore # noqa
+ fs._forkserver_pid, # type: ignore # noqa
+ resource_tracker._resource_tracker._pid,
+ resource_tracker._resource_tracker._fd,
+ ) = curr_actor._forkserver_info
+ else:
+ # spawn method
+ fs_info = (None, None, None, None, None)
+
+ proc: mp.Process = _spawn._ctx.Process( # type: ignore
+ target=_mp_main,
+ args=(
+ subactor,
+ bind_addrs,
+ fs_info,
+ _spawn._spawn_method,
+ parent_addr,
+ infect_asyncio,
+ ),
+ # daemon=True,
+ name=name,
+ )
+
+ # `multiprocessing` only (since no async interface):
+ # register the process before start in case we get a cancel
+ # request before the actor has fully spawned - then we can wait
+ # for it to fully come up before sending a cancel request
+ actor_nursery._children[subactor.aid.uid] = (subactor, proc, None)
+
+ proc.start()
+ if not proc.is_alive():
+ raise ActorFailure("Couldn't start sub-actor?")
+
+ log.runtime(f"Started {proc}")
+
+ ipc_server: _server.Server = actor_nursery._actor.ipc_server
+ try:
+ # wait for actor to spawn and connect back to us
+ # channel should have handshake completed by the
+ # local actor by the time we get a ref to it
+ event, chan = await ipc_server.wait_for_peer(
+ subactor.aid.uid,
+ )
+
+ # XXX: monkey patch poll API to match the ``subprocess`` API..
+ # not sure why they don't expose this but kk.
+ proc.poll = lambda: proc.exitcode # type: ignore
+
+ # except:
+ # TODO: in the case we were cancelled before the sub-proc
+ # registered itself back we must be sure to try and clean
+ # any process we may have started.
+
+ portal = Portal(chan)
+ actor_nursery._children[subactor.aid.uid] = (subactor, proc, portal)
+
+ # unblock parent task
+ task_status.started(portal)
+
+ # wait for ``ActorNursery`` block to signal that
+ # subprocesses can be waited upon.
+ # This is required to ensure synchronization
+ # with user code that may want to manually await results
+ # from nursery spawned sub-actors. We don't want the
+ # containing nurseries here to collect results or error
+ # while user code is still doing it's thing. Only after the
+ # nursery block closes do we allow subactor results to be
+ # awaited and reported upwards to the supervisor.
+ with trio.CancelScope(shield=True):
+ await actor_nursery._join_procs.wait()
+
+ async with trio.open_nursery() as nursery:
+ if portal in actor_nursery._cancel_after_result_on_exit:
+ nursery.start_soon(
+ cancel_on_completion,
+ portal,
+ subactor,
+ errors
+ )
+
+ # This is a "soft" (cancellable) join/reap which
+ # will remote cancel the actor on a ``trio.Cancelled``
+ # condition.
+ await soft_kill(
+ proc,
+ proc_waiter,
+ portal
+ )
+
+ # cancel result waiter that may have been spawned in
+ # tandem if not done already
+ log.warning(
+ "Cancelling existing result waiter task for "
+ f"{subactor.aid.uid}")
+ nursery.cancel_scope.cancel()
+
+ finally:
+ # hard reap sequence
+ if proc.is_alive():
+ log.cancel(f"Attempting to hard kill {proc}")
+ with trio.move_on_after(0.1) as cs:
+ cs.shield = True
+ await proc_waiter(proc)
+
+ if cs.cancelled_caught:
+ proc.terminate()
+
+ proc.join()
+ log.debug(f"Joined {proc}")
+
+ # pop child entry to indicate we are no longer managing subactor
+ actor_nursery._children.pop(subactor.aid.uid)
+
+ # TODO: prolly report to ``mypy`` how this causes all sorts of
+ # false errors..
+ # subactor, proc, portal = actor_nursery._children.pop(subactor.uid)
diff --git a/tractor/spawn/_spawn.py b/tractor/spawn/_spawn.py
index 9d89648c..d040813a 100644
--- a/tractor/spawn/_spawn.py
+++ b/tractor/spawn/_spawn.py
@@ -20,7 +20,6 @@ Machinery for actor process spawning using multiple backends.
"""
from __future__ import annotations
import multiprocessing as mp
-import sys
import platform
from typing import (
Any,
@@ -34,14 +33,8 @@ from typing import (
import trio
from trio import TaskStatus
-from ..devx import (
- debug,
- pformat as _pformat
-)
+from ..devx import debug
from tractor.runtime._state import (
- current_actor,
- is_main_process,
- is_root_process,
debug_mode,
_runtime_vars,
)
@@ -49,12 +42,7 @@ from tractor.log import get_logger
from tractor.discovery._addr import UnwrappedAddress
from tractor.runtime._portal import Portal
from tractor.runtime._runtime import Actor
-from ._entry import _mp_main
-from tractor._exceptions import ActorFailure
-from tractor.msg import (
- types as msgtypes,
- pretty_struct,
-)
+from tractor.msg import types as msgtypes
if TYPE_CHECKING:
@@ -445,398 +433,11 @@ async def new_proc(
)
-async def trio_proc(
- name: str,
- actor_nursery: ActorNursery,
- subactor: Actor,
- errors: dict[tuple[str, str], Exception],
-
- # passed through to actor main
- bind_addrs: list[UnwrappedAddress],
- parent_addr: UnwrappedAddress,
- _runtime_vars: dict[str, Any], # serialized and sent to _child
- *,
- infect_asyncio: bool = False,
- task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED,
- proc_kwargs: dict[str, any] = {}
-
-) -> None:
- '''
- Create a new ``Process`` using a "spawn method" as (configured using
- ``try_set_start_method()``).
-
- This routine should be started in a actor runtime task and the logic
- here is to be considered the core supervision strategy.
-
- '''
- spawn_cmd = [
- sys.executable,
- "-m",
- # Hardcode this (instead of using ``_child.__name__`` to avoid a
- # double import warning: https://stackoverflow.com/a/45070583
- "tractor._child",
- # We provide the child's unique identifier on this exec/spawn
- # line for debugging purposes when viewing the process tree from
- # the OS; it otherwise can be passed via the parent channel if
- # we prefer in the future (for privacy).
- "--uid",
- # TODO, how to pass this over "wire" encodings like
- # cmdline args?
- # -[ ] maybe we can add an `msgtypes.Aid.min_tuple()` ?
- str(subactor.aid.uid),
- # Address the child must connect to on startup
- "--parent_addr",
- str(parent_addr)
- ]
-
- if subactor.loglevel:
- spawn_cmd += [
- "--loglevel",
- subactor.loglevel
- ]
- # Tell child to run in guest mode on top of ``asyncio`` loop
- if infect_asyncio:
- spawn_cmd.append("--asyncio")
-
- cancelled_during_spawn: bool = False
- proc: trio.Process|None = None
- ipc_server: _server.Server = actor_nursery._actor.ipc_server
- try:
- try:
- proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd, **proc_kwargs)
- log.runtime(
- f'Started new child subproc\n'
- f'(>\n'
- f' |_{proc}\n'
- )
-
- # wait for actor to spawn and connect back to us
- # channel should have handshake completed by the
- # local actor by the time we get a ref to it
- event, chan = await ipc_server.wait_for_peer(
- subactor.aid.uid
- )
-
- except trio.Cancelled:
- cancelled_during_spawn = True
- # we may cancel before the child connects back in which
- # case avoid clobbering the pdb tty.
- if debug_mode():
- with trio.CancelScope(shield=True):
- # don't clobber an ongoing pdb
- if is_root_process():
- await debug.maybe_wait_for_debugger()
-
- elif proc is not None:
- async with debug.acquire_debug_lock(
- subactor_uid=subactor.aid.uid
- ):
- # soft wait on the proc to terminate
- with trio.move_on_after(0.5):
- await proc.wait()
- raise
-
- # a sub-proc ref **must** exist now
- assert proc
-
- portal = Portal(chan)
- actor_nursery._children[subactor.aid.uid] = (
- subactor,
- proc,
- portal,
- )
-
- # send a "spawning specification" which configures the
- # initial runtime state of the child.
- sspec = msgtypes.SpawnSpec(
- _parent_main_data=subactor._parent_main_data,
- enable_modules=subactor.enable_modules,
- reg_addrs=subactor.reg_addrs,
- bind_addrs=bind_addrs,
- _runtime_vars=_runtime_vars,
- )
- log.runtime(
- f'Sending spawn spec to child\n'
- f'{{}}=> {chan.aid.reprol()!r}\n'
- f'\n'
- f'{pretty_struct.pformat(sspec)}\n'
- )
- await chan.send(sspec)
-
- # track subactor in current nursery
- curr_actor: Actor = current_actor()
- curr_actor._actoruid2nursery[subactor.aid.uid] = actor_nursery
-
- # resume caller at next checkpoint now that child is up
- task_status.started(portal)
-
- # wait for ActorNursery.wait() to be called
- with trio.CancelScope(shield=True):
- await actor_nursery._join_procs.wait()
-
- async with trio.open_nursery() as nursery:
- if portal in actor_nursery._cancel_after_result_on_exit:
- nursery.start_soon(
- cancel_on_completion,
- portal,
- subactor,
- errors
- )
-
- # This is a "soft" (cancellable) join/reap which
- # will remote cancel the actor on a ``trio.Cancelled``
- # condition.
- await soft_kill(
- proc,
- trio.Process.wait, # XXX, uses `pidfd_open()` below.
- portal
- )
-
- # cancel result waiter that may have been spawned in
- # tandem if not done already
- log.cancel(
- 'Cancelling portal result reaper task\n'
- f'c)> {subactor.aid.reprol()!r}\n'
- )
- nursery.cancel_scope.cancel()
-
- finally:
- # XXX NOTE XXX: The "hard" reap since no actor zombies are
- # allowed! Do this **after** cancellation/teardown to avoid
- # killing the process too early.
- if proc:
- reap_repr: str = _pformat.nest_from_op(
- input_op='>x)',
- text=subactor.pformat(),
- )
- log.cancel(
- f'Hard reap sequence starting for subactor\n'
- f'{reap_repr}'
- )
-
- with trio.CancelScope(shield=True):
- # don't clobber an ongoing pdb
- if cancelled_during_spawn:
- # Try again to avoid TTY clobbering.
- async with debug.acquire_debug_lock(
- subactor_uid=subactor.aid.uid
- ):
- with trio.move_on_after(0.5):
- await proc.wait()
-
- await debug.maybe_wait_for_debugger(
- child_in_debug=_runtime_vars.get(
- '_debug_mode', False
- ),
- header_msg=(
- 'Delaying subproc reaper while debugger locked..\n'
- ),
-
- # TODO: need a diff value then default?
- # poll_steps=9999999,
- )
- # TODO: solve the following issue where we need
- # to do a similar wait like this but in an
- # "intermediary" parent actor that itself isn't
- # in debug but has a child that is, and we need
- # to hold off on relaying SIGINT until that child
- # is complete.
- # https://github.com/goodboy/tractor/issues/320
- # -[ ] we need to handle non-root parent-actors specially
- # by somehow determining if a child is in debug and then
- # avoiding cancel/kill of said child by this
- # (intermediary) parent until such a time as the root says
- # the pdb lock is released and we are good to tear down
- # (our children)..
- #
- # -[ ] so maybe something like this where we try to
- # acquire the lock and get notified of who has it,
- # check that uid against our known children?
- # this_uid: tuple[str, str] = current_actor().uid
- # await debug.acquire_debug_lock(this_uid)
-
- if proc.poll() is None:
- log.cancel(f"Attempting to hard kill {proc}")
- await hard_kill(proc)
-
- log.debug(f"Joined {proc}")
- else:
- log.warning('Nursery cancelled before sub-proc started')
-
- if not cancelled_during_spawn:
- # pop child entry to indicate we no longer managing this
- # subactor
- actor_nursery._children.pop(subactor.aid.uid)
-
-
-async def mp_proc(
- name: str,
- actor_nursery: ActorNursery, # type: ignore # noqa
- subactor: Actor,
- errors: dict[tuple[str, str], Exception],
- # passed through to actor main
- bind_addrs: list[UnwrappedAddress],
- parent_addr: UnwrappedAddress,
- _runtime_vars: dict[str, Any], # serialized and sent to _child
- *,
- infect_asyncio: bool = False,
- task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED,
- proc_kwargs: dict[str, any] = {}
-
-) -> None:
-
- # uggh zone
- try:
- from multiprocessing import semaphore_tracker # type: ignore
- resource_tracker = semaphore_tracker
- resource_tracker._resource_tracker = resource_tracker._semaphore_tracker # noqa
- except ImportError:
- # 3.8 introduces a more general version that also tracks shared mems
- from multiprocessing import resource_tracker # type: ignore
-
- assert _ctx
- start_method = _ctx.get_start_method()
- if start_method == 'forkserver':
-
- from multiprocessing import forkserver # type: ignore
- # XXX do our hackery on the stdlib to avoid multiple
- # forkservers (one at each subproc layer).
- fs = forkserver._forkserver
- curr_actor = current_actor()
- if is_main_process() and not curr_actor._forkserver_info:
- # if we're the "main" process start the forkserver
- # only once and pass its ipc info to downstream
- # children
- # forkserver.set_forkserver_preload(enable_modules)
- forkserver.ensure_running()
- fs_info = (
- fs._forkserver_address, # type: ignore # noqa
- fs._forkserver_alive_fd, # type: ignore # noqa
- getattr(fs, '_forkserver_pid', None),
- getattr(
- resource_tracker._resource_tracker, '_pid', None),
- resource_tracker._resource_tracker._fd,
- )
- else: # request to forkerserver to fork a new child
- assert curr_actor._forkserver_info
- fs_info = (
- fs._forkserver_address, # type: ignore # noqa
- fs._forkserver_alive_fd, # type: ignore # noqa
- fs._forkserver_pid, # type: ignore # noqa
- resource_tracker._resource_tracker._pid,
- resource_tracker._resource_tracker._fd,
- ) = curr_actor._forkserver_info
- else:
- # spawn method
- fs_info = (None, None, None, None, None)
-
- proc: mp.Process = _ctx.Process( # type: ignore
- target=_mp_main,
- args=(
- subactor,
- bind_addrs,
- fs_info,
- _spawn_method,
- parent_addr,
- infect_asyncio,
- ),
- # daemon=True,
- name=name,
- )
-
- # `multiprocessing` only (since no async interface):
- # register the process before start in case we get a cancel
- # request before the actor has fully spawned - then we can wait
- # for it to fully come up before sending a cancel request
- actor_nursery._children[subactor.aid.uid] = (subactor, proc, None)
-
- proc.start()
- if not proc.is_alive():
- raise ActorFailure("Couldn't start sub-actor?")
-
- log.runtime(f"Started {proc}")
-
- ipc_server: _server.Server = actor_nursery._actor.ipc_server
- try:
- # wait for actor to spawn and connect back to us
- # channel should have handshake completed by the
- # local actor by the time we get a ref to it
- event, chan = await ipc_server.wait_for_peer(
- subactor.aid.uid,
- )
-
- # XXX: monkey patch poll API to match the ``subprocess`` API..
- # not sure why they don't expose this but kk.
- proc.poll = lambda: proc.exitcode # type: ignore
-
- # except:
- # TODO: in the case we were cancelled before the sub-proc
- # registered itself back we must be sure to try and clean
- # any process we may have started.
-
- portal = Portal(chan)
- actor_nursery._children[subactor.aid.uid] = (subactor, proc, portal)
-
- # unblock parent task
- task_status.started(portal)
-
- # wait for ``ActorNursery`` block to signal that
- # subprocesses can be waited upon.
- # This is required to ensure synchronization
- # with user code that may want to manually await results
- # from nursery spawned sub-actors. We don't want the
- # containing nurseries here to collect results or error
- # while user code is still doing it's thing. Only after the
- # nursery block closes do we allow subactor results to be
- # awaited and reported upwards to the supervisor.
- with trio.CancelScope(shield=True):
- await actor_nursery._join_procs.wait()
-
- async with trio.open_nursery() as nursery:
- if portal in actor_nursery._cancel_after_result_on_exit:
- nursery.start_soon(
- cancel_on_completion,
- portal,
- subactor,
- errors
- )
-
- # This is a "soft" (cancellable) join/reap which
- # will remote cancel the actor on a ``trio.Cancelled``
- # condition.
- await soft_kill(
- proc,
- proc_waiter,
- portal
- )
-
- # cancel result waiter that may have been spawned in
- # tandem if not done already
- log.warning(
- "Cancelling existing result waiter task for "
- f"{subactor.aid.uid}")
- nursery.cancel_scope.cancel()
-
- finally:
- # hard reap sequence
- if proc.is_alive():
- log.cancel(f"Attempting to hard kill {proc}")
- with trio.move_on_after(0.1) as cs:
- cs.shield = True
- await proc_waiter(proc)
-
- if cs.cancelled_caught:
- proc.terminate()
-
- proc.join()
- log.debug(f"Joined {proc}")
-
- # pop child entry to indicate we are no longer managing subactor
- actor_nursery._children.pop(subactor.aid.uid)
-
- # TODO: prolly report to ``mypy`` how this causes all sorts of
- # false errors..
- # subactor, proc, portal = actor_nursery._children.pop(subactor.uid)
+# NOTE: bottom-of-module to avoid a circular import since the
+# backend submodules pull `cancel_on_completion`/`soft_kill`/
+# `hard_kill`/`proc_waiter` from this module.
+from ._trio import trio_proc
+from ._mp import mp_proc
# proc spawning backend target map
diff --git a/tractor/spawn/_trio.py b/tractor/spawn/_trio.py
new file mode 100644
index 00000000..a79d1c8d
--- /dev/null
+++ b/tractor/spawn/_trio.py
@@ -0,0 +1,292 @@
+# tractor: structured concurrent "actors".
+# Copyright 2018-eternity Tyler Goodlet.
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+'''
+The `trio` subprocess spawning backend.
+
+Spawns sub-actors as fresh OS processes driven by
+`trio.lowlevel.open_process()` — our default, cross-platform
+spawn method.
+
+'''
+from __future__ import annotations
+import sys
+from typing import (
+ Any,
+ TYPE_CHECKING,
+)
+
+import trio
+from trio import TaskStatus
+
+from ..devx import (
+ debug,
+ pformat as _pformat,
+)
+from tractor.runtime._state import (
+ current_actor,
+ is_root_process,
+ debug_mode,
+ _runtime_vars,
+)
+from tractor.log import get_logger
+from tractor.discovery._addr import UnwrappedAddress
+from tractor.runtime._portal import Portal
+from tractor.runtime._runtime import Actor
+from tractor.msg import (
+ types as msgtypes,
+ pretty_struct,
+)
+from ._spawn import (
+ cancel_on_completion,
+ hard_kill,
+ soft_kill,
+)
+
+
+if TYPE_CHECKING:
+ from tractor.ipc import (
+ _server,
+ Channel,
+ )
+ from tractor.runtime._supervise import ActorNursery
+
+
+log = get_logger('tractor')
+
+
+async def trio_proc(
+ name: str,
+ actor_nursery: ActorNursery,
+ subactor: Actor,
+ errors: dict[tuple[str, str], Exception],
+
+ # passed through to actor main
+ bind_addrs: list[UnwrappedAddress],
+ parent_addr: UnwrappedAddress,
+ _runtime_vars: dict[str, Any], # serialized and sent to _child
+ *,
+ infect_asyncio: bool = False,
+ task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED,
+ proc_kwargs: dict[str, any] = {}
+
+) -> None:
+ '''
+ Create a new ``Process`` using a "spawn method" as (configured using
+ ``try_set_start_method()``).
+
+ This routine should be started in a actor runtime task and the logic
+ here is to be considered the core supervision strategy.
+
+ '''
+ spawn_cmd = [
+ sys.executable,
+ "-m",
+ # Hardcode this (instead of using ``_child.__name__`` to avoid a
+ # double import warning: https://stackoverflow.com/a/45070583
+ "tractor._child",
+ # We provide the child's unique identifier on this exec/spawn
+ # line for debugging purposes when viewing the process tree from
+ # the OS; it otherwise can be passed via the parent channel if
+ # we prefer in the future (for privacy).
+ "--uid",
+ # TODO, how to pass this over "wire" encodings like
+ # cmdline args?
+ # -[ ] maybe we can add an `msgtypes.Aid.min_tuple()` ?
+ str(subactor.aid.uid),
+ # Address the child must connect to on startup
+ "--parent_addr",
+ str(parent_addr)
+ ]
+
+ if subactor.loglevel:
+ spawn_cmd += [
+ "--loglevel",
+ subactor.loglevel
+ ]
+ # Tell child to run in guest mode on top of ``asyncio`` loop
+ if infect_asyncio:
+ spawn_cmd.append("--asyncio")
+
+ cancelled_during_spawn: bool = False
+ proc: trio.Process|None = None
+ ipc_server: _server.Server = actor_nursery._actor.ipc_server
+ try:
+ try:
+ proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd, **proc_kwargs)
+ log.runtime(
+ f'Started new child subproc\n'
+ f'(>\n'
+ f' |_{proc}\n'
+ )
+
+ # wait for actor to spawn and connect back to us
+ # channel should have handshake completed by the
+ # local actor by the time we get a ref to it
+ event, chan = await ipc_server.wait_for_peer(
+ subactor.aid.uid
+ )
+
+ except trio.Cancelled:
+ cancelled_during_spawn = True
+ # we may cancel before the child connects back in which
+ # case avoid clobbering the pdb tty.
+ if debug_mode():
+ with trio.CancelScope(shield=True):
+ # don't clobber an ongoing pdb
+ if is_root_process():
+ await debug.maybe_wait_for_debugger()
+
+ elif proc is not None:
+ async with debug.acquire_debug_lock(
+ subactor_uid=subactor.aid.uid
+ ):
+ # soft wait on the proc to terminate
+ with trio.move_on_after(0.5):
+ await proc.wait()
+ raise
+
+ # a sub-proc ref **must** exist now
+ assert proc
+
+ portal = Portal(chan)
+ actor_nursery._children[subactor.aid.uid] = (
+ subactor,
+ proc,
+ portal,
+ )
+
+ # send a "spawning specification" which configures the
+ # initial runtime state of the child.
+ sspec = msgtypes.SpawnSpec(
+ _parent_main_data=subactor._parent_main_data,
+ enable_modules=subactor.enable_modules,
+ reg_addrs=subactor.reg_addrs,
+ bind_addrs=bind_addrs,
+ _runtime_vars=_runtime_vars,
+ )
+ log.runtime(
+ f'Sending spawn spec to child\n'
+ f'{{}}=> {chan.aid.reprol()!r}\n'
+ f'\n'
+ f'{pretty_struct.pformat(sspec)}\n'
+ )
+ await chan.send(sspec)
+
+ # track subactor in current nursery
+ curr_actor: Actor = current_actor()
+ curr_actor._actoruid2nursery[subactor.aid.uid] = actor_nursery
+
+ # resume caller at next checkpoint now that child is up
+ task_status.started(portal)
+
+ # wait for ActorNursery.wait() to be called
+ with trio.CancelScope(shield=True):
+ await actor_nursery._join_procs.wait()
+
+ async with trio.open_nursery() as nursery:
+ if portal in actor_nursery._cancel_after_result_on_exit:
+ nursery.start_soon(
+ cancel_on_completion,
+ portal,
+ subactor,
+ errors
+ )
+
+ # This is a "soft" (cancellable) join/reap which
+ # will remote cancel the actor on a ``trio.Cancelled``
+ # condition.
+ await soft_kill(
+ proc,
+ trio.Process.wait, # XXX, uses `pidfd_open()` below.
+ portal
+ )
+
+ # cancel result waiter that may have been spawned in
+ # tandem if not done already
+ log.cancel(
+ 'Cancelling portal result reaper task\n'
+ f'c)> {subactor.aid.reprol()!r}\n'
+ )
+ nursery.cancel_scope.cancel()
+
+ finally:
+ # XXX NOTE XXX: The "hard" reap since no actor zombies are
+ # allowed! Do this **after** cancellation/teardown to avoid
+ # killing the process too early.
+ if proc:
+ reap_repr: str = _pformat.nest_from_op(
+ input_op='>x)',
+ text=subactor.pformat(),
+ )
+ log.cancel(
+ f'Hard reap sequence starting for subactor\n'
+ f'{reap_repr}'
+ )
+
+ with trio.CancelScope(shield=True):
+ # don't clobber an ongoing pdb
+ if cancelled_during_spawn:
+ # Try again to avoid TTY clobbering.
+ async with debug.acquire_debug_lock(
+ subactor_uid=subactor.aid.uid
+ ):
+ with trio.move_on_after(0.5):
+ await proc.wait()
+
+ await debug.maybe_wait_for_debugger(
+ child_in_debug=_runtime_vars.get(
+ '_debug_mode', False
+ ),
+ header_msg=(
+ 'Delaying subproc reaper while debugger locked..\n'
+ ),
+
+ # TODO: need a diff value then default?
+ # poll_steps=9999999,
+ )
+ # TODO: solve the following issue where we need
+ # to do a similar wait like this but in an
+ # "intermediary" parent actor that itself isn't
+ # in debug but has a child that is, and we need
+ # to hold off on relaying SIGINT until that child
+ # is complete.
+ # https://github.com/goodboy/tractor/issues/320
+ # -[ ] we need to handle non-root parent-actors specially
+ # by somehow determining if a child is in debug and then
+ # avoiding cancel/kill of said child by this
+ # (intermediary) parent until such a time as the root says
+ # the pdb lock is released and we are good to tear down
+ # (our children)..
+ #
+ # -[ ] so maybe something like this where we try to
+ # acquire the lock and get notified of who has it,
+ # check that uid against our known children?
+ # this_uid: tuple[str, str] = current_actor().uid
+ # await debug.acquire_debug_lock(this_uid)
+
+ if proc.poll() is None:
+ log.cancel(f"Attempting to hard kill {proc}")
+ await hard_kill(proc)
+
+ log.debug(f"Joined {proc}")
+ else:
+ log.warning('Nursery cancelled before sub-proc started')
+
+ if not cancelled_during_spawn:
+ # pop child entry to indicate we no longer managing this
+ # subactor
+ actor_nursery._children.pop(subactor.aid.uid)