A partial revert of commit c05d08e426 since it seem we already
suppress tpt-closed errors lower down in `.ipc.Channel.send()`; given
that i'm pretty sure this new handler code should basically never run?
Left in a todo to remove the masked content once i'm done more
thoroughly testing under `piker`.
The correct ordering is to de-alloc the surrounding `service_n`
+ `trio.Event` **after** the `mng` teardown ensuring the
`mng.__aexit__()` never can hit a ref-error if it touches either (like
if a `tn` is passed to `maybe_open_context()`!
For IPC-disconnects-during-teardown edge cases, augment some `._rpc`
machinery,
- in `._invoke()` around the `await chan.send(return_msg)` where we
suppress if the underlying `Channel` already disconnected.
- add a disjoint handler in `_errors_relayed_via_ipc()` which just
reports-n-reraises the exc (same as prior behaviour).
* originally i thought it needed to be handled specially (to avoid
being crash handled) but turns out that isn't necessary?
* hence the also-added-bu-masked-out `debug_filter` / guard expression
around the `await debug._maybe_enter_pm()` line.
- show the `._invoke()` frame for the moment.
Such that key->value pairs can be defined which should *never be*
unmasked where values of
- the keys are exc-types which might be masked, and
- the values are exc-types which masked the equivalent key.
For example, the default includes:
- KBI->taskc: a kbi should never be unmasked from its masking
`trio.Cancelled`.
For the impl, a new `do_warn: bool` in the fn-body determines the
primary guard for whether a warning or re-raising is necessary.
Including all caller usage throughout. Moving to a non-`except*` impl
means it's never needed as a signal from the caller - we can just catch
the beg outright (like we should have always been doing)..
That is from `maybe_raise_from_masking_exc()` thus minimizing us to
a single `except BaseException` block with logic branching for the beg
vs. `unmask_from` exc cases.
Also,
- raise val-err when `unmask_from` is not a `tuple`.
- tweak the exc-note warning format.
- drop all pausing from dev work.
Turns out we weren't despite the optional `stream_handler_nursery` input
to `Server.listen_on()`; fail over to the `Server._stream_handler_tn`
allocated during server setup in those cases.
Turns out I didn't read my own internals docs/comments and despite it
not being used previously, this adds the real use case: a root,
per-actor, scope which ensures parent comms are the last conc-thing to
be cancelled.
Also, the impl changes here make the test from 6410e45 (or wtv
it's rebased to) pass, i.e. we can support crash handling in the root
actor despite the root-tn having been (self) cancelled.
Superficial adjustments,
- rename `Actor._service_n` -> `._service_tn` everywhere.
- add asserts to `._runtime.async_main()` which ensure that the any
`.trionics.maybe_open_nursery()` calls against optionally passed
`._[root/service]_tn` are allocated-if-not-provided (the
`._service_tn`-case being an i-guess-prep-for-the-future-anti-pattern
Bp).
- obvi adjust all internal usage to match new naming.
Serious/real-use-case changes,
- add (back) a `Actor._root_tn` which sits a scope "above" the
service-tn and is either,
+ assigned in `._runtime.async_main()` for sub-actors OR,
+ assigned in `._root.open_root_actor()` for the root actor.
**THE primary reason** to keep this "upper" tn is that during
a full-`Actor`-cancellation condition (more details below) we want to
ensure that the IPC connection with a sub-actor's parent is **the last
thing to be cancelled**; this is most simply implemented by ensuring
that the `Actor._parent_chan: .ipc.Channel` is handled in an upper
scope in `_rpc.process_messages()`-subtask-terms.
- for the root actor this `root_tn` is allocated in `.open_root_actor()`
body and assigned as such.
- extend `Actor.cancel_soon()` to be cohesive with this entire teardown
"policy" by scheduling a task in the `._root_tn` which,
* waits for the `._service_tn` to complete and then,
* cancels the `._root_tn.cancel_scope`,
* includes "sclangy" console logging throughout.
Such that we audit the `shield=root_tn.cancel_scope.cancel_called,`
passed to `await debug._maybe_enter_pm()` in the `open_root_actor()`
exit handler block.
I masked it bc it doesn't seem to actually work for the case I was
testing (`emsd` clobbering a `paperboi` in `piker`..) but figured I'd
leave it as a reminder for solving this problem more generally (#320)
since this is likely the place in the code for a soln.
When i tested it in my case it just resulted in a hang around the `with
debug.acquire_debug_lock()` for some reason? Can't remember if the child
ended up being able to REPL without issue though..
Such that we handle them despite a cancellation condition. This is
almost always the case, that `root_tn.cancel_scope.cancel_called` is
set, by the time the `debug._maybe_enter_pm()` hits. Previous I guess we
just weren't actually ever REPL-debugging such cases?
TODO, still needs a test obvi!
That is the `target` can declare a `chan: LinkedTaskChannel` instead of
`to_trio`/`from_aio`.
To support it,
- change `.started()` -> the more appropriate `.started_nowait()` which
can be called sync from the aio child task.
- adjust the `provide_channels` assert to accept either fn sig
declaration (for now).
Still needs test(s) obvi..
Makes the newly added `test_aio_side_raises_before_started` test pass by
ensuring errors raised by any `.to_asyncio.open_channel_from()` spawned
child-`asyncio.Task` are relayed by any caught `trio.EndOfChannel` by
checking for a new `LinkedTaskChannel._closed_by_aio_task: bool`.
Impl deats,
- obvi add `LinkedTaskChannel._closed_by_aio_task: bool = False`
- in `translate_aio_errors()` always check for the new flag on EOC
conditions and in such cases set `chan._trio_to_raise = aio_err` such
that the `trio`-parent-task always raises the child's exception
directly, OW keep original EoC passthrough in place.
- include *very* detailed per-case comments around the extended handler.
- adjust re-raising logic with a new `raise_from` where we only give the
`aio_err` priority if it's not already set as to `trio_to_raise`.
Also,
- hide the `_run_asyncio_task()` frame by def.
Verifying that if any exc is raised pre `chan.send_nowait()` (our
currentlly shite version of a `chan.started()`) then that exc is indeed
raised through on the `trio`-parent task side. This case was reproduced
from a `piker.brokers.ib` issue with a similar embedded
`.trionics.maybe_open_context()` call.
Deats,
- call the suite `test_aio_side_raises_before_started`.
- mk the `@context` simply `maybe_open_context(acm_func=open_channel_from)`
with a `target=raise_before_started` which,
- simply sleeps then immediately raises a RTE.
- expect the RTE from the aio-child-side to propagate all the way up to
the root-actor's task right up through the `trio.run()`.
Been meaning to do this forever and a recent test hang finally drove me
to it Bp
Like it sounds, adopt the "cancel-status" properties on `ActorNursery`
use already on our `Context` and derived from `trio.CancelScope`:
- add new private `._cancel_called` (set in the head of `.cancel()`)
& `._cancelled_caught` (set in the tail) instance vars with matching
read-only `@properties`.
- drop the instance-var and instead delegate a `.cancelled: bool`
property to `._cancel_called` and add a usage deprecation warning
(since removing it breaks a buncha tests).
So i need to either adjust the tests or figure out if/why this is needed
to avoid the crashing in `pikerd` i found when killin the chart during
a long backfill with `binance` backend..
Just like in the BRE case (for UDS) it seems when a peer closes the
(UDS?) socket `trio` instead raises a `ClosedResourceError` which we now
catch and re-raise as a `TransportClosed`. This again results in
`tpt.send()` calls from the rpc-runtime **not** raising when it's known
that the IPC channel is disconnected.
Shift around comments and expressions for better reading, assign
`tpt_closed` for easier introspection from REPL during debug oh and fix
the `MsgpackTransport.pformat()` to render '|_peers: 1' .. XD
Similar to what was just changed for `Context.repr_state`, when the
child task is cancelled but by a different "layer" of the runtime (i.e.
a `Portal.cancel_actor()` / `SIGINT`-to-process canceller) we don't
dump a traceback instead just `log.cancel()` emit.
Such that if the local task hasn't resolved but is `trio.Cancelled` and
a `.canceller` was set, we report a `'actor-cancelled'` from
`.repr_state: str`. Bit of formatting to avoid needless newlines too!
Since it's merely a local-file-sys subdirectory and there should be no
reason file creation conflicts with other bind spaces.
Also add 2 test suites to match,
- `tests/ipc/test_each_tpt::test_uds_bindspace_created_implicitly` to
verify the dir creation when DNE.
- `..test_uds_double_listen_raises_connerr` to ensure a double bind
raises a `ConnectionError` from the src `OSError`.
Using `.get_stream_addrs()` such that we always (*can*) assign the peer
end's PID in the `._raddr`.
Also factor common `ConnectionError` re-raising into
a `_reraise_as_connerr()`-@cm.
Such that the caller can be responsible for their own (nursery) scoping
as needed and, for the latter fn's case with
a `trio.Nursery.CancelStatus.encloses()` check to ensure the `tn` is
a valid parent-ish.
Some deats,
- in `gather_contexts()`, mv the `try/finally` outside the nursery block
to ensure we always do the `parent_exit`.
- for `maybe_open_context()` we do a naive task-tree hierarchy audit to
ensure the provided scope is not *too* child-ish (with what APIs `trio`
gives us, see above), OW go with the old approach of using the actor's
private service nursery.
Also,
* better report `trio.Cancelled` around the cache-miss `yield`
cases and ensure we **never** unmask triggering key-errors.
* report on any stale-state with the mutex in the `finally` block.
Call it `test_lock_not_corrupted_on_fast_cancel()` and includes
a detailed doc string to explain. Implemented it "cleverly" by having
the target `@acm` cancel its parent nursery after a peer, cache-hitting
task, is already waiting on the task mutex release.
Since the `await service_n.start()` on key-err can be cancel-masked
(checkpoint interrupted before `_Cache.run_ctx` completes), we need to
always `lock.release()` in to avoid lock-owner-state corruption and/or
inf-hangs in peer cache-hitting tasks.
Deats,
- add a `try/except/finally` around the key-err triggered cache-miss
`service_n.start(_Cache.run_ctx, ..)` call, reporting on any taskc
and always `finally` unlocking.
- fill out some log msg content and use `.debug()` level.
Here I was thinking the bcaster (usage) maybe required a rework but,
NOPE it's just bc a checkpoint was needed in the parent task owning the
`tn` which spawns `get_sub_and_pull()` tasks to ensure the bg allocated
`an`/portal is eventually cancel-called..
Ah well, at least i started a patch for `MsgStream.subscribe()` to make
it multicast revertible.. XD
Anyway, I tossed in some checks & notes related to all that unnecessary
effort since I do think i'll move forward implementing it:
- for the `cache_hit` case always verify that the `bcast` clone is
unregistered from the common state subs after
`.subscribe().__aexit__()`.
- do a light check that the implicit `MsgStream._broadcaster` is always
the only bcrx instance left-leaked into that state.. that is until
i get the proper de-allocation/reversion from multicast -> unicast
working.
- put in mega detailed note about the required parent-task checkpoint.
Since I recently discovered a very subtle race-case that can sometimes
cause the suite to hang, seemingly due to the `an: ActorNursery`
allocated *behind* the `.trionics.maybe_open_context()` usage; this can
result in never cancelling the 'streamer' subactor despite the `main()`
timeout-guard?
This led me to dig in and find that the underlying issue was 2-fold,
- our `BroadcastReceiver` termination-mgmt semantics in
`MsgStream.subscribe()` can result in the first subscribing task to
always keep the `MsgStream._broadcaster` instance allocated; it's
never `.aclose()`ed, which makes it tough to determine (and thus
trace) when all subscriber-tasks are actually complete and
exited-from-`.subscribe()`..
- i was shield waiting `.ipc._server.Server.wait_for_no_more_peers()` in
`._runtime.async_main()`'s shutdown sequence which would then compound
the issue resulting in a SIGINT-shielded hang.. the worst kind XD
Actual changes here are just styling, printing, and some mucking with
passing the `an`-ref up to the parent task in the root-actor where i was
doing a conditional `ActorNursery.cancel()` to mk sure that was actually
the problem. Presuming this is fixed the `.pause()` i left unmasked
should never hit.
As mentioned in prior testing commit, it can cause the worst kind of
hangs, the SIGINT ignoring kind.. Pretty sure there was never any reason
outside some esoteric multi-actor debugging case, and pretty sure that
already was solved?
Was failing due to the `.fail_after()` timeout being *too short* and
somehow the new interplay of that with strict-exception groups resulting
in the `TooSlowError` never raising but instead an eg with the embedded
`AssertionError`?? I still don't really get it honestly..
I've written up lengthy notes around the different `delay` settings that
can be used to see the diff outcomes, the failing case being the one
i still don't really grok and think is justification for `trio` to
bubble inner `Cancelled`s differently possibly?
For now i've included the original failing case as an `xfail`
parametrization for now which will hopefully drive a follow lowlevel
`trio` test in `test_trioisms`!
Namely `test_empty_mngrs_input_raises()` was failing due to
lazy-iterator use as input to `mngrs` which i guess i added support for
a while back (by it doing a `list(mngrs)` internally)? So just change it
to `gather_contexts(mngrs=())` and also tweak the `trio.fail_after(3)`
since it appears that the prior 1sec was causing
too-fast-of-a-cancellation (before the cluster fully spawned) and thus
the expected `ValueError` never to show..
Also, mask the `tractor.trionics.collapse_eg()` usage (again?) in
`open_actor_cluster()` since it seems unnecessary.
Seems that the way the actor-nursery interacts with the
`.trionics.gather_contexts()` API on cancellation makes our
`.trionics.collapse_eg()` not work as intended?
I need to dig into how `ActorNursery.cancel()` and `.__aexit__()` might
be causing this discrepancy..
Consider this a commit-of-my-index type save for rn.
It was originally this way; I forgot to flip it back when discarding the
`except*` handler impl..
Specially handle the `exc.__cause__` case where we raise from any
detected underlying cause and OW `from None` to suppress the eg's tb.
Since it turns out the semantics are basically inverse of normal
`except` (particularly for re-raising) which is hard to get right, and
bc it's a lot easier to just delegate to what `trio` already has behind
the `strict_exception_groups=False` setting, Bp
I added a rant here which will get removed shortly likely, but i think
going forward recommending against use of `except*` is prudent for
anything low level enough in the runtime (like trying to filter begs).
Dirty deats,
- copy `trio._core._run.collapse_exception_group()` to here with only
a slight mod to remove the notes check and tb concatting for the
collapse case.
- rename `maybe_collapse_eg()` - > `get_collapsed_eg()` and delegate it
directly to the former `trio` fn; return `None` when it returns the
same beg without collapse.
- simplify our own `collapse_eg()` to either raise the collapsed `exc`
or original `beg`.
I dunno what exactly I was thinking but we definitely don't want to
**ever** raise from the original exc-group, instead always raise from
any original `.__cause__` to be consistent with the embedded src-error's
context.
Also, adjust `maybe_collapse_eg()` to return `False` in the non-single
`.exceptions` case, again don't know what I was trying to do but this
simplifies caller logic and the prior return-semantic had no real
value..
This fixes some final usage in the runtime (namely top level nursery
usage in `._root`/`._runtime`) which was previously causing test suite
failures prior to this fix.
Seems to cause the following test suites to fail however..
- 'test_advanced_faults.py::test_ipc_channel_break_during_stream'
- 'test_advanced_faults.py::test_ipc_channel_break_during_stream'
- 'test_clustering.py::test_empty_mngrs_input_raises'
Also tweak some ctxc request logging content.
That is just throughout the core library, not the tests yet. Again, we
simply change over to using our (nearly equivalent?)
`.trionics.collapse_eg()` in place of the already deprecated
`strict_exception_groups=False` flag in the following internals,
- the conc-fan-out tn use in `._discovery.find_actor()`.
- `._portal.open_portal()`'s internal tn used to spawn a bg rpc-msg-loop
task.
- the daemon and "run-in-actor" layered tn pair allocated in
`._supervise._open_and_supervise_one_cancels_all_nursery()`.
The remaining loose-eg usage in `._root` and `._runtime` seem to be
necessary to keep the test suite green?? For the moment these are left
out.
- drop the (never/un)used `.get_chans()`.
- add #TODO for factoring many methods into a new `.rpc`-subsys/pkg
primitive, like an `RPCMngr/Server` type eventually.
- add todo to maybe mv `.get_parent()` elsewhere?
- move masked `._hard_mofo_kill()` to bottom.
Namely inside various bootup-sequences in `._root` and `._runtime`
particularly in the root actor to support both better tpt-address
denoting in our logging and as part of clarifying logic around setting
the root's registry addresses which is soon to be much better factored
out of the core and into an explicit subsystem + API.
Some `_root.open_root_actor()` deats,
- set `registry_addrs` to a new `uw_reg_addrs` (uw: unwrapped) to be
more explicit about wrapped addr types thoughout.
- instead ensure `registry_addrs` are the wrapped types and pass down
into the root `Actor` singleton-instance.
- factor the root-actor check + rt-vars update (updating the `'_root_addrs'`)
out of `._runtime.async_main()` into this fn.
- as previous, set `trans_bind_addrs = uw_reg_addrs` in unwrapped form since it will
be passed down both through rt-vars as `'_root_addrs'` and to
`._runtim.async_main()` as `accept_addrs` (which is then passed to the
IPC server).
- adjust/simplify much logging.
- shield the `await actor.cancel(None) # self cancel` to avoid any
finally-footguns.
- as mentioned convert the
For `_runtime.async_main()` tweaks,
- expect `registry_addrs: list[Address]|None = None` with appropriate
unwrapping prior to setting both `.reg_addrs` and the equiv rt-var.
- add a new `.registry_addrs` prop for the wrapped form.
- convert a final loose-eg for the `service_nursery` to use
`collapse_eg()`.
- simplify teardown report logging.
Mostly just multi-line code styling again: always putting standalone
`'f\n'` on separate LOC so it reads like it renders to console. Oh and
and a level drop to `.runtime()` for rx-msg reports.
Again using `Channel.aid.reprol()`, `.devx.pformat.nest_from_op()` and
converting to multi-line code style an ' for str-report-contents. Tweak
some imports to sub-mod level as well.
Again for nicer console logging. Also fix a double `req_chan` arg bug
when passed to `_invoke` in the `self.cancel()` rt-ep; don't update the
`kwargs: dict` just merge in `req_chan` input at call time.
Including a new one-line `_shutdown_msg: str` which we mod-var-set for
testing usage and some denoising at `.info()` level. Adjust `Actor()`
instantiating input to the new `.registry_addrs` wrapped addrs property.
Mostly adjusting indentation, noise level, and clarity via `.pformat()`
tweaks more general use of `.devx.pformat.nest_from_op()`.
Specific impl deats,
- use `pformat.ppfmt()/`nest_from_op()` more seriously throughout
`._server`.
- add a `._server.Endpoint.pformat()`.
- add `._server.Server.len_peers()` and `.repr_state()`.
- polish `Server.pformat()`.
- drop some redundant `log.runtime()`s from `._serve_ipc_eps()` instead
leaving-them-only/putting-them in the caller pub meth.
- `._tcp.start_listener()` log the bound addr, not the input (which may
be the 0-port.
- only generate a repr in `.from_addr()` when log level is >= 'runtime'.
|_ add a todo about supporting this optimization more generally on our
adapter.
- fix `Channel.pformat()` to show unknown peer field line fmt correctly.
- add a `Channel.maddr: str` which just delegates directly to the
`._transport` like other pass-thru property fields.
To start trying out,
- using in the `Start`-msg handler-block to repr the msg coming
*from* a `repr(Channel)` using '<=)` sclang op.
- for a completed RPC task in `_invoke_non_context()`.
- for the msg loop task's termination report.
Such as the `MsgTransport.stream` and `.drain` attrs since they're
rarely that important at the chan level. Also start adopting
a `.<attr>=` style for actual attrs of the type versus a `<name>:
` style for meta-field info lines.
To simplify `.pformat()` output when the new `privates: bool` is unset
(the default) this adds new public attrs to wrap an actor's
cancellation status as well as provide a `.repr_state: str` (similar to
our equiv on `Context`). Rework `.pformat()` to render a much simplified
repr using all these new refinements.
Further, port the `.cancel()` method to use `.msg.types.Aid` for all
internal `requesting_uid` refs (now renamed with `_aid`) and in all
called downstream methods.
New cancel-state iface deats,
- rename `._cancel_called_by_remote` -> `._cancel_called_by` and expect
it to be set as an `Aid`.
- add `.cancel_complete: bool` which flags whether `.cancel()` ran to
completion.
- add `.cancel_called: bool` which just wraps `._cancel_called` (and
which likely will just be dropped since we already have
`._cancel_called_by`).
- add `.cancel_caller: Aid|None` which wraps `._cancel_called_by`.
In terms of using `Aid` in cancel methods,
- rename vars with `_aid` suffix in `.cancel()` (and wherever else).
- change `.cancel_rpc_tasks()` input param to `req_aid: msgtypes.Aid`.
- do the same for `._cancel_task()` and (for now until we adjust its
internals as well) use the `Aid.uid` remap property when assigning
`Context._canceller`.
- adjust all log msg refs to match obvi.
Providing the legacy `.uid -> tuple` style id (since still used for the
`Actor._contexts` table) and a `repr-one-line` method `.reprol() -> str`
for rendering a compact unique actor ID summary (useful in
logging/.pformat()s at the least).
2025-08-15 16:07:39 -04:00
21 changed files with 345 additions and 152 deletions