asyncio exception handler: not getting called until event loop thread stopped

  • Last Update :
  • Techknowledgy :

Here is the fixed code (without exceptions and the exception handler):

import asyncio
from threading
import Thread

async def coro():
   print("in coro")
return 42

loop = asyncio.get_event_loop()
thread = Thread(target = loop.run_forever)
thread.start()

fut = asyncio.run_coroutine_threadsafe(coro(), loop)

print(fut.result())

loop.call_soon_threadsafe(loop.stop)

thread.join()

call_soon_threadsafe() returns a future object which holds the exception (it does not get to the default exception handler):

import asyncio
from pprint
import pprint
from threading
import Thread

def exception_handler(loop, context):
   print('Exception handler called')
pprint(context)

loop = asyncio.get_event_loop()

loop.set_exception_handler(exception_handler)

thread = Thread(target = loop.run_forever)
thread.start()

async def coro():
   print("coro")
raise RuntimeError("BOOM!")

fut = asyncio.run_coroutine_threadsafe(coro(), loop)
try:
print("success:", fut.result())
except:
   print("exception:", fut.exception())

loop.call_soon_threadsafe(loop.stop)

thread.join()

However, coroutines that are called using create_task() or ensure_future() will call the exception_handler:

async def coro2():
   print("coro2")
raise RuntimeError("BOOM2!")

async def coro():
   loop.create_task(coro2())
print("coro")
raise RuntimeError("BOOM!")

Suggestion : 2

If there is no running event loop a RuntimeError is raised. This function can only be called from a coroutine or a callback.,Because this function has rather complex behavior (especially when custom event loop policies are in use), using the get_running_loop() function is preferred to get_event_loop() in coroutines and callbacks.,Deprecated since version 3.10: Deprecation warning is emitted if there is no running event loop. In future Python releases, this function will be an alias of get_running_loop().,Raises RuntimeError if called on a loop that’s been closed. This can happen on a secondary thread when the main application is shutting down.

try:
loop.run_forever()
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
# will schedule "print("
Hello ", flush=True)"
loop.call_soon(
   functools.partial(print, "Hello", flush = True))
import asyncio
import concurrent.futures

def blocking_io():
   # File operations(such as logging) can block the
# event loop: run them in a thread pool.
with open('/dev/urandom', 'rb') as f:
   return f.read(100)

def cpu_bound():
   # CPU - bound operations will block the event loop:
   # in general it is preferable to run them in a
# process pool.
return sum(i * i
   for i in range(10 ** 7))

async def main():
   loop = asyncio.get_running_loop()

# # Options:

   # 1. Run in the
default loop 's executor:
result = await loop.run_in_executor(
   None, blocking_io)
print('default thread pool', result)

# 2. Run in a custom thread pool:
   with concurrent.futures.ThreadPoolExecutor() as pool:
   result = await loop.run_in_executor(
      pool, blocking_io)
print('custom thread pool', result)

# 3. Run in a custom process pool:
   with concurrent.futures.ProcessPoolExecutor() as pool:
   result = await loop.run_in_executor(
      pool, cpu_bound)
print('custom process pool', result)

asyncio.run(main())
srv = await loop.create_server(...)

async with srv:
   # some code

# At this point, srv is closed and no longer accepts new connections.
async def client_connected(reader, writer):
   # Communicate with the client with
# reader / writer streams.For example:
   await reader.readline()

async def main(host, port):
   srv = await asyncio.start_server(
      client_connected, host, port)
await srv.serve_forever()

asyncio.run(main('127.0.0.1', 0))
import asyncio
import selectors

selector = selectors.SelectSelector()
loop = asyncio.SelectorEventLoop(selector)
asyncio.set_event_loop(loop)

Suggestion : 3

Delegating costly function calls to a pool of threads.,AbstractEventLoop.create_connection() and asyncio.open_connection().,Call a function in an Executor (pool of threads or pool of processes). By default, an event loop uses a thread pool executor (ThreadPoolExecutor).,This method was added in Python 3.4.2. Use the async() function to support also older Python versions.

try:
loop.run_forever()
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
import asyncio

def hello_world(loop):
   print('Hello World')
loop.stop()

loop = asyncio.get_event_loop()

# Schedule a call to hello_world()
loop.call_soon(hello_world, loop)

# Blocking call interrupted by loop.stop()
loop.run_forever()
loop.close()
import asyncio
import datetime

def display_date(end_time, loop):
   print(datetime.datetime.now())
if (loop.time() + 1.0) < end_time:
   loop.call_later(1, display_date, end_time, loop)
else:
   loop.stop()

loop = asyncio.get_event_loop()

# Schedule the first call to display_date()
end_time = loop.time() + 5.0
loop.call_soon(display_date, end_time, loop)

# Blocking call interrupted by loop.stop()
loop.run_forever()
loop.close()
import asyncio
try:
from socket
import socketpair
except ImportError:
   from asyncio.windows_utils
import socketpair

# Create a pair of connected file descriptors
rsock, wsock = socketpair()
loop = asyncio.get_event_loop()

def reader():
   data = rsock.recv(100)
print("Received:", data.decode())
# We are done: unregister the file descriptor
loop.remove_reader(rsock)
# Stop the event loop
loop.stop()

# Register the file descriptor
for read event
loop.add_reader(rsock, reader)

# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())

# Run the event loop
loop.run_forever()

# We are done, close sockets and the event loop
rsock.close()
wsock.close()
loop.close()
import asyncio
import functools
import os
import signal

def ask_exit(signame):
   print("got signal %s: exit" % signame)
loop.stop()

loop = asyncio.get_event_loop()
for signame in ('SIGINT', 'SIGTERM'):
   loop.add_signal_handler(getattr(signal, signame),
      functools.partial(ask_exit, signame))

print("Event loop running forever, press Ctrl+C to interrupt.")
print("pid %s: send SIGINT or SIGTERM to exit." % os.getpid())
try:
loop.run_forever()
finally:
loop.close()

Suggestion : 4

Returns an opaque handle that may be passed to remove_timeout to cancel. Note that unlike the asyncio method of the same name, the returned object does not have a cancel() method.,See add_timeout for comments on thread-safety and subclassing.,This method only accepts Future objects and not other awaitables (unlike most of Tornado where the two are interchangeable).,Note that it is not safe to call add_timeout from other threads. Instead, you must use add_callback to transfer control to the IOLoop’s thread, and then call add_timeout from there.

import asyncio
import errno
import functools
import socket

import tornado.ioloop
from tornado.iostream
import IOStream

async def handle_connection(connection, address):
   stream = IOStream(connection)
message = await stream.read_until_close()
print("message from client:", message.decode().strip())

def connection_ready(sock, fd, events):
   while True:
   try:
   connection, address = sock.accept()
except BlockingIOError:
   return
connection.setblocking(0)
io_loop = tornado.ioloop.IOLoop.current()
io_loop.spawn_callback(handle_connection, connection, address)

async def main():
   sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setblocking(0)
sock.bind(("", 8888))
sock.listen(128)

io_loop = tornado.ioloop.IOLoop.current()
callback = functools.partial(connection_ready, sock)
io_loop.add_handler(sock.fileno(), callback, io_loop.READ)
await asyncio.Event().wait()

if __name__ == "__main__":
   asyncio.run(main())
async def main():
   # do stuff...

      if __name__ == '__main__':
   IOLoop.current().run_sync(main)

Suggestion : 5

This error is especially surprising to people who are familiar with C# async/await. It is because most of asyncio is not thread-safe, nor is asyncio.Future or asyncio.Task. Also don't confuse asyncio.Future with concurrent.futures.Future because they are not compatible (at least until Python 3.6): the latter is thread-safe while the former is not. , In order to await an asyncio.Future in a different thread, asyncio.Future can be wrapped in a concurrent.Future: , This runtime warning can happen in many scenarios, but the cause are same: A coroutine object is created by the invocation of an async function, but is never inserted into an EventLoop. , Python3 asyncio is a powerful asynchronous library. However, the complexity results in a very steep learning curve. 1 Compared to C# async/await, the interfaces of Python3 asyncio is verbose and difficult to use. And the document is somewhat difficult to understand. (Even Guido admited the document is not clear! 2) Here I summerize some of the common mistakes when using asyncio.

async def foo():
   # a long async operation
# no value is returned
prepare_for_foo()
foo()
# RuntimeWarning: coroutine foo was never awaited
remaining_work_not_depends_on_foo()
prepare_for_foo()
task = loop.create_task(foo())
remaining_work_not_depends_on_foo()
loop.run_until_complete(task)
def cancel_tasks():
   # get all task in current loop
tasks = Task.all_tasks()
for t in tasks:
   t.cancel()

cancel_tasks()
loop.stop()
try:
# run_forever() returns after calling loop.stop()
loop.run_forever()
tasks = Task.all_tasks()
for t in [t
      for t in tasks
      if not(t.done() or t.cancelled())
   ]:
   # give canceled tasks the last chance to run
loop.run_until_complete(t)
finally:
loop.close()
def wrap_future(asyncio_future):
   def done_callback(af, cf):
   try:
   cf.set_result(af.result())
except Exception as e:
   acf.set_exception(e)

concur_future = concurrent.Future()
asyncio_future.add_done_callback(
   lambda f: done_callback(f, cf = concur_future))
return concur_future