Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pytest-anyio and crashed background task in taskgroup fixture #805

Open
2 tasks done
jakkdl opened this issue Oct 11, 2024 · 18 comments
Open
2 tasks done

pytest-anyio and crashed background task in taskgroup fixture #805

jakkdl opened this issue Oct 11, 2024 · 18 comments
Labels
bug Something isn't working

Comments

@jakkdl
Copy link

jakkdl commented Oct 11, 2024

Things to check first

  • I have searched the existing issues and didn't find my bug already reported there

  • I have checked that my bug is still present in the latest release

AnyIO version

4.6.0

Python version

3.12.4

What happened?

I'm encountering several weird things, where it will either hang in weird places or crash.

This is from trying to rewrite pytest-trio and encountering the test that was added after python-trio/pytest-trio#77 in python-trio/pytest-trio#83

How can we reproduce the bug?

import anyio
import pytest
from contextlib import asynccontextmanager

my_event = anyio.Event()
async def die_soon(task_status):
    task_status.started()
    await my_event.wait()
    raise RuntimeError('OOPS')


@asynccontextmanager
async def my_simple_fixture():
    async with anyio.create_task_group() as tg:
        await tg.start(die_soon)
        yield

@pytest.mark.anyio
async def test_try():
    async with my_simple_fixture():
        my_event.set()

Running this with trio as the backend gives:

[...]
  |   File "/tmp/anyio_pytest/bar.py", line 14, in my_simple_fixture
  |     async with anyio.create_task_group() as tg:
  |   File "/tmp/anyio_pytest/.venv/lib/python3.12/site-packages/anyio/_backends/_trio.py", line 187, in __aexit__
  |     return await self._nursery_manager.__aexit__(exc_type, exc_val, exc_tb)
  |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "/tmp/anyio_pytest/.venv/lib/python3.12/site-packages/trio/_core/_run.py", line 959, in __aexit__
  |     raise combined_error_from_nursery
  | ExceptionGroup: Exceptions from Trio nursery (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/tmp/anyio_pytest/bar.py", line 8, in die_soon
    |     await my_event.wait()
    |   File "/tmp/anyio_pytest/.venv/lib/python3.12/site-packages/anyio/_core/_synchronization.py", line 130, in wait
    |     await self._event.wait()
    |   File "/tmp/anyio_pytest/.venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 1716, in wait
    |     await AsyncIOBackend.checkpoint()
    |   File "/tmp/anyio_pytest/.venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 2264, in checkpoint
    |     await sleep(0)
    |   File "/usr/lib/python3.12/asyncio/tasks.py", line 656, in sleep
    |     await __sleep0()
    |   File "/usr/lib/python3.12/asyncio/tasks.py", line 650, in __sleep0
    |     yield
    | TypeError: trio.run received unrecognized yield message None. Are you trying to use a library written for some other framework like asyncio? That won't work without some kind of compatibility shim.

if I remove the decorator and directly run anyio.run(test_try, backend="trio") it correctly gives a group with our "OOPS" RuntimeError, same if running anyio-pytest with asyncio as backend.

2

This gives a teardown error and a messy traceback

import anyio
import pytest

@pytest.fixture
def anyio_backend():
    return 'asyncio'

async def die_soon():
    raise RuntimeError('OOPS')


@pytest.fixture
async def my_simple_fixture():
    async with anyio.create_task_group() as tg:
        tg.start_soon(die_soon)
        yield

async def test_try(my_simple_fixture, anyio_backend):
    ...

Error:

$ pytest bar.py -sv
===================================== test session starts =====================================
platform linux -- Python 3.12.4, pytest-8.3.3, pluggy-1.5.0 -- /tmp/anyio_pytest/.venv/bin/python
cachedir: .pytest_cache
rootdir: /tmp/anyio_pytest
plugins: anyio-4.6.0, trio-0.8.0
collected 1 item                                                                              

bar.py::test_try FAILED
bar.py::test_try ERROR

=========================================== ERRORS ============================================
________________________________ ERROR at teardown of test_try ________________________________

anyio_backend = 'asyncio', args = (), kwargs = {}, backend_name = 'asyncio'
backend_options = {}, runner = <anyio._backends._asyncio.TestRunner object at 0x71c2350cf260>

    def wrapper(*args, anyio_backend, **kwargs):  # type: ignore[no-untyped-def]
        backend_name, backend_options = extract_backend_and_options(anyio_backend)
        if has_backend_arg:
            kwargs["anyio_backend"] = anyio_backend
    
        with get_runner(backend_name, backend_options) as runner:
            if isasyncgenfunction(func):
>               yield from runner.run_asyncgen_fixture(func, kwargs)

.venv/lib/python3.12/site-packages/anyio/pytest_plugin.py:81: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py:2187: in run_asyncgen_fixture
    self.get_loop().run_until_complete(
/usr/lib/python3.12/asyncio/base_events.py:687: in run_until_complete
    return future.result()
.venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py:2170: in _call_in_runner_task
    self._send_stream.send_nowait((coro, future))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = MemoryObjectSendStream(_state=MemoryObjectStreamState(max_buffer_size=1, buffer=deque([]), open_send_channels=0, open_receive_channels=0, waiting_receivers=OrderedDict(), waiting_senders=OrderedDict()), _closed=True)
item = (<async_generator_asend object at 0x71c23487d2c0>, <Future pending>)

    def send_nowait(self, item: T_contra) -> None:
        """
        Send an item immediately if it can be done without waiting.
    
        :param item: the item to send
        :raises ~anyio.ClosedResourceError: if this send stream has been closed
        :raises ~anyio.BrokenResourceError: if the stream has been closed from the
            receiving end
        :raises ~anyio.WouldBlock: if the buffer is full and there are no tasks waiting
            to receive
    
        """
        if self._closed:
>           raise ClosedResourceError
E           anyio.ClosedResourceError

.venv/lib/python3.12/site-packages/anyio/streams/memory.py:211: ClosedResourceError
========================================== FAILURES ===========================================
__________________________________________ test_try ___________________________________________

pyfuncitem = <Function test_try>

    @pytest.hookimpl(tryfirst=True)
    def pytest_pyfunc_call(pyfuncitem: Any) -> bool | None:
        def run_with_hypothesis(**kwargs: Any) -> None:
            with get_runner(backend_name, backend_options) as runner:
                runner.run_test(original_func, kwargs)
    
        backend = pyfuncitem.funcargs.get("anyio_backend")
        if backend:
            backend_name, backend_options = extract_backend_and_options(backend)
    
            if hasattr(pyfuncitem.obj, "hypothesis"):
                # Wrap the inner test function unless it's already wrapped
                original_func = pyfuncitem.obj.hypothesis.inner_test
                if original_func.__qualname__ != run_with_hypothesis.__qualname__:
                    if iscoroutinefunction(original_func):
                        pyfuncitem.obj.hypothesis.inner_test = run_with_hypothesis
    
                return None
    
            if iscoroutinefunction(pyfuncitem.obj):
                funcargs = pyfuncitem.funcargs
                testargs = {arg: funcargs[arg] for arg in pyfuncitem._fixtureinfo.argnames}
                with get_runner(backend_name, backend_options) as runner:
                    try:
>                       runner.run_test(pyfuncitem.obj, testargs)

.venv/lib/python3.12/site-packages/anyio/pytest_plugin.py:131: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py:2217: in run_test
    self._raise_async_exceptions()
.venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py:2121: in _raise_async_exceptions
    raise exceptions[0]
.venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py:2211: in run_test
    self.get_loop().run_until_complete(
/usr/lib/python3.12/asyncio/base_events.py:687: in run_until_complete
    return future.result()
.venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py:2170: in _call_in_runner_task
    self._send_stream.send_nowait((coro, future))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = MemoryObjectSendStream(_state=MemoryObjectStreamState(max_buffer_size=1, buffer=deque([]), open_send_channels=0, open_receive_channels=0, waiting_receivers=OrderedDict(), waiting_senders=OrderedDict()), _closed=True)
item = (<coroutine object test_try at 0x71c2350a3280>, <Future pending>)

    def send_nowait(self, item: T_contra) -> None:
        """
        Send an item immediately if it can be done without waiting.
    
        :param item: the item to send
        :raises ~anyio.ClosedResourceError: if this send stream has been closed
        :raises ~anyio.BrokenResourceError: if the stream has been closed from the
            receiving end
        :raises ~anyio.WouldBlock: if the buffer is full and there are no tasks waiting
            to receive
    
        """
        if self._closed:
>           raise ClosedResourceError
E           anyio.ClosedResourceError

.venv/lib/python3.12/site-packages/anyio/streams/memory.py:211: ClosedResourceError
=================================== short test summary info ===================================
FAILED bar.py::test_try - anyio.ClosedResourceError
ERROR bar.py::test_try - anyio.ClosedResourceError
================================= 1 failed, 1 error in 0.30s ==================================

3

but if we make anyio_backend return "trio" we instead get a hang. KeyboardInterrupt traceback ends with


self = <Condition(<unlocked _thread.lock object at 0x7951089ac5c0>, 0)>, timeout = None

    def wait(self, timeout=None):
        """Wait until notified or until a timeout occurs.
    
        If the calling thread has not acquired the lock when this method is
        called, a RuntimeError is raised.
    
        This method releases the underlying lock, and then blocks until it is
        awakened by a notify() or notify_all() call for the same condition
        variable in another thread, or until the optional timeout occurs. Once
        awakened or timed out, it re-acquires the lock and returns.
    
        When the timeout argument is present and not None, it should be a
        floating point number specifying a timeout for the operation in seconds
        (or fractions thereof).
    
        When the underlying lock is an RLock, it is not released using its
        release() method, since this may not actually unlock the lock when it
        was acquired multiple times recursively. Instead, an internal interface
        of the RLock class is used, which really unlocks it even when it has
        been recursively acquired several times. Another internal interface is
        then used to restore the recursion level when the lock is reacquired.
    
        """
        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")
        waiter = _allocate_lock()
        waiter.acquire()
        self._waiters.append(waiter)
        saved_state = self._release_save()
        gotit = False
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
>               waiter.acquire()
E               KeyboardInterrupt

/usr/lib/python3.12/threading.py:355: KeyboardInterrupt
====================================== 1 passed in 1.16s ======================================
Exception ignored in: <async_generator object my_simple_fixture at 0x7951089bd000>
Traceback (most recent call last):
  File "/tmp/anyio_pytest/.venv/lib/python3.12/site-packages/trio/_core/_asyncgens.py", line 123, in finalizer
    raise RuntimeError(
RuntimeError: Non-Trio async generator 'bar.my_simple_fixture' awaited something during finalization; install a finalization hook to support this, or wrap it in 'async with aclosing(...):'
@jakkdl jakkdl added the bug Something isn't working label Oct 11, 2024
@agronholm
Copy link
Owner

agronholm commented Oct 12, 2024

How did you run the first snippet with trio as backend? I put it in a single test module, and ran pytest with the module as argument plus -k trio. I didn't see any issues there. I can repro the issue with snippet 2, but as for snippet 3, I don't get that RuntimeError.

@jakkdl
Copy link
Author

jakkdl commented Oct 16, 2024

How did you run the first snippet with trio as backend? I put it in a single test module, and ran pytest with the module as argument plus -k trio. I didn't see any issues there. I can repro the issue with snippet 2, but as for snippet 3, I don't get that RuntimeError.

ah hmm, for snippet 1 I only get it when I run both asyncio&trio, so I suppose it's something about the fixture not being torn down & re-set up properly? I tested it just now with pytest-random-order and if running trio 1st and asyncio 2nd I get a similar error:

  |   File "/tmp/anyio_pytest/foo_1.py", line 14, in my_simple_fixture
  |     async with anyio.create_task_group() as tg:
  |   File "/tmp/anyio_pytest/.venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 736, in __aexit__
  |     raise BaseExceptionGroup(
  | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/tmp/anyio_pytest/foo_1.py", line 8, in die_soon
    |     await my_event.wait()
    |   File "/tmp/anyio_pytest/.venv/lib/python3.12/site-packages/anyio/_core/_synchronization.py", line 130, in wait
    |     await self._event.wait()
    |   File "/tmp/anyio_pytest/.venv/lib/python3.12/site-packages/anyio/_backends/_trio.py", line 647, in wait
    |     return await self.__original.wait()
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/tmp/anyio_pytest/.venv/lib/python3.12/site-packages/trio/_sync.py", line 86, in wait
    |     await trio.lowlevel.checkpoint()
    |   File "/tmp/anyio_pytest/.venv/lib/python3.12/site-packages/trio/_core/_run.py", line 2788, in checkpoint
    |     await cancel_shielded_checkpoint()
    |   File "/tmp/anyio_pytest/.venv/lib/python3.12/site-packages/trio/_core/_traps.py", line 51, in cancel_shielded_checkpoint
    |     (await _async_yield(CancelShieldedCheckpoint)).unwrap()
    |      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/tmp/anyio_pytest/.venv/lib/python3.12/site-packages/trio/_core/_traps.py", line 29, in _async_yield
    |     return (yield obj)
    |             ^^^^^^^^^
    | RuntimeError: Task got bad yield: <class 'trio._core._traps.CancelShieldedCheckpoint'>
    +------------------------------------

Not sure how to help you repro the 2nd snippet, I can reliably re-repro it in new venv's.

@agronholm
Copy link
Owner

I noticed that you had a global Event object, but it will bind to the first back-end it's used with, so that won't work in a test suite involving two back-ends. Make it a fixture instead.

@jakkdl
Copy link
Author

jakkdl commented Oct 16, 2024

I noticed that you had a global Event object, but it will bind to the first back-end it's used with, so that won't work in a test suite involving two back-ends. Make it a fixture instead.

Ah thanks. Making it a fixture resolved it when using my_simple_fixture as a context manager, but switching it back to being a fixture resurfaces problems

import anyio
import pytest

@pytest.fixture
def my_event():
    return anyio.Event()

async def die_soon(my_event, task_status):
    print("entering die_soon")
    task_status.started()
    await my_event.wait()
    print("crashing")
    raise RuntimeError('OOPS')

@pytest.fixture
async def my_simple_fixture(my_event):
    async with anyio.create_task_group() as tg:
        await tg.start(die_soon, my_event)
        print("yielding")
        yield
        print("continuing")

@pytest.mark.anyio
async def test_try(my_simple_fixture, my_event):
    print("in test")
    my_event.set()
    print("end of test")

I'm now getting anyio.ClosedResourceError on asyncio and a hang on trio, but if i random-order to get trio first I get trio passing and then a hang.

@agronholm
Copy link
Owner

This randomness is due to Trio's non-deterministic scheduling. If you know how to turn that off, please let me know! It's supremely annoying when running tests.

@jakkdl
Copy link
Author

jakkdl commented Oct 16, 2024

This randomness is due to Trio's non-deterministic scheduling. If you know how to turn that off, please let me know! It's supremely annoying when running tests.

sorry, with random-order I meant running with https://pypi.org/project/pytest-random-order/ in order to see if running trio before asyncio was different from running asyncio before trio.

@smurfix
Copy link
Collaborator

smurfix commented Oct 17, 2024

If you know how to turn that off, please let me know! It's supremely annoying when running tests.

Well, that's easy.

env PYTHONHASHSEED=123 pytest …

Also, in conftest.py:

from trio._core import _run
_run._ALLOW_DETERMINISTIC_SCHEDULING = True

@agronholm
Copy link
Owner

I think I'll ask around about this particular variable and its future.

@smurfix
Copy link
Collaborator

smurfix commented Oct 17, 2024

On second look a better solution is to call _run._r.seed(123) instead, but the same caveat applies.

Trio should probably export a way to explicitly set that seed. I'd recommend to submit a PR.

@agronholm
Copy link
Owner

It's not just the random scheduling I'd like to tackle, but I'd really like to make trio schedule tasks in the FIFO order. Does _ALLOW_DETERMINISTIC_SCHEDULING = True do that?

@smurfix
Copy link
Collaborator

smurfix commented Oct 18, 2024

No. If you want that, you need to monkeypatch run._r.random() to always return >= 0.5.

@agronholm
Copy link
Owner

I think this and #614 should probably be fixed together as part of a larger overhaul of the pytest plugin.

@agronholm
Copy link
Owner

I started experimenting, but I'm not sure what the desired end result is with code like this:

class TestFailingFixtureTaskGroup:
    @pytest.fixture
    async def failing_task(self) -> AsyncGenerator[Event]:
        event = Event()

        async def die_soon(task_status: TaskStatus[None]) -> NoReturn:
            task_status.started()
            await event.wait()
            raise RuntimeError("OOPS")

        async with create_task_group() as tg:
            await tg.start(die_soon)
            yield event

    async def test_tg_crash(self, failing_task: Event) -> None:
        failing_task.set()
        await checkpoint()

Is the expectation that the teardown phase errors with an exception group containing the RuntimeError("OOPS")? As I see it, the exception from the async fixture isn't happening soon enough for the test itself to fail.

@agronholm
Copy link
Owner

I think we should visualize the above test and fixture like this:

async def test_tg_crash() -> None:
    event = Event()

    async def die_soon(task_status: TaskStatus[None]) -> NoReturn:
        task_status.started()
        await event.wait()
        raise RuntimeError("OOPS")

    async with create_task_group() as tg:
        await tg.start(die_soon)
        event.set()
        await checkpoint()

What do we want to happen when a task group task is cancelled? Since everything runs in the single "runner" task, a failing task group will cause that task to be cancelled. If, on the other hand, we run asyncgen fixtures in separate tasks, they won't be able to set contextvars for the test and the other fixtures.

@jakkdl
Copy link
Author

jakkdl commented Jan 6, 2025

I started experimenting, but I'm not sure what the desired end result is with code like this:

class TestFailingFixtureTaskGroup:
    @pytest.fixture
    async def failing_task(self) -> AsyncGenerator[Event]:
        event = Event()

        async def die_soon(task_status: TaskStatus[None]) -> NoReturn:
            task_status.started()
            await event.wait()
            raise RuntimeError("OOPS")

        async with create_task_group() as tg:
            await tg.start(die_soon)
            yield event

    async def test_tg_crash(self, failing_task: Event) -> None:
        failing_task.set()
        await checkpoint()

Is the expectation that the teardown phase errors with an exception group containing the RuntimeError("OOPS")? As I see it, the exception from the async fixture isn't happening soon enough for the test itself to fail.

I'm not sure if it matters a ton, since teardown error will cause the pytest run to fail & you can see which test caused it. Maybe a bit conceptually nicer to have it as a teardown error

@jakkdl
Copy link
Author

jakkdl commented Jan 6, 2025

I think we should visualize the above test and fixture like this:

async def test_tg_crash() -> None:
    event = Event()

    async def die_soon(task_status: TaskStatus[None]) -> NoReturn:
        task_status.started()
        await event.wait()
        raise RuntimeError("OOPS")

    async with create_task_group() as tg:
        await tg.start(die_soon)
        event.set()
        await checkpoint()

What do we want to happen when a task group task is cancelled? Since everything runs in the single "runner" task, a failing task group will cause that task to be cancelled. If, on the other hand, we run asyncgen fixtures in separate tasks, they won't be able to set contextvars for the test and the other fixtures.

What's the downsides of cancelling the runner task?

@agronholm
Copy link
Owner

I think we should visualize the above test and fixture like this:

async def test_tg_crash() -> None:
    event = Event()

    async def die_soon(task_status: TaskStatus[None]) -> NoReturn:
        task_status.started()
        await event.wait()
        raise RuntimeError("OOPS")

    async with create_task_group() as tg:
        await tg.start(die_soon)
        event.set()
        await checkpoint()

What do we want to happen when a task group task is cancelled? Since everything runs in the single "runner" task, a failing task group will cause that task to be cancelled. If, on the other hand, we run asyncgen fixtures in separate tasks, they won't be able to set contextvars for the test and the other fixtures.

What's the downsides of cancelling the runner task?

"Confusion" is the first answer that comes to mind. The same thing that prompted this issue. I at least would be confused about a CancelledError coming out of a test run or teardown. If a task somewhere down the line raises an exception, that exception should bubble up to either the task run or the teardown, depending on the timing.

Also, what will happen to task groups and cancel scopes created in other async fixtures, if one of them cancels the runner task? I haven't really given it much thought until now.

@agronholm
Copy link
Owner

I think what I'm trying to say we should try to accomplish here:

  1. Ensure that the actual exception bubbles up to the test runner
  2. Make sure that tests not related to the failing async fixture can still run successfully

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants