Compare commits

...

4 Commits

Author SHA1 Message Date
Gud Boi 30c2a50b7e Clear rtvs state on root shutdown..
Fixes the bug discovered in last test update, not sure how this wasn't
caught already XD
2026-02-11 18:30:09 -05:00
Gud Boi 2e1865b8a3 Catch-n-fail on stale `_root_addrs` state..
Turns out we aren't clearing the `._state._runtime_vars` entries in
between `open_root_actor` calls.. This test refinement catches that by
adding runtime-vars asserts on the expected root-addrs value; ensure
`_runtime_vars['_root_addrs'] ONLY match the values provided by the
test's CURRENT root actor.

This causes a failure when the (just added)
`test_non_registrar_spawns_child` is run as part of the module suite,
it's fine when run standalone.
2026-02-11 18:20:59 -05:00
Gud Boi 74eb402e40 Fix when root-actor addrs is set as rtvs
Move `_root_addrs` assignment to after `async_main()` unblocks (via
`.started()`) which now delivers the bind addrs , ensuring correct
`UnwrappedAddress` propagation into `._state._runtime_vars` for
non-registar root actors..

Previously for non-registrar root actors the `._state._runtime_vars`
entries were being set as `Address` values which ofc IPC serialize
incorrectly rn vs. the unwrapped versions, (well until we add a msgspec
for their structs anyway) and thus are passed in incorrect form to
children/subactors during spawning..

This fixes the issue by waiting for the `.ipc.*` stack to
bind-and-resolve any randomly allocated addrs (by the OS) until after
the initial `Actor` startup is complete.

Deats,
- primarily, mv `_root_addrs` assignment from before `root_tn.start()`
  to after, using started(-ed) `accept_addrs` now delivered from
  `._runtime.async_main()`..
- update `task_status` type hints to match.
- unpack and set the `(accept_addrs, reg_addrs)` tuple from
  `root_tn.start()` call into `._state._runtime_vars` entries.
- improve and embolden comments distinguishing registrar vs non-registrar
  init paths, ensure typing reflects wrapped vs. unwrapped addrs.

Also,
- add a masked `mk_pdb().set_trace()` for debugging `raddrs` values
  being "off".
- add TODO about using UDS on linux for root mailbox
- rename `trans_bind_addrs` -> `tpt_bind_addrs` for clarity.
- expand comment about random port allocation for
  non-registrar case

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-11 17:48:01 -05:00
Gud Boi 99c66d1495 Add test for non-registrar root sub-spawning
Ensure non-registrar root actors can spawn children and that
those children receive correct parent contact info. This test
catches the bug reported in,

https://github.com/goodboy/tractor/issues/410

Add new `test_non_registrar_spawns_child()` which spawns a sub-actor
from a non-registrar root and verifies the child can manually connect
back to its parent using `get_root()` API, auditing
`._state._runtime_vars` addr propagation from rent to child.

Also,
- improve type hints throughout test suites
  (`subprocess.Popen`, `UnwrappedAddress`, `Aid` etc.)
- rename `n` -> `an` for actor nursery vars
- use multiline style for function signatures

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-11 17:47:29 -05:00
3 changed files with 207 additions and 36 deletions

View File

@ -1,8 +1,13 @@
""" """
Multiple python programs invoking the runtime. Multiple python programs invoking the runtime.
""" """
from __future__ import annotations
import platform import platform
import subprocess
import time import time
from typing import (
TYPE_CHECKING,
)
import pytest import pytest
import trio import trio
@ -10,14 +15,29 @@ import tractor
from tractor._testing import ( from tractor._testing import (
tractor_test, tractor_test,
) )
from tractor import (
current_actor,
_state,
Actor,
Context,
Portal,
)
from .conftest import ( from .conftest import (
sig_prog, sig_prog,
_INT_SIGNAL, _INT_SIGNAL,
_INT_RETURN_CODE, _INT_RETURN_CODE,
) )
if TYPE_CHECKING:
from tractor.msg import Aid
from tractor._addr import (
UnwrappedAddress,
)
def test_abort_on_sigint(daemon):
def test_abort_on_sigint(
daemon: subprocess.Popen,
):
assert daemon.returncode is None assert daemon.returncode is None
time.sleep(0.1) time.sleep(0.1)
sig_prog(daemon, _INT_SIGNAL) sig_prog(daemon, _INT_SIGNAL)
@ -30,8 +50,11 @@ def test_abort_on_sigint(daemon):
@tractor_test @tractor_test
async def test_cancel_remote_arbiter(daemon, reg_addr): async def test_cancel_remote_arbiter(
assert not tractor.current_actor().is_arbiter daemon: subprocess.Popen,
reg_addr: UnwrappedAddress,
):
assert not current_actor().is_arbiter
async with tractor.get_registry(reg_addr) as portal: async with tractor.get_registry(reg_addr) as portal:
await portal.cancel_actor() await portal.cancel_actor()
@ -45,24 +68,106 @@ async def test_cancel_remote_arbiter(daemon, reg_addr):
pass pass
def test_register_duplicate_name(daemon, reg_addr): def test_register_duplicate_name(
daemon: subprocess.Popen,
reg_addr: UnwrappedAddress,
):
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery(
registry_addrs=[reg_addr], registry_addrs=[reg_addr],
) as n: ) as an:
assert not tractor.current_actor().is_arbiter assert not current_actor().is_arbiter
p1 = await n.start_actor('doggy') p1 = await an.start_actor('doggy')
p2 = await n.start_actor('doggy') p2 = await an.start_actor('doggy')
async with tractor.wait_for_actor('doggy') as portal: async with tractor.wait_for_actor('doggy') as portal:
assert portal.channel.uid in (p2.channel.uid, p1.channel.uid) assert portal.channel.uid in (p2.channel.uid, p1.channel.uid)
await n.cancel() await an.cancel()
# run it manually since we want to start **after** # XXX, run manually since we want to start this root **after**
# the other "daemon" program # the other "daemon" program with it's own root.
trio.run(main)
@tractor.context
async def get_root_portal(
ctx: Context,
):
'''
Connect back to the root actor manually (using `._discovery` API)
and ensure it's contact info is the same as our immediate parent.
'''
sub: Actor = current_actor()
rtvs: dict = _state._runtime_vars
raddrs: list[UnwrappedAddress] = rtvs['_root_addrs']
# await tractor.pause()
# XXX, in case the sub->root discovery breaks you might need
# this (i know i did Xp)!!
# from tractor.devx import mk_pdb
# mk_pdb().set_trace()
assert (
len(raddrs) == 1
and
list(sub._parent_chan.raddr.unwrap()) in raddrs
)
# connect back to our immediate parent which should also
# be the actor-tree's root.
from tractor._discovery import get_root
ptl: Portal
async with get_root() as ptl:
root_aid: Aid = ptl.chan.aid
parent_ptl: Portal = current_actor().get_parent()
assert (
root_aid.name == 'root'
and
parent_ptl.chan.aid == root_aid
)
await ctx.started()
def test_non_registrar_spawns_child(
daemon: subprocess.Popen,
reg_addr: UnwrappedAddress,
loglevel: str,
debug_mode: bool,
):
'''
Ensure a non-regristar (serving) root actor can spawn a sub and
that sub can connect back (manually) to it's rent that is the
root without issue.
More or less this audits the global contact info in
`._state._runtime_vars`.
'''
async def main():
async with tractor.open_nursery(
registry_addrs=[reg_addr],
loglevel=loglevel,
debug_mode=debug_mode,
) as an:
actor: Actor = tractor.current_actor()
assert not actor.is_registrar
sub_ptl: Portal = await an.start_actor(
name='sub',
enable_modules=[__name__],
)
async with sub_ptl.open_context(
get_root_portal,
) as (ctx, first):
print('Waiting for `sub` to connect back to us..')
await an.cancel()
# XXX, run manually since we want to start this root **after**
# the other "daemon" program with it's own root.
trio.run(main) trio.run(main)

View File

@ -88,7 +88,8 @@ async def maybe_block_bp(
bp_blocked: bool bp_blocked: bool
if ( if (
debug_mode debug_mode
and maybe_enable_greenback and
maybe_enable_greenback
and ( and (
maybe_mod := await debug.maybe_init_greenback( maybe_mod := await debug.maybe_init_greenback(
raise_not_found=False, raise_not_found=False,
@ -385,10 +386,13 @@ async def open_root_actor(
addr, addr,
) )
trans_bind_addrs: list[UnwrappedAddress] = [] tpt_bind_addrs: list[
Address # `Address.get_random()` case
|UnwrappedAddress # registrar case `= uw_reg_addrs`
] = []
# Create a new local root-actor instance which IS NOT THE # ------ NON-REGISTRAR ------
# REGISTRAR # create a new root-actor instance.
if ponged_addrs: if ponged_addrs:
if ensure_registry: if ensure_registry:
raise RuntimeError( raise RuntimeError(
@ -415,12 +419,21 @@ async def open_root_actor(
# XXX INSTEAD, bind random addrs using the same tpt # XXX INSTEAD, bind random addrs using the same tpt
# proto. # proto.
for addr in ponged_addrs: for addr in ponged_addrs:
trans_bind_addrs.append( tpt_bind_addrs.append(
# XXX, these are `Address` NOT `UnwrappedAddress`.
#
# NOTE, in the case of posix/berkley socket
# protos we allocate port=0 such that the system
# allocates a random value at bind time; this
# happens in the `.ipc.*` stack's backend.
addr.get_random( addr.get_random(
bindspace=addr.bindspace, bindspace=addr.bindspace,
) )
) )
# ------ REGISTRAR ------
# create a new "registry providing" root-actor instance.
#
# Start this local actor as the "registrar", aka a regular # Start this local actor as the "registrar", aka a regular
# actor who manages the local registry of "mailboxes" of # actor who manages the local registry of "mailboxes" of
# other process-tree-local sub-actors. # other process-tree-local sub-actors.
@ -429,7 +442,7 @@ async def open_root_actor(
# following init steps are taken: # following init steps are taken:
# - the tranport layer server is bound to each addr # - the tranport layer server is bound to each addr
# pair defined in provided registry_addrs, or the default. # pair defined in provided registry_addrs, or the default.
trans_bind_addrs = uw_reg_addrs tpt_bind_addrs = uw_reg_addrs
# - it is normally desirable for any registrar to stay up # - it is normally desirable for any registrar to stay up
# indefinitely until either all registered (child/sub) # indefinitely until either all registered (child/sub)
@ -449,20 +462,10 @@ async def open_root_actor(
enable_modules=enable_modules, enable_modules=enable_modules,
) )
# XXX, in case the root actor runtime was actually run from # XXX, in case the root actor runtime was actually run from
# `tractor.to_asyncio.run_as_asyncio_guest()` and NOt # `tractor.to_asyncio.run_as_asyncio_guest()` and NOT
# `.trio.run()`. # `.trio.run()`.
actor._infected_aio = _state._runtime_vars['_is_infected_aio'] actor._infected_aio = _state._runtime_vars['_is_infected_aio']
# NOTE, only set the loopback addr for the
# process-tree-global "root" mailbox since all sub-actors
# should be able to speak to their root actor over that
# channel.
raddrs: list[Address] = _state._runtime_vars['_root_addrs']
raddrs.extend(trans_bind_addrs)
# TODO, remove once we have also removed all usage;
# eventually all (root-)registry apis should expect > 1 addr.
_state._runtime_vars['_root_mailbox'] = raddrs[0]
# Start up main task set via core actor-runtime nurseries. # Start up main task set via core actor-runtime nurseries.
try: try:
# assign process-local actor # assign process-local actor
@ -499,14 +502,39 @@ async def open_root_actor(
# "actor runtime" primitives are SC-compat and thus all # "actor runtime" primitives are SC-compat and thus all
# transitively spawned actors/processes must be as # transitively spawned actors/processes must be as
# well. # well.
await root_tn.start( accept_addrs: list[UnwrappedAddress]
reg_addrs: list[UnwrappedAddress]
(
accept_addrs,
reg_addrs,
) = await root_tn.start(
partial( partial(
_runtime.async_main, _runtime.async_main,
actor, actor,
accept_addrs=trans_bind_addrs, accept_addrs=tpt_bind_addrs,
parent_addr=None parent_addr=None
) )
) )
# NOTE, only set a local-host addr (i.e. like
# `lo`-loopback for TCP) for the process-tree-global
# "root"-process (its tree-wide "mailbox") since all
# sub-actors should be able to speak to their root
# actor over that channel.
#
# ?TODO, per-OS non-network-proto alt options?
# -[ ] on linux we should be able to always use UDS?
#
raddrs: list[Address] = _state._runtime_vars['_root_addrs']
raddrs.extend(
accept_addrs,
)
# TODO, remove once we have also removed all usage;
# eventually all (root-)registry apis should expect > 1 addr.
_state._runtime_vars['_root_mailbox'] = raddrs[0]
# if 'chart' in actor.aid.name:
# from tractor.devx import mk_pdb
# mk_pdb().set_trace()
try: try:
yield actor yield actor
except ( except (
@ -588,6 +616,13 @@ async def open_root_actor(
): ):
_state._runtime_vars['_debug_mode'] = False _state._runtime_vars['_debug_mode'] = False
# !XXX, clear ALL prior contact info state, this is MEGA
# important if you are opening the runtime multiple times
# from the same parent process (like in our test
# harness)!
_state._runtime_vars['_root_addrs'].clear()
_state._runtime_vars['_root_mailbox'] = None
_state._current_actor = None _state._current_actor = None
_state._last_actor_terminated = actor _state._last_actor_terminated = actor

View File

@ -147,6 +147,8 @@ def get_mod_nsps2fps(mod_ns_paths: list[str]) -> dict[str, str]:
return nsp2fp return nsp2fp
_bp = False
class Actor: class Actor:
''' '''
The fundamental "runtime" concurrency primitive. The fundamental "runtime" concurrency primitive.
@ -272,7 +274,9 @@ class Actor:
stacklevel=2, stacklevel=2,
) )
registry_addrs: list[Address] = [wrap_address(arbiter_addr)] registry_addrs: list[Address] = [
wrap_address(arbiter_addr)
]
# marked by the process spawning backend at startup # marked by the process spawning backend at startup
# will be None for the parent most process started manually # will be None for the parent most process started manually
@ -959,6 +963,21 @@ class Actor:
rvs['_is_root'] = False # obvi XD rvs['_is_root'] = False # obvi XD
# TODO, remove! left in just while protoing init fix!
# global _bp
# if (
# 'chart' in self.aid.name
# and
# isinstance(
# rvs['_root_addrs'][0],
# dict,
# )
# and
# not _bp
# ):
# _bp = True
# breakpoint()
_state._runtime_vars.update(rvs) _state._runtime_vars.update(rvs)
# `SpawnSpec.reg_addrs` # `SpawnSpec.reg_addrs`
@ -1455,7 +1474,12 @@ async def async_main(
# be False when running as root actor and True when as # be False when running as root actor and True when as
# a subactor. # a subactor.
parent_addr: UnwrappedAddress|None = None, parent_addr: UnwrappedAddress|None = None,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[
tuple[
list[UnwrappedAddress], # accept_addrs
list[UnwrappedAddress], # reg_addrs
]
] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
''' '''
@ -1634,6 +1658,7 @@ async def async_main(
# if addresses point to the same actor.. # if addresses point to the same actor..
# So we need a way to detect that? maybe iterate # So we need a way to detect that? maybe iterate
# only on unique actor uids? # only on unique actor uids?
addr: UnwrappedAddress
for addr in actor.reg_addrs: for addr in actor.reg_addrs:
try: try:
waddr = wrap_address(addr) waddr = wrap_address(addr)
@ -1642,7 +1667,9 @@ async def async_main(
await debug.pause() await debug.pause()
# !TODO, get rid of the local-portal crap XD # !TODO, get rid of the local-portal crap XD
reg_portal: Portal
async with get_registry(addr) as reg_portal: async with get_registry(addr) as reg_portal:
accept_addr: UnwrappedAddress
for accept_addr in accept_addrs: for accept_addr in accept_addrs:
accept_addr = wrap_address(accept_addr) accept_addr = wrap_address(accept_addr)
@ -1658,8 +1685,12 @@ async def async_main(
is_registered: bool = True is_registered: bool = True
# init steps complete # init steps complete, deliver IPC-server and
task_status.started() # registrar addrs back to caller.
task_status.started((
accept_addrs,
actor.reg_addrs,
))
# Begin handling our new connection back to our # Begin handling our new connection back to our
# parent. This is done last since we don't want to # parent. This is done last since we don't want to