asyncio.async(mongo...))
just schedules the mongo query. And run_until_complete()
doesn't wait for it. Here's code example that shows it using asyncio.sleep()
coroutine:
#!/usr/bin/env python3
import asyncio
from contextlib
import closing
from timeit
import default_timer as timer
@asyncio.coroutine
def sleep_BROKEN(n):
# schedule coroutine;
it runs on the next yield
asyncio.async(asyncio.sleep(n))
@asyncio.coroutine
def sleep(n):
yield from asyncio.sleep(n)
@asyncio.coroutine
def double_sleep(n):
f = asyncio.async(asyncio.sleep(n))
yield from asyncio.sleep(n) # the first sleep is also started
yield from f
n = 2
with closing(asyncio.get_event_loop()) as loop:
start = timer()
loop.run_until_complete(sleep_BROKEN(n))
print(timer() - start)
loop.run_until_complete(sleep(n))
print(timer() - start)
loop.run_until_complete(double_sleep(n))
print(timer() - start)
Output
0.0001221800921484828 2.002586881048046 4.005100341048092
This issue tracker has been migrated to GitHub, and is currently read-only. For more information, see the GitHub FAQs in the Python's Developer Guide., This issue has been migrated to GitHub: https://github.com/python/cpython/issues/65362
Some tasks created via asyncio are vanishing because there is no reference to their resultant futures.
This behaviour does not occur in Python 3.3 .3 with asyncio - 0.4 .1.
Also, doing a gc.collect() immediately after creating the tasks seems to fix the problem.
Attachment also available at https: //gist.github.com/richardkiss/9988156
Ouch.That example is very obfuscated--I fail to understand what it is trying to accomplish.Running it I see that it always prints 100 for the count with 3.3 or DO_CG on; for me it prints 87 with 3.4 an DO_GC off.But I wouldn 't be surprised if the reason calling do.collect() "fixes" whatever issue you have is that it causes there not to be any further collections until the next cycle through that main loop, and everything simply runs before being collected. But that' s just a theory based on minimal understanding of the example. I 'm not sure that tasks are *supposed* to stay alive when there are no references to them. It seems that once they make it past a certain point they keep themselves alive. One more thing: using a try / except I see that the "lost" consumers all get a GeneratorExit on their first iteration.You might want to look into this.(Sorry, gotta run, wanted to dump this.)
I agree it's confusing and I apologize for that.
Background:
This multiplexing pattern is used in pycoinnet, a bitcoin client I'm developing at <https: //github.com/richardkiss/pycoinnet>. The BitcoinPeerProtocol class multiplexes protocol messages into multiple asyncio.Queue objects so each interested listener can react. An example listener is in pycoinnet.helpers.standards.install_pong_manager, which looks for "ping" messages and sends "pong" responses. When the peer disconnects, the pong manager sees a None (to indicate EOF), and it exits. The return value is uninteresting, so no reference to the Task is kept.
My client is in late alpha, and mostly works, but when I tried it on Python 3.4.0, it stopped working and I narrowed it down to this.
I'm not certain this behaviour is incorrect, but it's definitely different from 3.3.3, and it seems odd that a GC cycle BEFORE additional references can be made would allow it to work.
Most likely your program is simply relying on undefined behavior and the right way to fix it is to keep strong references to all tasks until they self - destruct.
I 'll investigate further.
You were right: adding a strong reference to each Task seems to have solved the original problem in pycoinnet.I see that the reference to the global lists of asyncio.tasks is a weakset, so it 's necessary to keep a strong reference myself. This does seem a little surprising.It can make it trickier to create a task that is only important in its side effect.Compare to threaded programming: unreferenced threads are never collected. For example: f = asyncio.Task(some_coroutine()) f.add_callback(some_completion_callback) f = None In theory, the "some_coroutine" task can be collected, preventing "some_completion_callback" from ever being called.While technically correct, it does seem surprising. (I couldn 't get this to work in a simple example, although I did get it to work in a complicated example.) Some change between 3.3 and 3.4 means garbage collection is much more aggressive at collecting up unreferenced tasks, which means broken code, like mine, that worked in 3.3 fails in 3.4.This may trip up other early adopters of tulip. Maybe adding a "do_not_collect=True" flag to asyncio.async or asyncio.Task, which would keep a strong reference by throwing it into a singleton set(removing it as a future callback) would bring attention to this subtle issue.Or displaying a warning in debug mode when a Task is garbage - collected before finishing. Thanks for your help.
This function runs the passed coroutine, taking care of managing the asyncio event loop, finalizing asynchronous generators, and closing the threadpool.,This function cannot be called when another asyncio event loop is running in the same thread.,Directly calling blocking_io() in any coroutine would block the event loop for its duration, resulting in an additional 1 second of run time. Instead, by using asyncio.to_thread(), we can run it in a separate thread without blocking the event loop.,This function always creates a new event loop and closes it at the end. It should be used as a main entry point for asyncio programs, and should ideally only be called once.
>>>
import asyncio
>>>
async def main():
...print('hello')
...await asyncio.sleep(1)
...print('world')
>>>
asyncio.run(main())
hello
world
>>> main()
<coroutine object main at 0x1053bb7c8>
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
print(f "started at {time.strftime('%X')}")
await say_after(1, 'hello')
await say_after(2, 'world')
print(f "finished at {time.strftime('%X')}")
asyncio.run(main())
started at 17: 13: 52 hello world finished at 17: 13: 55
async def main():
task1 = asyncio.create_task(
say_after(1, 'hello'))
task2 = asyncio.create_task(
say_after(2, 'world'))
print(f "started at {time.strftime('%X')}")
# Wait until both tasks are completed(should take # around 2 seconds.)
await task1
await task2
print(f "finished at {time.strftime('%X')}")
started at 17: 14: 32 hello world finished at 17: 14: 34
Generator-based coroutines should be decorated with @asyncio.coroutine, although this is not strictly enforced. The decorator enables compatibility with async def coroutines, and also serves as documentation. Generator-based coroutines use the yield from syntax introduced in PEP 380, instead of the original yield syntax.,Decorator to mark generator-based coroutines. This enables the generator use yield from to call async def coroutines, and also enables the generator to be called by async def coroutines, for instance using an await expression.,If the coroutine is not done, this returns the stack where it is suspended. If the coroutine has completed successfully or was cancelled, this returns an empty list. If the coroutine was terminated by an exception, this returns the list of traceback frames.,The function that defines a coroutine (a function definition using async def or decorated with @asyncio.coroutine). If disambiguation is needed we will call this a coroutine function (iscoroutinefunction() returns True).
import asyncio
async def hello_world():
print("Hello World!")
loop = asyncio.get_event_loop()
# Blocking call which returns when the hello_world() coroutine is done
loop.run_until_complete(hello_world())
loop.close()
import asyncio
import datetime
async def display_date(loop):
end_time = loop.time() + 5.0
while True:
print(datetime.datetime.now())
if (loop.time() + 1.0) >= end_time:
break
await asyncio.sleep(1)
loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()
import asyncio
async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
await asyncio.sleep(1.0)
return x + y
async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()
import asyncio
async def slow_operation(future):
await asyncio.sleep(1)
future.set_result('Future is done!')
loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(slow_operation(future))
loop.run_until_complete(future)
print(future.result())
loop.close()
import asyncio
async def slow_operation(future):
await asyncio.sleep(1)
future.set_result('Future is done!')
def got_result(future):
print(future.result())
loop.stop()
loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(slow_operation(future))
future.add_done_callback(got_result)
try:
loop.run_forever()
finally:
loop.close()
import asyncio
async def factorial(name, number):
f = 1
for i in range(2, number + 1):
print("Task %s: Compute factorial(%s)..." % (name, i))
await asyncio.sleep(1)
f *= i
print("Task %s: factorial(%s) = %s" % (name, number, f))
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
))
loop.close()
The function that defines a coroutine (a function definition decorated with @asyncio.coroutine). If disambiguation is needed we will call this a coroutine function (iscoroutinefunction() returns True).,A coroutine is a generator that follows certain conventions. For documentation purposes, all coroutines should be decorated with @asyncio.coroutine, but this cannot be strictly enforced.,Return True if func is a decorated coroutine function.,raise exception – raise an exception in the coroutine that is waiting for this one using yield from.
import asyncio
@asyncio.coroutine
def greet_every_two_seconds():
while True:
print('Hello World')
yield from asyncio.sleep(2)
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(greet_every_two_seconds())
finally:
loop.close()
import asyncio
@asyncio.coroutine
def compute(x, y):
print("Compute %s + %s ..." % (x, y))
yield from asyncio.sleep(1.0)
return x + y
@asyncio.coroutine
def print_sum(x, y):
result = yield from compute(x, y)
print("%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()
import asyncio
@asyncio.coroutine
def slow_operation(future):
yield from asyncio.sleep(1)
future.set_result('Future is done!')
loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.async(slow_operation(future))
loop.run_until_complete(future)
print(future.result())
loop.close()
import asyncio
@asyncio.coroutine
def slow_operation(future):
yield from asyncio.sleep(1)
future.set_result('Future is done!')
def got_result(future):
print(future.result())
loop.stop()
loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.async(slow_operation(future))
future.add_done_callback(got_result)
try:
loop.run_forever()
finally:
loop.close()
import asyncio
@asyncio.coroutine
def factorial(name, number):
f = 1
for i in range(2, number + 1):
print("Task %s: Compute factorial(%s)..." % (name, i))
yield from asyncio.sleep(1)
f *= i
print("Task %s: factorial(%s) = %s" % (name, number, f))
loop = asyncio.get_event_loop()
tasks = [
asyncio.async(factorial("A", 2)),
asyncio.async(factorial("B", 3)),
asyncio.async(factorial("C", 4))
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
Task C: Compute factorial(2)...
Task A: factorial(2) = 2
Task B: Compute factorial(3)...
Task C: Compute factorial(3)...
Task B: factorial(3) = 6
Task C: Compute factorial(4)...
Task C: factorial(4) = 24