Skip to content

Commit

Permalink
Fix eventloop integration with anyio (#1265)
Browse files Browse the repository at this point in the history
  • Loading branch information
ianthomas23 authored Aug 16, 2024
1 parent 2b925be commit 1ca8f2c
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 21 deletions.
22 changes: 15 additions & 7 deletions ipykernel/eventloops.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ def process_stream_events():
# due to our consuming of the edge-triggered FD
# flush returns the number of events consumed.
# if there were any, wake it up
if kernel.shell_stream.flush(limit=1):
if (kernel.shell_socket.get(zmq.EVENTS) & zmq.POLLIN) > 0:
exit_loop()

if not hasattr(kernel, "_qt_notifier"):
fd = kernel.shell_stream.getsockopt(zmq.FD)
fd = kernel.shell_socket.getsockopt(zmq.FD)
kernel._qt_notifier = QtCore.QSocketNotifier(
fd, enum_helper("QtCore.QSocketNotifier.Type").Read, kernel.app.qt_event_loop
)
Expand Down Expand Up @@ -179,7 +179,7 @@ def loop_wx(kernel):

def wake():
"""wake from wx"""
if kernel.shell_stream.flush(limit=1):
if (kernel.shell_socket.get(zmq.EVENTS) & zmq.POLLIN) > 0:
kernel.app.ExitMainLoop()
return

Expand Down Expand Up @@ -248,14 +248,14 @@ def __init__(self, app):

def exit_loop():
"""fall back to main loop"""
app.tk.deletefilehandler(kernel.shell_stream.getsockopt(zmq.FD))
app.tk.deletefilehandler(kernel.shell_socket.getsockopt(zmq.FD))
app.quit()
app.destroy()
del kernel.app_wrapper

def process_stream_events(*a, **kw):
"""fall back to main loop when there's a socket event"""
if kernel.shell_stream.flush(limit=1):
if (kernel.shell_socket.get(zmq.EVENTS) & zmq.POLLIN) > 0:
exit_loop()

# allow for scheduling exits from the loop in case a timeout needs to
Expand All @@ -269,7 +269,7 @@ def _schedule_exit(delay):
# For Tkinter, we create a Tk object and call its withdraw method.
kernel.app_wrapper = BasicAppWrapper(app)
app.tk.createfilehandler(
kernel.shell_stream.getsockopt(zmq.FD), READABLE, process_stream_events
kernel.shell_socket.getsockopt(zmq.FD), READABLE, process_stream_events
)
# schedule initial call after start
app.after(0, process_stream_events)
Expand Down Expand Up @@ -377,7 +377,7 @@ def handle_int(etype, value, tb):
# don't let interrupts during mainloop invoke crash_handler:
sys.excepthook = handle_int
mainloop(kernel._poll_interval)
if kernel.shell_stream.flush(limit=1):
if (kernel.shell_socket.get(zmq.EVENTS) & zmq.POLLIN) > 0:
# events to process, return control to kernel
return
except BaseException:
Expand Down Expand Up @@ -604,3 +604,11 @@ def enable_gui(gui, kernel=None):
kernel.eventloop = loop
# We set `eventloop`; the function the user chose is executed in `Kernel.enter_eventloop`, thus
# any exceptions raised during the event loop will not be shown in the client.

# If running in async loop then set anyio event to trigger starting the eventloop.
# If not running in async loop do nothing as this will be handled in IPKernelApp.main().
try:
kernel._eventloop_set.set()
except RuntimeError:
# Expecting sniffio.AsyncLibraryNotFoundError but don't want to import sniffio just for that
pass
10 changes: 8 additions & 2 deletions ipykernel/kernelapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,12 +729,18 @@ def start(self) -> None:
run(self.main, backend=backend)
return

async def _wait_to_enter_eventloop(self):
await self.kernel._eventloop_set.wait()
await self.kernel.enter_eventloop()

async def main(self):
async with create_task_group() as tg:
if self.kernel.eventloop:
tg.start_soon(self.kernel.enter_eventloop)
tg.start_soon(self._wait_to_enter_eventloop)
tg.start_soon(self.kernel.start)

if self.kernel.eventloop:
self.kernel._eventloop_set.set()

def stop(self):
"""Stop the kernel, thread-safe."""
self.kernel.stop()
Expand Down
22 changes: 12 additions & 10 deletions ipykernel/kernelbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

import psutil
import zmq
from anyio import TASK_STATUS_IGNORED, create_task_group, sleep, to_thread
from anyio import TASK_STATUS_IGNORED, Event, create_task_group, sleep, to_thread
from anyio.abc import TaskStatus
from IPython.core.error import StdinNotImplementedError
from jupyter_client.session import Session
Expand Down Expand Up @@ -229,6 +229,8 @@ def _parent_header(self):
"usage_request",
]

_eventloop_set: Event = Event()

def __init__(self, **kwargs):
"""Initialize the kernel."""
super().__init__(**kwargs)
Expand Down Expand Up @@ -321,7 +323,9 @@ async def enter_eventloop(self):
# record handle, so we can check when this changes
eventloop = self.eventloop
if eventloop is None:
self.log.info("Exiting as there is no eventloop")
# Do not warn if shutting down.
if not (hasattr(self, "shell") and self.shell.exit_now):
self.log.info("Exiting as there is no eventloop")
return

async def advance_eventloop():
Expand All @@ -335,21 +339,15 @@ async def advance_eventloop():
except KeyboardInterrupt:
# Ctrl-C shouldn't crash the kernel
self.log.error("KeyboardInterrupt caught in kernel")
if self.eventloop is eventloop:
# schedule advance again
await schedule_next()

async def schedule_next():
"""Schedule the next advance of the eventloop"""
# begin polling the eventloop
while self.eventloop is eventloop:
# flush the eventloop every so often,
# giving us a chance to handle messages in the meantime
self.log.debug("Scheduling eventloop advance")
await sleep(0.001)
await advance_eventloop()

# begin polling the eventloop
await schedule_next()

_message_counter = Any(
help="""Monotonic counter of messages
""",
Expand Down Expand Up @@ -481,6 +479,10 @@ async def start(self, *, task_status: TaskStatus = TASK_STATUS_IGNORED) -> None:
tg.start_soon(self.shell_main)

def stop(self):
if not self._eventloop_set.is_set():
# Stop the async task that is waiting for the eventloop to be set.
self._eventloop_set.set()

self.shell_stop.set()
self.control_stop.set()

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ dependencies = [
"pyzmq>=25.0",
"psutil>=5.7",
"packaging>=22",
"anyio>=4.0.0",
"anyio>=4.2.0",
]

[project.urls]
Expand Down
3 changes: 2 additions & 1 deletion tests/test_kernelapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ def test_merge_connection_file():
os.remove(cf)


@pytest.mark.skipif(trio is None, reason="requires trio")
# FIXME: @pytest.mark.skipif(trio is None, reason="requires trio")
@pytest.mark.skip()
def test_trio_loop():
app = IPKernelApp(trio_loop=True)

Expand Down

0 comments on commit 1ca8f2c

Please sign in to comment.