tractor/tractor/trionics/_mngrs.py

496 lines
14 KiB
Python
Raw Normal View History

Re-license code base for distribution under AGPL This commit obviously denotes a re-license of all applicable parts of the code base. Acknowledgement of this change was completed in #274 by the majority of the current set of contributors. From here henceforth all changes will be AGPL licensed and distributed. This is purely an effort to maintain the same copy-left policy whilst closing the (perceived) SaaS loophole the GPL allows for. It is merely for this loophole: to avoid code hiding by any potential "network providers" who are attempting to use the project to make a profit without either compensating the authors or re-distributing their changes. I thought quite a bit about this change and can't see a reason not to close the SaaS loophole in our current license. We still are (hard) copy-left and I plan to keep the code base this way for a couple reasons: - The code base produces income/profit through parent projects and is demonstrably of high value. - I believe firms should not get free lunch for the sake of "contributions from their employees" or "usage as a service" which I have found to be a dubious argument at best. - If a firm who intends to profit from the code base wants to use it they can propose a secondary commercial license to purchase with the proceeds going to the project's authors under some form of well defined contract. - Many successful projects like Qt use this model; I see no reason it can't work in this case until such a time as the authors feel it should be loosened. There has been detailed discussion in #103 on licensing alternatives. The main point of this AGPL change is to protect the code base for the time being from exploitation while it grows and as we move into the next phase of development which will include extension into the multi-host distributed software space.
2021-12-13 18:08:32 +00:00
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Async context manager primitives with hard ``trio``-aware semantics
'''
from __future__ import annotations
from collections import defaultdict
from contextlib import (
asynccontextmanager as acm,
)
import inspect
from types import ModuleType
from typing import (
Any,
AsyncContextManager,
AsyncGenerator,
AsyncIterator,
Callable,
Hashable,
Sequence,
TypeVar,
TYPE_CHECKING,
)
import trio
from tractor.runtime._state import current_actor
from tractor.log import get_logger
import tractor
# from ._beg import collapse_eg
# from ._taskc import (
# maybe_raise_from_masking_exc,
# )
if TYPE_CHECKING:
from tractor import ActorNursery
log = get_logger()
# A regular invariant generic type
T = TypeVar("T")
@acm
async def maybe_open_nursery(
nursery: trio.Nursery|ActorNursery|None = None,
shield: bool = False,
lib: ModuleType = trio,
**kwargs, # proxy thru
) -> AsyncGenerator[trio.Nursery, Any]:
'''
Create a new nursery if None provided.
Blocks on exit as expected if no input nursery is provided.
'''
if nursery is not None:
yield nursery
else:
async with lib.open_nursery(**kwargs) as nursery:
if lib == trio:
nursery.cancel_scope.shield = shield
yield nursery
async def _enter_and_wait(
mngr: AsyncContextManager[T],
unwrapped: dict[int, T],
all_entered: trio.Event,
parent_exit: trio.Event,
seed: int,
) -> None:
'''
Open the async context manager deliver it's value
to this task's spawner and sleep until cancelled.
'''
async with mngr as value:
unwrapped[id(mngr)] = value
if all(
val != seed
for val in unwrapped.values()
):
all_entered.set()
await parent_exit.wait()
@acm
async def gather_contexts(
mngrs: Sequence[AsyncContextManager[T]],
# caller can provide their own scope
tn: trio.Nursery|None = None,
) -> AsyncGenerator[
tuple[
T | None,
...
],
None,
]:
'''
Concurrently enter a sequence of async context managers (`acm`s),
each scheduled in a separate `trio.Task` and deliver their
unwrapped `yield`-ed values in the same order once all `@acm`s
in every task have entered.
On exit, all `acm`s are subsequently and concurrently exited with
**no order guarantees**.
This function is somewhat similar to a batch of non-blocking
calls to `contextlib.AsyncExitStack.enter_async_context()`
(inside a loop) *in combo with* a `asyncio.gather()` to get the
`.__aenter__()`-ed values, except the managers are both
concurrently entered and exited and *cancellation-just-works*.
'''
seed: int = id(mngrs)
unwrapped: dict[int, T|None] = {}.fromkeys(
(id(mngr) for mngr in mngrs),
seed,
)
all_entered = trio.Event()
parent_exit = trio.Event()
# XXX: ensure greedy sequence of manager instances
# since a lazy inline generator doesn't seem to work
# with `async with` syntax.
mngrs = list(mngrs)
if not mngrs:
raise ValueError(
'`.trionics.gather_contexts()` input mngrs is empty?\n'
'\n'
'Did try to use inline generator syntax?\n'
'Check that list({mngrs}) works!\n'
# 'or sequence-type intead!\n'
# 'Use a non-lazy iterator or sequence-type intead!\n'
)
try:
async with (
#
# ?TODO, does including these (eg-collapsing,
# taskc-unmasking) improve tb noise-reduction/legibility?
#
# collapse_eg(),
maybe_open_nursery(
nursery=tn,
) as tn,
# maybe_raise_from_masking_exc(),
):
for mngr in mngrs:
tn.start_soon(
_enter_and_wait,
mngr,
unwrapped,
all_entered,
parent_exit,
seed,
)
# deliver control to caller once all ctx-managers have
# started (yielded back to us).
await all_entered.wait()
yield tuple(unwrapped.values())
parent_exit.set()
finally:
# XXX NOTE: this is ABSOLUTELY REQUIRED to avoid
# the following wacky bug:
# <tractorbugurlhere>
parent_exit.set()
# Per actor task caching helpers.
# Further potential examples of interest:
# https://gist.github.com/njsmith/cf6fc0a97f53865f2c671659c88c1798#file-cache-py-L8
2021-12-15 13:16:31 +00:00
class _Cache:
'''
2021-12-15 13:16:31 +00:00
Globally (actor-processs scoped) cached, task access to
a kept-alive-while-in-use async resource.
'''
2025-08-19 23:59:05 +00:00
service_tn: trio.Nursery|None = None
locks: dict[Hashable, trio.Lock] = {}
users: defaultdict[
tuple|Hashable,
int,
] = defaultdict(int)
values: dict[Any, Any] = {}
resources: dict[
2021-12-15 22:21:41 +00:00
Hashable,
tuple[trio.Nursery, trio.Event]
] = {}
# nurseries: dict[int, trio.Nursery] = {}
2025-08-19 23:59:05 +00:00
no_more_users: trio.Event|None = None
@classmethod
async def run_ctx(
cls,
mng,
ctx_key: tuple,
task_status: trio.TaskStatus[T] = trio.TASK_STATUS_IGNORED,
) -> None:
try:
async with mng as value:
_, no_more_users = cls.resources[ctx_key]
try:
cls.values[ctx_key] = value
task_status.started(value)
await no_more_users.wait()
finally:
value = cls.values.pop(ctx_key)
finally:
# discard nursery ref so it won't be re-used (an error)?
_rsrcs = cls.resources.pop(ctx_key)
log.error(
f'Popping ctx resources\n'
f'{_rsrcs}\n'
)
class _UnresolvedCtx:
'''
Placeholder for the mabye-value delivered from some `acm_func`,
once (first) entered by a `maybe_open_context()` task.
Enables internal teardown logic conditioned on whether the
context was actually entered successfully vs. cancelled prior.
'''
@acm
async def maybe_open_context(
acm_func: Callable[..., AsyncContextManager[T]],
# XXX: used as cache key after conversion to tuple
# and all embedded values must also be hashable
2021-12-15 22:21:41 +00:00
kwargs: dict = {},
key: Hashable|Callable[..., Hashable] = None,
# caller can provide their own scope
tn: trio.Nursery|None = None,
) -> AsyncIterator[tuple[bool, T]]:
'''
Maybe open an async-context-manager (acm) if there is not already
a `_Cached` version for the provided (input) `key` for *this* actor.
Return the `_Cached` instance on a _Cache hit.
'''
fid: int = id(acm_func)
if inspect.isfunction(key):
ctx_key = (
fid,
key(**kwargs)
)
else:
ctx_key = (
fid,
key or tuple(kwargs.items())
)
# yielded output
# sentinel = object()
yielded: Any = _UnresolvedCtx
lock_registered: bool = False
# Lock resource acquisition around task racing / ``trio``'s
# scheduler protocol.
# NOTE: the lock is target context manager func specific in order
# to allow re-entrant use cases where one `maybe_open_context()`
# wrapped factory may want to call into another.
task: trio.Task = trio.lowlevel.current_task()
lock: trio.StrictFIFOLock|None = _Cache.locks.get(
# fid
ctx_key
)
if not lock:
lock = _Cache.locks[
ctx_key
# fid
] = trio.StrictFIFOLock()
# lock = _Cache.locks[fid] = trio.Lock()
header: str = 'Allocated NEW lock for @acm_func,\n'
lock_registered: bool = True
else:
await trio.lowlevel.checkpoint()
header: str = 'Reusing OLD lock for @acm_func,\n'
log.debug(
f'{header}'
f'Acquiring..\n'
f'task={task!r}\n'
f'fid={fid!r}\n'
f'acm_func={acm_func}\n'
)
await lock.acquire()
log.debug(
f'Acquir lock..\n'
f'task={task!r}\n'
f'fid={fid!r}\n'
f'acm_func={acm_func}\n'
)
# XXX: one singleton nursery per actor and we want to
# have it not be closed until all consumers have exited (which is
# currently difficult to implement any other way besides using our
# pre-allocated runtime instance..)
if tn:
# TODO, assert tn is eventual parent of this task!
task: trio.Task = trio.lowlevel.current_task()
task_tn: trio.Nursery = task.parent_nursery
if not tn._cancel_status.encloses(
task_tn._cancel_status
):
raise RuntimeError(
f'Mis-nesting of task under provided {tn} !?\n'
f'Current task is NOT a child(-ish)!!\n'
f'\n'
f'task: {task}\n'
f'task_tn: {task_tn}\n'
)
Heh, add back `Actor._root_tn`, it has purpose.. Turns out I didn't read my own internals docs/comments and despite it not being used previously, this adds the real use case: a root, per-actor, scope which ensures parent comms are the last conc-thing to be cancelled. Also, the impl changes here make the test from 6410e45 (or wtv it's rebased to) pass, i.e. we can support crash handling in the root actor despite the root-tn having been (self) cancelled. Superficial adjustments, - rename `Actor._service_n` -> `._service_tn` everywhere. - add asserts to `._runtime.async_main()` which ensure that the any `.trionics.maybe_open_nursery()` calls against optionally passed `._[root/service]_tn` are allocated-if-not-provided (the `._service_tn`-case being an i-guess-prep-for-the-future-anti-pattern Bp). - obvi adjust all internal usage to match new naming. Serious/real-use-case changes, - add (back) a `Actor._root_tn` which sits a scope "above" the service-tn and is either, + assigned in `._runtime.async_main()` for sub-actors OR, + assigned in `._root.open_root_actor()` for the root actor. **THE primary reason** to keep this "upper" tn is that during a full-`Actor`-cancellation condition (more details below) we want to ensure that the IPC connection with a sub-actor's parent is **the last thing to be cancelled**; this is most simply implemented by ensuring that the `Actor._parent_chan: .ipc.Channel` is handled in an upper scope in `_rpc.process_messages()`-subtask-terms. - for the root actor this `root_tn` is allocated in `.open_root_actor()` body and assigned as such. - extend `Actor.cancel_soon()` to be cohesive with this entire teardown "policy" by scheduling a task in the `._root_tn` which, * waits for the `._service_tn` to complete and then, * cancels the `._root_tn.cancel_scope`, * includes "sclangy" console logging throughout.
2025-08-19 23:24:20 +00:00
service_tn = tn
else:
Heh, add back `Actor._root_tn`, it has purpose.. Turns out I didn't read my own internals docs/comments and despite it not being used previously, this adds the real use case: a root, per-actor, scope which ensures parent comms are the last conc-thing to be cancelled. Also, the impl changes here make the test from 6410e45 (or wtv it's rebased to) pass, i.e. we can support crash handling in the root actor despite the root-tn having been (self) cancelled. Superficial adjustments, - rename `Actor._service_n` -> `._service_tn` everywhere. - add asserts to `._runtime.async_main()` which ensure that the any `.trionics.maybe_open_nursery()` calls against optionally passed `._[root/service]_tn` are allocated-if-not-provided (the `._service_tn`-case being an i-guess-prep-for-the-future-anti-pattern Bp). - obvi adjust all internal usage to match new naming. Serious/real-use-case changes, - add (back) a `Actor._root_tn` which sits a scope "above" the service-tn and is either, + assigned in `._runtime.async_main()` for sub-actors OR, + assigned in `._root.open_root_actor()` for the root actor. **THE primary reason** to keep this "upper" tn is that during a full-`Actor`-cancellation condition (more details below) we want to ensure that the IPC connection with a sub-actor's parent is **the last thing to be cancelled**; this is most simply implemented by ensuring that the `Actor._parent_chan: .ipc.Channel` is handled in an upper scope in `_rpc.process_messages()`-subtask-terms. - for the root actor this `root_tn` is allocated in `.open_root_actor()` body and assigned as such. - extend `Actor.cancel_soon()` to be cohesive with this entire teardown "policy" by scheduling a task in the `._root_tn` which, * waits for the `._service_tn` to complete and then, * cancels the `._root_tn.cancel_scope`, * includes "sclangy" console logging throughout.
2025-08-19 23:24:20 +00:00
service_tn: trio.Nursery = current_actor()._service_tn
# TODO: is there any way to allocate
# a 'stays-open-till-last-task-finshed nursery?
Heh, add back `Actor._root_tn`, it has purpose.. Turns out I didn't read my own internals docs/comments and despite it not being used previously, this adds the real use case: a root, per-actor, scope which ensures parent comms are the last conc-thing to be cancelled. Also, the impl changes here make the test from 6410e45 (or wtv it's rebased to) pass, i.e. we can support crash handling in the root actor despite the root-tn having been (self) cancelled. Superficial adjustments, - rename `Actor._service_n` -> `._service_tn` everywhere. - add asserts to `._runtime.async_main()` which ensure that the any `.trionics.maybe_open_nursery()` calls against optionally passed `._[root/service]_tn` are allocated-if-not-provided (the `._service_tn`-case being an i-guess-prep-for-the-future-anti-pattern Bp). - obvi adjust all internal usage to match new naming. Serious/real-use-case changes, - add (back) a `Actor._root_tn` which sits a scope "above" the service-tn and is either, + assigned in `._runtime.async_main()` for sub-actors OR, + assigned in `._root.open_root_actor()` for the root actor. **THE primary reason** to keep this "upper" tn is that during a full-`Actor`-cancellation condition (more details below) we want to ensure that the IPC connection with a sub-actor's parent is **the last thing to be cancelled**; this is most simply implemented by ensuring that the `Actor._parent_chan: .ipc.Channel` is handled in an upper scope in `_rpc.process_messages()`-subtask-terms. - for the root actor this `root_tn` is allocated in `.open_root_actor()` body and assigned as such. - extend `Actor.cancel_soon()` to be cohesive with this entire teardown "policy" by scheduling a task in the `._root_tn` which, * waits for the `._service_tn` to complete and then, * cancels the `._root_tn.cancel_scope`, * includes "sclangy" console logging throughout.
2025-08-19 23:24:20 +00:00
# service_tn: trio.Nursery
# async with maybe_open_nursery(_Cache.service_tn) as service_tn:
# _Cache.service_tn = service_tn
cache_miss_ke: KeyError|None = None
maybe_taskc: trio.Cancelled|None = None
try:
# **critical section** that should prevent other tasks from
2021-12-15 13:16:31 +00:00
# checking the _Cache until complete otherwise the scheduler
# may switch and by accident we create more then one resource.
yielded = _Cache.values[ctx_key]
# XXX^ should key-err if not-yet-allocated
except KeyError as _ke:
# XXX, stay mutexed up to cache-miss yield
try:
cache_miss_ke = _ke
log.debug(
f'Allocating new @acm-func entry\n'
f'ctx_key={ctx_key}\n'
f'acm_func={acm_func}\n'
)
# await tractor.pause()
mngr = acm_func(**kwargs)
resources = _Cache.resources
entry: tuple|None = resources.get(ctx_key)
if entry:
service_tn, ev = entry
# XXX, trace this.
# await tractor.pause(shield=True)
raise RuntimeError(
f'Caching resources ALREADY exist?!\n'
f'ctx_key={ctx_key!r}\n'
f'acm_func={acm_func}\n'
f'task: {task}\n'
)
Heh, add back `Actor._root_tn`, it has purpose.. Turns out I didn't read my own internals docs/comments and despite it not being used previously, this adds the real use case: a root, per-actor, scope which ensures parent comms are the last conc-thing to be cancelled. Also, the impl changes here make the test from 6410e45 (or wtv it's rebased to) pass, i.e. we can support crash handling in the root actor despite the root-tn having been (self) cancelled. Superficial adjustments, - rename `Actor._service_n` -> `._service_tn` everywhere. - add asserts to `._runtime.async_main()` which ensure that the any `.trionics.maybe_open_nursery()` calls against optionally passed `._[root/service]_tn` are allocated-if-not-provided (the `._service_tn`-case being an i-guess-prep-for-the-future-anti-pattern Bp). - obvi adjust all internal usage to match new naming. Serious/real-use-case changes, - add (back) a `Actor._root_tn` which sits a scope "above" the service-tn and is either, + assigned in `._runtime.async_main()` for sub-actors OR, + assigned in `._root.open_root_actor()` for the root actor. **THE primary reason** to keep this "upper" tn is that during a full-`Actor`-cancellation condition (more details below) we want to ensure that the IPC connection with a sub-actor's parent is **the last thing to be cancelled**; this is most simply implemented by ensuring that the `Actor._parent_chan: .ipc.Channel` is handled in an upper scope in `_rpc.process_messages()`-subtask-terms. - for the root actor this `root_tn` is allocated in `.open_root_actor()` body and assigned as such. - extend `Actor.cancel_soon()` to be cohesive with this entire teardown "policy" by scheduling a task in the `._root_tn` which, * waits for the `._service_tn` to complete and then, * cancels the `._root_tn.cancel_scope`, * includes "sclangy" console logging throughout.
2025-08-19 23:24:20 +00:00
resources[ctx_key] = (service_tn, trio.Event())
yielded: Any = await service_tn.start(
_Cache.run_ctx,
mngr,
ctx_key,
)
_Cache.users[ctx_key] += 1
finally:
# XXX, since this runs from an `except` it's a checkpoint
# which can be `trio.Cancelled`-masked.
#
# NOTE, in that case the mutex is never released by the
# (first and) caching task and **we can't** simply shield
# bc that will inf-block on the `await
# no_more_users.wait()`.
#
# SO just always unlock!
lock.release()
try:
yield (
False, # cache_hit = "no"
yielded,
)
except trio.Cancelled as taskc:
maybe_taskc = taskc
log.cancel(
f'Cancelled from cache-miss entry\n'
f'\n'
f'ctx_key: {ctx_key!r}\n'
f'mngr: {mngr!r}\n'
)
# XXX, always unset ke from cancelled context
# since we never consider it a masked exc case!
# - bc this can be called directly ty `._rpc._invoke()`?
#
if maybe_taskc.__context__ is cache_miss_ke:
maybe_taskc.__context__ = None
raise taskc
else:
# XXX, cached-entry-path
_Cache.users[ctx_key] += 1
log.debug(
f'Re-using cached resource for user {_Cache.users}\n\n'
f'{ctx_key!r} -> {type(yielded)}\n'
# TODO: make this work with values but without
# `msgspec.Struct` causing frickin crashes on field-type
# lookups..
# f'{ctx_key!r} -> {yielded!r}\n'
)
lock.release()
yield (
True, # cache_hit = "yes"
yielded,
)
finally:
if lock.locked():
stats: trio.LockStatistics = lock.statistics()
owner: trio.Task|None = stats.owner
log.error(
f'Lock never released by last owner={owner!r} !?\n'
f'{stats}\n'
f'\n'
f'task={task!r}\n'
f'fid={fid!r}\n'
f'acm_func={acm_func}\n'
)
# XXX, trace it.
# await tractor.pause(shield=True)
_Cache.users[ctx_key] -= 1
if yielded is not _UnresolvedCtx:
# if no more consumers, teardown the client
if _Cache.users[ctx_key] <= 0:
log.debug(
f'De-allocating @acm-func entry\n'
f'ctx_key={ctx_key!r}\n'
f'acm_func={acm_func!r}\n'
)
2021-12-15 13:16:31 +00:00
# XXX: if we're cancelled we the entry may have never
# been entered since the nursery task was killed.
# _, no_more_users = _Cache.resources[ctx_key]
entry = _Cache.resources.get(ctx_key)
if entry:
_, no_more_users = entry
no_more_users.set()
if lock_registered:
maybe_lock = _Cache.locks.pop(
ctx_key,
None,
)
if maybe_lock is None:
log.error(
f'Resource lock for {ctx_key} ALREADY POPPED?'
)