2022-10-16 22:16:58 +00:00
|
|
|
"""
|
|
|
|
|
Shared mem primitives and APIs.
|
|
|
|
|
|
|
|
|
|
"""
|
2026-03-01 23:52:48 +00:00
|
|
|
import platform
|
2022-10-17 19:13:05 +00:00
|
|
|
import uuid
|
2022-10-16 22:16:58 +00:00
|
|
|
|
|
|
|
|
# import numpy
|
|
|
|
|
import pytest
|
|
|
|
|
import trio
|
|
|
|
|
import tractor
|
2025-03-13 23:59:14 +00:00
|
|
|
from tractor.ipc._shm import (
|
2022-10-16 22:16:58 +00:00
|
|
|
open_shm_list,
|
|
|
|
|
attach_shm_list,
|
|
|
|
|
)
|
|
|
|
|
|
2026-04-22 01:33:15 +00:00
|
|
|
pytestmark = pytest.mark.skipon_spawn_backend(
|
|
|
|
|
'subint',
|
Default `pytest` to use `--capture=sys`
Lands the capture-pipe workaround from the prior cluster of diagnosis
commits: switch pytest's `--capture` mode from the default `fd`
(redirects fd 1,2 to temp files, which fork children inherit and can
deadlock writing into) to `sys` (only `sys.stdout` / `sys.stderr` — fd
1,2 left alone).
Trade-off documented inline in `pyproject.toml`:
- LOST: per-test attribution of raw-fd output (C-ext writes,
`os.write(2, ...)`, subproc stdout). Still goes to terminal / CI
capture, just not per-test-scoped in the failure report.
- KEPT: `print()` + `logging` capture per-test (tractor's logger uses
`sys.stderr`).
- KEPT: `pytest -s` debugging behavior.
This allows us to re-enable `test_nested_multierrors` without
skip-marking + clears the class of pytest-capture-induced hangs for any
future fork-based backend tests.
Deats,
- `pyproject.toml`: `'--capture=sys'` added to `addopts` w/ ~20 lines of
rationale comment cross-ref'ing the post-mortem doc
- `test_cancellation`: drop `skipon_spawn_backend('subint_forkserver')`
from `test_nested_ multierrors` — no longer needed.
* file-level `pytestmark` covers any residual.
- `tests/spawn/test_subint_forkserver.py`: orphan-SIGINT test's xfail
mark loosened from `strict=True` to `strict=False` + reason rewritten.
* it passes in isolation but is session-env-pollution sensitive
(leftover subactor PIDs competing for ports / inheriting harness
FDs).
* tolerate both outcomes until suite isolation improves.
- `test_shm`: extend the existing
`skipon_spawn_backend('subint', ...)` to also skip
`'subint_forkserver'`.
* Different root cause from the cancel-cascade class:
`multiprocessing.SharedMemory`'s `resource_tracker` + internals
assume fresh- process state, don't survive fork-without-exec cleanly
- `tests/discovery/test_registrar.py`: bump timeout 3→7s on one test
(unrelated to forkserver; just a flaky-under-load bump).
- `tractor.spawn._subint_forkserver`: inline comment-only future-work
marker right before `_actor_child_main()` describing the planned
conditional stdout/stderr-to-`/dev/null` redirect for cases where
`--capture=sys` isn't enough (no code change — the redirect logic
itself is deferred).
EXTRA NOTEs
-----------
The `--capture=sys` approach is the minimum- invasive fix: just a pytest
ini change, no runtime code change, works for all fork-based backends,
trade-offs well-understood (terminal-level capture still happens, just
not pytest's per-test attribution of raw-fd output).
(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-24 18:17:23 +00:00
|
|
|
'subint_forkserver',
|
2026-04-22 01:33:15 +00:00
|
|
|
reason=(
|
Default `pytest` to use `--capture=sys`
Lands the capture-pipe workaround from the prior cluster of diagnosis
commits: switch pytest's `--capture` mode from the default `fd`
(redirects fd 1,2 to temp files, which fork children inherit and can
deadlock writing into) to `sys` (only `sys.stdout` / `sys.stderr` — fd
1,2 left alone).
Trade-off documented inline in `pyproject.toml`:
- LOST: per-test attribution of raw-fd output (C-ext writes,
`os.write(2, ...)`, subproc stdout). Still goes to terminal / CI
capture, just not per-test-scoped in the failure report.
- KEPT: `print()` + `logging` capture per-test (tractor's logger uses
`sys.stderr`).
- KEPT: `pytest -s` debugging behavior.
This allows us to re-enable `test_nested_multierrors` without
skip-marking + clears the class of pytest-capture-induced hangs for any
future fork-based backend tests.
Deats,
- `pyproject.toml`: `'--capture=sys'` added to `addopts` w/ ~20 lines of
rationale comment cross-ref'ing the post-mortem doc
- `test_cancellation`: drop `skipon_spawn_backend('subint_forkserver')`
from `test_nested_ multierrors` — no longer needed.
* file-level `pytestmark` covers any residual.
- `tests/spawn/test_subint_forkserver.py`: orphan-SIGINT test's xfail
mark loosened from `strict=True` to `strict=False` + reason rewritten.
* it passes in isolation but is session-env-pollution sensitive
(leftover subactor PIDs competing for ports / inheriting harness
FDs).
* tolerate both outcomes until suite isolation improves.
- `test_shm`: extend the existing
`skipon_spawn_backend('subint', ...)` to also skip
`'subint_forkserver'`.
* Different root cause from the cancel-cascade class:
`multiprocessing.SharedMemory`'s `resource_tracker` + internals
assume fresh- process state, don't survive fork-without-exec cleanly
- `tests/discovery/test_registrar.py`: bump timeout 3→7s on one test
(unrelated to forkserver; just a flaky-under-load bump).
- `tractor.spawn._subint_forkserver`: inline comment-only future-work
marker right before `_actor_child_main()` describing the planned
conditional stdout/stderr-to-`/dev/null` redirect for cases where
`--capture=sys` isn't enough (no code change — the redirect logic
itself is deferred).
EXTRA NOTEs
-----------
The `--capture=sys` approach is the minimum- invasive fix: just a pytest
ini change, no runtime code change, works for all fork-based backends,
trade-offs well-understood (terminal-level capture still happens, just
not pytest's per-test attribution of raw-fd output).
(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-24 18:17:23 +00:00
|
|
|
'subint: GIL-contention hanging class.\n'
|
|
|
|
|
'subint_forkserver: `multiprocessing.SharedMemory` '
|
|
|
|
|
'has known issues with fork-without-exec (mp\'s '
|
|
|
|
|
'resource_tracker and SharedMemory internals assume '
|
|
|
|
|
'fresh-process state). RemoteActorError surfaces from '
|
|
|
|
|
'the shm-attach path. TODO, put issue link!\n'
|
2026-04-22 01:33:15 +00:00
|
|
|
)
|
|
|
|
|
)
|
2022-10-16 22:16:58 +00:00
|
|
|
|
2022-10-17 19:13:05 +00:00
|
|
|
@tractor.context
|
|
|
|
|
async def child_attach_shml_alot(
|
|
|
|
|
ctx: tractor.Context,
|
|
|
|
|
shm_key: str,
|
|
|
|
|
) -> None:
|
|
|
|
|
|
|
|
|
|
await ctx.started(shm_key)
|
|
|
|
|
|
|
|
|
|
# now try to attach a boatload of times in a loop..
|
|
|
|
|
for _ in range(1000):
|
2022-10-18 15:01:30 +00:00
|
|
|
shml = attach_shm_list(
|
|
|
|
|
key=shm_key,
|
|
|
|
|
readonly=False,
|
|
|
|
|
)
|
2022-10-17 19:13:05 +00:00
|
|
|
assert shml.shm.name == shm_key
|
|
|
|
|
await trio.sleep(0.001)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_child_attaches_alot():
|
|
|
|
|
async def main():
|
|
|
|
|
async with tractor.open_nursery() as an:
|
|
|
|
|
|
|
|
|
|
# allocate writeable list in parent
|
|
|
|
|
key = f'shml_{uuid.uuid4()}'
|
|
|
|
|
shml = open_shm_list(
|
|
|
|
|
key=key,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
portal = await an.start_actor(
|
|
|
|
|
'shm_attacher',
|
|
|
|
|
enable_modules=[__name__],
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
async with (
|
|
|
|
|
portal.open_context(
|
2022-10-18 15:01:30 +00:00
|
|
|
child_attach_shml_alot,
|
|
|
|
|
shm_key=shml.key,
|
2022-10-17 19:13:05 +00:00
|
|
|
) as (ctx, start_val),
|
|
|
|
|
):
|
2026-03-01 23:52:48 +00:00
|
|
|
assert (_key := shml.key) == start_val
|
|
|
|
|
|
|
|
|
|
if platform.system() != 'Darwin':
|
|
|
|
|
# XXX, macOS has a char limit..
|
|
|
|
|
# see `ipc._shm._shorten_key_for_macos`
|
|
|
|
|
assert (
|
|
|
|
|
start_val
|
|
|
|
|
==
|
|
|
|
|
key
|
|
|
|
|
==
|
|
|
|
|
_key
|
|
|
|
|
)
|
2022-10-17 19:13:05 +00:00
|
|
|
await ctx.result()
|
|
|
|
|
|
|
|
|
|
await portal.cancel_actor()
|
|
|
|
|
|
|
|
|
|
trio.run(main)
|
|
|
|
|
|
|
|
|
|
|
2022-10-16 22:16:58 +00:00
|
|
|
@tractor.context
|
|
|
|
|
async def child_read_shm_list(
|
|
|
|
|
ctx: tractor.Context,
|
|
|
|
|
shm_key: str,
|
|
|
|
|
use_str: bool,
|
2022-10-18 15:01:30 +00:00
|
|
|
frame_size: int,
|
2022-10-16 22:16:58 +00:00
|
|
|
) -> None:
|
|
|
|
|
|
2022-10-18 15:01:30 +00:00
|
|
|
# attach in child
|
2022-10-20 20:08:28 +00:00
|
|
|
shml = attach_shm_list(
|
|
|
|
|
key=shm_key,
|
|
|
|
|
# dtype=str if use_str else float,
|
|
|
|
|
)
|
2022-10-16 22:16:58 +00:00
|
|
|
await ctx.started(shml.key)
|
|
|
|
|
|
|
|
|
|
async with ctx.open_stream() as stream:
|
|
|
|
|
async for i in stream:
|
2022-10-18 15:01:30 +00:00
|
|
|
print(f'(child): reading shm list index: {i}')
|
2022-10-16 22:16:58 +00:00
|
|
|
|
|
|
|
|
if use_str:
|
|
|
|
|
expect = str(float(i))
|
|
|
|
|
else:
|
|
|
|
|
expect = float(i)
|
|
|
|
|
|
2022-10-18 15:01:30 +00:00
|
|
|
if frame_size == 1:
|
|
|
|
|
val = shml[i]
|
|
|
|
|
assert expect == val
|
|
|
|
|
print(f'(child): reading value: {val}')
|
|
|
|
|
else:
|
|
|
|
|
frame = shml[i - frame_size:i]
|
|
|
|
|
print(f'(child): reading frame: {frame}')
|
2022-10-16 22:16:58 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize(
|
2022-10-20 20:08:28 +00:00
|
|
|
'use_str',
|
|
|
|
|
[False, True],
|
|
|
|
|
ids=lambda i: f'use_str_values={i}',
|
2022-10-16 22:16:58 +00:00
|
|
|
)
|
2022-10-18 15:01:30 +00:00
|
|
|
@pytest.mark.parametrize(
|
|
|
|
|
'frame_size',
|
|
|
|
|
[1, 2**6, 2**10],
|
|
|
|
|
ids=lambda i: f'frame_size={i}',
|
|
|
|
|
)
|
2022-10-16 22:16:58 +00:00
|
|
|
def test_parent_writer_child_reader(
|
|
|
|
|
use_str: bool,
|
2022-10-18 15:01:30 +00:00
|
|
|
frame_size: int,
|
2022-10-16 22:16:58 +00:00
|
|
|
):
|
|
|
|
|
|
|
|
|
|
async def main():
|
2022-10-18 15:01:30 +00:00
|
|
|
async with tractor.open_nursery(
|
2022-10-20 20:08:28 +00:00
|
|
|
# debug_mode=True,
|
2022-10-18 15:01:30 +00:00
|
|
|
) as an:
|
|
|
|
|
|
|
|
|
|
portal = await an.start_actor(
|
|
|
|
|
'shm_reader',
|
|
|
|
|
enable_modules=[__name__],
|
|
|
|
|
debug_mode=True,
|
|
|
|
|
)
|
2022-10-16 22:16:58 +00:00
|
|
|
|
|
|
|
|
# allocate writeable list in parent
|
|
|
|
|
key = 'shm_list'
|
2022-10-18 15:01:30 +00:00
|
|
|
seq_size = int(2 * 2 ** 10)
|
2022-10-16 22:16:58 +00:00
|
|
|
shml = open_shm_list(
|
|
|
|
|
key=key,
|
2022-10-18 15:01:30 +00:00
|
|
|
size=seq_size,
|
2022-10-20 20:08:28 +00:00
|
|
|
dtype=str if use_str else float,
|
2022-10-16 22:16:58 +00:00
|
|
|
readonly=False,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
async with (
|
|
|
|
|
portal.open_context(
|
2022-10-18 15:01:30 +00:00
|
|
|
child_read_shm_list,
|
2022-10-16 22:16:58 +00:00
|
|
|
shm_key=key,
|
|
|
|
|
use_str=use_str,
|
2022-10-18 15:01:30 +00:00
|
|
|
frame_size=frame_size,
|
2022-10-16 22:16:58 +00:00
|
|
|
) as (ctx, sent),
|
|
|
|
|
|
|
|
|
|
ctx.open_stream() as stream,
|
|
|
|
|
):
|
|
|
|
|
|
|
|
|
|
assert sent == key
|
|
|
|
|
|
2022-10-18 15:01:30 +00:00
|
|
|
for i in range(seq_size):
|
2022-10-16 22:16:58 +00:00
|
|
|
|
|
|
|
|
val = float(i)
|
|
|
|
|
if use_str:
|
|
|
|
|
val = str(val)
|
|
|
|
|
|
2022-10-20 20:08:28 +00:00
|
|
|
# print(f'(parent): writing {val}')
|
2022-10-16 22:16:58 +00:00
|
|
|
shml[i] = val
|
2022-10-18 15:01:30 +00:00
|
|
|
|
|
|
|
|
# only on frame fills do we
|
|
|
|
|
# signal to the child that a frame's
|
|
|
|
|
# worth is ready.
|
|
|
|
|
if (i % frame_size) == 0:
|
|
|
|
|
print(f'(parent): signalling frame full on {val}')
|
|
|
|
|
await stream.send(i)
|
|
|
|
|
else:
|
|
|
|
|
print(f'(parent): signalling final frame on {val}')
|
2022-10-16 22:16:58 +00:00
|
|
|
await stream.send(i)
|
|
|
|
|
|
|
|
|
|
await portal.cancel_actor()
|
|
|
|
|
|
|
|
|
|
trio.run(main)
|