Route parent-main replay through SpawnSpec
Keep trio child bootstrap data in the spawn handshake instead of stashing it on Actor state so the replay opt-out stays explicit and avoids stale-looking runtime fields.subint_spawner_backend
parent
f5301d3fb0
commit
6309c2e6fc
|
|
@ -168,6 +168,11 @@ async def check_loglevel(level):
|
||||||
log.critical('yoyoyo')
|
log.critical('yoyoyo')
|
||||||
|
|
||||||
|
|
||||||
|
async def get_main_mod_name() -> str:
|
||||||
|
import sys
|
||||||
|
return sys.modules['__main__'].__name__
|
||||||
|
|
||||||
|
|
||||||
def test_loglevel_propagated_to_subactor(
|
def test_loglevel_propagated_to_subactor(
|
||||||
start_method,
|
start_method,
|
||||||
capfd,
|
capfd,
|
||||||
|
|
@ -199,48 +204,40 @@ def test_loglevel_propagated_to_subactor(
|
||||||
assert 'yoyoyo' in captured.err
|
assert 'yoyoyo' in captured.err
|
||||||
|
|
||||||
|
|
||||||
def test_start_actor_can_skip_parent_main_replay(monkeypatch, reg_addr):
|
def test_start_actor_can_skip_parent_main_replay(
|
||||||
captured_parent_main_data: list[dict[str, str]] = []
|
start_method,
|
||||||
from tractor.runtime import _supervise as supervise_module
|
reg_addr,
|
||||||
|
monkeypatch,
|
||||||
async def fake_new_proc(
|
):
|
||||||
name: str,
|
if start_method != 'trio':
|
||||||
actor_nursery,
|
pytest.skip(
|
||||||
subactor,
|
'parent main replay opt-out only affects the trio spawn backend'
|
||||||
errors,
|
)
|
||||||
bind_addrs,
|
from tractor.spawn import _mp_fixup_main
|
||||||
parent_addr,
|
|
||||||
_runtime_vars,
|
|
||||||
*,
|
|
||||||
infect_asyncio: bool = False,
|
|
||||||
task_status=trio.TASK_STATUS_IGNORED,
|
|
||||||
proc_kwargs: dict[str, Any] = {},
|
|
||||||
) -> None:
|
|
||||||
captured_parent_main_data.append(dict(subactor._parent_main_data))
|
|
||||||
task_status.started(object())
|
|
||||||
|
|
||||||
monkeypatch.setattr(
|
monkeypatch.setattr(
|
||||||
supervise_module._spawn,
|
_mp_fixup_main,
|
||||||
'new_proc',
|
'_mp_figure_out_main',
|
||||||
fake_new_proc,
|
lambda: {'init_main_from_name': __name__},
|
||||||
)
|
)
|
||||||
|
|
||||||
async def main() -> None:
|
async def main() -> None:
|
||||||
async with tractor.open_root_actor(
|
async with tractor.open_nursery(
|
||||||
|
name='registrar',
|
||||||
|
start_method=start_method,
|
||||||
registry_addrs=[reg_addr],
|
registry_addrs=[reg_addr],
|
||||||
):
|
) as an:
|
||||||
async with tractor.open_nursery() as an:
|
replaying = await an.run_in_actor(
|
||||||
await an.start_actor(
|
get_main_mod_name,
|
||||||
'replaying-parent-main',
|
name='replaying-parent-main',
|
||||||
enable_modules=[__name__],
|
)
|
||||||
)
|
isolated = await an.run_in_actor(
|
||||||
await an.start_actor(
|
get_main_mod_name,
|
||||||
'isolated-parent-main',
|
name='isolated-parent-main',
|
||||||
enable_modules=[__name__],
|
replay_parent_main=False,
|
||||||
replay_parent_main=False,
|
)
|
||||||
)
|
|
||||||
|
assert await replaying.result() == '__mp_main__'
|
||||||
|
assert await isolated.result() == '__main__'
|
||||||
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
assert captured_parent_main_data[0]
|
|
||||||
assert captured_parent_main_data[1] == {}
|
|
||||||
|
|
|
||||||
|
|
@ -217,8 +217,6 @@ class Actor:
|
||||||
'''
|
'''
|
||||||
return self._ipc_server
|
return self._ipc_server
|
||||||
|
|
||||||
# Information about `__main__` from parent
|
|
||||||
_parent_main_data: dict[str, str]
|
|
||||||
_parent_chan_cs: CancelScope|None = None
|
_parent_chan_cs: CancelScope|None = None
|
||||||
_spawn_spec: msgtypes.SpawnSpec|None = None
|
_spawn_spec: msgtypes.SpawnSpec|None = None
|
||||||
|
|
||||||
|
|
@ -265,10 +263,6 @@ class Actor:
|
||||||
self._cancel_called_by: tuple[str, tuple]|None = None
|
self._cancel_called_by: tuple[str, tuple]|None = None
|
||||||
self._cancel_called: bool = False
|
self._cancel_called: bool = False
|
||||||
|
|
||||||
# retreive and store parent `__main__` data which
|
|
||||||
# will be passed to children
|
|
||||||
self._parent_main_data = _mp_fixup_main._mp_figure_out_main()
|
|
||||||
|
|
||||||
# TODO? only add this when `is_debug_mode() == True` no?
|
# TODO? only add this when `is_debug_mode() == True` no?
|
||||||
# always include debugging tools module
|
# always include debugging tools module
|
||||||
if _state.is_root_process():
|
if _state.is_root_process():
|
||||||
|
|
@ -535,6 +529,7 @@ class Actor:
|
||||||
|
|
||||||
def load_modules(
|
def load_modules(
|
||||||
self,
|
self,
|
||||||
|
parent_main_data: dict[str, str]|None = None,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
|
@ -548,13 +543,20 @@ class Actor:
|
||||||
'''
|
'''
|
||||||
try:
|
try:
|
||||||
if self._spawn_method == 'trio':
|
if self._spawn_method == 'trio':
|
||||||
parent_data = self._parent_main_data
|
if (
|
||||||
if 'init_main_from_name' in parent_data:
|
parent_main_data is not None
|
||||||
|
and
|
||||||
|
'init_main_from_name' in parent_main_data
|
||||||
|
):
|
||||||
_mp_fixup_main._fixup_main_from_name(
|
_mp_fixup_main._fixup_main_from_name(
|
||||||
parent_data['init_main_from_name'])
|
parent_main_data['init_main_from_name'])
|
||||||
elif 'init_main_from_path' in parent_data:
|
elif (
|
||||||
|
parent_main_data is not None
|
||||||
|
and
|
||||||
|
'init_main_from_path' in parent_main_data
|
||||||
|
):
|
||||||
_mp_fixup_main._fixup_main_from_path(
|
_mp_fixup_main._fixup_main_from_path(
|
||||||
parent_data['init_main_from_path'])
|
parent_main_data['init_main_from_path'])
|
||||||
|
|
||||||
status: str = 'Attempting to import enabled modules:\n'
|
status: str = 'Attempting to import enabled modules:\n'
|
||||||
|
|
||||||
|
|
@ -840,6 +842,7 @@ class Actor:
|
||||||
Channel,
|
Channel,
|
||||||
list[UnwrappedAddress]|None,
|
list[UnwrappedAddress]|None,
|
||||||
list[str]|None, # preferred tpts
|
list[str]|None, # preferred tpts
|
||||||
|
dict[str, str]|None,
|
||||||
]:
|
]:
|
||||||
'''
|
'''
|
||||||
Bootstrap this local actor's runtime config from its parent by
|
Bootstrap this local actor's runtime config from its parent by
|
||||||
|
|
@ -860,6 +863,7 @@ class Actor:
|
||||||
await chan._do_handshake(aid=self.aid)
|
await chan._do_handshake(aid=self.aid)
|
||||||
|
|
||||||
accept_addrs: list[UnwrappedAddress]|None = None
|
accept_addrs: list[UnwrappedAddress]|None = None
|
||||||
|
parent_main_data: dict[str, str]|None = None
|
||||||
|
|
||||||
if self._spawn_method == "trio":
|
if self._spawn_method == "trio":
|
||||||
|
|
||||||
|
|
@ -1020,17 +1024,13 @@ class Actor:
|
||||||
spawnspec.enable_modules
|
spawnspec.enable_modules
|
||||||
)
|
)
|
||||||
|
|
||||||
self._parent_main_data = spawnspec._parent_main_data
|
parent_main_data = spawnspec._parent_main_data
|
||||||
# XXX QUESTION(s)^^^
|
|
||||||
# -[ ] already set in `.__init__()` right, but how is
|
|
||||||
# it diff from this blatant parent copy?
|
|
||||||
# -[ ] do we need/want the .__init__() value in
|
|
||||||
# just the root case orr?
|
|
||||||
|
|
||||||
return (
|
return (
|
||||||
chan,
|
chan,
|
||||||
accept_addrs,
|
accept_addrs,
|
||||||
_state._runtime_vars['_enable_tpts']
|
_state._runtime_vars['_enable_tpts'],
|
||||||
|
parent_main_data,
|
||||||
)
|
)
|
||||||
|
|
||||||
# failed to connect back?
|
# failed to connect back?
|
||||||
|
|
@ -1523,6 +1523,7 @@ async def async_main(
|
||||||
|
|
||||||
# establish primary connection with immediate parent
|
# establish primary connection with immediate parent
|
||||||
actor._parent_chan: Channel|None = None
|
actor._parent_chan: Channel|None = None
|
||||||
|
parent_main_data: dict[str, str]|None = None
|
||||||
|
|
||||||
# is this a sub-actor?
|
# is this a sub-actor?
|
||||||
# get runtime info from parent.
|
# get runtime info from parent.
|
||||||
|
|
@ -1531,6 +1532,7 @@ async def async_main(
|
||||||
actor._parent_chan,
|
actor._parent_chan,
|
||||||
set_accept_addr_says_rent,
|
set_accept_addr_says_rent,
|
||||||
maybe_preferred_transports_says_rent,
|
maybe_preferred_transports_says_rent,
|
||||||
|
parent_main_data,
|
||||||
) = await actor._from_parent(parent_addr)
|
) = await actor._from_parent(parent_addr)
|
||||||
|
|
||||||
accept_addrs: list[UnwrappedAddress] = []
|
accept_addrs: list[UnwrappedAddress] = []
|
||||||
|
|
@ -1610,7 +1612,9 @@ async def async_main(
|
||||||
# XXX: do this **after** establishing a channel to the parent
|
# XXX: do this **after** establishing a channel to the parent
|
||||||
# but **before** starting the message loop for that channel
|
# but **before** starting the message loop for that channel
|
||||||
# such that import errors are properly propagated upwards
|
# such that import errors are properly propagated upwards
|
||||||
actor.load_modules()
|
actor.load_modules(
|
||||||
|
parent_main_data=parent_main_data,
|
||||||
|
)
|
||||||
|
|
||||||
# XXX TODO XXX: figuring out debugging of this
|
# XXX TODO XXX: figuring out debugging of this
|
||||||
# would somemwhat guarantee "self-hosted" runtime
|
# would somemwhat guarantee "self-hosted" runtime
|
||||||
|
|
|
||||||
|
|
@ -251,8 +251,6 @@ class ActorNursery:
|
||||||
# verbatim relay this actor's registrar addresses
|
# verbatim relay this actor's registrar addresses
|
||||||
registry_addrs=current_actor().registry_addrs,
|
registry_addrs=current_actor().registry_addrs,
|
||||||
)
|
)
|
||||||
if not replay_parent_main:
|
|
||||||
subactor._parent_main_data = {}
|
|
||||||
parent_addr: UnwrappedAddress = self._actor.accept_addr
|
parent_addr: UnwrappedAddress = self._actor.accept_addr
|
||||||
assert parent_addr
|
assert parent_addr
|
||||||
|
|
||||||
|
|
@ -272,6 +270,7 @@ class ActorNursery:
|
||||||
parent_addr,
|
parent_addr,
|
||||||
_rtv, # run time vars
|
_rtv, # run time vars
|
||||||
infect_asyncio=infect_asyncio,
|
infect_asyncio=infect_asyncio,
|
||||||
|
replay_parent_main=replay_parent_main,
|
||||||
proc_kwargs=proc_kwargs
|
proc_kwargs=proc_kwargs
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -50,6 +50,7 @@ from tractor.discovery._addr import UnwrappedAddress
|
||||||
from tractor.runtime._portal import Portal
|
from tractor.runtime._portal import Portal
|
||||||
from tractor.runtime._runtime import Actor
|
from tractor.runtime._runtime import Actor
|
||||||
from ._entry import _mp_main
|
from ._entry import _mp_main
|
||||||
|
from . import _mp_fixup_main
|
||||||
from tractor._exceptions import ActorFailure
|
from tractor._exceptions import ActorFailure
|
||||||
from tractor.msg import (
|
from tractor.msg import (
|
||||||
types as msgtypes,
|
types as msgtypes,
|
||||||
|
|
@ -420,6 +421,7 @@ async def new_proc(
|
||||||
*,
|
*,
|
||||||
|
|
||||||
infect_asyncio: bool = False,
|
infect_asyncio: bool = False,
|
||||||
|
replay_parent_main: bool = True,
|
||||||
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED,
|
||||||
proc_kwargs: dict[str, any] = {}
|
proc_kwargs: dict[str, any] = {}
|
||||||
|
|
||||||
|
|
@ -440,6 +442,7 @@ async def new_proc(
|
||||||
parent_addr,
|
parent_addr,
|
||||||
_runtime_vars, # run time vars
|
_runtime_vars, # run time vars
|
||||||
infect_asyncio=infect_asyncio,
|
infect_asyncio=infect_asyncio,
|
||||||
|
replay_parent_main=replay_parent_main,
|
||||||
task_status=task_status,
|
task_status=task_status,
|
||||||
proc_kwargs=proc_kwargs
|
proc_kwargs=proc_kwargs
|
||||||
)
|
)
|
||||||
|
|
@ -457,6 +460,7 @@ async def trio_proc(
|
||||||
_runtime_vars: dict[str, Any], # serialized and sent to _child
|
_runtime_vars: dict[str, Any], # serialized and sent to _child
|
||||||
*,
|
*,
|
||||||
infect_asyncio: bool = False,
|
infect_asyncio: bool = False,
|
||||||
|
replay_parent_main: bool = True,
|
||||||
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED,
|
||||||
proc_kwargs: dict[str, any] = {}
|
proc_kwargs: dict[str, any] = {}
|
||||||
|
|
||||||
|
|
@ -549,7 +553,11 @@ async def trio_proc(
|
||||||
# send a "spawning specification" which configures the
|
# send a "spawning specification" which configures the
|
||||||
# initial runtime state of the child.
|
# initial runtime state of the child.
|
||||||
sspec = msgtypes.SpawnSpec(
|
sspec = msgtypes.SpawnSpec(
|
||||||
_parent_main_data=subactor._parent_main_data,
|
_parent_main_data=(
|
||||||
|
_mp_fixup_main._mp_figure_out_main()
|
||||||
|
if replay_parent_main
|
||||||
|
else {}
|
||||||
|
),
|
||||||
enable_modules=subactor.enable_modules,
|
enable_modules=subactor.enable_modules,
|
||||||
reg_addrs=subactor.reg_addrs,
|
reg_addrs=subactor.reg_addrs,
|
||||||
bind_addrs=bind_addrs,
|
bind_addrs=bind_addrs,
|
||||||
|
|
@ -680,6 +688,7 @@ async def mp_proc(
|
||||||
_runtime_vars: dict[str, Any], # serialized and sent to _child
|
_runtime_vars: dict[str, Any], # serialized and sent to _child
|
||||||
*,
|
*,
|
||||||
infect_asyncio: bool = False,
|
infect_asyncio: bool = False,
|
||||||
|
replay_parent_main: bool = True,
|
||||||
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED,
|
||||||
proc_kwargs: dict[str, any] = {}
|
proc_kwargs: dict[str, any] = {}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue