Simplify parent-main replay opt-out.
Keep actor-owned parent-main capture and let `_mp_figure_out_main()` decide whether to return `__main__` bootstrap data, avoiding the extra SpawnSpec plumbing while preserving the per-actor flag.subint_spawner_backend
parent
6309c2e6fc
commit
83b6c4270a
|
|
@ -218,7 +218,11 @@ def test_start_actor_can_skip_parent_main_replay(
|
|||
monkeypatch.setattr(
|
||||
_mp_fixup_main,
|
||||
'_mp_figure_out_main',
|
||||
lambda: {'init_main_from_name': __name__},
|
||||
lambda replay_parent_main=True: (
|
||||
{'init_main_from_name': __name__}
|
||||
if replay_parent_main
|
||||
else {}
|
||||
),
|
||||
)
|
||||
|
||||
async def main() -> None:
|
||||
|
|
|
|||
|
|
@ -217,6 +217,8 @@ class Actor:
|
|||
'''
|
||||
return self._ipc_server
|
||||
|
||||
# Information about `__main__` from parent
|
||||
_parent_main_data: dict[str, str]
|
||||
_parent_chan_cs: CancelScope|None = None
|
||||
_spawn_spec: msgtypes.SpawnSpec|None = None
|
||||
|
||||
|
|
@ -242,6 +244,7 @@ class Actor:
|
|||
loglevel: str|None = None,
|
||||
registry_addrs: list[Address]|None = None,
|
||||
spawn_method: str|None = None,
|
||||
replay_parent_main: bool = True,
|
||||
|
||||
arbiter_addr: UnwrappedAddress|None = None,
|
||||
|
||||
|
|
@ -263,6 +266,12 @@ class Actor:
|
|||
self._cancel_called_by: tuple[str, tuple]|None = None
|
||||
self._cancel_called: bool = False
|
||||
|
||||
# retrieve and store parent `__main__` data which
|
||||
# will be passed to children
|
||||
self._parent_main_data = _mp_fixup_main._mp_figure_out_main(
|
||||
replay_parent_main,
|
||||
)
|
||||
|
||||
# TODO? only add this when `is_debug_mode() == True` no?
|
||||
# always include debugging tools module
|
||||
if _state.is_root_process():
|
||||
|
|
@ -529,7 +538,6 @@ class Actor:
|
|||
|
||||
def load_modules(
|
||||
self,
|
||||
parent_main_data: dict[str, str]|None = None,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
|
|
@ -543,20 +551,13 @@ class Actor:
|
|||
'''
|
||||
try:
|
||||
if self._spawn_method == 'trio':
|
||||
if (
|
||||
parent_main_data is not None
|
||||
and
|
||||
'init_main_from_name' in parent_main_data
|
||||
):
|
||||
parent_data = self._parent_main_data
|
||||
if 'init_main_from_name' in parent_data:
|
||||
_mp_fixup_main._fixup_main_from_name(
|
||||
parent_main_data['init_main_from_name'])
|
||||
elif (
|
||||
parent_main_data is not None
|
||||
and
|
||||
'init_main_from_path' in parent_main_data
|
||||
):
|
||||
parent_data['init_main_from_name'])
|
||||
elif 'init_main_from_path' in parent_data:
|
||||
_mp_fixup_main._fixup_main_from_path(
|
||||
parent_main_data['init_main_from_path'])
|
||||
parent_data['init_main_from_path'])
|
||||
|
||||
status: str = 'Attempting to import enabled modules:\n'
|
||||
|
||||
|
|
@ -842,7 +843,6 @@ class Actor:
|
|||
Channel,
|
||||
list[UnwrappedAddress]|None,
|
||||
list[str]|None, # preferred tpts
|
||||
dict[str, str]|None,
|
||||
]:
|
||||
'''
|
||||
Bootstrap this local actor's runtime config from its parent by
|
||||
|
|
@ -863,7 +863,6 @@ class Actor:
|
|||
await chan._do_handshake(aid=self.aid)
|
||||
|
||||
accept_addrs: list[UnwrappedAddress]|None = None
|
||||
parent_main_data: dict[str, str]|None = None
|
||||
|
||||
if self._spawn_method == "trio":
|
||||
|
||||
|
|
@ -1024,13 +1023,12 @@ class Actor:
|
|||
spawnspec.enable_modules
|
||||
)
|
||||
|
||||
parent_main_data = spawnspec._parent_main_data
|
||||
self._parent_main_data = spawnspec._parent_main_data
|
||||
|
||||
return (
|
||||
chan,
|
||||
accept_addrs,
|
||||
_state._runtime_vars['_enable_tpts'],
|
||||
parent_main_data,
|
||||
_state._runtime_vars['_enable_tpts']
|
||||
)
|
||||
|
||||
# failed to connect back?
|
||||
|
|
@ -1523,7 +1521,6 @@ async def async_main(
|
|||
|
||||
# establish primary connection with immediate parent
|
||||
actor._parent_chan: Channel|None = None
|
||||
parent_main_data: dict[str, str]|None = None
|
||||
|
||||
# is this a sub-actor?
|
||||
# get runtime info from parent.
|
||||
|
|
@ -1532,7 +1529,6 @@ async def async_main(
|
|||
actor._parent_chan,
|
||||
set_accept_addr_says_rent,
|
||||
maybe_preferred_transports_says_rent,
|
||||
parent_main_data,
|
||||
) = await actor._from_parent(parent_addr)
|
||||
|
||||
accept_addrs: list[UnwrappedAddress] = []
|
||||
|
|
@ -1612,9 +1608,7 @@ async def async_main(
|
|||
# XXX: do this **after** establishing a channel to the parent
|
||||
# but **before** starting the message loop for that channel
|
||||
# such that import errors are properly propagated upwards
|
||||
actor.load_modules(
|
||||
parent_main_data=parent_main_data,
|
||||
)
|
||||
actor.load_modules()
|
||||
|
||||
# XXX TODO XXX: figuring out debugging of this
|
||||
# would somemwhat guarantee "self-hosted" runtime
|
||||
|
|
|
|||
|
|
@ -247,6 +247,7 @@ class ActorNursery:
|
|||
# modules allowed to invoked funcs from
|
||||
enable_modules=enable_modules,
|
||||
loglevel=loglevel,
|
||||
replay_parent_main=replay_parent_main,
|
||||
|
||||
# verbatim relay this actor's registrar addresses
|
||||
registry_addrs=current_actor().registry_addrs,
|
||||
|
|
@ -270,7 +271,6 @@ class ActorNursery:
|
|||
parent_addr,
|
||||
_rtv, # run time vars
|
||||
infect_asyncio=infect_asyncio,
|
||||
replay_parent_main=replay_parent_main,
|
||||
proc_kwargs=proc_kwargs
|
||||
)
|
||||
)
|
||||
|
|
|
|||
|
|
@ -33,11 +33,16 @@ import runpy
|
|||
ORIGINAL_DIR = os.path.abspath(os.getcwd())
|
||||
|
||||
|
||||
def _mp_figure_out_main() -> dict[str, str]:
|
||||
def _mp_figure_out_main(
|
||||
replay_parent_main: bool = True,
|
||||
) -> dict[str, str]:
|
||||
"""Taken from ``multiprocessing.spawn.get_preparation_data()``.
|
||||
|
||||
Retrieve parent actor `__main__` module data.
|
||||
"""
|
||||
if not replay_parent_main:
|
||||
return {}
|
||||
|
||||
d = {}
|
||||
# Figure out whether to initialise main in the subprocess as a module
|
||||
# or through direct execution (or to leave it alone entirely)
|
||||
|
|
|
|||
|
|
@ -50,7 +50,6 @@ from tractor.discovery._addr import UnwrappedAddress
|
|||
from tractor.runtime._portal import Portal
|
||||
from tractor.runtime._runtime import Actor
|
||||
from ._entry import _mp_main
|
||||
from . import _mp_fixup_main
|
||||
from tractor._exceptions import ActorFailure
|
||||
from tractor.msg import (
|
||||
types as msgtypes,
|
||||
|
|
@ -421,7 +420,6 @@ async def new_proc(
|
|||
*,
|
||||
|
||||
infect_asyncio: bool = False,
|
||||
replay_parent_main: bool = True,
|
||||
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED,
|
||||
proc_kwargs: dict[str, any] = {}
|
||||
|
||||
|
|
@ -442,7 +440,6 @@ async def new_proc(
|
|||
parent_addr,
|
||||
_runtime_vars, # run time vars
|
||||
infect_asyncio=infect_asyncio,
|
||||
replay_parent_main=replay_parent_main,
|
||||
task_status=task_status,
|
||||
proc_kwargs=proc_kwargs
|
||||
)
|
||||
|
|
@ -460,7 +457,6 @@ async def trio_proc(
|
|||
_runtime_vars: dict[str, Any], # serialized and sent to _child
|
||||
*,
|
||||
infect_asyncio: bool = False,
|
||||
replay_parent_main: bool = True,
|
||||
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED,
|
||||
proc_kwargs: dict[str, any] = {}
|
||||
|
||||
|
|
@ -553,11 +549,7 @@ async def trio_proc(
|
|||
# send a "spawning specification" which configures the
|
||||
# initial runtime state of the child.
|
||||
sspec = msgtypes.SpawnSpec(
|
||||
_parent_main_data=(
|
||||
_mp_fixup_main._mp_figure_out_main()
|
||||
if replay_parent_main
|
||||
else {}
|
||||
),
|
||||
_parent_main_data=subactor._parent_main_data,
|
||||
enable_modules=subactor.enable_modules,
|
||||
reg_addrs=subactor.reg_addrs,
|
||||
bind_addrs=bind_addrs,
|
||||
|
|
@ -688,7 +680,6 @@ async def mp_proc(
|
|||
_runtime_vars: dict[str, Any], # serialized and sent to _child
|
||||
*,
|
||||
infect_asyncio: bool = False,
|
||||
replay_parent_main: bool = True,
|
||||
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED,
|
||||
proc_kwargs: dict[str, any] = {}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue