Compare commits

...

50 Commits

Author SHA1 Message Date
Tyler Goodlet f923551391 Add timeout around inf-streamer suite
Since with the new actorc injection seems to be hanging?
Not sure what exactly the issue is but likely races again
during teardown between the `.run_in_actor()` remote-exc capture
and any actorc after the `portal.cancel()`..

Also tossed in a bp to figure out why actorcs aren't actually showing
outside the `trio.run()`..?
2026-02-19 19:24:52 -05:00
Tyler Goodlet 4943438780 Adjust nested-subs debug test for tbs output
Such that we don't require every single src/relay_uid in the final
output but instead at some point in the pre-output of some prompt.
Added some comments to match each actor sub-layer.
2026-02-19 19:24:52 -05:00
Tyler Goodlet 7ee95cf709 WIP, actor-nursery non-graceful-cancel raises EG
Attempting a rework of the post-cancellation "raising semantics" such
that subactors which are `ActorCancelled` as a result of a non-graceful
in-scope error, are acked via a re-raised
`ExceptionGroup[ActorCancelled*N, Exception]`
*outside the an-block*. Eventually, the idea is to have `ActorCancelled`
be relayed from each subactor in response to any
`Actor.cancel()/Portal.cancel_actor()` request much like
`Context.cancel()/ContextCancelled`.

This is a WIP bc it does break a few tests and requires related
`_spawn`-mod-machinery changes to match some of which I'm not yet sure
are required; need to dig into to the details of the currently failing
suites first.

`._supervise` patch deats,
- add `ActorNursery.maybe_error` which delivers the maybe-EG or
  `._scope_error` depending on `.errors` (now `._errors`, a mapping from
  `Aid`-keys) has entries seet for subs.
- raise ^ if non-null in a new outer-`finally` in
  `_open_and_supervise_one_cancels_all_nursery()`; an "outer" block is
  added to ensure all sub-actor-excs are emited/captured as part of
  `ActorNursery.cancel()` being called (as prior) as well as the
  `da_nursery` being explicitly cancelled alongside it (to unblock the
  tn-block, but still not sure why this is necessary yet?..).
- (now masked) tried injecting actorcs from `.cancel()` loop, but (again
  per more explanation in section below) seems to be suffering a race
  issue with RAE relay?
- left in buncha notes obvi for all this..

`._spawn` patch deats,
- as above, expect `errors: dict` to map from `Aid`-keys.
- pass `errors: dict` into `soft_kill()` since it seemed like we'd want
  to (for now) inject `ActoreCancelled` in some cases (but now i'm not
  sure XD).
- tried out a couple spots (which are now masked) to inject
  `ActorCancelled` after calling `Portal.cancel()` in various
  subactor-supervision routines whenev an RAE is not set..
  - oddly seems to be overwriting actual errors (likely due to racing
    with RAE receive and/or actorc-request timeout?) despite the guard
    logic..which clearly doesn't resolve the issue..
- buncha `tn`-style renaming.
2026-02-19 19:24:52 -05:00
Tyler Goodlet e08bf4812c Add todo for `tn` to `gather_contexts()` from `find_actor()`? 2026-02-19 19:24:52 -05:00
Tyler Goodlet aa46685f38 Use `an` var name in nested subactor debugging ex. 2026-02-19 19:24:52 -05:00
Tyler Goodlet a59a3290aa TOSQUASH 313ad93: yeah dun use `._message` as tb-str.. 2026-02-19 19:24:52 -05:00
Tyler Goodlet ddd8861fce Add an `actorc` test-driven-dev suite
Defining how an actor-nursery should emit an eg based on non-graceful
cancellation in a new `test_actor_nursery` module. Obviously fails atm
until the implementation is completed.
2026-02-19 19:24:52 -05:00
Tyler Goodlet 7db5b801d3 Add `ActorCancelled` as an runtime-wide-signal
As in a layer "above" a KBI/SIGINT but "below" a `ContextCancelled` and
generally signalling an interrupt which requests cancellation of the
actor's `trio.run()`.

Impl deats,
- mk the new exc type inherit from our ctxc (for now) but overriding the
  `.canceller` impl to,
  * pull from the `RemoteActorError._extra_msgdata: dict` when no
    `._ipc_msg` is set (which is always to start, until we incorporate
    a new `CancelActor` msg type).
  * not allow a `None` value since we should key-error if not set per
    prev bullet.
- Mk adjustments (related) to parent `RemoteActorError.pformat()` to
  accommodate showing the `.canceller` field in repr output,
  * change `.relay_uid` to not crash when `._ipc_msg` is unset.
  * support `.msg.types.Aid` and use its `.reprol()` from `._mk_fields_str()`.
  * always call `._mk_fields_str()`, not just when `tb_str` is provided,
    and for now use any `._message` in-place of a `tb_str` when
    undefined.
2026-02-19 19:24:52 -05:00
Bd 70bb77280e
Merge pull request #411 from goodboy/tpt_tolerance
Tpt-tolerance: more lowlevel `trio` CRE/BRE -> `TransportClosed` translations
2026-02-19 16:40:17 -05:00
Gud Boi 916f88a070 Less newlines in `._rpc` log msg 2026-02-19 16:31:54 -05:00
Gud Boi 91f2f3ec10 Use test-harness `loglevel` in inter-peer suite 2026-02-19 16:29:20 -05:00
Tyler Goodlet 3e5124e184 Hide `._rpc._invoke()` frame, again.. 2026-02-19 16:28:22 -05:00
Gud Boi fa86269e30 Stuff from auto-review in https://github.com/goodboy/tractor/pull/412 .. 2026-02-19 16:20:21 -05:00
Gud Boi d0b92bbeba Clean up `._transport` error-case comment
Expand and clarify the comment for the default `case _`
block in the `.send()` error matcher, noting that we
console-error and raise-thru for unexpected disconnect
conditions.

(this patch was suggested by copilot in,
 https://github.com/goodboy/tractor/pull/411)

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-19 16:18:39 -05:00
Gud Boi 9470815f5a Fix `spawn` fixture cleanup + test assertions
Improve the `spawn` fixture teardown logic in
`tests/devx/conftest.py` fixing the while-else bug, and fix
`test_advanced_faults` genexp for `TransportClosed` exc type
checking.

Deats,
- replace broken `while-else` pattern with direct
  `if ptyproc.isalive()` check after the SIGINT loop.
- fix undefined `spawned` ref -> `ptyproc.isalive()` in
  while condition.
- improve walrus expr formatting in timeout check (multiline
  style).

Also fix `test_ipc_channel_break_during_stream()` assertion,
- wrap genexp in `all()` call so it actually checks all excs
  are `TransportClosed` instead of just creating an unused
  generator.

(this patch was suggested by copilot in,
 https://github.com/goodboy/tractor/pull/411)

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-19 16:14:11 -05:00
Gud Boi 592d918394 Tweak `test_inter_peer_cancellation` for races
Adjust `basic_echo_server()` default sequence len to avoid the race
where the 'tell_little_bro()` finished streaming **before** the
echo-server sub is cancelled by its peer subactor (which is the whole
thing we're testing!).

Deats,
- bump `rng_seed` default from 50 -> 100 to ensure peer
  cancel req arrives before echo dialog completes on fast hw.
- add `trio.sleep(0.001)` between send/receive in msg loop on the
  "client" streamer side to give cancel request transit more time to
  arrive.

Also,
- add more native `tractor`-type hints.
- reflow `basic_echo_server()` doc-string for 67 char limit
- add masked `pause()` call with comment about unreachable
  code path
- alphabetize imports: mv `current_actor` and `open_nursery`
  below typed imports

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-19 15:24:42 -05:00
Gud Boi 0cddc67bdb Add doc-strs to `get_root()` + `maybe_open_portal()`
Brief descriptions for both fns in `._discovery` clarifying
what each delivers and under what conditions.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-19 13:55:02 -05:00
Gud Boi 052fe2435f Improve `Channel` doc-strs + minor cleanups
Flesh out missing method doc-strings, improve log msg formatting and
assert -> `RuntimeError` for un-inited tpt layer.

Deats,
- add doc-string to `.send()` noting `TransportClosed` raise
  on comms failures.
- add doc-string to `.recv()`.
- expand `._aiter_msgs()` doc-string, line-len reflow.
- add doc-string to `.connected()`.
- convert `assert self._transport` -> `RuntimeError` raise
  in `._aiter_msgs()` for more explicit crashing.
- expand `_connect_chan()` doc-string, note it's lowlevel
  and suggest `.open_portal()` to user instead.
- factor out `src_exc_str` in `TransportClosed` log handler
  to avoid double-call
- use multiline style for `.connected()` return expr.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-19 13:55:02 -05:00
Gud Boi 28819bf5d3 Add `Actor.is_root()` convenience predicate meth 2026-02-19 13:55:02 -05:00
Gud Boi 07c2ba5c0d Drop `trio`-exc-catching if tpt-closed covers them
Remove the `trio.ClosedResourceError` and `trio.BrokenResourceError`
handling that should now be subsumed by `TransportClosed` re-raising out
of the `.ipc` stack.

Deats,
- drop CRE and BRE from `._streaming.MsgStream.aclose()/.send()` blocks.
- similarly rm from `._context.open_context_from_portal()`.
- also from `._portal.Portal.cancel_actor()` and drop the
  (now-completed-todo) comment about this exact thing.

Also add comment in `._rpc.try_ship_error_to_remote()` noting the
remaining `trio` catches there are bc the `.ipc` layers *should* be
wrapping them; thus `log.critical()` use is warranted.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-19 13:55:02 -05:00
Gud Boi 50f40f427b Include `TransportClosed` in tpt-layer err handling
Add `TransportClosed` to except clauses where `trio`'s own
resource-closed errors are already caught, ensuring our
higher-level tpt exc is also tolerated in those same spots.
Likely i will follow up with a removal of the `trio` variants since most
*should be* caught and re-raised as tpt-closed out of the `.ipc` stack
now?

Add `TransportClosed` to various handler blocks,
- `._streaming.MsgStream.aclose()/.send()` except blocks.
- the broken-channel except in `._context.open_context_from_portal()`.
- obvi import it where necessary in those ^ mods.

Adjust `test_advanced_faults` suite + exs-script to match,
- update `ipc_failure_during_stream.py` example to catch
  `TransportClosed` alongside `trio.ClosedResourceError`
  in both the break and send-check paths.
- shield the `trio.sleep(0.01)` after tpt close in example to avoid
  taskc-raise/masking on that checkpoint since we want to simulate
  waiting for a user to send a KBI.
- loosen `ExceptionGroup` assertion to `len(excs) <= 2` and ensure all
  excs are `TransportClosed`.
- improve multi-line formatting, minor style/formatting fixes in
  condition expressions.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-19 13:55:02 -05:00
Gud Boi bf6de55865 Improve tpt-closed msg-fmt/content and CRE case matching
Refine tpt-error reporting to include closure attribution (`'locally'`
vs `'by peer'`), tighten match conditions and reduce needless newlines
in exc reprs.

Deats,
- factor out `trans_err_msg: str` and `by_whom: str` into a `dict`
  lookup before the `match:` block to pair specific err msgs to closure
  attribution strings.
- use `by_whom` directly as `CRE` case guard condition
  (truthy when msg matches known underlying CRE msg content).
- conveniently include `by_whom!r` in `TransportClosed` message.
- fix `'locally ?'` -> `'locally?'` in send-side `CRE`
  handler (drop errant space).
- add masked `maybe_pause_bp()` calls at both `CRE` sites (from when
  i was tracing a test harness issue where the UDS socket path wasn't
  being cleaned up on teardown).
- drop trailing `\n` from `body=` args to `TransportClosed`.
- reuse `trans_err_msg` for the `BRE`/broken-pipe guard.

Also adjust testing, namely `test_ctxep_pauses_n_maybe_ipc_breaks`'s
expected patts-set for new msg formats to be raised out of
`.ipc._transport`.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-19 13:55:02 -05:00
Gud Boi 5ded99a886 Add a `._trace.maybe_pause_bp()` for tpt-broken cases
Internal helper which falls back to sync `pdb` when the
child actor can't reach root to acquire the TTY lock.

Useful when debugging tpt layer failures (intentional or
otherwise) where a sub-actor can no longer IPC-contact the
root to coordinate REPL access; root uses `.pause()` as
normal while non-root falls back to `mk_pdb().set_trace()`.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-19 13:55:02 -05:00
Gud Boi 7145fa364f Add `SIGINT` cleanup to `spawn` fixture in `devx/conftest`
Convert `spawn` fixture to a generator and add post-test graceful
subproc cleanup via `SIGINT`/`SIGKILL` to avoid leaving stale `pexpect`
child procs around between test runs as well as any UDS-tpt socket files
under the system runtime-dir.

Deats,
- convert `return _spawn` -> `yield _spawn` to enable
  post-yield teardown logic.
- add a new `nonlocal spawned` ref so teardown logic can access the last
  spawned child from outside the delivered spawner fn-closure.
- add `SIGINT`-loop after yield with 5s timeout, then
  `SIGKILL` if proc still alive.
- add masked `breakpoint()` and TODO about UDS path cleanup

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-19 13:55:02 -05:00
Gud Boi f8e25688c7 Unmask `ClosedResourceError` handling in `._transport`
Unmask the CRE case block for peer-closed socket errors which already
had a TODO about reproducing the condition. It appears this case can
happen during inter-actor comms teardowns in `piker`, but i haven't been
able to figure out exactly what reproduces it yet..

So activate the block again for that 'socket already closed'-msg case,
and add a TODO questioning how to reproduce it.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-12 00:51:50 -05:00
Tyler Goodlet c3f455a8ec Mask tpt-closed handling of `chan.send(return_msg)`
A partial revert of commit c05d08e426 since it seem we already
suppress tpt-closed errors lower down in `.ipc.Channel.send()`; given
that i'm pretty sure this new handler code should basically never run?

Left in a todo to remove the masked content once i'm done more
thoroughly testing under `piker`.
2026-02-12 00:51:50 -05:00
Tyler Goodlet f78e842fba More `TransportClosed`-handling around IPC-IO
For IPC-disconnects-during-teardown edge cases, augment some `._rpc`
machinery,
- in `._invoke()` around the `await chan.send(return_msg)` where we
  suppress if the underlying `Channel` already disconnected.
- add a disjoint handler in `_errors_relayed_via_ipc()` which just
  reports-n-reraises the exc (same as prior behaviour).
  * originally i thought it needed to be handled specially (to avoid
    being crash handled) but turns out that isn't necessary?
  * hence the also-added-bu-masked-out `debug_filter` / guard expression
    around the `await debug._maybe_enter_pm()` line.
- show the `._invoke()` frame for the moment.
2026-02-12 00:51:50 -05:00
Bd 3638b80c9d
Merge pull request #412 from goodboy/root_actor_raddrs_fix
Non-registrar, root actor `_root_addrs` runtime-vars fix
2026-02-12 00:49:40 -05:00
Gud Boi 2ed9e65530 Clear rtvs state on root shutdown..
Fixes the bug discovered in last test update, not sure how this wasn't
caught already XD
2026-02-11 22:17:26 -05:00
Gud Boi 6cab363c51 Catch-n-fail on stale `_root_addrs` state..
Turns out we aren't clearing the `._state._runtime_vars` entries in
between `open_root_actor` calls.. This test refinement catches that by
adding runtime-vars asserts on the expected root-addrs value; ensure
`_runtime_vars['_root_addrs'] ONLY match the values provided by the
test's CURRENT root actor.

This causes a failure when the (just added)
`test_non_registrar_spawns_child` is run as part of the module suite,
it's fine when run standalone.
2026-02-11 22:17:26 -05:00
Gud Boi 8aee24e83f Fix when root-actor addrs is set as rtvs
Move `_root_addrs` assignment to after `async_main()` unblocks (via
`.started()`) which now delivers the bind addrs , ensuring correct
`UnwrappedAddress` propagation into `._state._runtime_vars` for
non-registar root actors..

Previously for non-registrar root actors the `._state._runtime_vars`
entries were being set as `Address` values which ofc IPC serialize
incorrectly rn vs. the unwrapped versions, (well until we add a msgspec
for their structs anyway) and thus are passed in incorrect form to
children/subactors during spawning..

This fixes the issue by waiting for the `.ipc.*` stack to
bind-and-resolve any randomly allocated addrs (by the OS) until after
the initial `Actor` startup is complete.

Deats,
- primarily, mv `_root_addrs` assignment from before `root_tn.start()`
  to after, using started(-ed) `accept_addrs` now delivered from
  `._runtime.async_main()`..
- update `task_status` type hints to match.
- unpack and set the `(accept_addrs, reg_addrs)` tuple from
  `root_tn.start()` call into `._state._runtime_vars` entries.
- improve and embolden comments distinguishing registrar vs non-registrar
  init paths, ensure typing reflects wrapped vs. unwrapped addrs.

Also,
- add a masked `mk_pdb().set_trace()` for debugging `raddrs` values
  being "off".
- add TODO about using UDS on linux for root mailbox
- rename `trans_bind_addrs` -> `tpt_bind_addrs` for clarity.
- expand comment about random port allocation for
  non-registrar case

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-11 22:17:26 -05:00
Gud Boi cdcc1b42fc Add test for non-registrar root sub-spawning
Ensure non-registrar root actors can spawn children and that
those children receive correct parent contact info. This test
catches the bug reported in,

https://github.com/goodboy/tractor/issues/410

Add new `test_non_registrar_spawns_child()` which spawns a sub-actor
from a non-registrar root and verifies the child can manually connect
back to its parent using `get_root()` API, auditing
`._state._runtime_vars` addr propagation from rent to child.

Also,
- improve type hints throughout test suites
  (`subprocess.Popen`, `UnwrappedAddress`, `Aid` etc.)
- rename `n` -> `an` for actor nursery vars
- use multiline style for function signatures

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-11 22:17:26 -05:00
Bd 51ac0c623e
Merge pull request #402 from goodboy/log_sys_testing
Log sys testing, starting to get "serious" about it..
2026-02-11 22:13:17 -05:00
Gud Boi 3f0bde1bf8 Use bare `get_logger()` in `.to_asyncio` 2026-02-11 22:02:41 -05:00
Gud Boi fa1a15dce8 Cleaups per copilot PR review 2026-02-11 21:51:40 -05:00
Gud Boi 5850844297 Mk `test_implicit_mod_name_applied_for_child()` check init-mods
Test pkg-level init module and sub-pkg module logger naming
to better validate auto-naming logic.

Deats,
- create `pkg_init_mod` and write `mod_code` to it for
  testing pkg-level `__init__.py` logger instance creation.
  * assert `snakelib.__init__` logger name is `proj_name`.
- write `mod_code` to `subpkg/__init__.py`` as well and check the same.

Also,
- rename some vars,
  * `pkg_mod` -> `pkg_submod`,
  * `pkgmod` -> `subpkgmod`
- add `ModuleType` import for type hints
- improve comments explaining pkg init vs first-level
  sub-module naming expectations.
- drop trailing whitespace and unused TODO comment
- remove masked `breakpoint()` call

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-11 21:43:37 -05:00
Gud Boi ff02939213 Toss in some `colorlog` alts to try 2026-02-11 21:05:16 -05:00
Gud Boi d61e8caab2 Improve `test_log_sys` for new auto-naming logic
Add assertions and comments to better test the reworked
implicit module-name detection in `get_logger()`.

Deats,
- add `assert not tractor.current_actor()` check to verify
  no runtime is active during test.
- import `.log` submod directly for use.
- add masked `breakpoint()` for debugging mod loading.
- add comment about using `ranger` to inspect `testdir` layout
  of auto-generated py pkg + module-files.
- improve comments explaining pkg-root-log creation.
- add TODO for testing `get_logger()` call from pkg
  `__init__.py`
- add comment about first-pkg-level module naming.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-11 21:05:07 -05:00
Gud Boi 0b0c83e9da Drop `name=__name__` from all `get_logger()` calls
Use new implicit module-name detection throughout codebase to simplify
logger creation and leverage auto-naming from caller mod .

Main changes,
- drop `name=__name__` arg from all `get_logger()` calls
  (across 29 modules).
- update `get_console_log()` calls to include `name='tractor'` for
  enabling root logger in test harness and entry points; this ensures
  logic in `get_logger()` triggers so that **all** `tractor`-internal
  logging emits to console.
- add info log msg in test `conftest.py` showing test-harness
  log level

Also,
- fix `.actor.uid` ref to `.actor.aid.uid` in `._trace`.
- adjust a `._context` log msg formatting for clarity.
- add TODO comments in `._addr`, `._uds` for when we mv to
  using `multiaddr`.
- add todo for `RuntimeVars` type hint TODO in `.msg.types` (once we
  eventually get that all going obvi!)

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-11 21:04:49 -05:00
Gud Boi 5e7c0f264d Rework `.get_logger()`, better autonaming, deduping
Overhaul of the automatic-calling-module-name detection and sub-log
creation logic to avoid (at least warn) on duplication(s) and still
handle the common usage of a call with `name=__name__` from a mod's top
level scope.

Main `get_logger()` changes,
- refactor auto-naming logic for implicit `name=None` case such that we
  handle at least `tractor` internal "bare" calls from internal submods.
- factor out the `get_caller_mod()` closure (still inside
  `get_logger()`)for introspecting caller's module with configurable
  frame depth.
- use `.removeprefix()` instead of `.lstrip()` for stripping pkg-name
  from mod paths
- mv root-logger creation before sub-logger name processing
- improve duplicate detection for `pkg_name` in `name`
- add `_strict_debug=True`-only-emitted warnings for duplicate
  pkg/leaf-mod names.
- use `print()` fallback for warnings when no actor runtime is up at
  call time.

Surrounding tweaks,
- add `.level` property to `StackLevelAdapter` for getting
  current emit level as lowercase `str`.
- mv `_proj_name` def to just above `get_logger()`
- use `_curr_actor_no_exc` partial in `_conc_name_getters`
  to avoid runtime errors
- improve comments/doc-strings throughout
- keep some masked `breakpoint()` calls for future debugging

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-11 21:04:29 -05:00
Gud Boi edf1189fe0 Multi-line styling in `test.devx.conftest` 2026-02-11 21:04:22 -05:00
Tyler Goodlet de24bfe052 Mv `load_module_from_path()` to a new `._code_load` submod 2026-02-11 21:03:29 -05:00
Tyler Goodlet e235b96894 Use new `pkg_name` in log-sys test suites 2026-02-11 21:03:07 -05:00
Tyler Goodlet dea4b9fd93 Implicitly name sub-logs by caller's mod
That is when no `name` is passed to `get_logger()`, try to introspect
the caller's `module.__name__` and use it to infer/get the "namespace
path" to that module the same as if using `name=__name__` as in the most
common usage.

Further, change the `_root_name` to be `pkg_name: str`, a public and
more obvious param name, and deprecate the former. This obviously adds
the necessary impl to make the new
`test_sys_log::test_implicit_mod_name_applied_for_child` test pass.

Impl detalles for `get_logger()`,
- add `pkg_name` and deprecate `_root_name`, include failover logic
  and a warning.
- implement calling module introspection using
  `inspect.stack()/getmodule()` to get both the `.__name__` and
  `.__package__` info alongside adjusted logic to set the `name`
  when not provided but only when a new `mk_sublog: bool` is set.
- tweak the `name` processing for implicitly set case,
  - rename `sub_name` -> `pkg_path: str` which is the path
    to the calling module minus that module's name component.
  - only partition `name` if `pkg_name` is `in` it.
  - use the `_root_log` for `pkg_name` duplication warnings.

Other/related,
- add types to various public mod vars missing them.
- rename `.log.log` -> `.log._root_log`.
2026-02-11 21:03:07 -05:00
Tyler Goodlet 557e2cec6a Add an implicit-pkg-path-as-logger-name test
A bit of test driven dev to anticipate support  of `.log.get_logger()`
usage such that it can be called from arbitrary sub-modules, themselves
embedded in arbitrary sub-pkgs, of some project; the when not provided,
the `sub_name` passed to the `Logger.getChild(<sub_name>)` will be set
as the sub-pkg path "down to" the calling module.

IOW if you call something like,

`log = tractor.log.get_logger(pkg_name='mypylib')`

from some `submod.py` in a project-dir that looks like,

mypylib/
  mod.py
  subpkg/
    submod.py  <- calling module

the `log: StackLevelAdapter` child-`Logger` instance will have a
`.name: str = 'mypylib.subpkg'`, discluding the `submod` part since this
already rendered as the `{filename}` header in `log.LOG_FORMAT`.

Previously similar behaviour would be obtained by passing
`get_logger(name=__name__)` in the calling module and so much so it
motivated me to make this the default, presuming we can introspect for
the info.

Impl deats,
- duplicated a `load_module_from_path()` from `modden` to load the
  `testdir` rendered py project dir from its path.
 |_should prolly factor it down to this lib anyway bc we're going to
   need it for hot code reload? (well that and `watchfiles` Bp)
- in each of `mod.py` and `submod.py` render the `get_logger()` code
  sin `name`, expecting the (coming shortly) implicit introspection
  feat to do this.
- do `.name` and `.parent` checks against expected sub-logger values
  from `StackLevelAdapter.logger.getChildren()`.
2026-02-11 21:03:07 -05:00
Tyler Goodlet 0e3229f16d Start a logging-sys unit-test module
To start ensuring that when `name=__name__` is passed we try to
de-duplicate the `_root_name` and any `leaf_mod: str` since it's already
included in the headers as `{filename}`.

Deats,
- heavily document the de-duplication `str.partition()`s in
  `.log.get_logger()` and provide the end fix by changing the predicate,
  `if rname == 'tractor':` -> `if rname == _root_name`.
  * also toss in some warnings for when we still detect duplicates.
- add todo comments around logging "filters" (vs. our "adapter").
- create the new `test_log_sys.test_root_pkg_not_duplicated()` which
  runs green with the fixes from ^.
- add a ton of test-suite todos both for existing and anticipated
  logging sys feats in the new mod.
2026-02-11 21:03:07 -05:00
Bd 448d25aef4
Merge pull request #409 from goodboy/nixos_flake
Nixos flake, for the *too-hip-for-arch-ers*
2026-02-11 21:02:37 -05:00
Gud Boi 343c9e0034 Tweaks per the `copilot` PR review 2026-02-11 20:55:08 -05:00
Gud Boi 1dc27c5161 Add a dev-overlay nix flake
Based on the impure template from `pyproject.nix` and providing
a dev-shell for easy bypass-n-hack on nix(os) using `uv`.

Deats,
- include bash completion pkgs for devx/happiness.
- pull `ruff` from <nixpkgs> to avoid wheel (build) issues.
- pin to py313 `cpython` for now.
2026-01-23 16:27:19 -05:00
Gud Boi 14aefa4b11 Reorg dev deps into nested groups
Namely,
- `devx` for console debugging extras used in `tractor.devx`.
- `repl` for @goodboy's `xonsh` hackin utils.
- `testing` for harness stuffs.
- `lint` for whenever we start doing that; it requires special
  separation on nixos in order to pull `ruff` from pkgs.

Oh and bump the lock file.
2026-01-23 16:24:24 -05:00
51 changed files with 1960 additions and 519 deletions

View File

@ -17,6 +17,7 @@ from tractor import (
MsgStream,
_testing,
trionics,
TransportClosed,
)
import trio
import pytest
@ -208,12 +209,16 @@ async def main(
# TODO: is this needed or no?
raise
except trio.ClosedResourceError:
except (
trio.ClosedResourceError,
TransportClosed,
) as _tpt_err:
# NOTE: don't send if we already broke the
# connection to avoid raising a closed-error
# such that we drop through to the ctl-c
# mashing by user.
await trio.sleep(0.01)
with trio.CancelScope(shield=True):
await trio.sleep(0.01)
# timeout: int = 1
# with trio.move_on_after(timeout) as cs:
@ -247,6 +252,7 @@ async def main(
await stream.send(i)
pytest.fail('stream not closed?')
except (
TransportClosed,
trio.ClosedResourceError,
trio.EndOfChannel,
) as send_err:

View File

@ -21,12 +21,12 @@ async def breakpoint_forever():
async def spawn_until(depth=0):
""""A nested nursery that triggers another ``NameError``.
"""
async with tractor.open_nursery() as n:
async with tractor.open_nursery() as an:
if depth < 1:
await n.run_in_actor(breakpoint_forever)
await an.run_in_actor(breakpoint_forever)
p = await n.run_in_actor(
p = await an.run_in_actor(
name_error,
name='name_error'
)
@ -38,7 +38,7 @@ async def spawn_until(depth=0):
# recusrive call to spawn another process branching layer of
# the tree
depth -= 1
await n.run_in_actor(
await an.run_in_actor(
spawn_until,
depth=depth,
name=f'spawn_until_{depth}',

27
flake.lock 100644
View File

@ -0,0 +1,27 @@
{
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1769018530,
"narHash": "sha256-MJ27Cy2NtBEV5tsK+YraYr2g851f3Fl1LpNHDzDX15c=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "88d3861acdd3d2f0e361767018218e51810df8a1",
"type": "github"
},
"original": {
"owner": "nixos",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"nixpkgs": "nixpkgs"
}
}
},
"root": "root",
"version": 7
}

70
flake.nix 100644
View File

@ -0,0 +1,70 @@
# An "impure" template thx to `pyproject.nix`,
# https://pyproject-nix.github.io/pyproject.nix/templates.html#impure
# https://github.com/pyproject-nix/pyproject.nix/blob/master/templates/impure/flake.nix
{
description = "An impure overlay (w dev-shell) using `uv`";
inputs = {
nixpkgs.url = "github:nixos/nixpkgs/nixos-unstable";
};
outputs =
{ nixpkgs, ... }:
let
inherit (nixpkgs) lib;
forAllSystems = lib.genAttrs lib.systems.flakeExposed;
in
{
devShells = forAllSystems (
system:
let
pkgs = nixpkgs.legacyPackages.${system};
# XXX NOTE XXX, for now we overlay specific pkgs via
# a major-version-pinned-`cpython`
cpython = "python313";
venv_dir = "py313";
pypkgs = pkgs."${cpython}Packages";
in
{
default = pkgs.mkShell {
packages = [
# XXX, ensure sh completions activate!
pkgs.bashInteractive
pkgs.bash-completion
# XXX, on nix(os), use pkgs version to avoid
# build/sys-sh-integration issues
pkgs.ruff
pkgs.uv
pkgs.${cpython}# ?TODO^ how to set from `cpython` above?
];
shellHook = ''
# unmask to debug **this** dev-shell-hook
# set -e
# link-in c++ stdlib for various AOT-ext-pkgs (numpy, etc.)
LD_LIBRARY_PATH="${pkgs.stdenv.cc.cc.lib}/lib:$LD_LIBRARY_PATH"
export LD_LIBRARY_PATH
# RUNTIME-SETTINGS
# ------ uv ------
# - always use the ./py313/ venv-subdir
# - sync env with all extras
export UV_PROJECT_ENVIRONMENT=${venv_dir}
uv sync --dev --all-extras
# ------ TIPS ------
# NOTE, to launch the py-venv installed `xonsh` (like @goodboy)
# run the `nix develop` cmd with,
# >> nix develop -c uv run xonsh
'';
};
}
);
};
}

View File

@ -53,22 +53,33 @@ dependencies = [
[dependency-groups]
dev = [
# test suite
# TODO: maybe some of these layout choices?
# https://docs.pytest.org/en/8.0.x/explanation/goodpractices.html#choosing-a-test-layout-import-rules
"pytest>=8.3.5",
"pexpect>=4.9.0,<5",
{include-group = 'devx'},
{include-group = 'testing'},
{include-group = 'repl'},
]
devx = [
# `tractor.devx` tooling
"greenback>=1.2.1,<2",
"stackscope>=0.2.2,<0.3",
# ^ requires this?
"typing-extensions>=4.14.1",
]
testing = [
# test suite
# TODO: maybe some of these layout choices?
# https://docs.pytest.org/en/8.0.x/explanation/goodpractices.html#choosing-a-test-layout-import-rules
"pytest>=8.3.5",
"pexpect>=4.9.0,<5",
]
repl = [
"pyperclip>=1.9.0",
"prompt-toolkit>=3.0.50",
"xonsh>=0.19.2",
"psutil>=7.0.0",
]
lint = [
"ruff>=0.9.6"
]
# TODO, add these with sane versions; were originally in
# `requirements-docs.txt`..
# docs = [

View File

@ -65,7 +65,11 @@ def loglevel(request):
import tractor
orig = tractor.log._default_loglevel
level = tractor.log._default_loglevel = request.config.option.loglevel
tractor.log.get_console_log(level)
log = tractor.log.get_console_log(
level=level,
name='tractor', # <- enable root logger
)
log.info(f'Test-harness logging level: {level}\n')
yield level
tractor.log._default_loglevel = orig

View File

@ -4,6 +4,7 @@
'''
from __future__ import annotations
import time
import signal
from typing import (
Callable,
TYPE_CHECKING,
@ -34,7 +35,10 @@ if TYPE_CHECKING:
# a fn that sub-instantiates a `pexpect.spawn()`
# and returns it.
type PexpectSpawner = Callable[[str], pty_spawn.spawn]
type PexpectSpawner = Callable[
[str],
pty_spawn.spawn,
]
@pytest.fixture
@ -66,12 +70,15 @@ def spawn(
import os
os.environ['PYTHON_COLORS'] = '0'
spawned: PexpectSpawner|None = None
def _spawn(
cmd: str,
**mkcmd_kwargs,
) -> pty_spawn.spawn:
nonlocal spawned
unset_colors()
return testdir.spawn(
spawned = testdir.spawn(
cmd=mk_cmd(
cmd,
**mkcmd_kwargs,
@ -81,9 +88,35 @@ def spawn(
# ^TODO? get `pytest` core to expose underlying
# `pexpect.spawn()` stuff?
)
return spawned
# such that test-dep can pass input script name.
return _spawn # the `PexpectSpawner`, type alias.
yield _spawn # the `PexpectSpawner`, type alias.
if (
spawned
and
(ptyproc := spawned.ptyproc)
):
start: float = time.time()
timeout: float = 5
while (
ptyproc.isalive()
and
(
(_time_took := (time.time() - start))
<
timeout
)
):
ptyproc.kill(signal.SIGINT)
time.sleep(0.01)
if ptyproc.isalive():
ptyproc.kill(signal.SIGKILL)
# TODO? ensure we've cleaned up any UDS-paths?
# breakpoint()
@pytest.fixture(
@ -109,7 +142,11 @@ def ctlc(
'https://github.com/goodboy/tractor/issues/320'
)
if mark.name == 'ctlcs_bish':
if (
mark.name == 'ctlcs_bish'
and
use_ctlc
):
pytest.skip(
f'Test {node} prolly uses something from the stdlib (namely `asyncio`..)\n'
f'The test and/or underlying example script can *sometimes* run fine '

View File

@ -709,10 +709,41 @@ def test_multi_nested_subactors_error_through_nurseries(
child = spawn('multi_nested_subactors_error_up_through_nurseries')
# timed_out_early: bool = False
at_least_one: list[str] = [
"bdb.BdbQuit",
for send_char in itertools.cycle(['c', 'q']):
# leaf subs, which actually raise in "user code"
"src_uid=('breakpoint_forever'",
"src_uid=('name_error'",
# 2nd layer subs
"src_uid=('spawn_until_1'",
"src_uid=('spawn_until_2'",
"src_uid=('spawn_until_3'",
"relay_uid=('spawn_until_0'",
# 1st layer subs
"src_uid=('spawner0'",
"src_uid=('spawner1'",
]
for i, send_char in enumerate(
itertools.cycle(['c', 'q'])
):
try:
child.expect(PROMPT)
for patt in at_least_one.copy():
if in_prompt_msg(
child,
[patt],
):
print(
f'Found patt in prompt {i}\n'
f'patt: {patt!r}\n'
)
at_least_one.remove(patt)
child.sendline(send_char)
time.sleep(0.01)
@ -721,27 +752,15 @@ def test_multi_nested_subactors_error_through_nurseries(
assert_before(
child,
[ # boxed source errors
"NameError: name 'doggypants' is not defined",
[
# boxed source errors should show in final
# post-prompt tb to console.
"tractor._exceptions.RemoteActorError:",
"('name_error'",
"bdb.BdbQuit",
"NameError: name 'doggypants' is not defined",
# first level subtrees
# "tractor._exceptions.RemoteActorError: ('spawner0'",
"src_uid=('spawner0'",
# "tractor._exceptions.RemoteActorError: ('spawner1'",
# propagation of errors up through nested subtrees
# "tractor._exceptions.RemoteActorError: ('spawn_until_0'",
# "tractor._exceptions.RemoteActorError: ('spawn_until_1'",
# "tractor._exceptions.RemoteActorError: ('spawn_until_2'",
# ^-NOTE-^ old RAE repr, new one is below with a field
# showing the src actor's uid.
"src_uid=('spawn_until_0'",
"relay_uid=('spawn_until_1'",
"src_uid=('spawn_until_2'",
# TODO? once we get more pedantic with `relay_uid` should
# prolly include all actor-IDs we expect to see in final
# tb?
]
)
@ -1138,7 +1157,10 @@ def test_ctxep_pauses_n_maybe_ipc_breaks(
['peer IPC channel closed abruptly?',
'another task closed this fd',
'Debug lock request was CANCELLED?',
"TransportClosed: 'MsgpackUDSStream' was already closed locally ?",]
"'MsgpackUDSStream' was already closed locally?",
"TransportClosed: 'MsgpackUDSStream' was already closed 'by peer'?",
# ?TODO^? match depending on `tpt_proto(s)`?
]
# XXX races on whether these show/hit?
# 'Failed to REPl via `_pause()` You called `tractor.pause()` from an already cancelled scope!',

View File

@ -0,0 +1,98 @@
'''
Basic `ActorNursery` operations and closure semantics,
- basic remote error collection,
- basic multi-subactor cancellation.
'''
# import os
# import signal
# import platform
# import time
# from itertools import repeat
import pytest
import trio
import tractor
from tractor._exceptions import ActorCancelled
# from tractor._testing import (
# tractor_test,
# )
# from .conftest import no_windows
@pytest.mark.parametrize(
'num_subs',
[
1,
3,
]
)
def test_one_cancels_all(
start_method: str,
loglevel: str,
debug_mode: bool,
num_subs: int,
):
'''
Verify that ifa a single error bubbles to the an-scope the
nursery will be cancelled (just like in `trio`); this is a
one-cancels-all style strategy and are only supervision policy
at the moment.
'''
async def main():
try:
rte = RuntimeError('Uh oh something bad in parent')
async with tractor.open_nursery(
start_method=start_method,
loglevel=loglevel,
debug_mode=debug_mode,
) as an:
# spawn the same number of deamon actors which should be cancelled
dactor_portals = []
for i in range(num_subs):
name: str= f'sub_{i}'
ptl: tractor.Portal = await an.start_actor(
name=name,
enable_modules=[__name__],
)
dactor_portals.append(ptl)
# wait for booted
async with tractor.wait_for_actor(name):
print(f'{name!r} is up.')
# simulate uncaught exc
raise rte
# should error here with a ``RemoteActorError`` or ``MultiError``
except BaseExceptionGroup as _beg:
beg = _beg
# ?TODO? why can't we do `is` on beg?
assert (
beg.exceptions
==
an.maybe_error.exceptions
)
assert len(beg.exceptions) == (
num_subs
+
1 # rte from root
)
# all subactors should have been implicitly
# `Portal.cancel_actor()`ed.
excs = list(beg.exceptions)
excs.remove(rte)
for exc in excs:
assert isinstance(exc, ActorCancelled)
assert an._scope_error is rte
assert not an._children
assert an.cancelled is True
trio.run(main)

View File

@ -98,7 +98,8 @@ def test_ipc_channel_break_during_stream(
expect_final_exc = TransportClosed
mod: ModuleType = import_path(
examples_dir() / 'advanced_faults'
examples_dir()
/ 'advanced_faults'
/ 'ipc_failure_during_stream.py',
root=examples_dir(),
consider_namespace_packages=False,
@ -113,8 +114,9 @@ def test_ipc_channel_break_during_stream(
if (
# only expect EoC if trans is broken on the child side,
ipc_break['break_child_ipc_after'] is not False
and
# AND we tell the child to call `MsgStream.aclose()`.
and pre_aclose_msgstream
pre_aclose_msgstream
):
# expect_final_exc = trio.EndOfChannel
# ^XXX NOPE! XXX^ since now `.open_stream()` absorbs this
@ -160,7 +162,8 @@ def test_ipc_channel_break_during_stream(
ipc_break['break_child_ipc_after'] is not False
and (
ipc_break['break_parent_ipc_after']
> ipc_break['break_child_ipc_after']
>
ipc_break['break_child_ipc_after']
)
):
if pre_aclose_msgstream:
@ -248,8 +251,15 @@ def test_ipc_channel_break_during_stream(
# get raw instance from pytest wrapper
value = excinfo.value
if isinstance(value, ExceptionGroup):
excs = value.exceptions
assert len(excs) == 1
excs: tuple[Exception] = value.exceptions
assert (
len(excs) <= 2
and
all(
isinstance(exc, TransportClosed)
for exc in excs
)
)
final_exc = excs[0]
assert isinstance(final_exc, expect_final_exc)

View File

@ -11,6 +11,9 @@ from itertools import repeat
import pytest
import trio
import tractor
from tractor._exceptions import (
ActorCancelled,
)
from tractor._testing import (
tractor_test,
)
@ -124,7 +127,10 @@ def test_multierror(
) as nursery:
await nursery.run_in_actor(assert_err, name='errorer1')
portal2 = await nursery.run_in_actor(assert_err, name='errorer2')
portal2 = await nursery.run_in_actor(
assert_err,
name='errorer2',
)
# get result(s) from main task
try:
@ -137,7 +143,15 @@ def test_multierror(
# here we should get a ``BaseExceptionGroup`` containing exceptions
# from both subactors
with pytest.raises(BaseExceptionGroup):
with pytest.raises(
expected_exception=(
tractor.RemoteActorError,
# ?TODO, should it be this??
# like `trio`'s strict egs?
BaseExceptionGroup,
),
):
trio.run(main)
@ -233,8 +247,9 @@ async def stream_forever():
@tractor_test
async def test_cancel_infinite_streamer(start_method):
async def test_cancel_infinite_streamer(
start_method: str,
):
# stream for at most 1 seconds
with (
trio.fail_after(4),
@ -291,6 +306,7 @@ async def test_some_cancels_all(
num_actors_and_errs: tuple,
start_method: str,
loglevel: str,
debug_mode: bool,
):
'''
Verify a subset of failed subactors causes all others in
@ -306,68 +322,81 @@ async def test_some_cancels_all(
ria_func,
da_func,
) = num_actors_and_errs
try:
async with tractor.open_nursery() as an:
with trio.fail_after(
3
if not debug_mode
else 999
):
try:
async with tractor.open_nursery() as an:
# spawn the same number of deamon actors which should be cancelled
dactor_portals = []
for i in range(num_actors):
dactor_portals.append(await an.start_actor(
f'deamon_{i}',
enable_modules=[__name__],
))
# spawn the same number of deamon actors which should be cancelled
dactor_portals = []
for i in range(num_actors):
dactor_portals.append(await an.start_actor(
f'deamon_{i}',
enable_modules=[__name__],
))
func, kwargs = ria_func
riactor_portals = []
for i in range(num_actors):
# start actor(s) that will fail immediately
riactor_portals.append(
await an.run_in_actor(
func,
name=f'actor_{i}',
**kwargs
func, kwargs = ria_func
riactor_portals = []
for i in range(num_actors):
# start actor(s) that will fail immediately
riactor_portals.append(
await an.run_in_actor(
func,
name=f'actor_{i}',
**kwargs
)
)
)
if da_func:
func, kwargs, expect_error = da_func
for portal in dactor_portals:
# if this function fails then we should error here
# and the nursery should teardown all other actors
try:
await portal.run(func, **kwargs)
if da_func:
func, kwargs, expect_error = da_func
for portal in dactor_portals:
# if this function fails then we should error here
# and the nursery should teardown all other actors
try:
await portal.run(func, **kwargs)
except tractor.RemoteActorError as err:
assert err.boxed_type == err_type
# we only expect this first error to propogate
# (all other daemons are cancelled before they
# can be scheduled)
num_actors = 1
# reraise so nursery teardown is triggered
raise
except tractor.RemoteActorError as err:
assert err.boxed_type == err_type
# we only expect this first error to propogate
# (all other daemons are cancelled before they
# can be scheduled)
num_actors = 1
# reraise so nursery teardown is triggered
raise
else:
if expect_error:
pytest.fail(
"Deamon call should fail at checkpoint?")
# should error here with a ``RemoteActorError`` or ``MultiError``
except first_err as _err:
err = _err
if isinstance(err, BaseExceptionGroup):
assert len(err.exceptions) == num_actors
for exc in err.exceptions:
# TODO, figure out why these aren't being set?
if isinstance(exc, ActorCancelled):
breakpoint()
if isinstance(exc, tractor.RemoteActorError):
assert exc.boxed_type == err_type
else:
if expect_error:
pytest.fail(
"Deamon call should fail at checkpoint?")
assert isinstance(exc, trio.Cancelled)
# should error here with a ``RemoteActorError`` or ``MultiError``
elif isinstance(err, tractor.RemoteActorError):
assert err.boxed_type == err_type
except first_err as _err:
err = _err
if isinstance(err, BaseExceptionGroup):
assert len(err.exceptions) == num_actors
for exc in err.exceptions:
if isinstance(exc, tractor.RemoteActorError):
assert exc.boxed_type == err_type
else:
assert isinstance(exc, trio.Cancelled)
elif isinstance(err, tractor.RemoteActorError):
assert err.boxed_type == err_type
assert an.cancelled is True
assert not an._children
else:
pytest.fail("Should have gotten a remote assertion error?")
assert an.cancelled is True
assert not an._children
else:
pytest.fail("Should have gotten a remote assertion error?")
async def spawn_and_error(breadth, depth) -> None:

View File

@ -11,12 +11,13 @@ import trio
import tractor
from tractor import ( # typing
Actor,
current_actor,
open_nursery,
Portal,
Context,
ContextCancelled,
MsgStream,
Portal,
RemoteActorError,
current_actor,
open_nursery,
)
from tractor._testing import (
# tractor_test,
@ -796,8 +797,8 @@ async def basic_echo_server(
) -> None:
'''
Just the simplest `MsgStream` echo server which resays what
you told it but with its uid in front ;)
Just the simplest `MsgStream` echo server which resays what you
told it but with its uid in front ;)
'''
actor: Actor = tractor.current_actor()
@ -966,9 +967,14 @@ async def tell_little_bro(
caller: str = '',
err_after: float|None = None,
rng_seed: int = 50,
rng_seed: int = 100,
# NOTE, ensure ^ is large enough (on fast hw anyway)
# to ensure the peer cancel req arrives before the
# echoing dialog does itself Bp
):
# contact target actor, do a stream dialog.
lb: Portal
echo_ipc: MsgStream
async with (
tractor.wait_for_actor(
name=actor_name
@ -983,7 +989,6 @@ async def tell_little_bro(
else None
),
) as (sub_ctx, first),
sub_ctx.open_stream() as echo_ipc,
):
actor: Actor = current_actor()
@ -994,6 +999,7 @@ async def tell_little_bro(
i,
)
await echo_ipc.send(msg)
await trio.sleep(0.001)
resp = await echo_ipc.receive()
print(
f'{caller} => {actor_name}: {msg}\n'
@ -1006,6 +1012,9 @@ async def tell_little_bro(
assert sub_uid != uid
assert _i == i
# XXX, usually should never get here!
# await tractor.pause()
@pytest.mark.parametrize(
'raise_client_error',
@ -1020,6 +1029,9 @@ def test_peer_spawns_and_cancels_service_subactor(
raise_client_error: str,
reg_addr: tuple[str, int],
raise_sub_spawn_error_after: float|None,
loglevel: str,
# ^XXX, set to 'warning' to see masked-exc warnings
# that may transpire during actor-nursery teardown.
):
# NOTE: this tests for the modden `mod wks open piker` bug
# discovered as part of implementing workspace ctx
@ -1049,6 +1061,7 @@ def test_peer_spawns_and_cancels_service_subactor(
# NOTE: to halt the peer tasks on ctxc, uncomment this.
debug_mode=debug_mode,
registry_addrs=[reg_addr],
loglevel=loglevel,
) as an:
server: Portal = await an.start_actor(
(server_name := 'spawn_server'),

View File

@ -0,0 +1,185 @@
'''
`tractor.log`-wrapping unit tests.
'''
from pathlib import Path
import shutil
from types import ModuleType
import pytest
import tractor
from tractor import (
_code_load,
log,
)
def test_root_pkg_not_duplicated_in_logger_name():
'''
When both `pkg_name` and `name` are passed and they have
a common `<root_name>.< >` prefix, ensure that it is not
duplicated in the child's `StackLevelAdapter.name: str`.
'''
project_name: str = 'pylib'
pkg_path: str = 'pylib.subpkg.mod'
assert not tractor.current_actor(
err_on_no_runtime=False,
)
proj_log = log.get_logger(
pkg_name=project_name,
mk_sublog=False,
)
sublog = log.get_logger(
pkg_name=project_name,
name=pkg_path,
)
assert proj_log is not sublog
assert sublog.name.count(proj_log.name) == 1
assert 'mod' not in sublog.name
def test_implicit_mod_name_applied_for_child(
testdir: pytest.Pytester,
loglevel: str,
):
'''
Verify that when `.log.get_logger(pkg_name='pylib')` is called
from a given sub-mod from within the `pylib` pkg-path, we
implicitly set the equiv of `name=__name__` from the caller's
module.
'''
# tractor.log.get_console_log(level=loglevel)
proj_name: str = 'snakelib'
mod_code: str = (
f'import tractor\n'
f'\n'
# if you need to trace `testdir` stuff @ import-time..
# f'breakpoint()\n'
f'log = tractor.log.get_logger(pkg_name="{proj_name}")\n'
)
# create a sub-module for each pkg layer
_lib = testdir.mkpydir(proj_name)
pkg: Path = Path(_lib)
pkg_init_mod: Path = pkg / "__init__.py"
pkg_init_mod.write_text(mod_code)
subpkg: Path = pkg / 'subpkg'
subpkg.mkdir()
subpkgmod: Path = subpkg / "__init__.py"
subpkgmod.touch()
subpkgmod.write_text(mod_code)
_submod: Path = testdir.makepyfile(
_mod=mod_code,
)
pkg_submod = pkg / 'mod.py'
pkg_subpkg_submod = subpkg / 'submod.py'
shutil.copyfile(
_submod,
pkg_submod,
)
shutil.copyfile(
_submod,
pkg_subpkg_submod,
)
testdir.chdir()
# NOTE, to introspect the py-file-module-layout use (in .xsh
# syntax): `ranger @str(testdir)`
# XXX NOTE, once the "top level" pkg mod has been
# imported, we can then use `import` syntax to
# import it's sub-pkgs and modules.
subpkgmod: ModuleType = _code_load.load_module_from_path(
Path(pkg / '__init__.py'),
module_name=proj_name,
)
pkg_root_log = log.get_logger(
pkg_name=proj_name,
mk_sublog=False,
)
# the top level pkg-mod, created just now,
# by above API call.
assert pkg_root_log.name == proj_name
assert not pkg_root_log.logger.getChildren()
#
# ^TODO! test this same output but created via a `get_logger()`
# call in the `snakelib.__init__py`!!
# NOTE, the pkg-level "init mod" should of course
# have the same name as the package ns-path.
import snakelib as init_mod
assert init_mod.log.name == proj_name
# NOTE, a first-pkg-level sub-module should only
# use the package-name since the leaf-node-module
# will be included in log headers by default.
from snakelib import mod
assert mod.log.name == proj_name
from snakelib import subpkg
assert (
subpkg.log.name
==
subpkg.__package__
==
f'{proj_name}.subpkg'
)
from snakelib.subpkg import submod
assert (
submod.log.name
==
submod.__package__
==
f'{proj_name}.subpkg'
)
sub_logs = pkg_root_log.logger.getChildren()
assert len(sub_logs) == 1 # only one nested sub-pkg module
assert submod.log.logger in sub_logs
# TODO, moar tests against existing feats:
# ------ - ------
# - [ ] color settings?
# - [ ] header contents like,
# - actor + thread + task names from various conc-primitives,
# - [ ] `StackLevelAdapter` extensions,
# - our custom levels/methods: `transport|runtime|cance|pdb|devx`
# - [ ] custom-headers support?
#
# TODO, test driven dev of new-ideas/long-wanted feats,
# ------ - ------
# - [ ] https://github.com/goodboy/tractor/issues/244
# - [ ] @catern mentioned using a sync / deterministic sys
# and in particular `svlogd`?
# |_ https://smarden.org/runit/svlogd.8
# - [ ] using adapter vs. filters?
# - https://stackoverflow.com/questions/60691759/add-information-to-every-log-message-in-python-logging/61830838#61830838
# - [ ] `.at_least_level()` optimization which short circuits wtv
# `logging` is doing behind the scenes when the level filters
# the emission..?
# - [ ] use of `.log.get_console_log()` in subactors and the
# subtleties of ensuring it actually emits from a subproc.
# - [ ] this idea of activating per-subsys emissions with some
# kind of `.name` filter passed to the runtime or maybe configured
# via the root `StackLevelAdapter`?
# - [ ] use of `logging.dict.dictConfig()` to simplify the impl
# of any of ^^ ??
# - https://stackoverflow.com/questions/7507825/where-is-a-complete-example-of-logging-config-dictconfig
# - https://docs.python.org/3/library/logging.config.html#configuration-dictionary-schema
# - https://docs.python.org/3/library/logging.config.html#logging.config.dictConfig

View File

@ -1,8 +1,13 @@
"""
Multiple python programs invoking the runtime.
"""
from __future__ import annotations
import platform
import subprocess
import time
from typing import (
TYPE_CHECKING,
)
import pytest
import trio
@ -10,14 +15,29 @@ import tractor
from tractor._testing import (
tractor_test,
)
from tractor import (
current_actor,
_state,
Actor,
Context,
Portal,
)
from .conftest import (
sig_prog,
_INT_SIGNAL,
_INT_RETURN_CODE,
)
if TYPE_CHECKING:
from tractor.msg import Aid
from tractor._addr import (
UnwrappedAddress,
)
def test_abort_on_sigint(daemon):
def test_abort_on_sigint(
daemon: subprocess.Popen,
):
assert daemon.returncode is None
time.sleep(0.1)
sig_prog(daemon, _INT_SIGNAL)
@ -30,8 +50,11 @@ def test_abort_on_sigint(daemon):
@tractor_test
async def test_cancel_remote_arbiter(daemon, reg_addr):
assert not tractor.current_actor().is_arbiter
async def test_cancel_remote_arbiter(
daemon: subprocess.Popen,
reg_addr: UnwrappedAddress,
):
assert not current_actor().is_arbiter
async with tractor.get_registry(reg_addr) as portal:
await portal.cancel_actor()
@ -45,24 +68,106 @@ async def test_cancel_remote_arbiter(daemon, reg_addr):
pass
def test_register_duplicate_name(daemon, reg_addr):
def test_register_duplicate_name(
daemon: subprocess.Popen,
reg_addr: UnwrappedAddress,
):
async def main():
async with tractor.open_nursery(
registry_addrs=[reg_addr],
) as n:
) as an:
assert not tractor.current_actor().is_arbiter
assert not current_actor().is_arbiter
p1 = await n.start_actor('doggy')
p2 = await n.start_actor('doggy')
p1 = await an.start_actor('doggy')
p2 = await an.start_actor('doggy')
async with tractor.wait_for_actor('doggy') as portal:
assert portal.channel.uid in (p2.channel.uid, p1.channel.uid)
await n.cancel()
await an.cancel()
# run it manually since we want to start **after**
# the other "daemon" program
# XXX, run manually since we want to start this root **after**
# the other "daemon" program with it's own root.
trio.run(main)
@tractor.context
async def get_root_portal(
ctx: Context,
):
'''
Connect back to the root actor manually (using `._discovery` API)
and ensure it's contact info is the same as our immediate parent.
'''
sub: Actor = current_actor()
rtvs: dict = _state._runtime_vars
raddrs: list[UnwrappedAddress] = rtvs['_root_addrs']
# await tractor.pause()
# XXX, in case the sub->root discovery breaks you might need
# this (i know i did Xp)!!
# from tractor.devx import mk_pdb
# mk_pdb().set_trace()
assert (
len(raddrs) == 1
and
list(sub._parent_chan.raddr.unwrap()) in raddrs
)
# connect back to our immediate parent which should also
# be the actor-tree's root.
from tractor._discovery import get_root
ptl: Portal
async with get_root() as ptl:
root_aid: Aid = ptl.chan.aid
parent_ptl: Portal = current_actor().get_parent()
assert (
root_aid.name == 'root'
and
parent_ptl.chan.aid == root_aid
)
await ctx.started()
def test_non_registrar_spawns_child(
daemon: subprocess.Popen,
reg_addr: UnwrappedAddress,
loglevel: str,
debug_mode: bool,
):
'''
Ensure a non-regristar (serving) root actor can spawn a sub and
that sub can connect back (manually) to it's rent that is the
root without issue.
More or less this audits the global contact info in
`._state._runtime_vars`.
'''
async def main():
async with tractor.open_nursery(
registry_addrs=[reg_addr],
loglevel=loglevel,
debug_mode=debug_mode,
) as an:
actor: Actor = tractor.current_actor()
assert not actor.is_registrar
sub_ptl: Portal = await an.start_actor(
name='sub',
enable_modules=[__name__],
)
async with sub_ptl.open_context(
get_root_portal,
) as (ctx, _):
print('Waiting for `sub` to connect back to us..')
await an.cancel()
# XXX, run manually since we want to start this root **after**
# the other "daemon" program with it's own root.
trio.run(main)

View File

@ -17,9 +17,8 @@ from tractor.log import (
get_console_log,
get_logger,
)
log = get_logger(__name__)
log = get_logger()
_resource: int = 0

View File

@ -37,7 +37,7 @@ from .ipc._uds import UDSAddress
if TYPE_CHECKING:
from ._runtime import Actor
log = get_logger(__name__)
log = get_logger()
# TODO, maybe breakout the netns key to a struct?
@ -259,6 +259,8 @@ def wrap_address(
case _:
# import pdbp; pdbp.set_trace()
# from tractor.devx import mk_pdb
# mk_pdb().set_trace()
raise TypeError(
f'Can not wrap unwrapped-address ??\n'
f'type(addr): {type(addr)!r}\n'

View File

@ -0,0 +1,48 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
(Hot) coad (re-)load utils for python.
'''
import importlib
from pathlib import Path
import sys
from types import ModuleType
# ?TODO, move this into internal libs?
# -[ ] we already use it in `modden.config._pymod` as well
def load_module_from_path(
path: Path,
module_name: str|None = None,
) -> ModuleType:
'''
Taken from SO,
https://stackoverflow.com/a/67208147
which is based on stdlib docs,
https://docs.python.org/3/library/importlib.html#importing-a-source-file-directly
'''
module_name = module_name or path.stem
spec = importlib.util.spec_from_file_location(
module_name,
str(path),
)
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
return module

View File

@ -70,6 +70,7 @@ from ._exceptions import (
MsgTypeError,
RemoteActorError,
StreamOverrun,
TransportClosed,
pack_from_raise,
unpack_error,
)
@ -113,7 +114,7 @@ if TYPE_CHECKING:
CallerInfo,
)
log = get_logger(__name__)
log = get_logger()
class Unresolved:
@ -2391,16 +2392,18 @@ async def open_context_from_portal(
case trio.Cancelled():
logmeth = log.cancel
cause: str = 'cancelled'
msg: str = (
f'ctx {ctx.side!r}-side {cause!r} with,\n'
f'{ctx.repr_outcome()!r}\n'
)
# XXX explicitly report on any non-graceful-taskc cases
case _:
cause: str = 'errored'
logmeth = log.exception
msg: str = f'ctx {ctx.side!r}-side {cause!r} with,\n'
logmeth(
f'ctx {ctx.side!r}-side {cause!r} with,\n'
f'{ctx.repr_outcome()!r}\n'
)
logmeth(msg)
if debug_mode():
# async with debug.acquire_debug_lock(portal.actor.uid):
@ -2426,10 +2429,7 @@ async def open_context_from_portal(
try:
# await pause(shield=True)
await ctx.cancel()
except (
trio.BrokenResourceError,
trio.ClosedResourceError,
):
except TransportClosed:
log.warning(
'IPC connection for context is broken?\n'
f'task: {ctx.cid}\n'

View File

@ -27,7 +27,7 @@ from typing import (
)
from contextlib import asynccontextmanager as acm
from tractor.log import get_logger
from .log import get_logger
from .trionics import (
gather_contexts,
collapse_eg,
@ -53,7 +53,7 @@ if TYPE_CHECKING:
from ._runtime import Actor
log = get_logger(__name__)
log = get_logger()
@acm
@ -91,10 +91,13 @@ async def get_registry(
@acm
async def get_root(
**kwargs,
) -> AsyncGenerator[Portal, None]:
async def get_root(**kwargs) -> AsyncGenerator[Portal, None]:
'''
Deliver the current actor's "root process" actor (yes in actor
and proc tree terms) by delivering a `Portal` from the spawn-time
provided contact address.
'''
# TODO: rename mailbox to `_root_maddr` when we finally
# add and impl libp2p multi-addrs?
addr = _runtime_vars['_root_mailbox']
@ -193,6 +196,11 @@ async def maybe_open_portal(
addr: UnwrappedAddress,
name: str,
):
'''
Open a `Portal` to the actor serving @ `addr` or `None` if no
peer can be contacted or found.
'''
async with query_actor(
name=name,
regaddr=addr,
@ -217,7 +225,7 @@ async def find_actor(
raise_on_none: bool = False,
) -> AsyncGenerator[
Portal | list[Portal] | None,
Portal|list[Portal]|None,
None,
]:
'''
@ -259,6 +267,7 @@ async def find_actor(
collapse_eg(),
gather_contexts(
mngrs=maybe_portals,
# tn=tn, # ?TODO, helps to pass rent tn here?
) as portals,
):
# log.runtime(

View File

@ -50,7 +50,7 @@ if TYPE_CHECKING:
from ._spawn import SpawnMethodKey
log = get_logger(__name__)
log = get_logger()
def _mp_main(
@ -72,11 +72,15 @@ def _mp_main(
spawn_ctx: mp.context.BaseContext = try_set_start_method(start_method)
assert spawn_ctx
# XXX, enable root log at level
if actor.loglevel is not None:
log.info(
f'Setting loglevel for {actor.uid} to {actor.loglevel}'
f'Setting loglevel for {actor.uid} to {actor.loglevel!r}'
)
get_console_log(
level=actor.loglevel,
name='tractor',
)
get_console_log(actor.loglevel)
# TODO: use scops headers like for `trio` below!
# (well after we libify it maybe..)
@ -126,8 +130,12 @@ def _trio_main(
parent_addr=parent_addr
)
# XXX, enable root log at level
if actor.loglevel is not None:
get_console_log(actor.loglevel)
get_console_log(
level=actor.loglevel,
name='tractor',
)
log.info(
f'Starting `trio` subactor from parent @ '
f'{parent_addr}\n'

View File

@ -46,6 +46,7 @@ from msgspec import (
from tractor._state import current_actor
from tractor.log import get_logger
from tractor.msg import (
Aid,
Error,
PayloadMsg,
MsgType,
@ -479,9 +480,10 @@ class RemoteActorError(Exception):
@property
def relay_uid(self) -> tuple[str, str]|None:
return tuple(
self._ipc_msg.relay_path[-1]
)
if msg := self._ipc_msg:
return tuple(
msg.relay_path[-1]
)
@property
def src_uid(self) -> tuple[str, str]|None:
@ -521,7 +523,8 @@ class RemoteActorError(Exception):
for key in fields:
if (
key == 'relay_uid'
and not self.is_inception()
and
not self.is_inception()
):
continue
@ -534,6 +537,13 @@ class RemoteActorError(Exception):
None,
)
)
if (
key == 'canceller'
and
isinstance(val, Aid)
):
val: str = val.reprol(sin_uuid=False)
# TODO: for `.relay_path` on multiline?
# if not isinstance(val, str):
# val_str = pformat(val)
@ -623,12 +633,22 @@ class RemoteActorError(Exception):
# IFF there is an embedded traceback-str we always
# draw the ascii-box around it.
body: str = ''
if tb_str := self.tb_str:
fields: str = self._mk_fields_str(
_body_fields
+
self.extra_body_fields,
)
fields: str = self._mk_fields_str(
_body_fields
+
self.extra_body_fields,
)
tb_str: str = (
self.tb_str
#
# ^TODO? what to use instead? if anything?
# -[ ] ensure the `.message` doesn't show up 2x in output ya?
# -[ ] ._message isn't really right?
# or
# self._message
)
if tb_str:
from tractor.devx import (
pformat_boxed_tb,
)
@ -640,7 +660,7 @@ class RemoteActorError(Exception):
# just after <Type(
# |___ ..
tb_body_indent=1,
boxer_header=self.relay_uid,
boxer_header=self.relay_uid or '-',
)
# !TODO, it'd be nice to import these top level without
@ -713,6 +733,10 @@ class RemoteActorError(Exception):
class ContextCancelled(RemoteActorError):
'''
IPC context cancellation signal/msg.
Often reffed with the short-hand: "ctxc".
Inter-actor task context was cancelled by either a call to
``Portal.cancel_actor()`` or ``Context.cancel()``.
@ -737,8 +761,8 @@ class ContextCancelled(RemoteActorError):
- (simulating) an IPC transport network outage
- a (malicious) pkt sent specifically to cancel an actor's
runtime non-gracefully without ensuring ongoing RPC tasks are
incrementally cancelled as is done with:
runtime non-gracefully without ensuring ongoing RPC tasks
are incrementally cancelled as is done with:
`Actor`
|_`.cancel()`
|_`.cancel_soon()`
@ -759,6 +783,59 @@ class ContextCancelled(RemoteActorError):
# src_actor_uid = canceller
class ActorCancelled(ContextCancelled):
'''
Runtime-layer cancellation signal/msg.
Indicates a "graceful interrupt" of the machinery scheduled by
the py-proc's `trio.run()`.
Often reffed with the short-hand: "actorc".
Raised from within `an: ActorNursery` (via an `ExceptionGroup`)
when an actor has been "process wide" cancel-called using any of,
- `ActorNursery.cancel()`
- `Portal.cancel_actor()`
**and** that cancel request was part of a "non graceful" cancel
condition.
That is, whenever an exception is to be raised outside an `an`
scope-block due to some error raised-in/relayed-to that scope. In
such cases for every subactor which was cancelledand subsequently
( and according to the `an`'s supervision strat ) this is
normally raised per subactor portal.
'''
@property
def canceller(self) -> Aid:
'''
Return the (maybe) `Actor.aid: Aid` for the requesting-author
of this actorc.
Emit a warning msg when `.canceller` has not been set.
See additional relevant notes in
`ContextCancelled.canceller`.
'''
value: tuple[str, str]|None
if msg := self._ipc_msg:
value = msg.canceller
else:
value = self._extra_msgdata['canceller']
if value:
return value
log.warning(
'IPC Context cancelled without a requesting actor?\n'
'Maybe the IPC transport ended abruptly?\n\n'
f'{self}'
)
class MsgTypeError(
RemoteActorError,
):

View File

@ -69,7 +69,7 @@ from ._streaming import (
if TYPE_CHECKING:
from ._runtime import Actor
log = get_logger(__name__)
log = get_logger()
class Portal:
@ -329,18 +329,7 @@ class Portal:
# if we get here some weird cancellation case happened
return False
except (
# XXX, should never really get raised unless we aren't
# wrapping them in the below type by mistake?
#
# Leaving the catch here for now until we're very sure
# all the cases (for various tpt protos) have indeed been
# re-wrapped ;p
trio.ClosedResourceError,
trio.BrokenResourceError,
TransportClosed,
) as tpt_err:
except TransportClosed as tpt_err:
ipc_borked_report: str = (
f'IPC for actor already closed/broken?\n\n'
f'\n'

View File

@ -88,7 +88,8 @@ async def maybe_block_bp(
bp_blocked: bool
if (
debug_mode
and maybe_enable_greenback
and
maybe_enable_greenback
and (
maybe_mod := await debug.maybe_init_greenback(
raise_not_found=False,
@ -289,10 +290,12 @@ async def open_root_actor(
for uw_addr in uw_reg_addrs
]
loglevel = (
loglevel: str = (
loglevel
or log._default_loglevel
).upper()
or
log._default_loglevel
)
loglevel: str = loglevel.upper()
if (
debug_mode
@ -323,7 +326,10 @@ async def open_root_actor(
)
assert loglevel
_log = log.get_console_log(loglevel)
_log = log.get_console_log(
level=loglevel,
name='tractor',
)
assert _log
# TODO: factor this into `.devx._stackscope`!!
@ -380,10 +386,13 @@ async def open_root_actor(
addr,
)
trans_bind_addrs: list[UnwrappedAddress] = []
tpt_bind_addrs: list[
Address # `Address.get_random()` case
|UnwrappedAddress # registrar case `= uw_reg_addrs`
] = []
# Create a new local root-actor instance which IS NOT THE
# REGISTRAR
# ------ NON-REGISTRAR ------
# create a new root-actor instance.
if ponged_addrs:
if ensure_registry:
raise RuntimeError(
@ -410,12 +419,21 @@ async def open_root_actor(
# XXX INSTEAD, bind random addrs using the same tpt
# proto.
for addr in ponged_addrs:
trans_bind_addrs.append(
tpt_bind_addrs.append(
# XXX, these are `Address` NOT `UnwrappedAddress`.
#
# NOTE, in the case of posix/berkley socket
# protos we allocate port=0 such that the system
# allocates a random value at bind time; this
# happens in the `.ipc.*` stack's backend.
addr.get_random(
bindspace=addr.bindspace,
)
)
# ------ REGISTRAR ------
# create a new "registry providing" root-actor instance.
#
# Start this local actor as the "registrar", aka a regular
# actor who manages the local registry of "mailboxes" of
# other process-tree-local sub-actors.
@ -424,7 +442,7 @@ async def open_root_actor(
# following init steps are taken:
# - the tranport layer server is bound to each addr
# pair defined in provided registry_addrs, or the default.
trans_bind_addrs = uw_reg_addrs
tpt_bind_addrs = uw_reg_addrs
# - it is normally desirable for any registrar to stay up
# indefinitely until either all registered (child/sub)
@ -444,20 +462,10 @@ async def open_root_actor(
enable_modules=enable_modules,
)
# XXX, in case the root actor runtime was actually run from
# `tractor.to_asyncio.run_as_asyncio_guest()` and NOt
# `tractor.to_asyncio.run_as_asyncio_guest()` and NOT
# `.trio.run()`.
actor._infected_aio = _state._runtime_vars['_is_infected_aio']
# NOTE, only set the loopback addr for the
# process-tree-global "root" mailbox since all sub-actors
# should be able to speak to their root actor over that
# channel.
raddrs: list[Address] = _state._runtime_vars['_root_addrs']
raddrs.extend(trans_bind_addrs)
# TODO, remove once we have also removed all usage;
# eventually all (root-)registry apis should expect > 1 addr.
_state._runtime_vars['_root_mailbox'] = raddrs[0]
# Start up main task set via core actor-runtime nurseries.
try:
# assign process-local actor
@ -494,14 +502,39 @@ async def open_root_actor(
# "actor runtime" primitives are SC-compat and thus all
# transitively spawned actors/processes must be as
# well.
await root_tn.start(
accept_addrs: list[UnwrappedAddress]
reg_addrs: list[UnwrappedAddress]
(
accept_addrs,
reg_addrs,
) = await root_tn.start(
partial(
_runtime.async_main,
actor,
accept_addrs=trans_bind_addrs,
accept_addrs=tpt_bind_addrs,
parent_addr=None
)
)
# NOTE, only set a local-host addr (i.e. like
# `lo`-loopback for TCP) for the process-tree-global
# "root"-process (its tree-wide "mailbox") since all
# sub-actors should be able to speak to their root
# actor over that channel.
#
# ?TODO, per-OS non-network-proto alt options?
# -[ ] on linux we should be able to always use UDS?
#
raddrs: list[UnwrappedAddress] = _state._runtime_vars['_root_addrs']
raddrs.extend(
accept_addrs,
)
# TODO, remove once we have also removed all usage;
# eventually all (root-)registry apis should expect > 1 addr.
_state._runtime_vars['_root_mailbox'] = raddrs[0]
# if 'chart' in actor.aid.name:
# from tractor.devx import mk_pdb
# mk_pdb().set_trace()
try:
yield actor
except (
@ -583,6 +616,13 @@ async def open_root_actor(
):
_state._runtime_vars['_debug_mode'] = False
# !XXX, clear ALL prior contact info state, this is MEGA
# important if you are opening the runtime multiple times
# from the same parent process (like in our test
# harness)!
_state._runtime_vars['_root_addrs'].clear()
_state._runtime_vars['_root_mailbox'] = None
_state._current_actor = None
_state._last_actor_terminated = actor

View File

@ -284,6 +284,15 @@ async def _errors_relayed_via_ipc(
try:
yield # run RPC invoke body
# NOTE, never REPL any pseudo-expected tpt-disconnect.
except TransportClosed as err:
rpc_err = err
log.warning(
f'Tpt disconnect during remote-exc relay due to,\n'
f'{err!r}\n'
)
raise err
# box and ship RPC errors for wire-transit via
# the task's requesting parent IPC-channel.
except (
@ -327,10 +336,15 @@ async def _errors_relayed_via_ipc(
# recovery logic - the only case is some kind of
# strange bug in our transport layer itself? Going
# to keep this open ended for now.
log.debug(
'RPC task crashed, attempting to enter debugger\n'
f'|_{ctx}'
)
if _state.debug_mode():
log.exception(
f'RPC task crashed!\n'
f'Attempting to enter debugger\n'
f'\n'
f'{ctx}'
)
entered_debug = await debug._maybe_enter_pm(
err,
api_frame=inspect.currentframe(),
@ -419,7 +433,7 @@ async def _errors_relayed_via_ipc(
# cancel scope will not have been inserted yet
if is_rpc:
log.warning(
'RPC task likely errored or cancelled before start?\n'
'RPC task likely crashed or cancelled before start?\n'
f'|_{ctx._task}\n'
f' >> {ctx.repr_rpc}\n'
)
@ -862,9 +876,9 @@ async def _invoke(
)
logmeth(
f'{message}\n'
f'{message}'
f'\n'
f'{descr_str}\n'
f'{descr_str}'
)
@ -900,6 +914,11 @@ async def try_ship_error_to_remote(
# XXX NOTE XXX in SC terms this is one of the worst things
# that can happen and provides for a 2-general's dilemma..
#
# FURHTER, we should never really have to handle these
# lowlevel excs from `trio` since the `Channel.send()` layers
# downward should be mostly wrapping such cases in a
# tpt-closed; the `.critical()` usage is warranted.
except (
trio.ClosedResourceError,
trio.BrokenResourceError,

View File

@ -147,6 +147,8 @@ def get_mod_nsps2fps(mod_ns_paths: list[str]) -> dict[str, str]:
return nsp2fp
_bp = False
class Actor:
'''
The fundamental "runtime" concurrency primitive.
@ -181,6 +183,14 @@ class Actor:
def is_registrar(self) -> bool:
return self.is_arbiter
@property
def is_root(self) -> bool:
'''
This actor is the parent most in the tree?
'''
return _state.is_root_process()
msg_buffer_size: int = 2**6
# nursery placeholders filled in by `async_main()`,
@ -272,7 +282,9 @@ class Actor:
stacklevel=2,
)
registry_addrs: list[Address] = [wrap_address(arbiter_addr)]
registry_addrs: list[Address] = [
wrap_address(arbiter_addr)
]
# marked by the process spawning backend at startup
# will be None for the parent most process started manually
@ -959,6 +971,21 @@ class Actor:
rvs['_is_root'] = False # obvi XD
# TODO, remove! left in just while protoing init fix!
# global _bp
# if (
# 'chart' in self.aid.name
# and
# isinstance(
# rvs['_root_addrs'][0],
# dict,
# )
# and
# not _bp
# ):
# _bp = True
# breakpoint()
_state._runtime_vars.update(rvs)
# `SpawnSpec.reg_addrs`
@ -1455,7 +1482,12 @@ async def async_main(
# be False when running as root actor and True when as
# a subactor.
parent_addr: UnwrappedAddress|None = None,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
task_status: TaskStatus[
tuple[
list[UnwrappedAddress], # accept_addrs
list[UnwrappedAddress], # reg_addrs
]
] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
@ -1634,6 +1666,7 @@ async def async_main(
# if addresses point to the same actor..
# So we need a way to detect that? maybe iterate
# only on unique actor uids?
addr: UnwrappedAddress
for addr in actor.reg_addrs:
try:
waddr = wrap_address(addr)
@ -1642,7 +1675,9 @@ async def async_main(
await debug.pause()
# !TODO, get rid of the local-portal crap XD
reg_portal: Portal
async with get_registry(addr) as reg_portal:
accept_addr: UnwrappedAddress
for accept_addr in accept_addrs:
accept_addr = wrap_address(accept_addr)
@ -1658,8 +1693,12 @@ async def async_main(
is_registered: bool = True
# init steps complete
task_status.started()
# init steps complete, deliver IPC-server and
# registrar addrs back to caller.
task_status.started((
accept_addrs,
actor.reg_addrs,
))
# Begin handling our new connection back to our
# parent. This is done last since we don't want to

View File

@ -50,7 +50,11 @@ from tractor._addr import UnwrappedAddress
from tractor._portal import Portal
from tractor._runtime import Actor
from tractor._entry import _mp_main
from tractor._exceptions import ActorFailure
from tractor._exceptions import (
ActorCancelled,
ActorFailure,
# NoResult,
)
from tractor.msg import (
types as msgtypes,
pretty_struct,
@ -137,7 +141,6 @@ def try_set_start_method(
async def exhaust_portal(
portal: Portal,
actor: Actor
@ -185,10 +188,12 @@ async def exhaust_portal(
async def cancel_on_completion(
portal: Portal,
actor: Actor,
errors: dict[tuple[str, str], Exception],
errors: dict[
msgtypes.Aid,
Exception,
],
) -> None:
'''
@ -209,24 +214,57 @@ async def cancel_on_completion(
portal,
actor,
)
aid: msgtypes.Aid = actor.aid
repr_aid: str = aid.reprol(sin_uuid=False)
if isinstance(result, Exception):
errors[actor.uid]: Exception = result
errors[aid]: Exception = result
log.cancel(
'Cancelling subactor runtime due to error:\n\n'
f'Portal.cancel_actor() => {portal.channel.uid}\n\n'
f'error: {result}\n'
'Cancelling subactor {repr_aid!r} runtime due to error\n'
f'\n'
f'Portal.cancel_actor() => {portal.channel.uid}\n'
f'\n'
f'{result!r}\n'
)
else:
log.runtime(
'Cancelling subactor gracefully:\n\n'
f'Portal.cancel_actor() => {portal.channel.uid}\n\n'
f'result: {result}\n'
report: str = (
f'Cancelling subactor {repr_aid!r} gracefully..\n'
f'\n'
)
canc_info: str = (
f'Portal.cancel_actor() => {portal.chan.uid}\n'
f'\n'
f'final-result => {result!r}\n'
)
log.cancel(
report
+
canc_info
)
# cancel the process now that we have a final result
await portal.cancel_actor()
if (
not errors.get(aid)
# and
# result is NoResult
):
pass
# await debug.pause(shield=True)
# errors[aid] = ActorCancelled(
# message=(
# f'Cancelled subactor {repr_aid!r}\n'
# f'{canc_info}\n'
# ),
# canceller=current_actor().aid,
# # TODO? should we have a ack-msg?
# # ipc_msg=??
# # boxed_type=trio.Cancelled,
# )
async def hard_kill(
proc: trio.Process,
@ -331,6 +369,10 @@ async def soft_kill(
Awaitable,
],
portal: Portal,
errors: dict[
msgtypes.Aid,
Exception,
],
) -> None:
'''
@ -374,8 +416,8 @@ async def soft_kill(
# below. This means we try to do a graceful teardown
# via sending a cancel message before getting out
# zombie killing tools.
async with trio.open_nursery() as n:
n.cancel_scope.shield = True
async with trio.open_nursery() as tn:
tn.cancel_scope.shield = True
async def cancel_on_proc_deth():
'''
@ -385,24 +427,35 @@ async def soft_kill(
'''
await wait_func(proc)
n.cancel_scope.cancel()
tn.cancel_scope.cancel()
# start a task to wait on the termination of the
# process by itself waiting on a (caller provided) wait
# function which should unblock when the target process
# has terminated.
n.start_soon(cancel_on_proc_deth)
tn.start_soon(cancel_on_proc_deth)
# send the actor-runtime a cancel request.
await portal.cancel_actor()
# if not errors.get(peer_aid):
# errors[peer_aid] = ActorCancelled(
# message=(
# 'Sub-actor cancelled gracefully by parent\n'
# ),
# canceller=current_actor().aid,
# # TODO? should we have a ack-msg?
# # ipc_msg=??
# # boxed_type=trio.Cancelled,
# )
if proc.poll() is None: # type: ignore
log.warning(
'Subactor still alive after cancel request?\n\n'
f'uid: {peer_aid}\n'
f'|_{proc}\n'
)
n.cancel_scope.cancel()
tn.cancel_scope.cancel()
raise
@ -410,7 +463,10 @@ async def new_proc(
name: str,
actor_nursery: ActorNursery,
subactor: Actor,
errors: dict[tuple[str, str], Exception],
errors: dict[
msgtypes.Aid,
Exception,
],
# passed through to actor main
bind_addrs: list[UnwrappedAddress],
@ -449,7 +505,10 @@ async def trio_proc(
name: str,
actor_nursery: ActorNursery,
subactor: Actor,
errors: dict[tuple[str, str], Exception],
errors: dict[
msgtypes.Aid,
Exception,
],
# passed through to actor main
bind_addrs: list[UnwrappedAddress],
@ -572,9 +631,9 @@ async def trio_proc(
with trio.CancelScope(shield=True):
await actor_nursery._join_procs.wait()
async with trio.open_nursery() as nursery:
async with trio.open_nursery() as ptl_reaper_tn:
if portal in actor_nursery._cancel_after_result_on_exit:
nursery.start_soon(
ptl_reaper_tn.start_soon(
cancel_on_completion,
portal,
subactor,
@ -587,7 +646,8 @@ async def trio_proc(
await soft_kill(
proc,
trio.Process.wait, # XXX, uses `pidfd_open()` below.
portal
portal,
errors,
)
# cancel result waiter that may have been spawned in
@ -596,7 +656,7 @@ async def trio_proc(
'Cancelling portal result reaper task\n'
f'c)> {subactor.aid.reprol()!r}\n'
)
nursery.cancel_scope.cancel()
ptl_reaper_tn.cancel_scope.cancel()
finally:
# XXX NOTE XXX: The "hard" reap since no actor zombies are
@ -669,7 +729,10 @@ async def mp_proc(
name: str,
actor_nursery: ActorNursery, # type: ignore # noqa
subactor: Actor,
errors: dict[tuple[str, str], Exception],
errors: dict[
msgtypes.Aid,
Exception,
],
# passed through to actor main
bind_addrs: list[UnwrappedAddress],
parent_addr: UnwrappedAddress,
@ -794,7 +857,7 @@ async def mp_proc(
cancel_on_completion,
portal,
subactor,
errors
errors,
)
# This is a "soft" (cancellable) join/reap which
@ -803,7 +866,8 @@ async def mp_proc(
await soft_kill(
proc,
proc_waiter,
portal
portal,
errors,
)
# cancel result waiter that may have been spawned in

View File

@ -38,6 +38,7 @@ import trio
from ._exceptions import (
ContextCancelled,
RemoteActorError,
TransportClosed,
)
from .log import get_logger
from .trionics import (
@ -59,7 +60,7 @@ if TYPE_CHECKING:
from .ipc import Channel
log = get_logger(__name__)
log = get_logger()
# TODO: the list
@ -409,10 +410,8 @@ class MsgStream(trio.abc.Channel):
# it).
with trio.CancelScope(shield=True):
await self._ctx.send_stop()
except (
trio.BrokenResourceError,
trio.ClosedResourceError
TransportClosed,
) as re:
# the underlying channel may already have been pulled
# in which case our stop message is meaningless since
@ -593,9 +592,8 @@ class MsgStream(trio.abc.Channel):
),
)
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
BrokenPipeError,
TransportClosed,
) as _trans_err:
trans_err = _trans_err
if (

View File

@ -30,6 +30,9 @@ import warnings
import trio
from .msg import (
types as msgtypes,
)
from .devx import (
debug,
pformat as _pformat,
@ -48,6 +51,7 @@ from .trionics import (
)
from ._exceptions import (
ContextCancelled,
ActorCancelled,
)
from ._root import (
open_root_actor,
@ -62,7 +66,7 @@ if TYPE_CHECKING:
from .ipc import IPCServer
log = get_logger(__name__)
log = get_logger()
class ActorNursery:
@ -99,7 +103,10 @@ class ActorNursery:
actor: Actor,
ria_nursery: trio.Nursery,
da_nursery: trio.Nursery,
errors: dict[tuple[str, str], BaseException],
errors: dict[
msgtypes.Aid,
BaseException,
],
) -> None:
# self.supervisor = supervisor # TODO
@ -117,9 +124,11 @@ class ActorNursery:
]
] = {}
# signals when it is ok to start waiting o subactor procs
# for termination.
self._join_procs = trio.Event()
self._at_least_one_child_in_debug: bool = False
self.errors = errors
self._errors = errors
self._scope_error: BaseException|None = None
self.exited = trio.Event()
@ -260,7 +269,7 @@ class ActorNursery:
name,
self,
subactor,
self.errors,
self._errors,
bind_addrs,
parent_addr,
_rtv, # run time vars
@ -364,7 +373,9 @@ class ActorNursery:
# then `._children`..
children: dict = self._children
child_count: int = len(children)
msg: str = f'Cancelling actor nursery with {child_count} children\n'
msg: str = (
f'Cancelling actor-nursery with {child_count} children\n'
)
server: IPCServer = self._actor.ipc_server
@ -391,7 +402,9 @@ class ActorNursery:
else:
if portal is None: # actor hasn't fully spawned yet
event: trio.Event = server._peer_connected[subactor.uid]
event: trio.Event = server._peer_connected[
subactor.uid
]
log.warning(
f"{subactor.uid} never 't finished spawning?"
)
@ -416,7 +429,20 @@ class ActorNursery:
# spawn cancel tasks for each sub-actor
assert portal
if portal.channel.connected():
tn.start_soon(portal.cancel_actor)
async def canc_subactor():
await portal.cancel_actor()
# aid: msgtypes.Aid = subactor.aid
# reprol: str = aid.reprol(sin_uuid=False)
# if not self._errors.get(aid):
# self._errors[aid] = ActorCancelled(
# message=(
# f'Sub-actor {reprol!r} cancelled gracefully by parent nursery\n'
# ),
# canceller=self._actor.aid,
# )
tn.start_soon(canc_subactor)
log.cancel(msg)
# if we cancelled the cancel (we hung cancelling remote actors)
@ -442,6 +468,47 @@ class ActorNursery:
# mark ourselves as having (tried to have) cancelled all subactors
self._join_procs.set()
@property
def maybe_error(self) -> (
BaseException|
BaseExceptionGroup|
None
):
'''
Deliver any captured scope errors including those relayed
from subactors such as `ActorCancelled` during a non-graceful
cancellation scenario.
When more then a "graceful cancel" occurrs wrap all collected
sub-exceptions in a raised `ExceptionGroup`.
'''
scope_exc: BaseException|None = self._scope_error
# XXX NOTE, only pack an eg if there i at least one
# non-actorc exception received from a subactor, OR
# return `._scope_error` verbatim.
if (errors := self._errors):
# use `BaseExceptionGroup` as needed
excs: list[BaseException] = list(errors.values())
if (
len(excs) > 1
and
any(
type(exc) not in {ActorCancelled,}
for exc in excs
)
):
return ExceptionGroup(
'ActorNursery multi-errored with',
tuple(excs),
)
# raise the lone subactor exc
return list(excs)[0]
return scope_exc
@acm
async def _open_and_supervise_one_cancels_all_nursery(
@ -457,7 +524,10 @@ async def _open_and_supervise_one_cancels_all_nursery(
inner_err: BaseException|None = None
# the collection of errors retreived from spawned sub-actors
errors: dict[tuple[str, str], BaseException] = {}
errors: dict[
msgtypes.Aid,
BaseException,
] = {}
# This is the outermost level "deamon actor" nursery. It is awaited
# **after** the below inner "run in actor nursery". This allows for
@ -467,176 +537,212 @@ async def _open_and_supervise_one_cancels_all_nursery(
# `ActorNursery.start_actor()`).
# errors from this daemon actor nursery bubble up to caller
async with (
collapse_eg(),
trio.open_nursery() as da_nursery,
):
try:
# This is the inner level "run in actor" nursery. It is
# awaited first since actors spawned in this way (using
# `ActorNusery.run_in_actor()`) are expected to only
# return a single result and then complete (i.e. be canclled
# gracefully). Errors collected from these actors are
# immediately raised for handling by a supervisor strategy.
# As such if the strategy propagates any error(s) upwards
# the above "daemon actor" nursery will be notified.
async with (
collapse_eg(),
trio.open_nursery() as ria_nursery,
):
an = ActorNursery(
actor,
ria_nursery,
da_nursery,
errors
)
try:
# spawning of actors happens in the caller's scope
# after we yield upwards
yield an
# When we didn't error in the caller's scope,
# signal all process-monitor-tasks to conduct
# the "hard join phase".
log.runtime(
'Waiting on subactors to complete:\n'
f'>}} {len(an._children)}\n'
try:
async with (
collapse_eg(),
trio.open_nursery() as da_nursery,
):
try:
# This is the inner level "run in actor" nursery. It is
# awaited first since actors spawned in this way (using
# `ActorNusery.run_in_actor()`) are expected to only
# return a single result and then complete (i.e. be canclled
# gracefully). Errors collected from these actors are
# immediately raised for handling by a supervisor strategy.
# As such if the strategy propagates any error(s) upwards
# the above "daemon actor" nursery will be notified.
async with (
collapse_eg(),
trio.open_nursery() as ria_nursery,
):
an = ActorNursery(
actor,
ria_nursery,
da_nursery,
errors
)
an._join_procs.set()
try:
# spawning of actors happens in the caller's scope
# after we yield upwards
yield an
except BaseException as _inner_err:
inner_err = _inner_err
errors[actor.uid] = inner_err
# When we didn't error in the caller's scope,
# signal all process-monitor-tasks to conduct
# the "hard join phase".
log.runtime(
'Waiting on subactors to complete:\n'
f'>}} {len(an._children)}\n'
)
an._join_procs.set()
# If we error in the root but the debugger is
# engaged we don't want to prematurely kill (and
# thus clobber access to) the local tty since it
# will make the pdb repl unusable.
# Instead try to wait for pdb to be released before
# tearing down.
await debug.maybe_wait_for_debugger(
child_in_debug=an._at_least_one_child_in_debug
)
except BaseException as _inner_err:
inner_err = _inner_err
# errors[actor.aid] = inner_err
# if the caller's scope errored then we activate our
# one-cancels-all supervisor strategy (don't
# worry more are coming).
an._join_procs.set()
# If we error in the root but the debugger is
# engaged we don't want to prematurely kill (and
# thus clobber access to) the local tty since it
# will make the pdb repl unusable.
# Instead try to wait for pdb to be released before
# tearing down.
await debug.maybe_wait_for_debugger(
child_in_debug=an._at_least_one_child_in_debug
)
# XXX NOTE XXX: hypothetically an error could
# be raised and then a cancel signal shows up
# slightly after in which case the `else:`
# block here might not complete? For now,
# shield both.
with trio.CancelScope(shield=True):
etype: type = type(inner_err)
if etype in (
trio.Cancelled,
KeyboardInterrupt,
) or (
is_multi_cancelled(inner_err)
):
log.cancel(
f'Actor-nursery cancelled by {etype}\n\n'
# if the caller's scope errored then we activate our
# one-cancels-all supervisor strategy (don't
# worry more are coming).
an._join_procs.set()
f'{current_actor().uid}\n'
f' |_{an}\n\n'
# XXX NOTE XXX: hypothetically an error could
# be raised and then a cancel signal shows up
# slightly after in which case the `else:`
# block here might not complete? For now,
# shield both.
with trio.CancelScope(shield=True):
etype: type = type(inner_err)
if etype in (
trio.Cancelled,
KeyboardInterrupt,
) or (
is_multi_cancelled(inner_err)
):
log.cancel(
f'Actor-nursery cancelled by {etype}\n\n'
# TODO: show tb str?
# f'{tb_str}'
)
elif etype in {
ContextCancelled,
}:
log.cancel(
'Actor-nursery caught remote cancellation\n'
'\n'
f'{inner_err.tb_str}'
)
else:
log.exception(
'Nursery errored with:\n'
f'{current_actor().uid}\n'
f' |_{an}\n\n'
# TODO: same thing as in
# `._invoke()` to compute how to
# place this div-line in the
# middle of the above msg
# content..
# -[ ] prolly helper-func it too
# in our `.log` module..
# '------ - ------'
)
# TODO: show tb str?
# f'{tb_str}'
)
elif etype in {
ContextCancelled,
}:
log.cancel(
'Actor-nursery caught remote cancellation\n'
'\n'
f'{inner_err.tb_str}'
)
else:
log.exception(
'Nursery errored with:\n'
# cancel all subactors
await an.cancel()
# TODO: same thing as in
# `._invoke()` to compute how to
# place this div-line in the
# middle of the above msg
# content..
# -[ ] prolly helper-func it too
# in our `.log` module..
# '------ - ------'
)
# ria_nursery scope end
# cancel all subactors
await an.cancel()
# TODO: this is the handler around the ``.run_in_actor()``
# nursery. Ideally we can drop this entirely in the future as
# the whole ``.run_in_actor()`` API should be built "on top of"
# this lower level spawn-request-cancel "daemon actor" API where
# a local in-actor task nursery is used with one-to-one task
# + `await Portal.run()` calls and the results/errors are
# handled directly (inline) and errors by the local nursery.
except (
Exception,
BaseExceptionGroup,
trio.Cancelled
) as _outer_err:
outer_err = _outer_err
# ria_nursery scope end
an._scope_error = outer_err or inner_err
# TODO: this is the handler around the ``.run_in_actor()``
# nursery. Ideally we can drop this entirely in the future as
# the whole ``.run_in_actor()`` API should be built "on top of"
# this lower level spawn-request-cancel "daemon actor" API where
# a local in-actor task nursery is used with one-to-one task
# + `await Portal.run()` calls and the results/errors are
# handled directly (inline) and errors by the local nursery.
except (
Exception,
BaseExceptionGroup,
trio.Cancelled
) as _outer_err:
outer_err = _outer_err
# XXX: yet another guard before allowing the cancel
# sequence in case a (single) child is in debug.
await debug.maybe_wait_for_debugger(
child_in_debug=an._at_least_one_child_in_debug
)
# If actor-local error was raised while waiting on
# ".run_in_actor()" actors then we also want to cancel all
# remaining sub-actors (due to our lone strategy:
# one-cancels-all).
if an._children:
log.cancel(
'Actor-nursery cancelling due error type:\n'
f'{outer_err}\n'
# XXX: yet another guard before allowing the cancel
# sequence in case a (single) child is in debug.
await debug.maybe_wait_for_debugger(
child_in_debug=an._at_least_one_child_in_debug
)
with trio.CancelScope(shield=True):
await an.cancel()
raise
finally:
# No errors were raised while awaiting ".run_in_actor()"
# actors but those actors may have returned remote errors as
# results (meaning they errored remotely and have relayed
# those errors back to this parent actor). The errors are
# collected in ``errors`` so cancel all actors, summarize
# all errors and re-raise.
if errors:
# If actor-local error was raised while waiting on
# ".run_in_actor()" actors then we also want to cancel all
# remaining sub-actors (due to our lone strategy:
# one-cancels-all).
if an._children:
log.cancel(
'Actor-nursery cancelling due error type:\n'
f'{outer_err}\n'
)
with trio.CancelScope(shield=True):
await an.cancel()
# use `BaseExceptionGroup` as needed
if len(errors) > 1:
raise BaseExceptionGroup(
'tractor.ActorNursery errored with',
tuple(errors.values()),
)
else:
raise list(errors.values())[0]
raise
# show frame on any (likely) internal error
if (
not an.cancelled
and an._scope_error
):
__tracebackhide__: bool = False
finally:
scope_exc = an._scope_error = outer_err or inner_err
# await debug.pause(shield=True)
# if scope_exc:
# errors[actor.aid] = scope_exc
# da_nursery scope end - nursery checkpoint
# final exit
# show this frame on any internal error
if (
not an.cancelled
and
scope_exc
):
__tracebackhide__: bool = False
# NOTE, it's possible no errors were raised while
# awaiting ".run_in_actor()" actors but those
# sub-actors may have delivered remote errors as
# results, normally captured via machinery in
# `._spawn.cancel_on_completion()`.
#
# Any such remote errors are collected in `an._errors`
# which is summarized via `ActorNursery.maybe_error`
# which is maybe re-raised in an outer block (below).
#
# So here we first cancel all subactors the summarize
# all errors and then later (in that outer block)
# maybe-raise on a "non-graceful" cancellation
# outcome, normally as a summary EG.
if (
scope_exc
or
errors
):
if an._children:
with trio.CancelScope(shield=True):
await an.cancel()
# cancel outer tn so we unblock outside this
# finally!
da_nursery.cance_scope.cancel()
#
# ^TODO? still don't get why needed?
# - an.cancel() should cause all spawn-subtasks
# to eventually exit?
# - also, could (instead) we sync to an event here before
# (ever) calling `an.cancel()`??
# `da_nursery` scope end, thus a checkpoint.
finally:
# raise any eg compiled from all subs
# ??TODO should we also adopt strict-egs here like
# `trio.Nursery`??
#
# XXX justification notes,
# docs: https://trio.readthedocs.io/en/stable/reference-core.html#historical-note-non-strict-exceptiongroups
# anthropic: https://discuss.python.org/t/using-exceptiongroup-at-anthropic-experience-report/20888
# gh: https://github.com/python-trio/trio/issues/611
if an_exc := an.maybe_error:
raise an_exc
if scope_exc := an._scope_error:
raise scope_exc
# @acm-fn scope exit
_shutdown_msg: str = (
@ -648,7 +754,7 @@ _shutdown_msg: str = (
# @api_frame
async def open_nursery(
*, # named params only!
hide_tb: bool = True,
hide_tb: bool = False,
**kwargs,
# ^TODO, paramspec for `open_root_actor()`
@ -684,16 +790,21 @@ async def open_nursery(
# mark us for teardown on exit
implicit_runtime: bool = True
async with open_root_actor(
hide_tb=hide_tb,
**kwargs,
) as actor:
async with (
# collapse_eg(hide_tb=hide_tb),
open_root_actor(
hide_tb=hide_tb,
**kwargs,
) as actor,
):
assert actor is current_actor()
try:
async with _open_and_supervise_one_cancels_all_nursery(
actor
) as an:
async with (
_open_and_supervise_one_cancels_all_nursery(
actor
) as an
):
# NOTE: mark this nursery as having
# implicitly started the root actor so

View File

@ -49,7 +49,7 @@ from tractor.msg import (
import wrapt
log = get_logger(__name__)
log = get_logger()
# TODO: yeah, i don't love this and we should prolly just
# write a decorator that actually keeps a stupid ref to the func

View File

@ -51,7 +51,7 @@ from tractor import (
)
from tractor.devx import debug
log = logmod.get_logger(__name__)
log = logmod.get_logger()
if TYPE_CHECKING:

View File

@ -59,7 +59,7 @@ from ._sigint import (
_ctlc_ignore_header as _ctlc_ignore_header
)
log = get_logger(__name__)
log = get_logger()
# ----------------
# XXX PKG TODO XXX

View File

@ -84,7 +84,7 @@ _crash_msg: str = (
'Opening a pdb REPL in crashed actor'
)
log = get_logger(__package__)
log = get_logger()
class BoxedMaybeException(Struct):

View File

@ -47,7 +47,7 @@ if TYPE_CHECKING:
Actor,
)
log = get_logger(__name__)
log = get_logger()
_ctlc_ignore_header: str = (
'Ignoring SIGINT while debug REPL in use'

View File

@ -58,7 +58,7 @@ from ._sigint import (
_ctlc_ignore_header as _ctlc_ignore_header
)
log = get_logger(__package__)
log = get_logger()
async def maybe_wait_for_debugger(

View File

@ -93,7 +93,7 @@ if TYPE_CHECKING:
# from ._post_mortem import BoxedMaybeException
from ._repl import PdbREPL
log = get_logger(__package__)
log = get_logger()
_pause_msg: str = 'Opening a pdb REPL in paused actor'
_repl_fail_msg: str|None = (
@ -628,7 +628,7 @@ def _set_trace(
log.pdb(
f'{_pause_msg}\n'
f'>(\n'
f'|_{actor.uid}\n'
f'|_{actor.aid.uid}\n'
f' |_{task}\n' # @ {actor.uid}\n'
# f'|_{task}\n'
# ^-TODO-^ more compact pformating?
@ -1257,3 +1257,26 @@ async def breakpoint(
api_frame=inspect.currentframe(),
**kwargs,
)
async def maybe_pause_bp():
'''
Internal (ONLY for now) `breakpoint()`-er fn which only tries to
use the multi-actor `.pause()` API when the current actor is the
root.
?! BUT WHY !?
-------
This is useful when debugging cases where the tpt layer breaks
(or is intentionally broken, say during resiliency testing) in
the case where a child can no longer contact the root process to
acquire the process-tree-singleton TTY lock.
'''
import tractor
actor = tractor.current_actor()
if actor.aid.name == 'root':
await tractor.pause(shield=True)
else:
tractor.devx.mk_pdb().set_trace()

View File

@ -81,7 +81,7 @@ if TYPE_CHECKING:
BoxedMaybeException,
)
log = get_logger(__name__)
log = get_logger()
class LockStatus(

View File

@ -60,7 +60,7 @@ if TYPE_CHECKING:
from ._transport import MsgTransport
log = get_logger(__name__)
log = get_logger()
_is_windows = platform.system() == 'Windows'
@ -307,7 +307,12 @@ class Channel:
) -> None:
'''
Send a coded msg-blob over the transport.
Send a coded msg-blob over the underlying IPC transport.
This fn raises `TransportClosed` on comms failures and is
normally handled by higher level runtime machinery for the
expected-graceful cases, normally ephemercal
(re/dis)connects.
'''
__tracebackhide__: bool = hide_tb
@ -334,9 +339,10 @@ class Channel:
except KeyError:
raise err
case TransportClosed():
src_exc_str: str = err.repr_src_exc()
log.transport(
f'Transport stream closed due to\n'
f'{err.repr_src_exc()}\n'
f'Transport stream closed due to,\n'
f'{src_exc_str}'
)
case _:
@ -345,6 +351,11 @@ class Channel:
raise
async def recv(self) -> Any:
'''
Receive the latest (queued) msg-blob from the underlying IPC
transport.
'''
assert self._transport
return await self._transport.recv()
@ -418,16 +429,18 @@ class Channel:
self
) -> AsyncGenerator[Any, None]:
'''
Yield `MsgType` IPC msgs decoded and deliverd from
an underlying `MsgTransport` protocol.
Yield `MsgType` IPC msgs decoded and deliverd from an
underlying `MsgTransport` protocol.
This is a streaming routine alo implemented as an async-gen
func (same a `MsgTransport._iter_pkts()`) gets allocated by
a `.__call__()` inside `.__init__()` where it is assigned to
the `._aiter_msgs` attr.
This is a streaming routine alo implemented as an
async-generator func (same a `MsgTransport._iter_pkts()`)
gets allocated by a `.__call__()` inside `.__init__()` where
it is assigned to the `._aiter_msgs` attr.
'''
assert self._transport
if not self._transport:
raise RuntimeError('No IPC transport initialized!?')
while True:
try:
async for msg in self._transport:
@ -462,7 +475,15 @@ class Channel:
# continue
def connected(self) -> bool:
return self._transport.connected() if self._transport else False
'''
Predicate whether underlying IPC tpt is connected.
'''
return (
self._transport.connected()
if self._transport
else False
)
async def _do_handshake(
self,
@ -493,8 +514,11 @@ async def _connect_chan(
addr: UnwrappedAddress
) -> typing.AsyncGenerator[Channel, None]:
'''
Create and connect a channel with disconnect on context manager
teardown.
Create and connect a `Channel` to the provided `addr`, disconnect
it on cm exit.
NOTE, this is a lowlevel, normally internal-only iface. You
should likely use `.open_portal()` instead.
'''
chan = await Channel.from_addr(addr)

View File

@ -72,7 +72,7 @@ if TYPE_CHECKING:
from .._supervise import ActorNursery
log = log.get_logger(__name__)
log = log.get_logger()
async def maybe_wait_on_canced_subs(

View File

@ -59,7 +59,7 @@ except ImportError:
pass
log = get_logger(__name__)
log = get_logger()
SharedMemory = disable_mantracker()

View File

@ -41,7 +41,7 @@ from tractor.ipc._transport import (
)
log = get_logger(__name__)
log = get_logger()
class TCPAddress(

View File

@ -56,7 +56,7 @@ from tractor.msg import (
if TYPE_CHECKING:
from tractor._addr import Address
log = get_logger(__name__)
log = get_logger()
# (codec, transport)
@ -154,7 +154,6 @@ class MsgTransport(Protocol):
# ...
class MsgpackTransport(MsgTransport):
# TODO: better naming for this?
@ -278,14 +277,18 @@ class MsgpackTransport(MsgTransport):
except trio.ClosedResourceError as cre:
closure_err = cre
# await tractor.devx._trace.maybe_pause_bp()
raise TransportClosed(
message=(
f'{tpt_name} was already closed locally ?\n'
f'{tpt_name} was already closed locally?'
),
src_exc=closure_err,
loglevel='error',
raise_on_report=(
'another task closed this fd' in closure_err.args
'another task closed this fd'
in
closure_err.args
),
) from closure_err
@ -435,6 +438,11 @@ class MsgpackTransport(MsgTransport):
trans_err = _re
tpt_name: str = f'{type(self).__name__!r}'
trans_err_msg: str = trans_err.args[0]
by_whom: str = {
'another task closed this fd': 'locally',
'this socket was already closed': 'by peer',
}.get(trans_err_msg)
match trans_err:
# XXX, specifc to UDS transport and its,
@ -446,38 +454,42 @@ class MsgpackTransport(MsgTransport):
case trio.BrokenResourceError() if (
'[Errno 32] Broken pipe'
in
trans_err.args[0]
trans_err_msg
):
tpt_closed = TransportClosed.from_src_exc(
message=(
f'{tpt_name} already closed by peer\n'
),
body=f'{self}\n',
body=f'{self}',
src_exc=trans_err,
raise_on_report=True,
loglevel='transport',
)
raise tpt_closed from trans_err
# case trio.ClosedResourceError() if (
# 'this socket was already closed'
# in
# trans_err.args[0]
# ):
# tpt_closed = TransportClosed.from_src_exc(
# message=(
# f'{tpt_name} already closed by peer\n'
# ),
# body=f'{self}\n',
# src_exc=trans_err,
# raise_on_report=True,
# loglevel='transport',
# )
# raise tpt_closed from trans_err
# ??TODO??, what case in piker does this and HOW
# CAN WE RE-PRODUCE IT?!?!?
case trio.ClosedResourceError() if (
by_whom
):
tpt_closed = TransportClosed.from_src_exc(
message=(
f'{tpt_name} was already closed {by_whom!r}?\n'
),
body=f'{self}',
src_exc=trans_err,
raise_on_report=True,
loglevel='transport',
)
# unless the disconnect condition falls under "a
# normal operation breakage" we usualy console warn
# about it.
# await tractor.devx._trace.maybe_pause_bp()
raise tpt_closed from trans_err
# XXX, unless the disconnect condition falls
# under "a normal/expected operating breakage"
# (per the `trans_err_msg` guards in the cases
# above) we usualy console-error about it and
# raise-thru. about it.
case _:
log.exception(
f'{tpt_name} layer failed pre-send ??\n'

View File

@ -63,7 +63,7 @@ if TYPE_CHECKING:
from ._runtime import Actor
log = get_logger(__name__)
log = get_logger()
def unwrap_sockpath(
@ -166,6 +166,10 @@ class UDSAddress(
)
if actor:
sockname: str = '::'.join(actor.uid) + f'@{pid}'
# ?^TODO, for `multiaddr`'s parser we can't use the `::`
# above^, SO maybe a `.` or something else here?
# sockname: str = '.'.join(actor.uid) + f'@{pid}'
# -[ ] CURRENTLY using `.` BREAKS TEST SUITE tho..
else:
prefix: str = '<unknown-actor>'
if is_root_process():

View File

@ -14,11 +14,23 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Log like a forester!
'''
An enhanced logging subsys.
"""
An extended logging layer using (for now) the stdlib's `logging`
+ `colorlog` which embeds concurrency-primitive/runtime info into
records (headers) to help you better grok your distributed systems
built on `tractor`.
'''
from collections.abc import Mapping
from functools import partial
from inspect import (
FrameInfo,
getmodule,
stack,
)
import sys
import logging
from logging import (
@ -26,20 +38,24 @@ from logging import (
Logger,
StreamHandler,
)
import colorlog # type: ignore
from types import ModuleType
import warnings
import colorlog # type: ignore
# ?TODO, some other (modern) alt libs?
# import coloredlogs
# import colored_traceback.auto # ?TODO, need better config?
import trio
from ._state import current_actor
_proj_name: str = 'tractor'
_default_loglevel: str = 'ERROR'
# Super sexy formatting thanks to ``colorlog``.
# (NOTE: we use the '{' format style)
# Here, `thin_white` is just the layperson's gray.
LOG_FORMAT = (
LOG_FORMAT: str = (
# "{bold_white}{log_color}{asctime}{reset}"
"{log_color}{asctime}{reset}"
" {bold_white}{thin_white}({reset}"
@ -51,7 +67,7 @@ LOG_FORMAT = (
" {reset}{bold_white}{thin_white}{message}"
)
DATE_FORMAT = '%b %d %H:%M:%S'
DATE_FORMAT: str = '%b %d %H:%M:%S'
# FYI, ERROR is 40
# TODO: use a `bidict` to avoid the :155 check?
@ -75,7 +91,10 @@ STD_PALETTE = {
'TRANSPORT': 'cyan',
}
BOLD_PALETTE = {
BOLD_PALETTE: dict[
str,
dict[int, str],
] = {
'bold': {
level: f"bold_{color}" for level, color in STD_PALETTE.items()}
}
@ -97,9 +116,26 @@ def at_least_level(
return False
# TODO: this isn't showing the correct '{filename}'
# as it did before..
# TODO, compare with using a "filter" instead?
# - https://stackoverflow.com/questions/60691759/add-information-to-every-log-message-in-python-logging/61830838#61830838
# |_corresponding dict-config,
# https://stackoverflow.com/questions/7507825/where-is-a-complete-example-of-logging-config-dictconfig/7507842#7507842
# - [ ] what's the benefit/tradeoffs?
#
class StackLevelAdapter(LoggerAdapter):
'''
A (software) stack oriented logger "adapter".
'''
@property
def level(self) -> str:
'''
The currently set `str` emit level (in lowercase).
'''
return logging.getLevelName(
self.getEffectiveLevel()
).lower()
def at_least_level(
self,
@ -248,9 +284,14 @@ def pformat_task_uid(
return f'{task.name}[{tid_part}]'
_curr_actor_no_exc = partial(
current_actor,
err_on_no_runtime=False,
)
_conc_name_getters = {
'task': pformat_task_uid,
'actor': lambda: current_actor(),
'actor': lambda: _curr_actor_no_exc(),
'actor_name': lambda: current_actor().name,
'actor_uid': lambda: current_actor().uid[1][:6],
}
@ -282,9 +323,16 @@ class ActorContextInfo(Mapping):
return f'no {key} context'
_proj_name: str = 'tractor'
def get_logger(
name: str|None = None,
_root_name: str = _proj_name,
# ^NOTE, setting `name=_proj_name=='tractor'` enables the "root
# logger" for `tractor` itself.
pkg_name: str = _proj_name,
# XXX, deprecated, use ^
_root_name: str|None = None,
logger: Logger|None = None,
@ -293,49 +341,287 @@ def get_logger(
# |_https://stackoverflow.com/questions/7507825/where-is-a-complete-example-of-logging-config-dictconfig
# |_https://docs.python.org/3/library/logging.config.html#configuration-dictionary-schema
subsys_spec: str|None = None,
mk_sublog: bool = True,
_strict_debug: bool = False,
) -> StackLevelAdapter:
'''
Return the `tractor`-library root logger or a sub-logger for
`name` if provided.
'''
log: Logger
log = rlog = logger or logging.getLogger(_root_name)
When `name` is left null we try to auto-detect the caller's
`mod.__name__` and use that as a the sub-logger key.
This allows for example creating a module level instance like,
.. code:: python
log = tractor.log.get_logger(_root_name='mylib')
and by default all console record headers will show the caller's
(of any `log.<level>()`-method) correct sub-pkg's
+ py-module-file.
'''
if _root_name:
msg: str = (
'The `_root_name: str` param of `get_logger()` is now deprecated.\n'
'Use the new `pkg_name: str` instead, it is the same usage.\n'
)
warnings.warn(
msg,
DeprecationWarning,
stacklevel=2,
)
pkg_name: str = _root_name
def get_caller_mod(
frames_up:int = 2
):
'''
Attempt to get the module which called `tractor.get_logger()`.
'''
callstack: list[FrameInfo] = stack()
caller_fi: FrameInfo = callstack[frames_up]
caller_mod: ModuleType = getmodule(caller_fi.frame)
return caller_mod
# --- Auto--naming-CASE ---
# -------------------------
# Implicitly introspect the caller's module-name whenever `name`
# if left as the null default.
#
# When the `pkg_name` is `in` in the `mod.__name__` we presume
# this instance can be created as a sub-`StackLevelAdapter` and
# that the intention is to get free module-path tracing and
# filtering (well once we implement that) oriented around the
# py-module code hierarchy of the consuming project.
#
if (
mk_sublog
and
name is None
and
pkg_name
):
if (caller_mod := get_caller_mod()):
# ?XXX how is this `caller_mod.__name__` defined?
# => well by how the mod is imported.. XD
# |_https://stackoverflow.com/a/15883682
#
# if pkg_name in caller_mod.__package__:
# from tractor.devx.debug import mk_pdb
# mk_pdb().set_trace()
mod_ns_path: str = caller_mod.__name__
mod_pkg_ns_path: str = caller_mod.__package__
if (
mod_pkg_ns_path in mod_ns_path
or
pkg_name in mod_ns_path
):
# proper_mod_name = mod_ns_path.lstrip(
proper_mod_name = mod_pkg_ns_path.removeprefix(
f'{pkg_name}.'
)
name = proper_mod_name
elif (
pkg_name
# and
# pkg_name in mod_ns_path
):
name = mod_ns_path
if _strict_debug:
msg: str = (
f'@ {get_caller_mod()}\n'
f'Generating sub-logger name,\n'
f'{pkg_name}.{name}\n'
)
if _curr_actor_no_exc():
_root_log.debug(msg)
elif pkg_name != _proj_name:
print(
f'=> tractor.log.get_logger():\n'
f'{msg}\n'
)
# build a root logger instance
log: Logger
rlog = log = (
logger
or
logging.getLogger(pkg_name)
)
# XXX, lowlevel debuggin..
# if pkg_name != _proj_name:
# from tractor.devx.debug import mk_pdb
# mk_pdb().set_trace()
# NOTE: for handling for modules that use the unecessary,
# `get_logger(__name__)`
#
# we make the following stylistic choice:
# - always avoid duplicate project-package token
# in msg output: i.e. tractor.tractor.ipc._chan.py in header
# looks ridiculous XD
# - never show the leaf module name in the {name} part
# since in python the {filename} is always this same
# module-file.
if (
name
and
name != _proj_name
# ?TODO? more correct?
# _proj_name not in name
name != pkg_name
):
# ex. modden.runtime.progman
# -> rname='modden', _, pkg_path='runtime.progman'
if (
pkg_name
and
pkg_name in name
):
proper_name: str = name.removeprefix(
f'{pkg_name}.'
)
msg: str = (
f'@ {get_caller_mod()}\n'
f'Duplicate pkg-name in sub-logger `name`-key?\n'
f'pkg_name = {pkg_name!r}\n'
f'name = {name!r}\n'
f'\n'
f'=> You should change your input params to,\n'
f'get_logger(\n'
f' pkg_name={pkg_name!r}\n'
f' name={proper_name!r}\n'
f')'
)
# assert _duplicate == rname
if _curr_actor_no_exc():
_root_log.warning(msg)
else:
print(
f'=> tractor.log.get_logger() ERROR:\n'
f'{msg}\n'
)
# NOTE: for handling for modules that use `get_logger(__name__)`
# we make the following stylistic choice:
# - always avoid duplicate project-package token
# in msg output: i.e. tractor.tractor.ipc._chan.py in header
# looks ridiculous XD
# - never show the leaf module name in the {name} part
# since in python the {filename} is always this same
# module-file.
name = proper_name
sub_name: None|str = None
rname, _, sub_name = name.partition('.')
pkgpath, _, modfilename = sub_name.rpartition('.')
rname: str = pkg_name
pkg_path: str = name
# NOTE: for tractor itself never include the last level
# module key in the name such that something like: eg.
# 'tractor.trionics._broadcast` only includes the first
# 2 tokens in the (coloured) name part.
if rname == 'tractor':
sub_name = pkgpath
if _root_name in sub_name:
duplicate, _, sub_name = sub_name.partition('.')
# (
# rname,
# _,
# pkg_path,
# ) = name.partition('.')
if not sub_name:
# For ex. 'modden.runtime.progman'
# -> pkgpath='runtime', _, leaf_mod='progman'
(
subpkg_path,
_,
leaf_mod,
) = pkg_path.rpartition('.')
# NOTE: special usage for passing `name=__name__`,
#
# - remove duplication of any root-pkg-name in the
# (sub/child-)logger name; i.e. never include the
# `pkg_name` *twice* in the top-most-pkg-name/level
#
# -> this happens normally since it is added to `.getChild()`
# and as the name of its root-logger.
#
# => So for ex. (module key in the name) something like
# `name='tractor.trionics._broadcast` is passed,
# only includes the first 2 sub-pkg name-tokens in the
# child-logger's name; the colored "pkg-namespace" header
# will then correctly show the same value as `name`.
if (
# XXX, TRY to remove duplication cases
# which get warn-logged on below!
(
# when, subpkg_path == pkg_path
subpkg_path
and
rname == pkg_name
)
# ) or (
# # when, pkg_path == leaf_mod
# pkg_path
# and
# leaf_mod == pkg_path
# )
):
pkg_path = subpkg_path
# XXX, do some double-checks for duplication of,
# - root-pkg-name, already in root logger
# - leaf-module-name already in `{filename}` header-field
if (
_strict_debug
and
pkg_name
and
pkg_name in pkg_path
):
_duplicate, _, pkg_path = pkg_path.partition('.')
if _duplicate:
msg: str = (
f'@ {get_caller_mod()}\n'
f'Duplicate pkg-name in sub-logger key?\n'
f'pkg_name = {pkg_name!r}\n'
f'pkg_path = {pkg_path!r}\n'
)
# assert _duplicate == rname
if _curr_actor_no_exc():
_root_log.warning(msg)
else:
print(
f'=> tractor.log.get_logger() ERROR:\n'
f'{msg}\n'
)
# XXX, should never get here?
breakpoint()
if (
_strict_debug
and
leaf_mod
and
leaf_mod in pkg_path
):
msg: str = (
f'@ {get_caller_mod()}\n'
f'Duplicate leaf-module-name in sub-logger key?\n'
f'leaf_mod = {leaf_mod!r}\n'
f'pkg_path = {pkg_path!r}\n'
)
if _curr_actor_no_exc():
_root_log.warning(msg)
else:
print(
f'=> tractor.log.get_logger() ERROR:\n'
f'{msg}\n'
)
# mk/get underlying (sub-)`Logger`
if (
not pkg_path
and
leaf_mod == pkg_name
):
# breakpoint()
log = rlog
else:
log = rlog.getChild(sub_name)
elif mk_sublog:
# breakpoint()
log = rlog.getChild(pkg_path)
log.level = rlog.level
@ -350,8 +636,13 @@ def get_logger(
for name, val in CUSTOM_LEVELS.items():
logging.addLevelName(val, name)
# ensure customs levels exist as methods
assert getattr(logger, name.lower()), f'Logger does not define {name}'
# ensure our custom adapter levels exist as methods
assert getattr(
logger,
name.lower()
), (
f'Logger does not define {name}'
)
return logger
@ -425,4 +716,4 @@ def get_loglevel() -> str:
# global module logger for tractor itself
log: StackLevelAdapter = get_logger('tractor')
_root_log: StackLevelAdapter = get_logger('tractor')

View File

@ -68,7 +68,7 @@ from tractor.log import get_logger
if TYPE_CHECKING:
from tractor._context import Context
log = get_logger(__name__)
log = get_logger()
# TODO: unify with `MsgCodec` by making `._dec` part this?

View File

@ -77,7 +77,7 @@ if TYPE_CHECKING:
from tractor._streaming import MsgStream
log = get_logger(__name__)
log = get_logger()
_def_any_pldec: MsgDec[Any] = mk_dec(spec=Any)

View File

@ -51,7 +51,7 @@ from tractor.log import get_logger
# from tractor._addr import UnwrappedAddress
log = get_logger('tractor.msgspec')
log = get_logger()
# type variable for the boxed payload field `.pld`
PayloadT = TypeVar('PayloadT')
@ -202,7 +202,10 @@ class SpawnSpec(
# TODO: similar to the `Start` kwargs spec needed below, we need
# a hard `Struct` def for all of these fields!
_parent_main_data: dict
_runtime_vars: dict[str, Any]
_runtime_vars: (
dict[str, Any]
#|RuntimeVars # !TODO
)
# ^NOTE see `._state._runtime_vars: dict`
# module import capability

View File

@ -71,7 +71,7 @@ from outcome import (
Outcome,
)
log: StackLevelAdapter = get_logger(__name__)
log: StackLevelAdapter = get_logger()
__all__ = [

View File

@ -42,7 +42,7 @@ from trio.lowlevel import current_task
from msgspec import Struct
from tractor.log import get_logger
log = get_logger(__name__)
log = get_logger()
# TODO: use new type-vars syntax from 3.12
# https://realpython.com/python312-new-features/#dedicated-type-variable-syntax

View File

@ -49,7 +49,7 @@ if TYPE_CHECKING:
from tractor import ActorNursery
log = get_logger(__name__)
log = get_logger()
# A regular invariant generic type
T = TypeVar("T")

View File

@ -34,7 +34,7 @@ from typing import (
import trio
from tractor.log import get_logger
log = get_logger(__name__)
log = get_logger()
if TYPE_CHECKING:
@ -246,23 +246,12 @@ async def maybe_raise_from_masking_exc(
type(exc_match) # masked type
)
# Add to masked `exc_ctx`
if do_warn:
exc_ctx.add_note(note)
if (
do_warn
and
type(exc_match) in always_warn_on
):
log.warning(note)
if (
do_warn
and
raise_unmasked
):
# don't unmask already known "special" cases..
if len(masked) < 2:
# don't unmask already known "special" cases..
if (
_mask_cases
and
@ -283,11 +272,26 @@ async def maybe_raise_from_masking_exc(
)
raise exc_match
raise exc_ctx from exc_match
# ^?TODO, see above but, possibly unmasking sub-exc
# entries if there are > 1
# else:
# await pause(shield=True)
if type(exc_match) in always_warn_on:
import traceback
trace: list[str] = traceback.format_exception(
type(exc_ctx),
exc_ctx,
exc_ctx.__traceback__
)
tb_str: str = ''.join(trace)
log.warning(tb_str)
# XXX, for debug
# from tractor import pause
# await pause(shield=True)
if raise_unmasked:
raise exc_ctx from exc_match
# ??TODO, see above but, possibly unmasking sub-exc
# entries if there are > 1
# else:
# await pause(shield=True)
else:
raise

62
uv.lock
View File

@ -1,5 +1,5 @@
version = 1
revision = 2
revision = 3
requires-python = ">=3.11"
[[package]]
@ -329,6 +329,32 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/30/3d/64ad57c803f1fa1e963a7946b6e0fea4a70df53c1a7fed304586539c2bac/pytest-8.3.5-py3-none-any.whl", hash = "sha256:c69214aa47deac29fad6c2a4f590b9c4a9fdb16a403176fe154b79c0b4d4d820", size = 343634, upload-time = "2025-03-02T12:54:52.069Z" },
]
[[package]]
name = "ruff"
version = "0.14.14"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/2e/06/f71e3a86b2df0dfa2d2f72195941cd09b44f87711cb7fa5193732cb9a5fc/ruff-0.14.14.tar.gz", hash = "sha256:2d0f819c9a90205f3a867dbbd0be083bee9912e170fd7d9704cc8ae45824896b", size = 4515732, upload-time = "2026-01-22T22:30:17.527Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/d2/89/20a12e97bc6b9f9f68343952da08a8099c57237aef953a56b82711d55edd/ruff-0.14.14-py3-none-linux_armv6l.whl", hash = "sha256:7cfe36b56e8489dee8fbc777c61959f60ec0f1f11817e8f2415f429552846aed", size = 10467650, upload-time = "2026-01-22T22:30:08.578Z" },
{ url = "https://files.pythonhosted.org/packages/a3/b1/c5de3fd2d5a831fcae21beda5e3589c0ba67eec8202e992388e4b17a6040/ruff-0.14.14-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:6006a0082336e7920b9573ef8a7f52eec837add1265cc74e04ea8a4368cd704c", size = 10883245, upload-time = "2026-01-22T22:30:04.155Z" },
{ url = "https://files.pythonhosted.org/packages/b8/7c/3c1db59a10e7490f8f6f8559d1db8636cbb13dccebf18686f4e3c9d7c772/ruff-0.14.14-py3-none-macosx_11_0_arm64.whl", hash = "sha256:026c1d25996818f0bf498636686199d9bd0d9d6341c9c2c3b62e2a0198b758de", size = 10231273, upload-time = "2026-01-22T22:30:34.642Z" },
{ url = "https://files.pythonhosted.org/packages/a1/6e/5e0e0d9674be0f8581d1f5e0f0a04761203affce3232c1a1189d0e3b4dad/ruff-0.14.14-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f666445819d31210b71e0a6d1c01e24447a20b85458eea25a25fe8142210ae0e", size = 10585753, upload-time = "2026-01-22T22:30:31.781Z" },
{ url = "https://files.pythonhosted.org/packages/23/09/754ab09f46ff1884d422dc26d59ba18b4e5d355be147721bb2518aa2a014/ruff-0.14.14-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:3c0f18b922c6d2ff9a5e6c3ee16259adc513ca775bcf82c67ebab7cbd9da5bc8", size = 10286052, upload-time = "2026-01-22T22:30:24.827Z" },
{ url = "https://files.pythonhosted.org/packages/c8/cc/e71f88dd2a12afb5f50733851729d6b571a7c3a35bfdb16c3035132675a0/ruff-0.14.14-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1629e67489c2dea43e8658c3dba659edbfd87361624b4040d1df04c9740ae906", size = 11043637, upload-time = "2026-01-22T22:30:13.239Z" },
{ url = "https://files.pythonhosted.org/packages/67/b2/397245026352494497dac935d7f00f1468c03a23a0c5db6ad8fc49ca3fb2/ruff-0.14.14-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:27493a2131ea0f899057d49d303e4292b2cae2bb57253c1ed1f256fbcd1da480", size = 12194761, upload-time = "2026-01-22T22:30:22.542Z" },
{ url = "https://files.pythonhosted.org/packages/5b/06/06ef271459f778323112c51b7587ce85230785cd64e91772034ddb88f200/ruff-0.14.14-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:01ff589aab3f5b539e35db38425da31a57521efd1e4ad1ae08fc34dbe30bd7df", size = 12005701, upload-time = "2026-01-22T22:30:20.499Z" },
{ url = "https://files.pythonhosted.org/packages/41/d6/99364514541cf811ccc5ac44362f88df66373e9fec1b9d1c4cc830593fe7/ruff-0.14.14-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:1cc12d74eef0f29f51775f5b755913eb523546b88e2d733e1d701fe65144e89b", size = 11282455, upload-time = "2026-01-22T22:29:59.679Z" },
{ url = "https://files.pythonhosted.org/packages/ca/71/37daa46f89475f8582b7762ecd2722492df26421714a33e72ccc9a84d7a5/ruff-0.14.14-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bb8481604b7a9e75eff53772496201690ce2687067e038b3cc31aaf16aa0b974", size = 11215882, upload-time = "2026-01-22T22:29:57.032Z" },
{ url = "https://files.pythonhosted.org/packages/2c/10/a31f86169ec91c0705e618443ee74ede0bdd94da0a57b28e72db68b2dbac/ruff-0.14.14-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:14649acb1cf7b5d2d283ebd2f58d56b75836ed8c6f329664fa91cdea19e76e66", size = 11180549, upload-time = "2026-01-22T22:30:27.175Z" },
{ url = "https://files.pythonhosted.org/packages/fd/1e/c723f20536b5163adf79bdd10c5f093414293cdf567eed9bdb7b83940f3f/ruff-0.14.14-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:e8058d2145566510790eab4e2fad186002e288dec5e0d343a92fe7b0bc1b3e13", size = 10543416, upload-time = "2026-01-22T22:30:01.964Z" },
{ url = "https://files.pythonhosted.org/packages/3e/34/8a84cea7e42c2d94ba5bde1d7a4fae164d6318f13f933d92da6d7c2041ff/ruff-0.14.14-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:e651e977a79e4c758eb807f0481d673a67ffe53cfa92209781dfa3a996cf8412", size = 10285491, upload-time = "2026-01-22T22:30:29.51Z" },
{ url = "https://files.pythonhosted.org/packages/55/ef/b7c5ea0be82518906c978e365e56a77f8de7678c8bb6651ccfbdc178c29f/ruff-0.14.14-py3-none-musllinux_1_2_i686.whl", hash = "sha256:cc8b22da8d9d6fdd844a68ae937e2a0adf9b16514e9a97cc60355e2d4b219fc3", size = 10733525, upload-time = "2026-01-22T22:30:06.499Z" },
{ url = "https://files.pythonhosted.org/packages/6a/5b/aaf1dfbcc53a2811f6cc0a1759de24e4b03e02ba8762daabd9b6bd8c59e3/ruff-0.14.14-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:16bc890fb4cc9781bb05beb5ab4cd51be9e7cb376bf1dd3580512b24eb3fda2b", size = 11315626, upload-time = "2026-01-22T22:30:36.848Z" },
{ url = "https://files.pythonhosted.org/packages/2c/aa/9f89c719c467dfaf8ad799b9bae0df494513fb21d31a6059cb5870e57e74/ruff-0.14.14-py3-none-win32.whl", hash = "sha256:b530c191970b143375b6a68e6f743800b2b786bbcf03a7965b06c4bf04568167", size = 10502442, upload-time = "2026-01-22T22:30:38.93Z" },
{ url = "https://files.pythonhosted.org/packages/87/44/90fa543014c45560cae1fffc63ea059fb3575ee6e1cb654562197e5d16fb/ruff-0.14.14-py3-none-win_amd64.whl", hash = "sha256:3dde1435e6b6fe5b66506c1dff67a421d0b7f6488d466f651c07f4cab3bf20fd", size = 11630486, upload-time = "2026-01-22T22:30:10.852Z" },
{ url = "https://files.pythonhosted.org/packages/9e/6a/40fee331a52339926a92e17ae748827270b288a35ef4a15c9c8f2ec54715/ruff-0.14.14-py3-none-win_arm64.whl", hash = "sha256:56e6981a98b13a32236a72a8da421d7839221fa308b223b9283312312e5ac76c", size = 10920448, upload-time = "2026-01-22T22:30:15.417Z" },
]
[[package]]
name = "sniffio"
version = "1.3.1"
@ -395,6 +421,24 @@ dev = [
{ name = "typing-extensions" },
{ name = "xonsh" },
]
devx = [
{ name = "greenback" },
{ name = "stackscope" },
{ name = "typing-extensions" },
]
lint = [
{ name = "ruff" },
]
repl = [
{ name = "prompt-toolkit" },
{ name = "psutil" },
{ name = "pyperclip" },
{ name = "xonsh" },
]
testing = [
{ name = "pexpect" },
{ name = "pytest" },
]
[package.metadata]
requires-dist = [
@ -420,6 +464,22 @@ dev = [
{ name = "typing-extensions", specifier = ">=4.14.1" },
{ name = "xonsh", specifier = ">=0.19.2" },
]
devx = [
{ name = "greenback", specifier = ">=1.2.1,<2" },
{ name = "stackscope", specifier = ">=0.2.2,<0.3" },
{ name = "typing-extensions", specifier = ">=4.14.1" },
]
lint = [{ name = "ruff", specifier = ">=0.9.6" }]
repl = [
{ name = "prompt-toolkit", specifier = ">=3.0.50" },
{ name = "psutil", specifier = ">=7.0.0" },
{ name = "pyperclip", specifier = ">=1.9.0" },
{ name = "xonsh", specifier = ">=0.19.2" },
]
testing = [
{ name = "pexpect", specifier = ">=4.9.0,<5" },
{ name = "pytest", specifier = ">=8.3.5" },
]
[[package]]
name = "tricycle"