Compare commits

..

107 Commits

Author SHA1 Message Date
Tyler Goodlet f7901a73ce WIP, "revertible" or "dynamic" multicast streams
TODO, write up the deats, prolly by distilling (todo) notes from
`tests/test_resource_cache.py::test_open_local_sub_to_stream` comments!
2025-08-20 13:03:57 -04:00
Bd c9a55c2d46
Merge pull request #397 from goodboy/post_mortems
Fix root-actor crash handling despite runtime cancellation
2025-08-20 12:45:06 -04:00
Tyler Goodlet 548855b4f5 Comment/docs tweaks per copilot reivew
Add a micro glossary to clarify questioned terms and refine out some
patch specific comment regions.
2025-08-20 12:36:08 -04:00
Tyler Goodlet 5322861d6d Clean out old-commented tn-opens and ipc-server settings checks 2025-08-20 11:35:31 -04:00
Tyler Goodlet 46a2fa7074 Always pass a `tn` to `._server._serve_ipc_eps()`
Turns out we weren't despite the optional `stream_handler_nursery` input
to `Server.listen_on()`; fail over to the `Server._stream_handler_tn`
allocated during server setup in those cases.
2025-08-20 11:30:58 -04:00
Tyler Goodlet bfe5b2dde6 Hide `collapse_eg()` frame as used from `open_root_actor()` 2025-08-20 10:44:42 -04:00
Tyler Goodlet a9f06df3fb Heh, add back `Actor._root_tn`, it has purpose..
Turns out I didn't read my own internals docs/comments and despite it
not being used previously, this adds the real use case: a root,
per-actor, scope which ensures parent comms are the last conc-thing to
be cancelled.

Also, the impl changes here make the test from 6410e45 (or wtv
it's rebased to) pass, i.e. we can support crash handling in the root
actor despite the root-tn having been (self) cancelled.

Superficial adjustments,
- rename `Actor._service_n` -> `._service_tn` everywhere.
- add asserts to `._runtime.async_main()` which ensure that the any
  `.trionics.maybe_open_nursery()` calls against optionally passed
  `._[root/service]_tn` are allocated-if-not-provided (the
  `._service_tn`-case being an i-guess-prep-for-the-future-anti-pattern
  Bp).
- obvi adjust all internal usage to match new naming.

Serious/real-use-case changes,
- add (back) a `Actor._root_tn` which sits a scope "above" the
  service-tn and is either,
  + assigned in `._runtime.async_main()` for sub-actors OR,
  + assigned in `._root.open_root_actor()` for the root actor.
  **THE primary reason** to keep this "upper" tn is that during
  a full-`Actor`-cancellation condition (more details below) we want to
  ensure that the IPC connection with a sub-actor's parent is **the last
  thing to be cancelled**; this is most simply implemented by ensuring
  that the `Actor._parent_chan: .ipc.Channel` is handled in an upper
  scope in `_rpc.process_messages()`-subtask-terms.
- for the root actor this `root_tn` is allocated in `.open_root_actor()`
  body and assigned as such.
- extend `Actor.cancel_soon()` to be cohesive with this entire teardown
  "policy" by scheduling a task in the `._root_tn` which,
  * waits for the `._service_tn` to complete and then,
  * cancels the `._root_tn.cancel_scope`,
  * includes "sclangy" console logging throughout.
2025-08-20 10:18:52 -04:00
Tyler Goodlet ee32bc433c Add a root-already-cancelled crash handling test
Such that we audit the `shield=root_tn.cancel_scope.cancel_called,`
passed to `await debug._maybe_enter_pm()` in the `open_root_actor()`
exit handler block.
2025-08-20 10:18:52 -04:00
Tyler Goodlet 561954594e Add attempt at non-root-parent REPL guarding
I masked it bc it doesn't seem to actually work for the case I was
testing (`emsd` clobbering a `paperboi` in `piker`..) but figured I'd
leave it as a reminder for solving this problem more generally (#320)
since this is likely the place in the code for a soln.

When i tested it in my case it just resulted in a hang around the `with
debug.acquire_debug_lock()` for some reason? Can't remember if the child
ended up being able to REPL without issue though..
2025-08-19 14:15:14 -04:00
Tyler Goodlet 28a6354e81 Set `shield` when `.cancel_called` for root crashes
Such that we handle them despite a cancellation condition. This is
almost always the case, that `root_tn.cancel_scope.cancel_called` is
set, by the time the `debug._maybe_enter_pm()` hits. Previous I guess we
just weren't actually ever REPL-debugging such cases?

TODO, still needs a test obvi!
2025-08-19 14:14:38 -04:00
Tyler Goodlet d1599449e7 Mk `pause_from_sync()` raise `InternalError` on no `greenback` init 2025-08-19 14:14:27 -04:00
Tyler Goodlet 2d27c94dec Hide `_maybe_enter_pm()` frame (again?) 2025-08-19 14:14:27 -04:00
Tyler Goodlet 6e4c76245b Add LoC pattern matches for `test_post_mortem_api` 2025-08-19 14:14:27 -04:00
Bd a6f599901c
Merge pull request #395 from goodboy/to_asyncio_eoc_signal
`to_asyncio` eoc signal: use `trio.EndOfChannel` to indicate (maybe non-graceful) `asyncio.Task` termination
2025-08-19 12:45:23 -04:00
Tyler Goodlet 0fafd25f0d Comment tweaks per copilot review 2025-08-19 12:33:47 -04:00
Tyler Goodlet b74e93ee55 Change one infected-aio test to use `chan` in fn sig 2025-08-18 22:32:51 -04:00
Tyler Goodlet 961504b657 Support `chan.started_nowait()` in `.open_channel_from()` target
That is the `target` can declare a `chan: LinkedTaskChannel` instead of
`to_trio`/`from_aio`.

To support it,
- change `.started()` -> the more appropriate `.started_nowait()` which
  can be called sync from the aio child task.
- adjust the `provide_channels` assert to accept either fn sig
  declaration (for now).

Still needs test(s) obvi..
2025-08-18 22:32:51 -04:00
Tyler Goodlet bd148300c5 Relay `asyncio` errors via EoC and raise from rent
Makes the newly added `test_aio_side_raises_before_started` test pass by
ensuring errors raised by any `.to_asyncio.open_channel_from()` spawned
child-`asyncio.Task` are relayed by any caught `trio.EndOfChannel` by
checking for a new `LinkedTaskChannel._closed_by_aio_task: bool`.

Impl deats,
- obvi add `LinkedTaskChannel._closed_by_aio_task: bool = False`
- in `translate_aio_errors()` always check for the new flag on EOC
  conditions and in such cases set `chan._trio_to_raise = aio_err` such
  that the `trio`-parent-task always raises the child's exception
  directly, OW keep original EoC passthrough in place.
- include *very* detailed per-case comments around the extended handler.
- adjust re-raising logic with a new `raise_from` where we only give the
  `aio_err` priority if it's not already set as to `trio_to_raise`.

Also,
- hide the `_run_asyncio_task()` frame by def.
2025-08-18 22:32:51 -04:00
Tyler Goodlet 4a7491bda4 Add "raises-pre-started" `open_channel_from()` test
Verifying that if any exc is raised pre `chan.send_nowait()` (our
currentlly shite version of a `chan.started()`) then that exc is indeed
raised through on the `trio`-parent task side. This case was reproduced
from a `piker.brokers.ib` issue with a similar embedded
`.trionics.maybe_open_context()` call.

Deats,
- call the suite `test_aio_side_raises_before_started`.
- mk the `@context` simply `maybe_open_context(acm_func=open_channel_from)`
  with a `target=raise_before_started` which,
- simply sleeps then immediately raises a RTE.
- expect the RTE from the aio-child-side to propagate all the way up to
  the root-actor's task right up through the `trio.run()`.
2025-08-18 22:32:51 -04:00
Bd 62415518fc
Merge pull request #394 from goodboy/nursery_cleaning
A bit of (actor) nursery cleaning
2025-08-18 22:32:19 -04:00
Tyler Goodlet 5c7d930a9a Drop unused `Actor._root_n`.. 2025-08-18 22:16:03 -04:00
Tyler Goodlet c46986504d Switch nursery to `CancelScope`-status properties
Been meaning to do this forever and a recent test hang finally drove me
to it Bp

Like it sounds, adopt the "cancel-status" properties on `ActorNursery`
use already on our `Context` and derived from `trio.CancelScope`:

- add new private `._cancel_called` (set in the head of `.cancel()`)
  & `._cancelled_caught` (set in the tail) instance vars with matching
  read-only `@properties`.

- drop the instance-var and instead delegate a `.cancelled: bool`
  property to `._cancel_called` and add a usage deprecation warning
  (since removing it breaks a buncha tests).
2025-08-18 22:16:03 -04:00
Tyler Goodlet e05a4d3cac Enforce named-args only to `.open_nursery()` 2025-08-18 22:16:03 -04:00
Bd a9aa5ec04e
Merge pull request #392 from goodboy/introspect_ipc
Introspect-ipc: some `.ipc` subpkg iface refinements for reading cancel statuses and `Address.__repr__()`
2025-08-18 22:15:40 -04:00
Tyler Goodlet 5021514a6a Disable shm resource tracker via flag on 3.13+
As per the newly added support,
https://docs.python.org/3/library/multiprocessing.shared_memory.html
2025-08-18 22:04:40 -04:00
Tyler Goodlet 79f502034f Don't hard code runtime-dir, read it with `._state.get_rt_dir()` 2025-08-18 21:30:48 -04:00
Tyler Goodlet 331921f612 Hmm disable CRE case for now, causes test fails
So i need to either adjust the tests or figure out if/why this is needed
to avoid the crashing in `pikerd` i found when killin the chart during
a long backfill with `binance` backend..
2025-08-18 21:30:48 -04:00
Tyler Goodlet df0d00abf4 Translate CRE's due to socket-close to tpt-closed
Just like in the BRE case (for UDS) it seems when a peer closes the
(UDS?) socket `trio` instead raises a `ClosedResourceError` which we now
catch and re-raise as a `TransportClosed`. This again results in
`tpt.send()` calls from the rpc-runtime **not** raising when it's known
that the IPC channel is disconnected.
2025-08-18 21:30:48 -04:00
Tyler Goodlet a72d1e6c48 Multi-line-style up the UDS fast-connect handler
Shift around comments and expressions for better reading, assign
`tpt_closed` for easier introspection from REPL during debug oh and fix
the `MsgpackTransport.pformat()` to render '|_peers: 1' .. XD
2025-08-18 21:30:48 -04:00
Tyler Goodlet 5931c59aef Log "out-of-layer" cancellation in `._rpc._invoke()`
Similar to what was just changed for `Context.repr_state`, when the
child task is cancelled but by a different "layer" of the runtime (i.e.
a `Portal.cancel_actor()` / `SIGINT`-to-process canceller) we don't
dump a traceback instead just `log.cancel()` emit.
2025-08-18 21:30:48 -04:00
Tyler Goodlet ba08052ddf Handle "out-of-layer" remote `Context` cancellation
Such that if the local task hasn't resolved but is `trio.Cancelled` and
a `.canceller` was set, we report a `'actor-cancelled'` from
`.repr_state: str`. Bit of formatting to avoid needless newlines too!
2025-08-18 21:30:48 -04:00
Tyler Goodlet 00112edd58 UDS: implicitly create `Address.bindspace: Path`
Since it's merely a local-file-sys subdirectory and there should be no
reason file creation conflicts with other bind spaces.

Also add 2 test suites to match,
- `tests/ipc/test_each_tpt::test_uds_bindspace_created_implicitly` to
  verify the dir creation when DNE.
- `..test_uds_double_listen_raises_connerr` to ensure a double bind
  raises a `ConnectionError` from the src `OSError`.
2025-08-18 21:30:48 -04:00
Tyler Goodlet 1d706bddda Rm `assert` from `Channel.from_addr()`, for UDS we re-created to extract the peer PID 2025-08-18 21:30:48 -04:00
Tyler Goodlet 3c30c559d5 `ipc._uds`: assign `.l/raddr` in `.connect_to()`
Using `.get_stream_addrs()` such that we always (*can*) assign the peer
end's PID in the `._raddr`.

Also factor common `ConnectionError` re-raising into
a `_reraise_as_connerr()`-@cm.
2025-08-18 21:30:48 -04:00
Tyler Goodlet 599020c2c5 Rename all lingering ctx-side bits
As before but more thoroughly in comments and var names finally changing
all,
- caller -> parent
- callee -> child
2025-08-18 21:30:48 -04:00
Tyler Goodlet 50f6543ee7 Add `Channel.closed/.cancel_called`
I.e. the public properties for the private instance var equivs; improves
expected introspection usage.
2025-08-18 21:30:48 -04:00
Tyler Goodlet c0854fd221 Set `Channel._cancel_called` via `chan` var
In `Portal.cancel_actor()` that is, at the least to make it easier to
ref search from an editor Bp
2025-08-18 21:30:48 -04:00
Tyler Goodlet e875b62869 Add `.ipc._shm` todo-idea for `@actor_fixture` API 2025-08-18 21:30:48 -04:00
Tyler Goodlet 3ab7498893 Add todo for py3.13+ `.shared_memory`'s new `track=False` support.. finally they added it XD 2025-08-18 21:30:48 -04:00
Bd dd041b0a01
Merge pull request #393 from goodboy/trionics_tweaks
Trionics tweaks: some `._mngrs` refinements and fix a `test_resource_cache` hang
2025-08-18 21:20:33 -04:00
Tyler Goodlet 4e252526b5 Accept `tn` to `gather_contexts()/maybe_open_context()`
Such that the caller can be responsible for their own (nursery) scoping
as needed and, for the latter fn's case with
a `trio.Nursery.CancelStatus.encloses()` check to ensure the `tn` is
a valid parent-ish.

Some deats,
- in `gather_contexts()`, mv the `try/finally` outside the nursery block
  to ensure we always do the `parent_exit`.
- for `maybe_open_context()` we do a naive task-tree hierarchy audit to
  ensure the provided scope is not *too* child-ish (with what APIs `trio`
  gives us, see above), OW go with the old approach of using the actor's
  private service nursery.
  Also,
  * better report `trio.Cancelled` around the cache-miss `yield`
    cases and ensure we **never** unmask triggering key-errors.
  * report on any stale-state with the mutex in the `finally` block.
2025-08-18 21:07:12 -04:00
Tyler Goodlet 4ba3590450 Add `.trionics.maybe_open_context()` locking test
Call it `test_lock_not_corrupted_on_fast_cancel()` and includes
a detailed doc string to explain. Implemented it "cleverly" by having
the target `@acm` cancel its parent nursery after a peer, cache-hitting
task, is already waiting on the task mutex release.
2025-08-18 21:07:12 -04:00
Tyler Goodlet f1ff79a4e6 Always `finally` invoke cache-miss `lock.release()`s
Since the `await service_n.start()` on key-err can be cancel-masked
(checkpoint interrupted before `_Cache.run_ctx` completes), we need to
always `lock.release()` in to avoid lock-owner-state corruption and/or
inf-hangs in peer cache-hitting tasks.

Deats,
- add a `try/except/finally` around the key-err triggered cache-miss
  `service_n.start(_Cache.run_ctx, ..)` call, reporting on any taskc
  and always `finally` unlocking.
- fill out some log msg content and use `.debug()` level.
2025-08-18 21:07:12 -04:00
Tyler Goodlet 70664b98de Well then, I guess it just needed, a checkpoint XD
Here I was thinking the bcaster (usage) maybe required a rework but,
NOPE it's just bc a checkpoint was needed in the parent task owning the
`tn` which spawns `get_sub_and_pull()` tasks to ensure the bg allocated
`an`/portal is eventually cancel-called..

Ah well, at least i started a patch for `MsgStream.subscribe()` to make
it multicast revertible.. XD

Anyway, I tossed in some checks & notes related to all that unnecessary
effort since I do think i'll move forward implementing it:
- for the `cache_hit` case always verify that the `bcast` clone is
  unregistered from the common state subs after
  `.subscribe().__aexit__()`.
- do a light check that the implicit `MsgStream._broadcaster` is always
  the only bcrx instance left-leaked into that state.. that is until
  i get the proper de-allocation/reversion from multicast -> unicast
  working.
- put in mega detailed note about the required parent-task checkpoint.
2025-08-18 21:07:12 -04:00
Tyler Goodlet 1c425cbd22 Tool-up `test_resource_cache.test_open_local_sub_to_stream`
Since I recently discovered a very subtle race-case that can sometimes
cause the suite to hang, seemingly due to the `an: ActorNursery`
allocated *behind* the `.trionics.maybe_open_context()` usage; this can
result in never cancelling the 'streamer' subactor despite the `main()`
timeout-guard?

This led me to dig in and find that the underlying issue was 2-fold,

- our `BroadcastReceiver` termination-mgmt semantics in
  `MsgStream.subscribe()` can result in the first subscribing task to
  always keep the `MsgStream._broadcaster` instance allocated; it's
  never `.aclose()`ed, which makes it tough to determine (and thus
  trace) when all subscriber-tasks are actually complete and
  exited-from-`.subscribe()`..

- i was shield waiting `.ipc._server.Server.wait_for_no_more_peers()` in
  `._runtime.async_main()`'s shutdown sequence which would then compound
  the issue resulting in a SIGINT-shielded hang.. the worst kind XD

Actual changes here are just styling, printing, and some mucking with
passing the `an`-ref up to the parent task in the root-actor where i was
doing a conditional `ActorNursery.cancel()` to mk sure that was actually
the problem. Presuming this is fixed the `.pause()` i left unmasked
should never hit.
2025-08-18 21:07:06 -04:00
Tyler Goodlet edc2211444 Go multi-line-style tuples in `maybe_enter_context()`
Allows for an inline comment of the first "cache hit" bool element.
2025-08-18 20:55:18 -04:00
Bd b05abea51e
Merge pull request #390 from goodboy/strict_egs_everywhere
Strict egs everywhere: drop use of `strict_exception_groups=False` throughout!
2025-08-18 14:15:49 -04:00
Tyler Goodlet 88c1c083bd Add timeout to inf-streamer test 2025-08-18 13:31:15 -04:00
Tyler Goodlet b096867d40 Remove lingering seg=False-flags from tests 2025-08-18 12:03:32 -04:00
Tyler Goodlet a3c9822602 Remove lingering seg=False-flags from examples 2025-08-18 12:03:10 -04:00
Tyler Goodlet e3a542f2b5 Never shield-wait `ipc_server.wait_for_no_more_peers()`
As mentioned in prior testing commit, it can cause the worst kind of
hangs, the SIGINT ignoring kind.. Pretty sure there was never any reason
outside some esoteric multi-actor debugging case, and pretty sure that
already was solved?
2025-08-18 10:46:37 -04:00
Tyler Goodlet 0ffcea1033 Adjust `test_trio_prestarted_task_bubbles()` suite to expect non-eg raises 2025-08-18 10:46:37 -04:00
Tyler Goodlet a7bdf0486c Styling tweaks to quadruple streaming test fn 2025-08-18 10:46:37 -04:00
Tyler Goodlet d2ac9ecf95 Resolve `test_cancel_while_childs_child_in_sync_sleep`
Was failing due to the `.fail_after()` timeout being *too short* and
somehow the new interplay of that with strict-exception groups resulting
in the `TooSlowError` never raising but instead an eg with the embedded
`AssertionError`?? I still don't really get it honestly..

I've written up lengthy notes around the different `delay` settings that
can be used to see the diff outcomes, the failing case being the one
i still don't really grok and think is justification for `trio` to
bubble inner `Cancelled`s differently possibly?

For now i've included the original failing case as an `xfail`
parametrization for now which will hopefully drive a follow lowlevel
`trio` test in `test_trioisms`!
2025-08-18 10:46:37 -04:00
Tyler Goodlet dcb1062bb8 Fix cluster suite, chng to new `gather_contexts()`
Namely `test_empty_mngrs_input_raises()` was failing due to
lazy-iterator use as input to `mngrs` which i guess i added support for
a while back (by it doing a `list(mngrs)` internally)? So just change it
to `gather_contexts(mngrs=())` and also tweak the `trio.fail_after(3)`
since it appears that the prior 1sec was causing
too-fast-of-a-cancellation (before the cluster fully spawned) and thus
the expected `ValueError` never to show..

Also, mask the `tractor.trionics.collapse_eg()` usage (again?) in
`open_actor_cluster()` since it seems unnecessary.
2025-08-18 10:46:37 -04:00
Tyler Goodlet 05d865c0f1 WIP tinkering with strict-eg-tns and cluster API
Seems that the way the actor-nursery interacts with the
`.trionics.gather_contexts()` API on cancellation makes our
`.trionics.collapse_eg()` not work as intended?

I need to dig into how `ActorNursery.cancel()` and `.__aexit__()` might
be causing this discrepancy..

Consider this a commit-of-my-index type save for rn.
2025-08-18 10:46:37 -04:00
Tyler Goodlet 8218f0f51f Bit of multi-line styling / name tweaks in cancellation suites 2025-08-18 10:46:37 -04:00
Tyler Goodlet 8f19f5d3a8 Mk temp collapser bp work outside runtime as well.. 2025-08-18 10:46:37 -04:00
Tyler Goodlet 64c27a914b Add temp breakpoint support to `collapse_eg()` 2025-08-18 10:46:37 -04:00
Tyler Goodlet d9c8d543b3 Suppress beg tbs from `collapse_eg()`
It was originally this way; I forgot to flip it back when discarding the
`except*` handler impl..

Specially handle the `exc.__cause__` case where we raise from any
detected underlying cause and OW `from None` to suppress the eg's tb.
2025-08-18 10:46:37 -04:00
Tyler Goodlet 048b154f00 Rework `collapse_eg()` to NOT use `except*`..
Since it turns out the semantics are basically inverse of normal
`except` (particularly for re-raising) which is hard to get right, and
bc it's a lot easier to just delegate to what `trio` already has behind
the `strict_exception_groups=False` setting, Bp

I added a rant here which will get removed shortly likely, but i think
going forward recommending against use of `except*` is prudent for
anything low level enough in the runtime (like trying to filter begs).

Dirty deats,
- copy `trio._core._run.collapse_exception_group()` to here with only
  a slight mod to remove the notes check and tb concatting for the
  collapse case.
- rename `maybe_collapse_eg()` - > `get_collapsed_eg()` and delegate it
  directly to the former `trio` fn; return `None` when it returns the
  same beg without collapse.
- simplify our own `collapse_eg()` to either raise the collapsed `exc`
  or original `beg`.
2025-08-18 10:46:37 -04:00
Tyler Goodlet 88828e9f99 Couple more `._root` logging tweaks.. 2025-08-18 10:46:37 -04:00
Tyler Goodlet 25ff195c17 Use collapser around `root_tn` in `async_main()`
Replacing yet another loose-eg-flag. Also toss in a todo to maybe use
the unmasker around the `open_root_actor()` body.
2025-08-18 10:46:37 -04:00
Tyler Goodlet f60cc646ff Facepalm, fix `raise from` in `collapse_eg()`
I dunno what exactly I was thinking but we definitely don't want to
**ever** raise from the original exc-group, instead always raise from
any original `.__cause__` to be consistent with the embedded src-error's
context.

Also, adjust `maybe_collapse_eg()` to return `False` in the non-single
`.exceptions` case, again don't know what I was trying to do but this
simplifies caller logic and the prior return-semantic had no real
value..

This fixes some final usage in the runtime (namely top level nursery
usage in `._root`/`._runtime`) which was previously causing test suite
failures prior to this fix.
2025-08-18 10:46:37 -04:00
Tyler Goodlet a2b754b5f5 Just import `._runtime` ns in `._root`; be a bit more explicit 2025-08-18 10:46:37 -04:00
Tyler Goodlet 5e13588aed Use collapse in `._root.open_root_actor()` too
Seems to add one more cancellation suite failure as well as now cause
the discovery test to error instead of fail?
2025-08-18 10:46:37 -04:00
Tyler Goodlet 0a56f40bab Use collapser around root tn in `.async_main()`
Seems to cause the following test suites to fail however..

- 'test_advanced_faults.py::test_ipc_channel_break_during_stream'
- 'test_advanced_faults.py::test_ipc_channel_break_during_stream'
- 'test_clustering.py::test_empty_mngrs_input_raises'

Also tweak some ctxc request logging content.
2025-08-18 10:46:37 -04:00
Tyler Goodlet f776c47cb4 Drop msging-err patt from `subactor_breakpoint` ex
Since the `bdb` module was added to the namespace lookup set in
`._exceptions.get_err_type()` we can now relay a RAE-boxed
`bdb.BdbQuit`.
2025-08-18 10:46:37 -04:00
Tyler Goodlet 7f584d4f54 Switch to strict-eg nurseries almost everywhere
That is just throughout the core library, not the tests yet. Again, we
simply change over to using our (nearly equivalent?)
`.trionics.collapse_eg()` in place of the already deprecated
`strict_exception_groups=False` flag in the following internals,
- the conc-fan-out tn use in `._discovery.find_actor()`.
- `._portal.open_portal()`'s internal tn used to spawn a bg rpc-msg-loop
  task.
- the daemon and "run-in-actor" layered tn pair allocated in
  `._supervise._open_and_supervise_one_cancels_all_nursery()`.

The remaining loose-eg usage in `._root` and `._runtime` seem to be
necessary to keep the test suite green?? For the moment these are left
out.
2025-08-18 10:46:37 -04:00
Tyler Goodlet d650dda0fa Use collapser in rent side of `Context` 2025-08-18 10:46:37 -04:00
Tyler Goodlet f6598e8400 Add some tooling params to `collapse_eg()` 2025-08-18 10:46:37 -04:00
Bd 59822ff093
Merge pull request #389 from goodboy/better_reprs
Better `repr()`s: more console friendly reprentations of internal primitives
2025-08-16 17:20:02 -04:00
Tyler Goodlet ca427aec7e More prep-to-reduce the `Actor` method-iface
- drop the (never/un)used `.get_chans()`.
- add #TODO for factoring many methods into a new `.rpc`-subsys/pkg
  primitive, like an `RPCMngr/Server` type eventually.
- add todo to maybe mv `.get_parent()` elsewhere?
- move masked `._hard_mofo_kill()` to bottom.
2025-08-16 17:06:23 -04:00
Tyler Goodlet f53aa992af .log: expose `at_least_level()` as `StackLevelAdapter` meth 2025-08-15 17:29:22 -04:00
Tyler Goodlet 69e0afccf0 Use `Address` where possible in (root) actor boot
Namely inside various bootup-sequences in `._root` and `._runtime`
particularly in the root actor to support both better tpt-address
denoting in our logging and as part of clarifying logic around setting
the root's registry addresses which is soon to be much better factored
out of the core and into an explicit subsystem + API.

Some `_root.open_root_actor()` deats,
- set `registry_addrs` to a new `uw_reg_addrs` (uw: unwrapped) to be
  more explicit about wrapped addr types thoughout.
- instead ensure `registry_addrs` are the wrapped types and pass down
  into the root `Actor` singleton-instance.
- factor the root-actor check + rt-vars update (updating the `'_root_addrs'`)
  out of `._runtime.async_main()` into this fn.
- as previous, set `trans_bind_addrs = uw_reg_addrs` in unwrapped form since it will
  be passed down both through rt-vars as `'_root_addrs'` and to
  `._runtim.async_main()` as `accept_addrs` (which is then passed to the
  IPC server).
- adjust/simplify much logging.
- shield the `await actor.cancel(None)  # self cancel` to avoid any
  finally-footguns.
- as mentioned convert the

For `_runtime.async_main()` tweaks,
- expect `registry_addrs: list[Address]|None = None` with appropriate
  unwrapping prior to setting both `.reg_addrs` and the equiv rt-var.
- add a new `.registry_addrs` prop for the wrapped form.
- convert a final loose-eg for the `service_nursery` to use
  `collapse_eg()`.
- simplify teardown report logging.
2025-08-15 17:29:10 -04:00
Tyler Goodlet e275c49b23 Stackscope import fail msg dun need braces.. 2025-08-15 16:34:03 -04:00
Tyler Goodlet 48fbf38c1d Drop duplicated (masked) debugging-`terminate_after`, prolly a rebase slip.. 2025-08-15 16:33:31 -04:00
Tyler Goodlet defd6e28d2 Facepalm, actually use `.log.cancel()`-level to report parent-side taskc.. 2025-08-15 16:31:52 -04:00
Tyler Goodlet 414b0e2bae Update buncha log msg fmting in `.msg._ops`
Mostly just multi-line code styling again: always putting standalone
`'f\n'` on separate LOC so it reads like it renders to console. Oh and
and a level drop to `.runtime()` for rx-msg reports.
2025-08-15 16:30:10 -04:00
Tyler Goodlet d34fb54f7c Update buncha log msg fmting in `._spawn`
Again using `Channel.aid.reprol()`, `.devx.pformat.nest_from_op()` and
 converting to multi-line code style an ' for str-report-contents. Tweak
 some imports to sub-mod level as well.
2025-08-15 16:29:17 -04:00
Tyler Goodlet 5d87f63377 Update buncha log msg fmting in `._portal`
Namely to use `Channel.aid.reprol()` and converting to our newer style
multi-line code style for str-reports.
2025-08-15 16:29:11 -04:00
Tyler Goodlet 0ca3d50602 Use `._supervise._shutdown_msg` in tooling test 2025-08-15 16:29:05 -04:00
Tyler Goodlet 8880a80e3e Use `nest_from_op()`/`pretty_struct` in `._rpc`
Again for nicer console logging. Also fix a double `req_chan` arg bug
when passed to `_invoke` in the `self.cancel()` rt-ep; don't update the
`kwargs: dict` just merge in `req_chan` input at call time.
2025-08-15 16:28:46 -04:00
Tyler Goodlet 7be713ee1e Use `nest_from_op()` in actor-nursery shutdown
Including a new one-line `_shutdown_msg: str` which we mod-var-set for
testing usage and some denoising at `.info()` level. Adjust `Actor()`
instantiating input to the new `.registry_addrs` wrapped addrs property.
2025-08-15 16:28:30 -04:00
Tyler Goodlet 4bd8211abb Add #TODO for `._context` to use `.msg.Aid` 2025-08-15 16:24:35 -04:00
Tyler Goodlet a23a98886c Even more `.ipc.*` repr refinements
Mostly adjusting indentation, noise level, and clarity via `.pformat()`
tweaks more general use of `.devx.pformat.nest_from_op()`.

Specific impl deats,
- use `pformat.ppfmt()/`nest_from_op()` more seriously throughout
  `._server`.
- add a `._server.Endpoint.pformat()`.
- add `._server.Server.len_peers()` and `.repr_state()`.
- polish `Server.pformat()`.
- drop some redundant `log.runtime()`s from `._serve_ipc_eps()` instead
  leaving-them-only/putting-them in the caller pub meth.
- `._tcp.start_listener()` log the bound addr, not the input (which may
  be the 0-port.
2025-08-15 16:24:27 -04:00
Tyler Goodlet 31544c862c More `.ipc.Channel`-repr related tweaks
- only generate a repr in `.from_addr()` when log level is >= 'runtime'.
 |_ add a todo about supporting this optimization more generally on our
   adapter.
- fix `Channel.pformat()` to show unknown peer field line fmt correctly.
- add a `Channel.maddr: str` which just delegates directly to the
  `._transport` like other pass-thru property fields.
2025-08-15 16:24:22 -04:00
Tyler Goodlet 7d320c4e1e Mk `Aid` hashable, use pretty-`.__repr__()`
Hash on the `.uuid: str` and delegate verbatim to
`msg.pretty_struct.Struct`'s equiv method.
2025-08-15 16:24:15 -04:00
Tyler Goodlet 38944ad1d2 Drop `actor_info: str` from `._entry` logs 2025-08-15 16:24:06 -04:00
Tyler Goodlet 9260909fe1 Try `nest_from_op()` in some `._rpc` spots
To start trying out,
- using in the `Start`-msg handler-block to repr the msg coming
  *from* a `repr(Channel)` using '<=)` sclang op.
- for a completed RPC task in `_invoke_non_context()`.
- for the msg loop task's termination report.
2025-08-15 16:23:59 -04:00
Tyler Goodlet c00b3c86ea Hide more `Channel._transport` privates for repr
Such as the `MsgTransport.stream` and `.drain` attrs since they're
rarely that important at the chan level. Also start adopting
a `.<attr>=` style for actual attrs of the type versus a `<name>:
` style for meta-field info lines.
2025-08-15 16:23:54 -04:00
Tyler Goodlet 808a336508 Refine `Actor` status iface, use `Aid` throughout
To simplify `.pformat()` output when the new `privates: bool` is unset
(the default) this adds new public attrs to wrap an actor's
cancellation status as well as provide a `.repr_state: str` (similar to
our equiv on `Context`). Rework `.pformat()` to render a much simplified
repr using all these new refinements.

Further, port the `.cancel()` method to use `.msg.types.Aid` for all
internal `requesting_uid` refs (now renamed with `_aid`) and in all
called downstream methods.

New cancel-state iface deats,
- rename `._cancel_called_by_remote` -> `._cancel_called_by` and expect
  it to be set as an `Aid`.
- add `.cancel_complete: bool` which flags whether `.cancel()` ran to
  completion.
- add `.cancel_called: bool` which just wraps `._cancel_called` (and
  which likely will just be dropped since we already have
  `._cancel_called_by`).
- add `.cancel_caller: Aid|None` which wraps `._cancel_called_by`.

In terms of using `Aid` in cancel methods,
- rename vars with `_aid` suffix in `.cancel()` (and wherever else).
- change `.cancel_rpc_tasks()` input param to `req_aid: msgtypes.Aid`.
- do the same for `._cancel_task()` and (for now until we adjust its
  internals as well) use the `Aid.uid` remap property when assigning
  `Context._canceller`.
- adjust all log msg refs to match obvi.
2025-08-15 16:08:53 -04:00
Tyler Goodlet 679d999185 Add flag to toggle private vars in `Channel.pformat()`
Call it `privates: bool` and only show certain internal instance vars
when set in the `repr()` output.
2025-08-15 16:07:39 -04:00
Tyler Goodlet a8428d7de3 Extend `.msg.types.Aid` method interface
Providing the legacy `.uid -> tuple` style id (since still used for the
`Actor._contexts` table) and a `repr-one-line` method `.reprol() -> str`
for rendering a compact unique actor ID summary (useful in
logging/.pformat()s at the least).
2025-08-15 16:07:39 -04:00
Tyler Goodlet e9f2fecd66 Fix `nest_from_op()` call sigs, already changed upstream
In `._runtime/_root` and since the latest fn-signature changes were
already landed onto main branch via the 65b7956: #384-patch.
2025-07-18 00:35:35 -04:00
Tyler Goodlet 547cf5a210 Drop stale comment from inter-peer suite 2025-07-18 00:35:35 -04:00
Tyler Goodlet b5e3fa7370 Use `nest_from_op()` in some runtime logs for actor-state-repring 2025-07-18 00:35:35 -04:00
Bd cd16748598
Merge pull request #387 from goodboy/the_finally_footgun
Coping with "`finally` footguns": avoiding `trio.Cancelled` exc masking as best we can..
2025-07-17 22:33:33 -04:00
Tyler Goodlet 1af35f8170 Add back loose-tn in `gather_contexts()`, mk tests green 2025-07-16 18:18:34 -04:00
Tyler Goodlet 4569d11052 Move `.is_multi_cancelled()` to `.trioniics._beg`
Since it's for beg filtering, the current impl should be renamed anyway;
it's not just for filtering cancelled excs.

Deats,
- added a real doc string, links to official eg docs and fixed the
  return typing.
- adjust all internal imports to match.
2025-07-16 15:49:18 -04:00
Tyler Goodlet 6ba76ab700 .trionics: link in `finally`-footgun `trio` GH ish 2025-07-15 07:23:21 -04:00
Tyler Goodlet 734dda35e9 Hide `._rpc._errors_relayed_via_ipc()` frame by def 2025-07-15 07:23:21 -04:00
Tyler Goodlet b7e04525cc Always `Cancelled`-unmask ctx endpoint excs
To resolve the recently added and failing
`test_remote_exc_relay::test_unmasked_remote_exc`: never allow
`trio.Cancelled` to mask an underlying user-code exception, ever.

Our first real-world (runtime internal) use case for the new
`.trionics.maybe_raise_from_masking_exc()` such that the failing
test now passes with an properly relayed remote RTE unmasking B)

Details,
- flip the `Context._scope_nursery` to the default strict-eg behaviour
  and instead stack its outer scope with a `.trionics.collapse_eg()`.
- wrap the inner-most scope (after `msgops.maybe_limit_plds()`) with
  a `maybe_raise_from_masking_exc()` to ensure user-code errors are
  never masked by `trio.Cancelled`s.

Some err-reporting refinement,
- always capture any `scope_err` from the entire block for debug
  purposes; report it in the `finally` block's log.
- always capture any suppressed `maybe_re`, output from
  `ctx.maybe_raise()`, and `log.cancel()` report it.
2025-07-15 07:23:21 -04:00
Tyler Goodlet 35977dcebb Adjust ep-masking-suite for the real-use-case
Namely that the more common-and-pertinent case is when
a `@context`-ep-fn contains the `finally`-footgun but without
a surrounding embedded `tn` (which currently still requires its own
scope embedded `trionics.maybe_raise_from_masking_exc()`) which can't
be compensated-for by `._rpc._invoke()` easily. Instead the test is
composed where the `._invoke()`-internal `tn` is the machinery being
addressed in terms of masking user-code excs with `trio.Cancelled`.

Deats,
- rename the test -> `test_unmasked_remote_exc` to reflect what the
  runtime should actually be addressing/solving.
- drop the embedded `tn` from `sleep_n_chkpt_in_finally()` (for now)
  since that case can't currently easily be addressed without the user
  code using its own `trionics.maybe_raise_from_masking_exc()` inside
  the nursery scope.
- as such drop all `tn` related params/logic/usage from the ep.
- add in a `Cancelled` handler block which checks for RTE masking and
  always prints the occurrence loudly.

Follow up,
- obvi this suite will currently fail until the appropriate adjustment
  is made to `._rpc._invoke()` to do the unmasking; coming next.
- we probably still need a case with an embedded user `tn` where if
  the default strict-eg mode is used then a ctxc from the parent might
  cause a non-graceful `Context.cancel()` outcome?
 |_since the embedded user-`tn` will raise
   `ExceptionGroup[trio.Cancelled]` upward despite the parent nursery's
   scope being the canceller, or will a `collapse_eg()` inside the
   `._invoke()` scope handle this as well?
2025-07-15 07:23:21 -04:00
Tyler Goodlet e1f26f9611 Extend `._taskc.maybe_raise_from_masking_exc()`
To handle captured non-egs (when the now optional `tn` isn't provided)
as well as yield up a `BoxedMaybeException` which contains any detected
and un-masked `exc_ctx` as its `.value`.

Also add some additional tooling,
- a `raise_unmasked: bool` toggle for when the caller just wants to
  report the masked exc and not raise-it-in-place of the masker.
- `extra_note: str` which by default is tuned to the default
  `unmask_from = (trio.Cancelled,)` but which can be used to deliver
  custom exception msg content.
- `always_warn_on: tuple[BaseException]` which will always emit
  a warning log of what would have been the raised-in-place-of
  `ctx_exc`'s msg for special cases where you want to report
  a masking case that might not be otherwise noticed by the runtime
  (cough like a `Cancelled` masking another `Cancelled) but which
  you'd still like to warn the caller about.
- factor out the masked-`ext_ctx` predicate logic into
  a `find_masked_excs()` and also use it for non-eg cases.

Still maybe todo?
- rewrapping multiple masked sub-excs in an eg back into an eg? left in
  #TODOs and a pause-point where applicable.
2025-07-15 07:23:21 -04:00
Tyler Goodlet 63c5b7696a Mv `maybe_raise_from_masking_exc()` to `.trionics`
Factor the `@acm`-closure it out of the
`test_trioisms::test_acm_embedded_nursery_propagates_enter_err` suite
for real use internally.
2025-07-15 07:23:21 -04:00
Tyler Goodlet 5f94f52226 Add ctx-ep suite for `trio`'s *finally-footgun*
Deats are documented within, but basically a subtlety we already track
with `trio`'s masking of excs by a checkpoint-in-`finally` can cause
compounded issues with our `@context` endpoints, mostly in terms of
remote error and cancel-ack relay semantics.
2025-07-15 07:23:21 -04:00
36 changed files with 1389 additions and 413 deletions

View File

@ -16,6 +16,7 @@ from tractor import (
ContextCancelled,
MsgStream,
_testing,
trionics,
)
import trio
import pytest
@ -62,9 +63,8 @@ async def recv_and_spawn_net_killers(
await ctx.started()
async with (
ctx.open_stream() as stream,
trio.open_nursery(
strict_exception_groups=False,
) as tn,
trionics.collapse_eg(),
trio.open_nursery() as tn,
):
async for i in stream:
print(f'child echoing {i}')

View File

@ -0,0 +1,35 @@
import trio
import tractor
async def main():
async with tractor.open_root_actor(
debug_mode=True,
loglevel='cancel',
) as _root:
# manually trigger self-cancellation and wait
# for it to fully trigger.
_root.cancel_soon()
await _root._cancel_complete.wait()
print('root cancelled')
# now ensure we can still use the REPL
try:
await tractor.pause()
except trio.Cancelled as _taskc:
assert (root_cs := _root._root_tn.cancel_scope).cancel_called
# NOTE^^ above logic but inside `open_root_actor()` and
# passed to the `shield=` expression is effectively what
# we're testing here!
await tractor.pause(shield=root_cs.cancel_called)
# XXX, if shield logic *is wrong* inside `open_root_actor()`'s
# crash-handler block this should never be interacted,
# instead `trio.Cancelled` would be bubbled up: the original
# BUG.
assert 0
if __name__ == '__main__':
trio.run(main)

View File

@ -23,9 +23,8 @@ async def main():
modules=[__name__]
) as portal_map,
trio.open_nursery(
strict_exception_groups=False,
) as tn,
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
):
for (name, portal) in portal_map.items():

View File

@ -1,8 +1,8 @@
"""
That "native" debug mode better work!
All these tests can be understood (somewhat) by running the equivalent
`examples/debugging/` scripts manually.
All these tests can be understood (somewhat) by running the
equivalent `examples/debugging/` scripts manually.
TODO:
- none of these tests have been run successfully on windows yet but
@ -925,6 +925,7 @@ def test_post_mortem_api(
"<Task 'name_error'",
"NameError",
"('child'",
'getattr(doggypants)', # exc-LoC
]
)
if ctlc:
@ -941,8 +942,8 @@ def test_post_mortem_api(
"<Task '__main__.main'",
"('root'",
"NameError",
"tractor.post_mortem()",
"src_uid=('child'",
"tractor.post_mortem()", # in `main()`-LoC
]
)
if ctlc:
@ -960,6 +961,10 @@ def test_post_mortem_api(
"('root'",
"NameError",
"src_uid=('child'",
# raising line in `main()` but from crash-handling
# in `tractor.open_nursery()`.
'async with p.open_context(name_error) as (ctx, first):',
]
)
if ctlc:
@ -1151,6 +1156,54 @@ def test_ctxep_pauses_n_maybe_ipc_breaks(
)
def test_crash_handling_within_cancelled_root_actor(
spawn: PexpectSpawner,
):
'''
Ensure that when only a root-actor is started via `open_root_actor()`
we can crash-handle in debug-mode despite self-cancellation.
More-or-less ensures we conditionally shield the pause in
`._root.open_root_actor()`'s `await debug._maybe_enter_pm()`
call.
'''
child = spawn('root_self_cancelled_w_error')
child.expect(PROMPT)
assert_before(
child,
[
"Actor.cancel_soon()` was called!",
"root cancelled",
_pause_msg,
"('root'", # actor name
]
)
child.sendline('c')
child.expect(PROMPT)
assert_before(
child,
[
_crash_msg,
"('root'", # actor name
"AssertionError",
"assert 0",
]
)
child.sendline('c')
child.expect(EOF)
assert_before(
child,
[
"AssertionError",
"assert 0",
]
)
# TODO: better error for "non-ideal" usage from the root actor.
# -[ ] if called from an async scope emit a message that suggests
# using `await tractor.pause()` instead since it's less overhead

View File

@ -0,0 +1,114 @@
'''
Unit-ish tests for specific IPC transport protocol backends.
'''
from __future__ import annotations
from pathlib import Path
import pytest
import trio
import tractor
from tractor import (
Actor,
_state,
_addr,
)
@pytest.fixture
def bindspace_dir_str() -> str:
rt_dir: Path = tractor._state.get_rt_dir()
bs_dir: Path = rt_dir / 'doggy'
bs_dir_str: str = str(bs_dir)
assert not bs_dir.is_dir()
yield bs_dir_str
# delete it on suite teardown.
# ?TODO? should we support this internally
# or is leaking it ok?
if bs_dir.is_dir():
bs_dir.rmdir()
def test_uds_bindspace_created_implicitly(
debug_mode: bool,
bindspace_dir_str: str,
):
registry_addr: tuple = (
f'{bindspace_dir_str}',
'registry@doggy.sock',
)
bs_dir_str: str = registry_addr[0]
# XXX, ensure bindspace-dir DNE beforehand!
assert not Path(bs_dir_str).is_dir()
async def main():
async with tractor.open_nursery(
enable_transports=['uds'],
registry_addrs=[registry_addr],
debug_mode=debug_mode,
) as _an:
# XXX MUST be created implicitly by
# `.ipc._uds.start_listener()`!
assert Path(bs_dir_str).is_dir()
root: Actor = tractor.current_actor()
assert root.is_registrar
assert registry_addr in root.reg_addrs
assert (
registry_addr
in
_state._runtime_vars['_registry_addrs']
)
assert (
_addr.wrap_address(registry_addr)
in
root.registry_addrs
)
trio.run(main)
def test_uds_double_listen_raises_connerr(
debug_mode: bool,
bindspace_dir_str: str,
):
registry_addr: tuple = (
f'{bindspace_dir_str}',
'registry@doggy.sock',
)
async def main():
async with tractor.open_nursery(
enable_transports=['uds'],
registry_addrs=[registry_addr],
debug_mode=debug_mode,
) as _an:
# runtime up
root: Actor = tractor.current_actor()
from tractor.ipc._uds import (
start_listener,
UDSAddress,
)
ya_bound_addr: UDSAddress = root.registry_addrs[0]
try:
await start_listener(
addr=ya_bound_addr,
)
except ConnectionError as connerr:
assert type(src_exc := connerr.__context__) is OSError
assert 'Address already in use' in src_exc.args
# complete, exit test.
else:
pytest.fail('It dint raise a connerr !?')
trio.run(main)

View File

@ -313,9 +313,8 @@ async def inf_streamer(
# `trio.EndOfChannel` doesn't propagate directly to the above
# .open_stream() parent, resulting in it also raising instead
# of gracefully absorbing as normal.. so how to handle?
trio.open_nursery(
strict_exception_groups=False,
) as tn,
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
):
async def close_stream_on_sentinel():
async for msg in stream:

View File

@ -236,7 +236,10 @@ async def stream_forever():
async def test_cancel_infinite_streamer(start_method):
# stream for at most 1 seconds
with trio.move_on_after(1) as cancel_scope:
with (
trio.fail_after(4),
trio.move_on_after(1) as cancel_scope
):
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'donny',
@ -284,20 +287,32 @@ async def test_cancel_infinite_streamer(start_method):
],
)
@tractor_test
async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
"""Verify a subset of failed subactors causes all others in
async def test_some_cancels_all(
num_actors_and_errs: tuple,
start_method: str,
loglevel: str,
):
'''
Verify a subset of failed subactors causes all others in
the nursery to be cancelled just like the strategy in trio.
This is the first and only supervisory strategy at the moment.
"""
num_actors, first_err, err_type, ria_func, da_func = num_actors_and_errs
'''
(
num_actors,
first_err,
err_type,
ria_func,
da_func,
) = num_actors_and_errs
try:
async with tractor.open_nursery() as n:
async with tractor.open_nursery() as an:
# spawn the same number of deamon actors which should be cancelled
dactor_portals = []
for i in range(num_actors):
dactor_portals.append(await n.start_actor(
dactor_portals.append(await an.start_actor(
f'deamon_{i}',
enable_modules=[__name__],
))
@ -307,7 +322,7 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
for i in range(num_actors):
# start actor(s) that will fail immediately
riactor_portals.append(
await n.run_in_actor(
await an.run_in_actor(
func,
name=f'actor_{i}',
**kwargs
@ -337,7 +352,8 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
# should error here with a ``RemoteActorError`` or ``MultiError``
except first_err as err:
except first_err as _err:
err = _err
if isinstance(err, BaseExceptionGroup):
assert len(err.exceptions) == num_actors
for exc in err.exceptions:
@ -348,8 +364,8 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
elif isinstance(err, tractor.RemoteActorError):
assert err.boxed_type == err_type
assert n.cancelled is True
assert not n._children
assert an.cancelled is True
assert not an._children
else:
pytest.fail("Should have gotten a remote assertion error?")
@ -519,10 +535,15 @@ def test_cancel_via_SIGINT_other_task(
async def main():
# should never timeout since SIGINT should cancel the current program
with trio.fail_after(timeout):
async with trio.open_nursery(
async with (
# XXX ?TODO? why no work!?
# tractor.trionics.collapse_eg(),
trio.open_nursery(
strict_exception_groups=False,
) as n:
await n.start(spawn_and_sleep_forever)
) as tn,
):
await tn.start(spawn_and_sleep_forever)
if 'mp' in spawn_backend:
time.sleep(0.1)
os.kill(pid, signal.SIGINT)
@ -533,38 +554,123 @@ def test_cancel_via_SIGINT_other_task(
async def spin_for(period=3):
"Sync sleep."
print(f'sync sleeping in sub-sub for {period}\n')
time.sleep(period)
async def spawn():
async with tractor.open_nursery() as tn:
await tn.run_in_actor(
async def spawn_sub_with_sync_blocking_task():
async with tractor.open_nursery() as an:
print('starting sync blocking subactor..\n')
await an.run_in_actor(
spin_for,
name='sleeper',
)
print('exiting first subactor layer..\n')
@pytest.mark.parametrize(
'man_cancel_outer',
[
False, # passes if delay != 2
# always causes an unexpected eg-w-embedded-assert-err?
pytest.param(True,
marks=pytest.mark.xfail(
reason=(
'always causes an unexpected eg-w-embedded-assert-err?'
)
),
),
],
)
@no_windows
def test_cancel_while_childs_child_in_sync_sleep(
loglevel,
start_method,
spawn_backend,
loglevel: str,
start_method: str,
spawn_backend: str,
debug_mode: bool,
reg_addr: tuple,
man_cancel_outer: bool,
):
"""Verify that a child cancelled while executing sync code is torn
'''
Verify that a child cancelled while executing sync code is torn
down even when that cancellation is triggered by the parent
2 nurseries "up".
"""
Though the grandchild should stay blocking its actor runtime, its
parent should issue a "zombie reaper" to hard kill it after
sufficient timeout.
'''
if start_method == 'forkserver':
pytest.skip("Forksever sux hard at resuming from sync sleep...")
async def main():
with trio.fail_after(2):
async with tractor.open_nursery() as tn:
await tn.run_in_actor(
spawn,
name='spawn',
#
# XXX BIG TODO NOTE XXX
#
# it seems there's a strange race that can happen
# where where the fail-after will trigger outer scope
# .cancel() which then causes the inner scope to raise,
#
# BaseExceptionGroup('Exceptions from Trio nursery', [
# BaseExceptionGroup('Exceptions from Trio nursery',
# [
# Cancelled(),
# Cancelled(),
# ]
# ),
# AssertionError('assert 0')
# ])
#
# WHY THIS DOESN'T MAKE SENSE:
# ---------------------------
# - it should raise too-slow-error when too slow..
# * verified that using simple-cs and manually cancelling
# you get same outcome -> indicates that the fail-after
# can have its TooSlowError overriden!
# |_ to check this it's easy, simplly decrease the timeout
# as per the var below.
#
# - when using the manual simple-cs the outcome is different
# DESPITE the `assert 0` which means regardless of the
# inner scope effectively failing in the same way, the
# bubbling up **is NOT the same**.
#
# delays trigger diff outcomes..
# ---------------------------
# as seen by uncommenting various lines below there is from
# my POV an unexpected outcome due to the delay=2 case.
#
# delay = 1 # no AssertionError in eg, TooSlowError raised.
# delay = 2 # is AssertionError in eg AND no TooSlowError !?
delay = 4 # is AssertionError in eg AND no _cs cancellation.
with trio.fail_after(delay) as _cs:
# with trio.CancelScope() as cs:
# ^XXX^ can be used instead to see same outcome.
async with (
# tractor.trionics.collapse_eg(), # doesn't help
tractor.open_nursery(
hide_tb=False,
debug_mode=debug_mode,
registry_addrs=[reg_addr],
) as an,
):
await an.run_in_actor(
spawn_sub_with_sync_blocking_task,
name='sync_blocking_sub',
)
await trio.sleep(1)
if man_cancel_outer:
print('Cancelling manually in root')
_cs.cancel()
# trigger exc-srced taskc down
# the actor tree.
print('RAISING IN ROOT')
assert 0
with pytest.raises(AssertionError):

View File

@ -117,9 +117,10 @@ async def open_actor_local_nursery(
ctx: tractor.Context,
):
global _nursery
async with trio.open_nursery(
strict_exception_groups=False,
) as tn:
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn
):
_nursery = tn
await ctx.started()
await trio.sleep(10)

View File

@ -13,26 +13,24 @@ MESSAGE = 'tractoring at full speed'
def test_empty_mngrs_input_raises() -> None:
async def main():
with trio.fail_after(1):
with trio.fail_after(3):
async with (
open_actor_cluster(
modules=[__name__],
# NOTE: ensure we can passthrough runtime opts
loglevel='info',
# debug_mode=True,
loglevel='cancel',
debug_mode=False,
) as portals,
gather_contexts(
# NOTE: it's the use of inline-generator syntax
# here that causes the empty input.
mngrs=(
p.open_context(worker) for p in portals.values()
),
),
gather_contexts(mngrs=()),
):
assert 0
# should fail before this?
assert portals
# test should fail if we mk it here!
assert 0, 'Should have raised val-err !?'
with pytest.raises(ValueError):
trio.run(main)

View File

@ -11,6 +11,7 @@ import psutil
import pytest
import subprocess
import tractor
from tractor.trionics import collapse_eg
from tractor._testing import tractor_test
import trio
@ -193,10 +194,10 @@ async def spawn_and_check_registry(
try:
async with tractor.open_nursery() as an:
async with trio.open_nursery(
strict_exception_groups=False,
) as trion:
async with (
collapse_eg(),
trio.open_nursery() as trion,
):
portals = {}
for i in range(3):
name = f'a{i}'
@ -338,11 +339,12 @@ async def close_chans_before_nursery(
async with portal2.open_stream_from(
stream_forever
) as agen2:
async with trio.open_nursery(
strict_exception_groups=False,
) as n:
n.start_soon(streamer, agen1)
n.start_soon(cancel, use_signal, .5)
async with (
collapse_eg(),
trio.open_nursery() as tn,
):
tn.start_soon(streamer, agen1)
tn.start_soon(cancel, use_signal, .5)
try:
await streamer(agen2)
finally:

View File

@ -234,10 +234,8 @@ async def trio_ctx(
with trio.fail_after(1 + delay):
try:
async with (
trio.open_nursery(
# TODO, for new `trio` / py3.13
# strict_exception_groups=False,
) as tn,
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
tractor.to_asyncio.open_channel_from(
sleep_and_err,
) as (first, chan),
@ -573,6 +571,8 @@ def test_basic_interloop_channel_stream(
fan_out: bool,
):
async def main():
# TODO, figure out min timeout here!
with trio.fail_after(6):
async with tractor.open_nursery() as an:
portal = await an.run_in_actor(
stream_from_aio,
@ -1088,6 +1088,108 @@ def test_sigint_closes_lifetime_stack(
trio.run(main)
# ?TODO asyncio.Task fn-deco?
# -[ ] do sig checkingat import time like @context?
# -[ ] maybe name it @aio_task ??
# -[ ] chan: to_asyncio.InterloopChannel ??
async def raise_before_started(
# from_trio: asyncio.Queue,
# to_trio: trio.abc.SendChannel,
chan: to_asyncio.LinkedTaskChannel,
) -> None:
'''
`asyncio.Task` entry point which RTEs before calling
`to_trio.send_nowait()`.
'''
await asyncio.sleep(0.2)
raise RuntimeError('Some shite went wrong before `.send_nowait()`!!')
# to_trio.send_nowait('Uhh we shouldve RTE-d ^^ ??')
chan.started_nowait('Uhh we shouldve RTE-d ^^ ??')
await asyncio.sleep(float('inf'))
@tractor.context
async def caching_ep(
ctx: tractor.Context,
):
log = tractor.log.get_logger('caching_ep')
log.info('syncing via `ctx.started()`')
await ctx.started()
# XXX, allocate the `open_channel_from()` inside
# a `.trionics.maybe_open_context()`.
chan: to_asyncio.LinkedTaskChannel
async with (
tractor.trionics.maybe_open_context(
acm_func=tractor.to_asyncio.open_channel_from,
kwargs={
'target': raise_before_started,
# ^XXX, kwarg to `open_channel_from()`
},
# lock around current actor task access
key=tractor.current_actor().uid,
) as (cache_hit, (clients, chan)),
):
if cache_hit:
log.error(
'Re-using cached `.open_from_channel()` call!\n'
)
else:
log.info(
'Allocating SHOULD-FAIL `.open_from_channel()`\n'
)
await trio.sleep_forever()
def test_aio_side_raises_before_started(
reg_addr: tuple[str, int],
debug_mode: bool,
loglevel: str,
):
'''
Simulates connection-err from `piker.brokers.ib.api`..
Ensure any error raised by child-`asyncio.Task` BEFORE
`chan.started()`
'''
# delay = 999 if debug_mode else 1
async def main():
with trio.fail_after(3):
an: tractor.ActorNursery
async with tractor.open_nursery(
debug_mode=debug_mode,
loglevel=loglevel,
) as an:
p: tractor.Portal = await an.start_actor(
'lchan_cacher_that_raises_fast',
enable_modules=[__name__],
infect_asyncio=True,
)
async with p.open_context(
caching_ep,
) as (ctx, first):
assert not first
with pytest.raises(
expected_exception=(RemoteActorError),
) as excinfo:
trio.run(main)
# ensure `asyncio.Task` exception is bubbled
# allll the way erp!!
rae = excinfo.value
assert rae.boxed_type is RuntimeError
# TODO: debug_mode tests once we get support for `asyncio`!
#
# -[ ] need tests to wrap both scripts:

View File

@ -235,10 +235,16 @@ async def cancel_after(wait, reg_addr):
@pytest.fixture(scope='module')
def time_quad_ex(reg_addr, ci_env, spawn_backend):
def time_quad_ex(
reg_addr: tuple,
ci_env: bool,
spawn_backend: str,
):
if spawn_backend == 'mp':
"""no idea but the mp *nix runs are flaking out here often...
"""
'''
no idea but the mp *nix runs are flaking out here often...
'''
pytest.skip("Test is too flaky on mp in CI")
timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4
@ -249,12 +255,24 @@ def time_quad_ex(reg_addr, ci_env, spawn_backend):
return results, diff
def test_a_quadruple_example(time_quad_ex, ci_env, spawn_backend):
"""This also serves as a kind of "we'd like to be this fast test"."""
def test_a_quadruple_example(
time_quad_ex: tuple,
ci_env: bool,
spawn_backend: str,
):
'''
This also serves as a kind of "we'd like to be this fast test".
'''
results, diff = time_quad_ex
assert results
this_fast = 6 if platform.system() in ('Windows', 'Darwin') else 3
this_fast = (
6 if platform.system() in (
'Windows',
'Darwin',
)
else 3
)
assert diff < this_fast

View File

@ -1,5 +1,6 @@
'''
Async context manager cache api testing: ``trionics.maybe_open_context():``
Suites for our `.trionics.maybe_open_context()` multi-task
shared-cached `@acm` API.
'''
from contextlib import asynccontextmanager as acm
@ -9,6 +10,15 @@ from typing import Awaitable
import pytest
import trio
import tractor
from tractor.trionics import (
maybe_open_context,
)
from tractor.log import (
get_console_log,
get_logger,
)
log = get_logger(__name__)
_resource: int = 0
@ -52,7 +62,7 @@ def test_resource_only_entered_once(key_on):
# different task names per task will be used
kwargs = {'task_name': name}
async with tractor.trionics.maybe_open_context(
async with maybe_open_context(
maybe_increment_counter,
kwargs=kwargs,
key=key,
@ -140,7 +150,7 @@ async def open_stream() -> Awaitable[
@acm
async def maybe_open_stream(taskname: str):
async with tractor.trionics.maybe_open_context(
async with maybe_open_context(
# NOTE: all secondary tasks should cache hit on the same key
acm_func=open_stream,
) as (
@ -305,3 +315,92 @@ def test_open_local_sub_to_stream(
print('exiting main.')
trio.run(main)
@acm
async def cancel_outer_cs(
cs: trio.CancelScope|None = None,
delay: float = 0,
):
# on first task delay this enough to block
# the 2nd task but then cancel it mid sleep
# so that the tn.start() inside the key-err handler block
# is cancelled and would previously corrupt the
# mutext state.
log.info(f'task entering sleep({delay})')
await trio.sleep(delay)
if cs:
log.info('task calling cs.cancel()')
cs.cancel()
trio.lowlevel.checkpoint()
yield
await trio.sleep_forever()
def test_lock_not_corrupted_on_fast_cancel(
debug_mode: bool,
loglevel: str,
):
'''
Verify that if the caching-task (the first to enter
`maybe_open_context()`) is cancelled mid-cache-miss, the embedded
mutex can never be left in a corrupted state.
That is, the lock is always eventually released ensuring a peer
(cache-hitting) task will never,
- be left to inf-block/hang on the `lock.acquire()`.
- try to release the lock when still owned by the caching-task
due to it having erronously exited without calling
`lock.release()`.
'''
delay: float = 1.
async def use_moc(
cs: trio.CancelScope|None,
delay: float,
):
log.info('task entering moc')
async with maybe_open_context(
cancel_outer_cs,
kwargs={
'cs': cs,
'delay': delay,
},
) as (cache_hit, _null):
if cache_hit:
log.info('2nd task entered')
else:
log.info('1st task entered')
await trio.sleep_forever()
async def main():
with trio.fail_after(delay + 2):
async with (
tractor.open_root_actor(
debug_mode=debug_mode,
loglevel=loglevel,
),
trio.open_nursery() as tn,
):
get_console_log('info')
log.info('yo starting')
cs = tn.cancel_scope
tn.start_soon(
use_moc,
cs,
delay,
name='child',
)
with trio.CancelScope() as rent_cs:
await use_moc(
cs=rent_cs,
delay=delay,
)
trio.run(main)

View File

@ -147,8 +147,7 @@ def test_trio_prestarted_task_bubbles(
await trio.sleep_forever()
async def _trio_main():
# with trio.fail_after(2):
with trio.fail_after(999):
with trio.fail_after(2 if not debug_mode else 999):
first: str
chan: to_asyncio.LinkedTaskChannel
aio_ev = asyncio.Event()
@ -217,32 +216,25 @@ def test_trio_prestarted_task_bubbles(
):
aio_ev.set()
with pytest.raises(
expected_exception=ExceptionGroup,
) as excinfo:
tractor.to_asyncio.run_as_asyncio_guest(
trio_main=_trio_main,
)
eg = excinfo.value
rte_eg, rest_eg = eg.split(RuntimeError)
# ensure the trio-task's error bubbled despite the aio-side
# having (maybe) errored first.
if aio_err_trigger in (
'after_trio_task_starts',
'after_start_point',
):
assert len(errs := rest_eg.exceptions) == 1
typerr = errs[0]
assert (
type(typerr) is TypeError
and
'trio-side' in typerr.args
)
patt: str = 'trio-side'
expect_exc = TypeError
# when aio errors BEFORE (last) trio task is scheduled, we should
# never see anythinb but the aio-side.
else:
assert len(rtes := rte_eg.exceptions) == 1
assert 'asyncio-side' in rtes[0].args[0]
patt: str = 'asyncio-side'
expect_exc = RuntimeError
with pytest.raises(expect_exc) as excinfo:
tractor.to_asyncio.run_as_asyncio_guest(
trio_main=_trio_main,
)
caught_exc = excinfo.value
assert patt in caught_exc.args

View File

@ -8,6 +8,7 @@ from contextlib import (
)
import pytest
from tractor.trionics import collapse_eg
import trio
from trio import TaskStatus
@ -64,9 +65,8 @@ def test_stashed_child_nursery(use_start_soon):
async def main():
async with (
trio.open_nursery(
strict_exception_groups=False,
) as pn,
collapse_eg(),
trio.open_nursery() as pn,
):
cn = await pn.start(mk_child_nursery)
assert cn
@ -197,10 +197,8 @@ def test_gatherctxs_with_memchan_breaks_multicancelled(
async with (
# XXX should ensure ONLY the KBI
# is relayed upward
trionics.collapse_eg(),
trio.open_nursery(
# strict_exception_groups=False,
), # as tn,
collapse_eg(),
trio.open_nursery(), # as tn,
trionics.gather_contexts([
open_memchan(),

View File

@ -55,10 +55,17 @@ async def open_actor_cluster(
raise ValueError(
'Number of names is {len(names)} but count it {count}')
async with tractor.open_nursery(
async with (
# tractor.trionics.collapse_eg(),
tractor.open_nursery(
**runtime_kwargs,
) as an:
async with trio.open_nursery() as n:
) as an
):
async with (
# tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
tractor.trionics.maybe_raise_from_masking_exc()
):
uid = tractor.current_actor().uid
async def _start(name: str) -> None:
@ -69,9 +76,8 @@ async def open_actor_cluster(
)
for name in names:
n.start_soon(_start, name)
tn.start_soon(_start, name)
assert len(portals) == count
yield portals
await an.cancel(hard_kill=hard_kill)

View File

@ -154,7 +154,7 @@ class Context:
2 cancel-scope-linked, communicating and parallel executing
`Task`s. Contexts are allocated on each side of any task
RPC-linked msg dialog, i.e. for every request to a remote
actor from a `Portal`. On the "callee" side a context is
actor from a `Portal`. On the "child" side a context is
always allocated inside `._rpc._invoke()`.
TODO: more detailed writeup on cancellation, error and
@ -222,8 +222,8 @@ class Context:
# `._runtime.invoke()`.
_remote_func_type: str | None = None
# NOTE: (for now) only set (a portal) on the caller side since
# the callee doesn't generally need a ref to one and should
# NOTE: (for now) only set (a portal) on the parent side since
# the child doesn't generally need a ref to one and should
# normally need to explicitly ask for handle to its peer if
# more the the `Context` is needed?
_portal: Portal | None = None
@ -252,12 +252,12 @@ class Context:
_outcome_msg: Return|Error|ContextCancelled = Unresolved
# on a clean exit there should be a final value
# delivered from the far end "callee" task, so
# delivered from the far end "child" task, so
# this value is only set on one side.
# _result: Any | int = None
_result: PayloadT|Unresolved = Unresolved
# if the local "caller" task errors this value is always set
# if the local "parent" task errors this value is always set
# to the error that was captured in the
# `Portal.open_context().__aexit__()` teardown block OR, in
# 2 special cases when an (maybe) expected remote error
@ -293,7 +293,7 @@ class Context:
# a `ContextCancelled` due to a call to `.cancel()` triggering
# "graceful closure" on either side:
# - `._runtime._invoke()` will check this flag before engaging
# the crash handler REPL in such cases where the "callee"
# the crash handler REPL in such cases where the "child"
# raises the cancellation,
# - `.devx.debug.lock_stdio_for_peer()` will set it to `False` if
# the global tty-lock has been configured to filter out some
@ -307,8 +307,8 @@ class Context:
_stream_opened: bool = False
_stream: MsgStream|None = None
# caller of `Portal.open_context()` for
# logging purposes mostly
# the parent-task's calling-fn's frame-info, the frame above
# `Portal.open_context()`, for introspection/logging.
_caller_info: CallerInfo|None = None
# overrun handling machinery
@ -529,11 +529,11 @@ class Context:
'''
Exactly the value of `self._scope.cancelled_caught`
(delegation) and should only be (able to be read as)
`True` for a `.side == "caller"` ctx wherein the
`True` for a `.side == "parent"` ctx wherein the
`Portal.open_context()` block was exited due to a call to
`._scope.cancel()` - which should only ocurr in 2 cases:
- a caller side calls `.cancel()`, the far side cancels
- a parent side calls `.cancel()`, the far side cancels
and delivers back a `ContextCancelled` (making
`.cancel_acked == True`) and `._scope.cancel()` is
called by `._maybe_cancel_and_set_remote_error()` which
@ -542,20 +542,20 @@ class Context:
=> `._scope.cancelled_caught == True` by normal `trio`
cs semantics.
- a caller side is delivered a `._remote_error:
- a parent side is delivered a `._remote_error:
RemoteActorError` via `._deliver_msg()` and a transitive
call to `_maybe_cancel_and_set_remote_error()` calls
`._scope.cancel()` and that cancellation eventually
results in `trio.Cancelled`(s) caught in the
`.open_context()` handling around the @acm's `yield`.
Only as an FYI, in the "callee" side case it can also be
Only as an FYI, in the "child" side case it can also be
set but never is readable by any task outside the RPC
machinery in `._invoke()` since,:
- when a callee side calls `.cancel()`, `._scope.cancel()`
- when a child side calls `.cancel()`, `._scope.cancel()`
is called immediately and handled specially inside
`._invoke()` to raise a `ContextCancelled` which is then
sent to the caller side.
sent to the parent side.
However, `._scope.cancelled_caught` can NEVER be
accessed/read as `True` by any RPC invoked task since it
@ -666,7 +666,7 @@ class Context:
when called/closed by actor local task(s).
NOTEs:
- It is expected that the caller has previously unwrapped
- It is expected that the parent has previously unwrapped
the remote error using a call to `unpack_error()` and
provides that output exception value as the input
`error` argument *here*.
@ -676,7 +676,7 @@ class Context:
`Portal.open_context()` (ideally) we want to interrupt
any ongoing local tasks operating within that
`Context`'s cancel-scope so as to be notified ASAP of
the remote error and engage any caller handling (eg.
the remote error and engage any parent handling (eg.
for cross-process task supervision).
- In some cases we may want to raise the remote error
@ -886,6 +886,11 @@ class Context:
@property
def repr_caller(self) -> str:
'''
Render a "namespace-path" style representation of the calling
task-fn.
'''
ci: CallerInfo|None = self._caller_info
if ci:
return (
@ -899,7 +904,7 @@ class Context:
def repr_api(self) -> str:
return 'Portal.open_context()'
# TODO: use `.dev._frame_stack` scanning to find caller!
# TODO: use `.dev._frame_stack` scanning to find caller fn!
# ci: CallerInfo|None = self._caller_info
# if ci:
# return (
@ -934,7 +939,7 @@ class Context:
=> That is, an IPC `Context` (this) **does not**
have the same semantics as a `trio.CancelScope`.
If the caller (who entered the `Portal.open_context()`)
If the parent (who entered the `Portal.open_context()`)
desires that the internal block's cancel-scope be
cancelled it should open its own `trio.CancelScope` and
manage it as needed.
@ -1006,7 +1011,6 @@ class Context:
else:
log.cancel(
f'Timed out on cancel request of remote task?\n'
f'\n'
f'{reminfo}'
)
@ -1017,7 +1021,7 @@ class Context:
# `_invoke()` RPC task.
#
# NOTE: on this side we ALWAYS cancel the local scope
# since the caller expects a `ContextCancelled` to be sent
# since the parent expects a `ContextCancelled` to be sent
# from `._runtime._invoke()` back to the other side. The
# logic for catching the result of the below
# `._scope.cancel()` is inside the `._runtime._invoke()`
@ -1190,8 +1194,8 @@ class Context:
) -> Any|Exception:
'''
From some (caller) side task, wait for and return the final
result from the remote (callee) side's task.
From some (parent) side task, wait for and return the final
result from the remote (child) side's task.
This provides a mechanism for one task running in some actor to wait
on another task at the other side, in some other actor, to terminate.
@ -1487,6 +1491,12 @@ class Context:
):
status = 'peer-cancelled'
case (
Unresolved,
trio.Cancelled(), # any error-type
) if self.canceller:
status = 'actor-cancelled'
# (remote) error condition
case (
Unresolved,
@ -1600,7 +1610,7 @@ class Context:
raise err
# TODO: maybe a flag to by-pass encode op if already done
# here in caller?
# here in parent?
await self.chan.send(started_msg)
# set msg-related internal runtime-state
@ -1676,7 +1686,7 @@ class Context:
XXX RULES XXX
------ - ------
- NEVER raise remote errors from this method; a runtime task caller.
- NEVER raise remote errors from this method; a calling runtime-task.
An error "delivered" to a ctx should always be raised by
the corresponding local task operating on the
`Portal`/`Context` APIs.
@ -1752,7 +1762,7 @@ class Context:
else:
report = (
'Queueing OVERRUN msg on caller task:\n\n'
'Queueing OVERRUN msg on parent task:\n\n'
+ report
)
log.debug(report)
@ -1948,12 +1958,12 @@ async def open_context_from_portal(
IPC protocol.
The yielded `tuple` is a pair delivering a `tractor.Context`
and any first value "sent" by the "callee" task via a call
and any first value "sent" by the "child" task via a call
to `Context.started(<value: Any>)`; this side of the
context does not unblock until the "callee" task calls
context does not unblock until the "child" task calls
`.started()` in similar style to `trio.Nursery.start()`.
When the "callee" (side that is "called"/started by a call
to *this* method) returns, the caller side (this) unblocks
When the "child" (side that is "called"/started by a call
to *this* method) returns, the parent side (this) unblocks
and any final value delivered from the other end can be
retrieved using the `Contex.wait_for_result()` api.
@ -1966,7 +1976,7 @@ async def open_context_from_portal(
__tracebackhide__: bool = hide_tb
# denote this frame as a "runtime frame" for stack
# introspection where we report the caller code in logging
# introspection where we report the parent code in logging
# and error message content.
# NOTE: 2 bc of the wrapping `@acm`
__runtimeframe__: int = 2 # noqa
@ -2025,7 +2035,7 @@ async def open_context_from_portal(
# placeholder for any exception raised in the runtime
# or by user tasks which cause this context's closure.
scope_err: BaseException|None = None
ctxc_from_callee: ContextCancelled|None = None
ctxc_from_child: ContextCancelled|None = None
try:
async with (
collapse_eg(),
@ -2104,7 +2114,7 @@ async def open_context_from_portal(
# that we can re-use it around the `yield` ^ here
# or vice versa?
#
# maybe TODO NOTE: between the caller exiting and
# maybe TODO NOTE: between the parent exiting and
# arriving here the far end may have sent a ctxc-msg or
# other error, so the quetion is whether we should check
# for it here immediately and maybe raise so as to engage
@ -2170,16 +2180,16 @@ async def open_context_from_portal(
# request in which case we DO let the error bubble to the
# opener.
#
# 2-THIS "caller" task somewhere invoked `Context.cancel()`
# and received a `ContextCanclled` from the "callee"
# 2-THIS "parent" task somewhere invoked `Context.cancel()`
# and received a `ContextCanclled` from the "child"
# task, in which case we mask the `ContextCancelled` from
# bubbling to this "caller" (much like how `trio.Nursery`
# bubbling to this "parent" (much like how `trio.Nursery`
# swallows any `trio.Cancelled` bubbled by a call to
# `Nursery.cancel_scope.cancel()`)
except ContextCancelled as ctxc:
scope_err = ctxc
ctx._local_error: BaseException = scope_err
ctxc_from_callee = ctxc
ctxc_from_child = ctxc
# XXX TODO XXX: FIX THIS debug_mode BUGGGG!!!
# using this code and then resuming the REPL will
@ -2216,11 +2226,11 @@ async def open_context_from_portal(
# the above `._scope` can be cancelled due to:
# 1. an explicit self cancel via `Context.cancel()` or
# `Actor.cancel()`,
# 2. any "callee"-side remote error, possibly also a cancellation
# 2. any "child"-side remote error, possibly also a cancellation
# request by some peer,
# 3. any "caller" (aka THIS scope's) local error raised in the above `yield`
# 3. any "parent" (aka THIS scope's) local error raised in the above `yield`
except (
# CASE 3: standard local error in this caller/yieldee
# CASE 3: standard local error in this parent/yieldee
Exception,
# CASES 1 & 2: can manifest as a `ctx._scope_nursery`
@ -2234,9 +2244,9 @@ async def open_context_from_portal(
# any `Context._maybe_raise_remote_err()` call.
#
# 2.-`BaseExceptionGroup[ContextCancelled | RemoteActorError]`
# from any error delivered from the "callee" side
# from any error delivered from the "child" side
# AND a group-exc is only raised if there was > 1
# tasks started *here* in the "caller" / opener
# tasks started *here* in the "parent" / opener
# block. If any one of those tasks calls
# `.wait_for_result()` or `MsgStream.receive()`
# `._maybe_raise_remote_err()` will be transitively
@ -2249,8 +2259,8 @@ async def open_context_from_portal(
trio.Cancelled, # NOTE: NOT from inside the ctx._scope
KeyboardInterrupt,
) as caller_err:
scope_err = caller_err
) as rent_err:
scope_err = rent_err
ctx._local_error: BaseException = scope_err
# XXX: ALWAYS request the context to CANCEL ON any ERROR.
@ -2260,7 +2270,7 @@ async def open_context_from_portal(
# await debug.pause()
# log.cancel(
match scope_err:
case trio.Cancelled:
case trio.Cancelled():
logmeth = log.cancel
# XXX explicitly report on any non-graceful-taskc cases
@ -2268,7 +2278,7 @@ async def open_context_from_portal(
logmeth = log.exception
logmeth(
f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()}\n'
f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()!r}\n'
)
if debug_mode():
@ -2289,9 +2299,9 @@ async def open_context_from_portal(
'Calling `ctx.cancel()`!\n'
)
# we don't need to cancel the callee if it already
# we don't need to cancel the child if it already
# told us it's cancelled ;p
if ctxc_from_callee is None:
if ctxc_from_child is None:
try:
await ctx.cancel()
except (
@ -2322,8 +2332,8 @@ async def open_context_from_portal(
# via a call to
# `Context._maybe_cancel_and_set_remote_error()`.
# As per `Context._deliver_msg()`, that error IS
# ALWAYS SET any time "callee" side fails and causes "caller
# side" cancellation via a `ContextCancelled` here.
# ALWAYS SET any time "child" side fails and causes
# "parent side" cancellation via a `ContextCancelled` here.
try:
result_or_err: Exception|Any = await ctx.wait_for_result()
except BaseException as berr:
@ -2359,7 +2369,7 @@ async def open_context_from_portal(
)
case (None, _):
log.runtime(
'Context returned final result from callee task:\n'
'Context returned final result from child task:\n'
f'<= peer: {uid}\n'
f' |_ {nsf}()\n\n'
@ -2454,7 +2464,7 @@ async def open_context_from_portal(
)
# TODO: should we add a `._cancel_req_received`
# flag to determine if the callee manually called
# flag to determine if the child manually called
# `ctx.cancel()`?
# -[ ] going to need a cid check no?
@ -2510,7 +2520,7 @@ def mk_context(
recv_chan: trio.MemoryReceiveChannel
send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size)
# TODO: only scan caller-info if log level so high!
# TODO: only scan parent-info if log level so high!
from .devx._frame_stack import find_caller_info
caller_info: CallerInfo|None = find_caller_info()

View File

@ -481,10 +481,11 @@ async def open_root_actor(
collapse_eg(),
trio.open_nursery() as root_tn,
# XXX, finally-footgun below?
# ?TODO? finally-footgun below?
# -> see note on why shielding.
# maybe_raise_from_masking_exc(),
):
actor._root_tn = root_tn
# `_runtime.async_main()` creates an internal nursery
# and blocks here until any underlying actor(-process)
# tree has terminated thereby conducting so called
@ -523,6 +524,11 @@ async def open_root_actor(
err,
api_frame=inspect.currentframe(),
debug_filter=debug_filter,
# XXX NOTE, required to debug root-actor
# crashes under cancellation conditions; so
# most of them!
shield=root_tn.cancel_scope.cancel_called,
)
if (
@ -562,6 +568,7 @@ async def open_root_actor(
f'{op_nested_actor_repr}'
)
# XXX, THIS IS A *finally-footgun*!
# (also mentioned in with-block above)
# -> though already shields iternally it can
# taskc here and mask underlying errors raised in
# the try-block above?

View File

@ -384,7 +384,7 @@ async def _errors_relayed_via_ipc(
# RPC task bookeeping.
# since RPC tasks are scheduled inside a flat
# `Actor._service_n`, we add "handles" to each such that
# `Actor._service_tn`, we add "handles" to each such that
# they can be individually ccancelled.
finally:
@ -462,7 +462,7 @@ async def _invoke(
connected IPC channel.
This is the core "RPC" `trio.Task` scheduling machinery used to start every
remotely invoked function, normally in `Actor._service_n: Nursery`.
remotely invoked function, normally in `Actor._service_tn: Nursery`.
'''
__tracebackhide__: bool = hide_tb
@ -642,7 +642,7 @@ async def _invoke(
tn: Nursery
rpc_ctx_cs: CancelScope
async with (
collapse_eg(),
collapse_eg(hide_tb=False),
trio.open_nursery() as tn,
msgops.maybe_limit_plds(
ctx=ctx,
@ -823,24 +823,44 @@ async def _invoke(
f'after having {ctx.repr_state!r}\n'
)
if merr:
logmeth: Callable = log.error
if isinstance(merr, ContextCancelled):
logmeth: Callable = log.runtime
if (
# ctxc: by `Context.cancel()`
isinstance(merr, ContextCancelled)
if not isinstance(merr, RemoteActorError):
tb_str: str = ''.join(traceback.format_exception(merr))
# out-of-layer cancellation, one of:
# - actorc: by `Portal.cancel_actor()`
# - OSc: by SIGINT or `Process.signal()`
or (
isinstance(merr, trio.Cancelled)
and
ctx.canceller
)
):
logmeth: Callable = log.cancel
descr_str += (
f' with {merr!r}\n'
)
elif (
not isinstance(merr, RemoteActorError)
):
tb_str: str = ''.join(
traceback.format_exception(merr)
)
descr_str += (
f'\n{merr!r}\n' # needed?
f'{tb_str}\n'
f'\n'
f'scope_error:\n'
f'{scope_err!r}\n'
)
else:
descr_str += f'\n{merr!r}\n'
descr_str += (
f'{merr!r}\n'
)
else:
descr_str += f'\nwith final result {ctx.outcome!r}\n'
descr_str += (
f'\n'
f'with final result {ctx.outcome!r}\n'
)
logmeth(
f'{message}\n'
@ -916,7 +936,7 @@ async def process_messages(
Receive (multiplexed) per-`Channel` RPC requests as msgs from
remote processes; schedule target async funcs as local
`trio.Task`s inside the `Actor._service_n: Nursery`.
`trio.Task`s inside the `Actor._service_tn: Nursery`.
Depending on msg type, non-`cmd` (task spawning/starting)
request payloads (eg. `started`, `yield`, `return`, `error`)
@ -941,7 +961,7 @@ async def process_messages(
'''
actor: Actor = _state.current_actor()
assert actor._service_n # runtime state sanity
assert actor._service_tn # runtime state sanity
# TODO: once `trio` get's an "obvious way" for req/resp we
# should use it?
@ -1152,7 +1172,7 @@ async def process_messages(
start_status += '->( scheduling new task..\n'
log.runtime(start_status)
try:
ctx: Context = await actor._service_n.start(
ctx: Context = await actor._service_tn.start(
partial(
_invoke,
actor,
@ -1292,7 +1312,7 @@ async def process_messages(
) as err:
if nursery_cancelled_before_task:
sn: Nursery = actor._service_n
sn: Nursery = actor._service_tn
assert sn and sn.cancel_scope.cancel_called # sanity
log.cancel(
f'Service nursery cancelled before it handled {funcname}'

View File

@ -35,6 +35,15 @@ for running all lower level spawning, supervision and msging layers:
SC-transitive RPC via scheduling of `trio` tasks.
- registration of newly spawned actors with the discovery sys.
Glossary:
--------
- tn: a `trio.Nursery` or "task nursery".
- an: an `ActorNursery` or "actor nursery".
- root: top/parent-most scope/task/process/actor (or other runtime
primitive) in a hierarchical tree.
- parent-ish: "higher-up" in the runtime-primitive hierarchy.
- child-ish: "lower-down" in the runtime-primitive hierarchy.
'''
from __future__ import annotations
from contextlib import (
@ -76,6 +85,7 @@ from tractor.msg import (
)
from .trionics import (
collapse_eg,
maybe_open_nursery,
)
from .ipc import (
Channel,
@ -173,10 +183,11 @@ class Actor:
msg_buffer_size: int = 2**6
# nursery placeholders filled in by `async_main()` after fork
_root_n: Nursery|None = None
_service_n: Nursery|None = None
# nursery placeholders filled in by `async_main()`,
# - after fork for subactors.
# - during boot for the root actor.
_root_tn: Nursery|None = None
_service_tn: Nursery|None = None
_ipc_server: _server.IPCServer|None = None
@property
@ -1010,12 +1021,48 @@ class Actor:
the RPC service nursery.
'''
assert self._service_n
self._service_n.start_soon(
actor_repr: str = _pformat.nest_from_op(
input_op='>c(',
text=self.pformat(),
nest_indent=1,
)
log.cancel(
'Actor.cancel_soon()` was called!\n'
f'>> scheduling `Actor.cancel()`\n'
f'{actor_repr}'
)
assert self._service_tn
self._service_tn.start_soon(
self.cancel,
None, # self cancel all rpc tasks
)
# schedule a "canceller task" in the `._root_tn` once the
# `._service_tn` is fully shutdown; task waits for child-ish
# scopes to fully exit then finally cancels its parent,
# root-most, scope.
async def cancel_root_tn_after_services():
log.runtime(
'Waiting on service-tn to cancel..\n'
f'c>)\n'
f'|_{self._service_tn.cancel_scope!r}\n'
)
await self._cancel_complete.wait()
log.cancel(
f'`._service_tn` cancelled\n'
f'>c)\n'
f'|_{self._service_tn.cancel_scope!r}\n'
f'\n'
f'>> cancelling `._root_tn`\n'
f'c>(\n'
f' |_{self._root_tn.cancel_scope!r}\n'
)
self._root_tn.cancel_scope.cancel()
self._root_tn.start_soon(
cancel_root_tn_after_services
)
@property
def cancel_complete(self) -> bool:
return self._cancel_complete.is_set()
@ -1120,8 +1167,8 @@ class Actor:
await ipc_server.wait_for_shutdown()
# cancel all rpc tasks permanently
if self._service_n:
self._service_n.cancel_scope.cancel()
if self._service_tn:
self._service_tn.cancel_scope.cancel()
log_meth(msg)
self._cancel_complete.set()
@ -1258,7 +1305,7 @@ class Actor:
'''
Cancel all ongoing RPC tasks owned/spawned for a given
`parent_chan: Channel` or simply all tasks (inside
`._service_n`) when `parent_chan=None`.
`._service_tn`) when `parent_chan=None`.
'''
tasks: dict = self._rpc_tasks
@ -1470,46 +1517,55 @@ async def async_main(
accept_addrs.append(addr.unwrap())
assert accept_addrs
# The "root" nursery ensures the channel with the immediate
# parent is kept alive as a resilient service until
# cancellation steps have (mostly) occurred in
# a deterministic way.
ya_root_tn: bool = bool(actor._root_tn)
ya_service_tn: bool = bool(actor._service_tn)
# NOTE, a top-most "root" nursery in each actor-process
# enables a lifetime priority for the IPC-channel connection
# with a sub-actor's immediate parent. I.e. this connection
# is kept alive as a resilient service connection until all
# other machinery has exited, cancellation of all
# embedded/child scopes have completed. This helps ensure
# a deterministic (and thus "graceful")
# first-class-supervision style teardown where a parent actor
# (vs. say peers) is always the last to be contacted before
# disconnect.
root_tn: trio.Nursery
async with (
collapse_eg(),
trio.open_nursery() as root_tn,
maybe_open_nursery(
nursery=actor._root_tn,
) as root_tn,
):
actor._root_n = root_tn
assert actor._root_n
if ya_root_tn:
assert root_tn is actor._root_tn
else:
actor._root_tn = root_tn
ipc_server: _server.IPCServer
async with (
collapse_eg(),
trio.open_nursery() as service_nursery,
maybe_open_nursery(
nursery=actor._service_tn,
) as service_tn,
_server.open_ipc_server(
parent_tn=service_nursery,
stream_handler_tn=service_nursery,
parent_tn=service_tn, # ?TODO, why can't this be the root-tn
stream_handler_tn=service_tn,
) as ipc_server,
# ) as actor._ipc_server,
# ^TODO? prettier?
):
if ya_service_tn:
assert service_tn is actor._service_tn
else:
# This nursery is used to handle all inbound
# connections to us such that if the TCP server
# is killed, connections can continue to process
# in the background until this nursery is cancelled.
actor._service_n = service_nursery
actor._service_tn = service_tn
# set after allocate
actor._ipc_server = ipc_server
assert (
actor._service_n
and (
actor._service_n
is
actor._ipc_server._parent_tn
is
ipc_server._stream_handler_tn
)
)
# load exposed/allowed RPC modules
# XXX: do this **after** establishing a channel to the parent
@ -1535,10 +1591,11 @@ async def async_main(
# - root actor: the ``accept_addr`` passed to this method
# TODO: why is this not with the root nursery?
# - see above that the `._service_tn` is what's used?
try:
eps: list = await ipc_server.listen_on(
accept_addrs=accept_addrs,
stream_handler_nursery=service_nursery,
stream_handler_nursery=service_tn,
)
log.runtime(
f'Booted IPC server\n'
@ -1546,7 +1603,7 @@ async def async_main(
)
assert (
(eps[0].listen_tn)
is not service_nursery
is not service_tn
)
except OSError as oserr:
@ -1708,7 +1765,7 @@ async def async_main(
# XXX TODO but hard XXX
# we can't actually do this bc the debugger uses the
# _service_n to spawn the lock task, BUT, in theory if we had
# _service_tn to spawn the lock task, BUT, in theory if we had
# the root nursery surround this finally block it might be
# actually possible to debug THIS machinery in the same way
# as user task code?

View File

@ -236,10 +236,6 @@ async def hard_kill(
# whilst also hacking on it XD
# terminate_after: int = 99999,
# NOTE: for mucking with `.pause()`-ing inside the runtime
# whilst also hacking on it XD
# terminate_after: int = 99999,
) -> None:
'''
Un-gracefully terminate an OS level `trio.Process` after timeout.
@ -301,6 +297,23 @@ async def hard_kill(
# zombies (as a feature) we ask the OS to do send in the
# removal swad as the last resort.
if cs.cancelled_caught:
# TODO? attempt at intermediary-rent-sub
# with child in debug lock?
# |_https://github.com/goodboy/tractor/issues/320
#
# if not is_root_process():
# log.warning(
# 'Attempting to acquire debug-REPL-lock before zombie reap!'
# )
# with trio.CancelScope(shield=True):
# async with debug.acquire_debug_lock(
# subactor_uid=current_actor().uid,
# ) as _ctx:
# log.warning(
# 'Acquired debug lock, child ready to be killed ??\n'
# )
# TODO: toss in the skynet-logo face as ascii art?
log.critical(
# 'Well, the #ZOMBIE_LORD_IS_HERE# to collect\n'

View File

@ -643,8 +643,9 @@ _shutdown_msg: str = (
'Actor-runtime-shutdown'
)
# @api_frame
@acm
# @api_frame
async def open_nursery(
*, # named params only!
hide_tb: bool = True,

View File

@ -237,9 +237,9 @@ def enable_stack_on_sig(
try:
import stackscope
except ImportError:
log.error(
'`stackscope` not installed for use in debug mode!\n'
'`Ignoring {enable_stack_on_sig!r} call!\n'
log.warning(
'The `stackscope` lib is not installed!\n'
'`Ignoring enable_stack_on_sig() call!\n'
)
return None

View File

@ -250,7 +250,7 @@ async def _maybe_enter_pm(
*,
tb: TracebackType|None = None,
api_frame: FrameType|None = None,
hide_tb: bool = False,
hide_tb: bool = True,
# only enter debugger REPL when returns `True`
debug_filter: Callable[

View File

@ -58,6 +58,7 @@ from tractor._context import Context
from tractor import _state
from tractor._exceptions import (
NoRuntime,
InternalError,
)
from tractor._state import (
current_actor,
@ -79,6 +80,9 @@ from ._sigint import (
sigint_shield as sigint_shield,
_ctlc_ignore_header as _ctlc_ignore_header
)
from ..pformat import (
ppfmt,
)
if TYPE_CHECKING:
from trio.lowlevel import Task
@ -477,12 +481,12 @@ async def _pause(
# we have to figure out how to avoid having the service nursery
# cancel on this task start? I *think* this works below:
# ```python
# actor._service_n.cancel_scope.shield = shield
# actor._service_tn.cancel_scope.shield = shield
# ```
# but not entirely sure if that's a sane way to implement it?
# NOTE currently we spawn the lock request task inside this
# subactor's global `Actor._service_n` so that the
# subactor's global `Actor._service_tn` so that the
# lifetime of the lock-request can outlive the current
# `._pause()` scope while the user steps through their
# application code and when they finally exit the
@ -506,7 +510,7 @@ async def _pause(
f'|_{task}\n'
)
with trio.CancelScope(shield=shield):
req_ctx: Context = await actor._service_n.start(
req_ctx: Context = await actor._service_tn.start(
partial(
request_root_stdio_lock,
actor_uid=actor.uid,
@ -540,7 +544,7 @@ async def _pause(
_repl_fail_report = None
# when the actor is mid-runtime cancellation the
# `Actor._service_n` might get closed before we can spawn
# `Actor._service_tn` might get closed before we can spawn
# the request task, so just ignore expected RTE.
elif (
isinstance(pause_err, RuntimeError)
@ -985,7 +989,7 @@ def pause_from_sync(
# that output and assign the `repl` created above!
bg_task, _ = trio.from_thread.run(
afn=partial(
actor._service_n.start,
actor._service_tn.start,
partial(
_pause_from_bg_root_thread,
behalf_of_thread=thread,
@ -1153,9 +1157,10 @@ def pause_from_sync(
'use_greenback',
False,
):
raise RuntimeError(
'`greenback` was never initialized in this actor!?\n\n'
f'{_state._runtime_vars}\n'
raise InternalError(
f'`greenback` was never initialized in this actor?\n'
f'\n'
f'{ppfmt(_state._runtime_vars)}\n'
) from rte
raise

View File

@ -185,7 +185,9 @@ class Channel:
addr,
**kwargs,
)
assert transport.raddr == addr
# XXX, for UDS *no!* since we recv the peer-pid and build out
# a new addr..
# assert transport.raddr == addr
chan = Channel(transport=transport)
# ?TODO, compact this into adapter level-methods?
@ -301,7 +303,7 @@ class Channel:
self,
payload: Any,
hide_tb: bool = True,
hide_tb: bool = False,
) -> None:
'''

View File

@ -17,20 +17,38 @@
Utils to tame mp non-SC madeness
'''
import platform
# !TODO! in 3.13 this can be disabled (the-same/similarly) using
# a flag,
# - [ ] soo if it works like this, drop this module entirely for
# 3.13+ B)
# |_https://docs.python.org/3/library/multiprocessing.shared_memory.html
#
def disable_mantracker():
'''
Disable all `multiprocessing` "resource tracking" machinery since
it's an absolute multi-threaded mess of non-SC madness.
'''
from multiprocessing import resource_tracker as mantracker
from multiprocessing.shared_memory import SharedMemory
# 3.13+ only.. can pass `track=False` to disable
# all the resource tracker bs.
# https://docs.python.org/3/library/multiprocessing.shared_memory.html
if (_py_313 := (
platform.python_version_tuple()[:-1]
>=
('3', '13')
)
):
from functools import partial
return partial(
SharedMemory,
track=False,
)
# !TODO, once we drop 3.12- we can obvi remove all this!
else:
from multiprocessing import (
resource_tracker as mantracker,
)
# Tell the "resource tracker" thing to fuck off.
class ManTracker(mantracker.ResourceTracker):
@ -50,3 +68,8 @@ def disable_mantracker():
mantracker.ensure_running = mantracker._resource_tracker.ensure_running
mantracker.unregister = mantracker._resource_tracker.unregister
mantracker.getfd = mantracker._resource_tracker.getfd
# use std type verbatim
shmT = SharedMemory
return shmT

View File

@ -1001,7 +1001,11 @@ class Server(Struct):
partial(
_serve_ipc_eps,
server=self,
stream_handler_tn=stream_handler_nursery,
stream_handler_tn=(
stream_handler_nursery
or
self._stream_handler_tn
),
listen_addrs=accept_addrs,
)
)
@ -1145,13 +1149,17 @@ async def open_ipc_server(
async with maybe_open_nursery(
nursery=parent_tn,
) as rent_tn:
) as parent_tn:
no_more_peers = trio.Event()
no_more_peers.set()
ipc_server = IPCServer(
_parent_tn=rent_tn,
_stream_handler_tn=stream_handler_tn or rent_tn,
_parent_tn=parent_tn,
_stream_handler_tn=(
stream_handler_tn
or
parent_tn
),
_no_more_peers=no_more_peers,
)
try:

View File

@ -23,14 +23,15 @@ considered optional within the context of this runtime-library.
"""
from __future__ import annotations
from multiprocessing import shared_memory as shm
from multiprocessing.shared_memory import (
# SharedMemory,
ShareableList,
)
import platform
from sys import byteorder
import time
from typing import Optional
from multiprocessing import shared_memory as shm
from multiprocessing.shared_memory import (
SharedMemory,
ShareableList,
)
from msgspec import (
Struct,
@ -61,7 +62,7 @@ except ImportError:
log = get_logger(__name__)
disable_mantracker()
SharedMemory = disable_mantracker()
class SharedInt:
@ -797,7 +798,14 @@ def open_shm_list(
# "close" attached shm on actor teardown
try:
actor = tractor.current_actor()
actor.lifetime_stack.callback(shml.shm.close)
# XXX on 3.13+ we don't need to call this?
# -> bc we pass `track=False` for `SharedMemeory` orr?
if (
platform.python_version_tuple()[:-1] < ('3', '13')
):
actor.lifetime_stack.callback(shml.shm.unlink)
except RuntimeError:
log.warning('tractor runtime not active, skipping teardown steps')

View File

@ -430,20 +430,25 @@ class MsgpackTransport(MsgTransport):
return await self.stream.send_all(size + bytes_data)
except (
trio.BrokenResourceError,
) as bre:
trans_err = bre
trio.ClosedResourceError,
) as _re:
trans_err = _re
tpt_name: str = f'{type(self).__name__!r}'
match trans_err:
case trio.BrokenResourceError() if (
'[Errno 32] Broken pipe' in trans_err.args[0]
# ^XXX, specifc to UDS transport and its,
# XXX, specifc to UDS transport and its,
# well, "speediness".. XD
# |_ likely todo with races related to how fast
# the socket is setup/torn-down on linux
# as it pertains to rando pings from the
# `.discovery` subsys and protos.
case trio.BrokenResourceError() if (
'[Errno 32] Broken pipe'
in
trans_err.args[0]
):
raise TransportClosed.from_src_exc(
tpt_closed = TransportClosed.from_src_exc(
message=(
f'{tpt_name} already closed by peer\n'
),
@ -451,14 +456,31 @@ class MsgpackTransport(MsgTransport):
src_exc=trans_err,
raise_on_report=True,
loglevel='transport',
) from bre
)
raise tpt_closed from trans_err
# case trio.ClosedResourceError() if (
# 'this socket was already closed'
# in
# trans_err.args[0]
# ):
# tpt_closed = TransportClosed.from_src_exc(
# message=(
# f'{tpt_name} already closed by peer\n'
# ),
# body=f'{self}\n',
# src_exc=trans_err,
# raise_on_report=True,
# loglevel='transport',
# )
# raise tpt_closed from trans_err
# unless the disconnect condition falls under "a
# normal operation breakage" we usualy console warn
# about it.
case _:
log.exception(
'{tpt_name} layer failed pre-send ??\n'
f'{tpt_name} layer failed pre-send ??\n'
)
raise trans_err
@ -503,7 +525,7 @@ class MsgpackTransport(MsgTransport):
def pformat(self) -> str:
return (
f'<{type(self).__name__}(\n'
f' |_peers: 2\n'
f' |_peers: 1\n'
f' laddr: {self._laddr}\n'
f' raddr: {self._raddr}\n'
# f'\n'

View File

@ -18,6 +18,9 @@ Unix Domain Socket implementation of tractor.ipc._transport.MsgTransport protoco
'''
from __future__ import annotations
from contextlib import (
contextmanager as cm,
)
from pathlib import Path
import os
from socket import (
@ -29,6 +32,7 @@ from socket import (
)
import struct
from typing import (
Type,
TYPE_CHECKING,
ClassVar,
)
@ -99,8 +103,6 @@ class UDSAddress(
self.filedir
or
self.def_bindspace
# or
# get_rt_dir()
)
@property
@ -205,12 +207,35 @@ class UDSAddress(
f']'
)
@cm
def _reraise_as_connerr(
src_excs: tuple[Type[Exception]],
addr: UDSAddress,
):
try:
yield
except src_excs as src_exc:
raise ConnectionError(
f'Bad UDS socket-filepath-as-address ??\n'
f'{addr}\n'
f' |_sockpath: {addr.sockpath}\n'
f'\n'
f'from src: {src_exc!r}\n'
) from src_exc
async def start_listener(
addr: UDSAddress,
**kwargs,
) -> SocketListener:
# sock = addr._sock = socket.socket(
'''
Start listening for inbound connections via
a `trio.SocketListener` (task) which `socket.bind()`s on `addr`.
Note, if the `UDSAddress.bindspace: Path` directory dne it is
implicitly created.
'''
sock = socket.socket(
socket.AF_UNIX,
socket.SOCK_STREAM
@ -221,17 +246,25 @@ async def start_listener(
f'|_{addr}\n'
)
# ?TODO? should we use the `actor.lifetime_stack`
# to rm on shutdown?
bindpath: Path = addr.sockpath
try:
await sock.bind(str(bindpath))
except (
if not (bs := addr.bindspace).is_dir():
log.info(
'Creating bindspace dir in file-sys\n'
f'>{{\n'
f'|_{bs!r}\n'
)
bs.mkdir()
with _reraise_as_connerr(
src_excs=(
FileNotFoundError,
) as fdne:
raise ConnectionError(
f'Bad UDS socket-filepath-as-address ??\n'
f'{addr}\n'
f' |_sockpath: {addr.sockpath}\n'
) from fdne
OSError,
),
addr=addr
):
await sock.bind(str(bindpath))
sock.listen(1)
log.info(
@ -356,27 +389,30 @@ class MsgpackUDSStream(MsgpackTransport):
# `.setsockopt()` call tells the OS provide it; the client
# pid can then be read on server/listen() side via
# `get_peer_info()` above.
try:
with _reraise_as_connerr(
src_excs=(
FileNotFoundError,
),
addr=addr
):
stream = await open_unix_socket_w_passcred(
str(sockpath),
**kwargs
)
except (
FileNotFoundError,
) as fdne:
raise ConnectionError(
f'Bad UDS socket-filepath-as-address ??\n'
f'{addr}\n'
f' |_sockpath: {sockpath}\n'
) from fdne
stream = MsgpackUDSStream(
tpt_stream = MsgpackUDSStream(
stream,
prefix_size=prefix_size,
codec=codec
)
stream._raddr = addr
return stream
# XXX assign from new addrs after peer-PID extract!
(
tpt_stream._laddr,
tpt_stream._raddr,
) = cls.get_stream_addrs(stream)
return tpt_stream
@classmethod
def get_stream_addrs(

View File

@ -130,6 +130,7 @@ class LinkedTaskChannel(
_trio_task: trio.Task
_aio_task_complete: trio.Event
_closed_by_aio_task: bool = False
_suppress_graceful_exits: bool = True
_trio_err: BaseException|None = None
@ -208,10 +209,15 @@ class LinkedTaskChannel(
async def aclose(self) -> None:
await self._from_aio.aclose()
def started(
# ?TODO? async version of this?
def started_nowait(
self,
val: Any = None,
) -> None:
'''
Synchronize aio-side with its trio-parent.
'''
self._aio_started_val = val
return self._to_trio.send_nowait(val)
@ -242,6 +248,7 @@ class LinkedTaskChannel(
# cycle on the trio side?
# await trio.lowlevel.checkpoint()
return await self._from_aio.receive()
except BaseException as err:
async with translate_aio_errors(
chan=self,
@ -319,7 +326,7 @@ def _run_asyncio_task(
qsize: int = 1,
provide_channels: bool = False,
suppress_graceful_exits: bool = True,
hide_tb: bool = False,
hide_tb: bool = True,
**kwargs,
) -> LinkedTaskChannel:
@ -347,18 +354,6 @@ def _run_asyncio_task(
# value otherwise it would just return ;P
assert qsize > 1
if provide_channels:
assert 'to_trio' in args
# allow target func to accept/stream results manually by name
if 'to_trio' in args:
kwargs['to_trio'] = to_trio
if 'from_trio' in args:
kwargs['from_trio'] = from_trio
coro = func(**kwargs)
trio_task: trio.Task = trio.lowlevel.current_task()
trio_cs = trio.CancelScope()
aio_task_complete = trio.Event()
@ -373,6 +368,25 @@ def _run_asyncio_task(
_suppress_graceful_exits=suppress_graceful_exits,
)
# allow target func to accept/stream results manually by name
if 'to_trio' in args:
kwargs['to_trio'] = to_trio
if 'from_trio' in args:
kwargs['from_trio'] = from_trio
if 'chan' in args:
kwargs['chan'] = chan
if provide_channels:
assert (
'to_trio' in args
or
'chan' in args
)
coro = func(**kwargs)
async def wait_on_coro_final_result(
to_trio: trio.MemorySendChannel,
coro: Awaitable,
@ -445,9 +459,23 @@ def _run_asyncio_task(
f'Task exited with final result: {result!r}\n'
)
# only close the sender side which will relay
# a `trio.EndOfChannel` to the trio (consumer) side.
# XXX ALWAYS close the child-`asyncio`-task-side's
# `to_trio` handle which will in turn relay
# a `trio.EndOfChannel` to the `trio`-parent.
# Consequently the parent `trio` task MUST ALWAYS
# check for any `chan._aio_err` to be raised when it
# receives an EoC.
#
# NOTE, there are 2 EoC cases,
# - normal/graceful EoC due to the aio-side actually
# terminating its "streaming", but the task did not
# error and is not yet complete.
#
# - the aio-task terminated and we specially mark the
# closure as due to the `asyncio.Task`'s exit.
#
to_trio.close()
chan._closed_by_aio_task = True
aio_task_complete.set()
log.runtime(
@ -645,8 +673,9 @@ def _run_asyncio_task(
not trio_cs.cancel_called
):
log.cancel(
f'Cancelling `trio` side due to aio-side src exc\n'
f'{curr_aio_err}\n'
f'Cancelling trio-side due to aio-side src exc\n'
f'\n'
f'{curr_aio_err!r}\n'
f'\n'
f'(c>\n'
f' |_{trio_task}\n'
@ -758,6 +787,7 @@ async def translate_aio_errors(
aio_done_before_trio: bool = aio_task.done()
assert aio_task
trio_err: BaseException|None = None
eoc: trio.EndOfChannel|None = None
try:
yield # back to one of the cross-loop apis
except trio.Cancelled as taskc:
@ -789,12 +819,48 @@ async def translate_aio_errors(
# )
# raise
# XXX always passthrough EoC since this translator is often
# called from `LinkedTaskChannel.receive()` which we want
# passthrough and further we have no special meaning for it in
# terms of relaying errors or signals from the aio side!
except trio.EndOfChannel as eoc:
# XXX EoC is a special SIGNAL from the aio-side here!
# There are 2 cases to handle:
# 1. the "EoC passthrough" case.
# - the aio-task actually closed the channel "gracefully" and
# the trio-task should unwind any ongoing channel
# iteration/receiving,
# |_this exc-translator wraps calls to `LinkedTaskChannel.receive()`
# in which case we want to relay the actual "end-of-chan" for
# iteration purposes.
#
# 2. relaying the "asyncio.Task termination" case.
# - if the aio-task terminates, maybe with an error, AND the
# `open_channel_from()` API was used, it will always signal
# that termination.
# |_`wait_on_coro_final_result()` always calls
# `to_trio.close()` when `provide_channels=True` so we need to
# always check if there is an aio-side exc which needs to be
# relayed to the parent trio side!
# |_in this case the special `chan._closed_by_aio_task` is
# ALWAYS set.
#
except trio.EndOfChannel as _eoc:
eoc = _eoc
if (
chan._closed_by_aio_task
and
aio_err
):
log.cancel(
f'The asyncio-child task terminated due to error\n'
f'{aio_err!r}\n'
)
chan._trio_to_raise = aio_err
trio_err = chan._trio_err = eoc
#
# ?TODO?, raise something like a,
# chan._trio_to_raise = AsyncioErrored()
# BUT, with the tb rewritten to reflect the underlying
# call stack?
else:
trio_err = chan._trio_err = eoc
raise eoc
# NOTE ALSO SEE the matching note in the `cancel_trio()` asyncio
@ -1047,7 +1113,7 @@ async def translate_aio_errors(
#
if wait_on_aio_task:
await chan._aio_task_complete.wait()
log.info(
log.debug(
'asyncio-task is done and unblocked trio-side!\n'
)
@ -1064,11 +1130,17 @@ async def translate_aio_errors(
trio_to_raise: (
AsyncioCancelled|
AsyncioTaskExited|
Exception| # relayed from aio-task
None
) = chan._trio_to_raise
raise_from: Exception = (
trio_err if (aio_err is trio_to_raise)
else aio_err
)
if not suppress_graceful_exits:
raise trio_to_raise from (aio_err or trio_err)
raise trio_to_raise from raise_from
if trio_to_raise:
match (
@ -1101,7 +1173,7 @@ async def translate_aio_errors(
)
return
case _:
raise trio_to_raise from (aio_err or trio_err)
raise trio_to_raise from raise_from
# Check if the asyncio-side is the cause of the trio-side
# error.
@ -1167,7 +1239,6 @@ async def run_task(
@acm
async def open_channel_from(
target: Callable[..., Any],
suppress_graceful_exits: bool = True,
**target_kwargs,
@ -1201,7 +1272,6 @@ async def open_channel_from(
# deliver stream handle upward
yield first, chan
except trio.Cancelled as taskc:
# await tractor.pause(shield=True) # ya it worx ;)
if cs.cancel_called:
if isinstance(chan._trio_to_raise, AsyncioCancelled):
log.cancel(

View File

@ -31,7 +31,7 @@ from ._broadcast import (
)
from ._beg import (
collapse_eg as collapse_eg,
maybe_collapse_eg as maybe_collapse_eg,
get_collapsed_eg as get_collapsed_eg,
is_multi_cancelled as is_multi_cancelled,
)
from ._taskc import (

View File

@ -15,8 +15,9 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
`BaseExceptionGroup` related utils and helpers pertaining to
first-class-`trio` from a historical perspective B)
`BaseExceptionGroup` utils and helpers pertaining to
first-class-`trio` from a "historical" perspective, like "loose
exception group" task-nurseries.
'''
from contextlib import (
@ -24,28 +25,83 @@ from contextlib import (
)
from typing import (
Literal,
Type,
)
import trio
# from trio._core._concat_tb import (
# concat_tb,
# )
def maybe_collapse_eg(
# XXX NOTE
# taken verbatim from `trio._core._run` except,
# - remove the NONSTRICT_EXCEPTIONGROUP_NOTE deprecation-note
# guard-check; we know we want an explicit collapse.
# - mask out tb rewriting in collapse case, i don't think it really
# matters?
#
def collapse_exception_group(
excgroup: BaseExceptionGroup[BaseException],
) -> BaseException:
"""Recursively collapse any single-exception groups into that single contained
exception.
"""
exceptions = list(excgroup.exceptions)
modified = False
for i, exc in enumerate(exceptions):
if isinstance(exc, BaseExceptionGroup):
new_exc = collapse_exception_group(exc)
if new_exc is not exc:
modified = True
exceptions[i] = new_exc
if (
len(exceptions) == 1
and isinstance(excgroup, BaseExceptionGroup)
# XXX trio's loose-setting condition..
# and NONSTRICT_EXCEPTIONGROUP_NOTE in getattr(excgroup, "__notes__", ())
):
# exceptions[0].__traceback__ = concat_tb(
# excgroup.__traceback__,
# exceptions[0].__traceback__,
# )
return exceptions[0]
elif modified:
return excgroup.derive(exceptions)
else:
return excgroup
def get_collapsed_eg(
beg: BaseExceptionGroup,
) -> BaseException|bool:
) -> BaseException|None:
'''
If the input beg can collapse to a single non-eg sub-exception,
return it instead.
If the input beg can collapse to a single sub-exception which is
itself **not** an eg, return it.
'''
if len(excs := beg.exceptions) == 1:
return excs[0]
maybe_exc = collapse_exception_group(beg)
if maybe_exc is beg:
return None
return False
return maybe_exc
@acm
async def collapse_eg(
hide_tb: bool = True,
# XXX, for ex. will always show begs containing single taskc
ignore: set[Type[BaseException]] = {
# trio.Cancelled,
},
add_notes: bool = True,
bp: bool = False,
):
'''
If `BaseExceptionGroup` raised in the body scope is
@ -57,16 +113,55 @@ async def collapse_eg(
__tracebackhide__: bool = hide_tb
try:
yield
except* BaseException as beg:
except BaseExceptionGroup as _beg:
beg = _beg
if (
exc := maybe_collapse_eg(beg)
bp
and
len(beg.exceptions) > 1
):
import tractor
if tractor.current_actor(
err_on_no_runtime=False,
):
await tractor.pause(shield=True)
else:
breakpoint()
if (
(exc := get_collapsed_eg(beg))
and
type(exc) not in ignore
):
# TODO? report number of nested groups it was collapsed
# *from*?
if add_notes:
from_group_note: str = (
'( ^^^ this exc was collapsed from a group ^^^ )\n'
)
if (
from_group_note
not in
getattr(exc, "__notes__", ())
):
exc.add_note(from_group_note)
# raise exc
# ^^ this will leave the orig beg tb above with the
# "during the handling of <beg> the following.."
# So, instead do..
#
if cause := exc.__cause__:
raise exc from cause
else:
# suppress "during handling of <the beg>"
# output in tb/console.
raise exc from None
raise exc
raise beg
# keep original
raise # beg
def is_multi_cancelled(

View File

@ -40,7 +40,10 @@ from typing import (
import trio
from tractor._state import current_actor
from tractor.log import get_logger
from ._beg import collapse_eg
# from ._beg import collapse_eg
# from ._taskc import (
# maybe_raise_from_masking_exc,
# )
if TYPE_CHECKING:
@ -106,6 +109,9 @@ async def _enter_and_wait(
async def gather_contexts(
mngrs: Sequence[AsyncContextManager[T]],
# caller can provide their own scope
tn: trio.Nursery|None = None,
) -> AsyncGenerator[
tuple[
T | None,
@ -148,12 +154,22 @@ async def gather_contexts(
'`.trionics.gather_contexts()` input mngrs is empty?\n'
'\n'
'Did try to use inline generator syntax?\n'
'Use a non-lazy iterator or sequence-type intead!\n'
'Check that list({mngrs}) works!\n'
# 'or sequence-type intead!\n'
# 'Use a non-lazy iterator or sequence-type intead!\n'
)
try:
async with (
collapse_eg(),
trio.open_nursery() as tn,
#
# ?TODO, does including these (eg-collapsing,
# taskc-unmasking) improve tb noise-reduction/legibility?
#
# collapse_eg(),
maybe_open_nursery(
nursery=tn,
) as tn,
# maybe_raise_from_masking_exc(),
):
for mngr in mngrs:
tn.start_soon(
@ -165,11 +181,12 @@ async def gather_contexts(
seed,
)
# deliver control once all managers have started up
# deliver control to caller once all ctx-managers have
# started (yielded back to us).
await all_entered.wait()
try:
yield tuple(unwrapped.values())
parent_exit.set()
finally:
# XXX NOTE: this is ABSOLUTELY REQUIRED to avoid
# the following wacky bug:
@ -187,7 +204,7 @@ class _Cache:
a kept-alive-while-in-use async resource.
'''
service_n: Optional[trio.Nursery] = None
service_tn: Optional[trio.Nursery] = None
locks: dict[Hashable, trio.Lock] = {}
users: int = 0
values: dict[Any, Any] = {}
@ -228,6 +245,9 @@ async def maybe_open_context(
kwargs: dict = {},
key: Hashable | Callable[..., Hashable] = None,
# caller can provide their own scope
tn: trio.Nursery|None = None,
) -> AsyncIterator[tuple[bool, T]]:
'''
Maybe open an async-context-manager (acm) if there is not already
@ -260,43 +280,94 @@ async def maybe_open_context(
# have it not be closed until all consumers have exited (which is
# currently difficult to implement any other way besides using our
# pre-allocated runtime instance..)
service_n: trio.Nursery = current_actor()._service_n
if tn:
# TODO, assert tn is eventual parent of this task!
task: trio.Task = trio.lowlevel.current_task()
task_tn: trio.Nursery = task.parent_nursery
if not tn._cancel_status.encloses(
task_tn._cancel_status
):
raise RuntimeError(
f'Mis-nesting of task under provided {tn} !?\n'
f'Current task is NOT a child(-ish)!!\n'
f'\n'
f'task: {task}\n'
f'task_tn: {task_tn}\n'
)
service_tn = tn
else:
service_tn: trio.Nursery = current_actor()._service_tn
# TODO: is there any way to allocate
# a 'stays-open-till-last-task-finshed nursery?
# service_n: trio.Nursery
# async with maybe_open_nursery(_Cache.service_n) as service_n:
# _Cache.service_n = service_n
# service_tn: trio.Nursery
# async with maybe_open_nursery(_Cache.service_tn) as service_tn:
# _Cache.service_tn = service_tn
cache_miss_ke: KeyError|None = None
maybe_taskc: trio.Cancelled|None = None
try:
# **critical section** that should prevent other tasks from
# checking the _Cache until complete otherwise the scheduler
# may switch and by accident we create more then one resource.
yielded = _Cache.values[ctx_key]
except KeyError:
log.debug(f'Allocating new {acm_func} for {ctx_key}')
except KeyError as _ke:
# XXX, stay mutexed up to cache-miss yield
try:
cache_miss_ke = _ke
log.debug(
f'Allocating new @acm-func entry\n'
f'ctx_key={ctx_key}\n'
f'acm_func={acm_func}\n'
)
mngr = acm_func(**kwargs)
resources = _Cache.resources
assert not resources.get(ctx_key), f'Resource exists? {ctx_key}'
resources[ctx_key] = (service_n, trio.Event())
# sync up to the mngr's yielded value
yielded = await service_n.start(
resources[ctx_key] = (service_tn, trio.Event())
yielded: Any = await service_tn.start(
_Cache.run_ctx,
mngr,
ctx_key,
)
_Cache.users += 1
finally:
# XXX, since this runs from an `except` it's a checkpoint
# whih can be `trio.Cancelled`-masked.
#
# NOTE, in that case the mutex is never released by the
# (first and) caching task and **we can't** simply shield
# bc that will inf-block on the `await
# no_more_users.wait()`.
#
# SO just always unlock!
lock.release()
try:
yield (
False, # cache_hit = "no"
yielded,
)
except trio.Cancelled as taskc:
maybe_taskc = taskc
log.cancel(
f'Cancelled from cache-miss entry\n'
f'\n'
f'ctx_key: {ctx_key!r}\n'
f'mngr: {mngr!r}\n'
)
# XXX, always unset ke from cancelled context
# since we never consider it a masked exc case!
# - bc this can be called directly ty `._rpc._invoke()`?
#
if maybe_taskc.__context__ is cache_miss_ke:
maybe_taskc.__context__ = None
raise taskc
else:
_Cache.users += 1
log.runtime(
log.debug(
f'Re-using cached resource for user {_Cache.users}\n\n'
f'{ctx_key!r} -> {type(yielded)}\n'
@ -312,6 +383,13 @@ async def maybe_open_context(
)
finally:
if lock.locked():
stats: trio.LockStatistics = lock.statistics()
log.error(
f'Lock left locked by last owner !?\n'
f'{stats}\n'
)
_Cache.users -= 1
if yielded is not None:

View File

@ -60,8 +60,8 @@ def find_masked_excs(
return None
# XXX, relevant ish discussion @ `trio`-core,
# https://github.com/python-trio/trio/issues/455#issuecomment-2785122216
# XXX, relevant discussion @ `trio`-core,
# https://github.com/python-trio/trio/issues/455
#
@acm
async def maybe_raise_from_masking_exc(
@ -113,7 +113,6 @@ async def maybe_raise_from_masking_exc(
)
matching: list[BaseException]|None = None
maybe_eg: ExceptionGroup|None
maybe_eg: ExceptionGroup|None
if tn:
try: # handle egs