Prefer fresh conn for unreg, fallback to `_parent_chan`
The prior approach eagerly reused `_parent_chan` when parent IS the registrar, but that channel may still carry ctx/stream teardown protocol traffic — concurrent `unregister_actor` RPC causes protocol conflicts. Now try a fresh `get_registry()` conn first; only fall back to the parent channel on `OSError` (listener already closed/unlinked). Deats, - fresh `get_registry()` is the primary path for all addrs regardless of `parent_is_reg` - `OSError` handler checks `parent_is_reg` + `rent_chan.connected()` before fallback - fallback catches `OSError` and `trio.ClosedResourceError` separately - drop unused `reg_addr: Address` annotation (this commit msg was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-codesubint_spawner_backend
parent
70dc60a199
commit
8817032c90
|
|
@ -1848,17 +1848,21 @@ async def async_main(
|
||||||
failed_unreg: bool = False
|
failed_unreg: bool = False
|
||||||
rent_chan: Channel|None = actor._parent_chan
|
rent_chan: Channel|None = actor._parent_chan
|
||||||
|
|
||||||
# XXX check if the parent IS the registrar so we
|
# XXX, detect whether the parent IS the registrar
|
||||||
# can reuse the existing `_parent_chan` (avoids
|
# so we can FALL BACK to `_parent_chan` when a new
|
||||||
# opening a new connection which fails when the
|
# connection attempt fails (e.g. UDS transport
|
||||||
# listener socket is already closed, e.g. UDS
|
# `os.unlink()`s the socket file during teardown).
|
||||||
# transport `os.unlink()`s the socket file during
|
#
|
||||||
# teardown).
|
# IMPORTANT: we do NOT eagerly reuse `_parent_chan`
|
||||||
|
# because it may still be carrying context/stream
|
||||||
|
# teardown protocol traffic — sending an
|
||||||
|
# `unregister_actor` RPC over it concurrently
|
||||||
|
# causes protocol-level conflicts. Instead we try
|
||||||
|
# a fresh `get_registry()` connection first and
|
||||||
|
# only fall back to the parent channel on failure.
|
||||||
#
|
#
|
||||||
# See `ipc._uds.close_listener()` for details on
|
# See `ipc._uds.close_listener()` for details on
|
||||||
# the UDS socket-file lifecycle and why this
|
# the UDS socket-file lifecycle.
|
||||||
# optimization is necessary for the local-registrar
|
|
||||||
# case.
|
|
||||||
parent_is_reg: bool = False
|
parent_is_reg: bool = False
|
||||||
if (
|
if (
|
||||||
rent_chan is not None
|
rent_chan is not None
|
||||||
|
|
@ -1867,7 +1871,6 @@ async def async_main(
|
||||||
):
|
):
|
||||||
pchan_raddr: Address|None = rent_chan.raddr
|
pchan_raddr: Address|None = rent_chan.raddr
|
||||||
if pchan_raddr is not None:
|
if pchan_raddr is not None:
|
||||||
reg_addr: Address
|
|
||||||
for reg_addr in actor.reg_addrs:
|
for reg_addr in actor.reg_addrs:
|
||||||
if (
|
if (
|
||||||
pchan_raddr.unwrap()
|
pchan_raddr.unwrap()
|
||||||
|
|
@ -1883,24 +1886,39 @@ async def async_main(
|
||||||
with trio.move_on_after(0.5) as cs:
|
with trio.move_on_after(0.5) as cs:
|
||||||
cs.shield = True
|
cs.shield = True
|
||||||
try:
|
try:
|
||||||
if parent_is_reg:
|
async with get_registry(
|
||||||
reg_portal = Portal(rent_chan)
|
addr,
|
||||||
|
) as reg_portal:
|
||||||
await reg_portal.run_from_ns(
|
await reg_portal.run_from_ns(
|
||||||
'self',
|
'self',
|
||||||
'unregister_actor',
|
'unregister_actor',
|
||||||
uid=actor.aid.uid,
|
uid=actor.aid.uid,
|
||||||
)
|
)
|
||||||
else:
|
except OSError:
|
||||||
async with get_registry(
|
# Connection to registrar failed
|
||||||
addr,
|
# (listener socket likely already
|
||||||
) as reg_portal:
|
# closed/unlinked). Fall back to
|
||||||
|
# parent channel if parent IS the
|
||||||
|
# registrar.
|
||||||
|
if (
|
||||||
|
parent_is_reg
|
||||||
|
and
|
||||||
|
rent_chan.connected()
|
||||||
|
):
|
||||||
|
try:
|
||||||
|
reg_portal = Portal(rent_chan)
|
||||||
await reg_portal.run_from_ns(
|
await reg_portal.run_from_ns(
|
||||||
'self',
|
'self',
|
||||||
'unregister_actor',
|
'unregister_actor',
|
||||||
uid=actor.aid.uid,
|
uid=actor.aid.uid,
|
||||||
)
|
)
|
||||||
except OSError:
|
except (
|
||||||
failed_unreg = True
|
OSError,
|
||||||
|
trio.ClosedResourceError,
|
||||||
|
):
|
||||||
|
failed_unreg = True
|
||||||
|
else:
|
||||||
|
failed_unreg = True
|
||||||
|
|
||||||
if cs.cancelled_caught:
|
if cs.cancelled_caught:
|
||||||
failed_unreg = True
|
failed_unreg = True
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue