Here is the fixed code (without exceptions and the exception handler):
import asyncio
from threading
import Thread
async def coro():
print("in coro")
return 42
loop = asyncio.get_event_loop()
thread = Thread(target = loop.run_forever)
thread.start()
fut = asyncio.run_coroutine_threadsafe(coro(), loop)
print(fut.result())
loop.call_soon_threadsafe(loop.stop)
thread.join()
call_soon_threadsafe()
returns a future object which holds the exception (it does not get to the default exception handler):
import asyncio
from pprint
import pprint
from threading
import Thread
def exception_handler(loop, context):
print('Exception handler called')
pprint(context)
loop = asyncio.get_event_loop()
loop.set_exception_handler(exception_handler)
thread = Thread(target = loop.run_forever)
thread.start()
async def coro():
print("coro")
raise RuntimeError("BOOM!")
fut = asyncio.run_coroutine_threadsafe(coro(), loop)
try:
print("success:", fut.result())
except:
print("exception:", fut.exception())
loop.call_soon_threadsafe(loop.stop)
thread.join()
However, coroutines that are called using create_task()
or ensure_future()
will call the exception_handler:
async def coro2():
print("coro2")
raise RuntimeError("BOOM2!")
async def coro():
loop.create_task(coro2())
print("coro")
raise RuntimeError("BOOM!")
If there is no running event loop a RuntimeError is raised. This function can only be called from a coroutine or a callback.,Because this function has rather complex behavior (especially when custom event loop policies are in use), using the get_running_loop() function is preferred to get_event_loop() in coroutines and callbacks.,Deprecated since version 3.10: Deprecation warning is emitted if there is no running event loop. In future Python releases, this function will be an alias of get_running_loop().,Raises RuntimeError if called on a loop that’s been closed. This can happen on a secondary thread when the main application is shutting down.
try:
loop.run_forever()
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
# will schedule "print(" Hello ", flush=True)" loop.call_soon( functools.partial(print, "Hello", flush = True))
import asyncio import concurrent.futures def blocking_io(): # File operations(such as logging) can block the # event loop: run them in a thread pool. with open('/dev/urandom', 'rb') as f: return f.read(100) def cpu_bound(): # CPU - bound operations will block the event loop: # in general it is preferable to run them in a # process pool. return sum(i * i for i in range(10 ** 7)) async def main(): loop = asyncio.get_running_loop() # # Options: # 1. Run in the default loop 's executor: result = await loop.run_in_executor( None, blocking_io) print('default thread pool', result) # 2. Run in a custom thread pool: with concurrent.futures.ThreadPoolExecutor() as pool: result = await loop.run_in_executor( pool, blocking_io) print('custom thread pool', result) # 3. Run in a custom process pool: with concurrent.futures.ProcessPoolExecutor() as pool: result = await loop.run_in_executor( pool, cpu_bound) print('custom process pool', result) asyncio.run(main())
srv = await loop.create_server(...) async with srv: # some code # At this point, srv is closed and no longer accepts new connections.
async def client_connected(reader, writer):
# Communicate with the client with
# reader / writer streams.For example:
await reader.readline()
async def main(host, port):
srv = await asyncio.start_server(
client_connected, host, port)
await srv.serve_forever()
asyncio.run(main('127.0.0.1', 0))
import asyncio
import selectors
selector = selectors.SelectSelector()
loop = asyncio.SelectorEventLoop(selector)
asyncio.set_event_loop(loop)
Delegating costly function calls to a pool of threads.,AbstractEventLoop.create_connection() and asyncio.open_connection().,Call a function in an Executor (pool of threads or pool of processes). By default, an event loop uses a thread pool executor (ThreadPoolExecutor).,This method was added in Python 3.4.2. Use the async() function to support also older Python versions.
try:
loop.run_forever()
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
import asyncio
def hello_world(loop):
print('Hello World')
loop.stop()
loop = asyncio.get_event_loop()
# Schedule a call to hello_world()
loop.call_soon(hello_world, loop)
# Blocking call interrupted by loop.stop()
loop.run_forever()
loop.close()
import asyncio
import datetime
def display_date(end_time, loop):
print(datetime.datetime.now())
if (loop.time() + 1.0) < end_time:
loop.call_later(1, display_date, end_time, loop)
else:
loop.stop()
loop = asyncio.get_event_loop()
# Schedule the first call to display_date()
end_time = loop.time() + 5.0
loop.call_soon(display_date, end_time, loop)
# Blocking call interrupted by loop.stop()
loop.run_forever()
loop.close()
import asyncio
try:
from socket
import socketpair
except ImportError:
from asyncio.windows_utils
import socketpair
# Create a pair of connected file descriptors
rsock, wsock = socketpair()
loop = asyncio.get_event_loop()
def reader():
data = rsock.recv(100)
print("Received:", data.decode())
# We are done: unregister the file descriptor
loop.remove_reader(rsock)
# Stop the event loop
loop.stop()
# Register the file descriptor
for read event
loop.add_reader(rsock, reader)
# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())
# Run the event loop
loop.run_forever()
# We are done, close sockets and the event loop
rsock.close()
wsock.close()
loop.close()
import asyncio
import functools
import os
import signal
def ask_exit(signame):
print("got signal %s: exit" % signame)
loop.stop()
loop = asyncio.get_event_loop()
for signame in ('SIGINT', 'SIGTERM'):
loop.add_signal_handler(getattr(signal, signame),
functools.partial(ask_exit, signame))
print("Event loop running forever, press Ctrl+C to interrupt.")
print("pid %s: send SIGINT or SIGTERM to exit." % os.getpid())
try:
loop.run_forever()
finally:
loop.close()
Returns an opaque handle that may be passed to remove_timeout to cancel. Note that unlike the asyncio method of the same name, the returned object does not have a cancel() method.,See add_timeout for comments on thread-safety and subclassing.,This method only accepts Future objects and not other awaitables (unlike most of Tornado where the two are interchangeable).,Note that it is not safe to call add_timeout from other threads. Instead, you must use add_callback to transfer control to the IOLoop’s thread, and then call add_timeout from there.
import asyncio
import errno
import functools
import socket
import tornado.ioloop
from tornado.iostream
import IOStream
async def handle_connection(connection, address):
stream = IOStream(connection)
message = await stream.read_until_close()
print("message from client:", message.decode().strip())
def connection_ready(sock, fd, events):
while True:
try:
connection, address = sock.accept()
except BlockingIOError:
return
connection.setblocking(0)
io_loop = tornado.ioloop.IOLoop.current()
io_loop.spawn_callback(handle_connection, connection, address)
async def main():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setblocking(0)
sock.bind(("", 8888))
sock.listen(128)
io_loop = tornado.ioloop.IOLoop.current()
callback = functools.partial(connection_ready, sock)
io_loop.add_handler(sock.fileno(), callback, io_loop.READ)
await asyncio.Event().wait()
if __name__ == "__main__":
asyncio.run(main())
async def main():
# do stuff...
if __name__ == '__main__':
IOLoop.current().run_sync(main)
This error is especially surprising to people who are familiar with C# async/await. It is because most of asyncio is not thread-safe, nor is asyncio.Future or asyncio.Task. Also don't confuse asyncio.Future with concurrent.futures.Future because they are not compatible (at least until Python 3.6): the latter is thread-safe while the former is not. , In order to await an asyncio.Future in a different thread, asyncio.Future can be wrapped in a concurrent.Future: , This runtime warning can happen in many scenarios, but the cause are same: A coroutine object is created by the invocation of an async function, but is never inserted into an EventLoop. , Python3 asyncio is a powerful asynchronous library. However, the complexity results in a very steep learning curve. 1 Compared to C# async/await, the interfaces of Python3 asyncio is verbose and difficult to use. And the document is somewhat difficult to understand. (Even Guido admited the document is not clear! 2) Here I summerize some of the common mistakes when using asyncio.
async def foo(): # a long async operation # no value is returned
prepare_for_foo()
foo()
# RuntimeWarning: coroutine foo was never awaited
remaining_work_not_depends_on_foo()
prepare_for_foo() task = loop.create_task(foo()) remaining_work_not_depends_on_foo() loop.run_until_complete(task)
def cancel_tasks():
# get all task in current loop
tasks = Task.all_tasks()
for t in tasks:
t.cancel()
cancel_tasks()
loop.stop()
try:
# run_forever() returns after calling loop.stop()
loop.run_forever()
tasks = Task.all_tasks()
for t in [t
for t in tasks
if not(t.done() or t.cancelled())
]:
# give canceled tasks the last chance to run
loop.run_until_complete(t)
finally:
loop.close()
def wrap_future(asyncio_future):
def done_callback(af, cf):
try:
cf.set_result(af.result())
except Exception as e:
acf.set_exception(e)
concur_future = concurrent.Future()
asyncio_future.add_done_callback(
lambda f: done_callback(f, cf = concur_future))
return concur_future