Use multi-addr `dict` registry, drop `bidict`
Replace `Registrar._registry: bidict[uid, addr]` with `dict[uid, list[UnwrappedAddress]]` to support actors binding on multiple transports simultaneously (multi-homed). Deats, - `find_actor_addr()` returns first addr from the uid's list - `get_registry()` now returns per-uid addr lists - `find_actor_addrs()` uses `.extend()` to collect all addrs for a given actor name - `register_actor_addr()` appends to the uid's list (dedup'd) and evicts stale entries where a different uid claims the same addr - `delete_actor_addr()` does a linear scan + `.remove()` instead of `bidict.inverse.pop()`; deletes the uid entry entirely when no addrs remain (this commit msg was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-codesubint_spawner_backend
parent
23677f8a3c
commit
cb7b76c44f
|
|
@ -27,7 +27,6 @@ name-to-address mappings so peers can discover each other.
|
||||||
'''
|
'''
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from bidict import bidict
|
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
from ..runtime._runtime import Actor
|
from ..runtime._runtime import Actor
|
||||||
|
|
@ -83,10 +82,10 @@ class Registrar(Actor):
|
||||||
**kwargs,
|
**kwargs,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
self._registry: bidict[
|
self._registry: dict[
|
||||||
tuple[str, str],
|
tuple[str, str],
|
||||||
UnwrappedAddress,
|
list[UnwrappedAddress],
|
||||||
] = bidict({})
|
] = {}
|
||||||
self._waiters: dict[
|
self._waiters: dict[
|
||||||
str,
|
str,
|
||||||
# either an event to sync to receiving an
|
# either an event to sync to receiving an
|
||||||
|
|
@ -104,16 +103,15 @@ class Registrar(Actor):
|
||||||
|
|
||||||
) -> UnwrappedAddress|None:
|
) -> UnwrappedAddress|None:
|
||||||
|
|
||||||
for uid, addr in self._registry.items():
|
for uid, addrs in self._registry.items():
|
||||||
if name in uid:
|
if name in uid:
|
||||||
return addr
|
return addrs[0] if addrs else None
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
async def get_registry(
|
async def get_registry(
|
||||||
self
|
self
|
||||||
|
) -> dict[str, list[UnwrappedAddress]]:
|
||||||
) -> dict[str, UnwrappedAddress]:
|
|
||||||
'''
|
'''
|
||||||
Return current name registry.
|
Return current name registry.
|
||||||
|
|
||||||
|
|
@ -144,18 +142,17 @@ class Registrar(Actor):
|
||||||
|
|
||||||
'''
|
'''
|
||||||
addrs: list[UnwrappedAddress] = []
|
addrs: list[UnwrappedAddress] = []
|
||||||
addr: UnwrappedAddress
|
|
||||||
|
|
||||||
mailbox_info: str = (
|
mailbox_info: str = (
|
||||||
'Actor registry contact infos:\n'
|
'Actor registry contact infos:\n'
|
||||||
)
|
)
|
||||||
for uid, addr in self._registry.items():
|
for uid, uid_addrs in self._registry.items():
|
||||||
mailbox_info += (
|
mailbox_info += (
|
||||||
f'|_uid: {uid}\n'
|
f'|_uid: {uid}\n'
|
||||||
f'|_addr: {addr}\n\n'
|
f'|_addrs: {uid_addrs}\n\n'
|
||||||
)
|
)
|
||||||
if name == uid[0]:
|
if name == uid[0]:
|
||||||
addrs.append(addr)
|
addrs.extend(uid_addrs)
|
||||||
|
|
||||||
if not addrs:
|
if not addrs:
|
||||||
waiter = trio.Event()
|
waiter = trio.Event()
|
||||||
|
|
@ -166,7 +163,7 @@ class Registrar(Actor):
|
||||||
|
|
||||||
for uid in self._waiters[name]:
|
for uid in self._waiters[name]:
|
||||||
if not isinstance(uid, trio.Event):
|
if not isinstance(uid, trio.Event):
|
||||||
addrs.append(
|
addrs.extend(
|
||||||
self._registry[uid]
|
self._registry[uid]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -187,13 +184,24 @@ class Registrar(Actor):
|
||||||
# should never be 0-dynamic-os-alloc
|
# should never be 0-dynamic-os-alloc
|
||||||
await debug.pause()
|
await debug.pause()
|
||||||
|
|
||||||
# XXX NOTE, value must also be hashable AND since
|
addr_tup: tuple = tuple(addr)
|
||||||
# `._registry` is a `bidict` values must be unique;
|
|
||||||
# use `.forceput()` to replace any prior (stale)
|
# Evict stale entries: if a *different* uid claims
|
||||||
# entries that might map a different uid to the same
|
# this addr (e.g. after unclean shutdown or
|
||||||
# addr (e.g. after an unclean shutdown or
|
# actor-restart reusing the same address), remove
|
||||||
# actor-restart reusing the same address).
|
# it from the old uid's addr list.
|
||||||
self._registry.forceput(uid, tuple(addr))
|
for other_uid, other_addrs in self._registry.items():
|
||||||
|
if (
|
||||||
|
other_uid != uid
|
||||||
|
and addr_tup in other_addrs
|
||||||
|
):
|
||||||
|
other_addrs.remove(addr_tup)
|
||||||
|
break
|
||||||
|
|
||||||
|
# Append to this uid's addr list (avoid duplicates)
|
||||||
|
entry: list = self._registry.setdefault(uid, [])
|
||||||
|
if addr_tup not in entry:
|
||||||
|
entry.append(addr_tup)
|
||||||
|
|
||||||
# pop and signal all waiter events
|
# pop and signal all waiter events
|
||||||
events = self._waiters.pop(name, [])
|
events = self._waiters.pop(name, [])
|
||||||
|
|
@ -210,7 +218,7 @@ class Registrar(Actor):
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
uid = (str(uid[0]), str(uid[1]))
|
uid = (str(uid[0]), str(uid[1]))
|
||||||
entry: tuple = self._registry.pop(
|
entry: list|None = self._registry.pop(
|
||||||
uid, None
|
uid, None
|
||||||
)
|
)
|
||||||
if entry is None:
|
if entry is None:
|
||||||
|
|
@ -225,13 +233,20 @@ class Registrar(Actor):
|
||||||
) -> tuple[str, str]|None:
|
) -> tuple[str, str]|None:
|
||||||
# NOTE: `addr` arrives as a `list` over IPC
|
# NOTE: `addr` arrives as a `list` over IPC
|
||||||
# (msgpack deserializes tuples -> lists) so
|
# (msgpack deserializes tuples -> lists) so
|
||||||
# coerce to `tuple` for the bidict hash lookup.
|
# coerce to `tuple` for the linear scan.
|
||||||
uid: tuple[str, str]|None = (
|
addr = tuple(addr)
|
||||||
self._registry.inverse.pop(
|
uid: tuple[str, str]|None = None
|
||||||
tuple(addr),
|
|
||||||
None,
|
for _uid, addrs in self._registry.items():
|
||||||
)
|
if addr in addrs:
|
||||||
)
|
addrs.remove(addr)
|
||||||
|
uid = _uid
|
||||||
|
# remove the uid entry entirely when it
|
||||||
|
# has no remaining addrs.
|
||||||
|
if not addrs:
|
||||||
|
del self._registry[_uid]
|
||||||
|
break
|
||||||
|
|
||||||
if uid:
|
if uid:
|
||||||
report: str = (
|
report: str = (
|
||||||
'Deleting registry-entry for,\n'
|
'Deleting registry-entry for,\n'
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue