how to reset an asyncio eventloop by a worker?

  • Last Update :
  • Techknowledgy :

The recursive call to main() and the new event loop adds unnecessary complication. Here is a simpler prototype to play with - it monitors an external source (the file system) and, when a file is created, it just stops the loop. main() contains a loop that takes care of both (re-)creating and cancelling the tasks:

import os, asyncio, random

async def monitor():
   loop = asyncio.get_event_loop()
while True:
   if os.path.exists('reset'):
   print('reset!')
os.unlink('reset')
loop.stop()
await asyncio.sleep(1)

async def work(workid):
   while True:
   t = random.random()
print(workid, 'sleeping for', t)
await asyncio.sleep(t)

def main():
   loop = asyncio.get_event_loop()
loop.create_task(monitor())
offset = 0
while True:
   workers = []
workers.append(loop.create_task(work(offset + 1)))
workers.append(loop.create_task(work(offset + 2)))
workers.append(loop.create_task(work(offset + 3)))
loop.run_forever()
for t in workers:
   t.cancel()
offset += 3

if __name__ == '__main__':
   main()

Another option would be to never even stop the event loop, but to simply trigger a reset event:

async def monitor(evt):
   while True:
   if os.path.exists('reset'):
   print('reset!')
os.unlink('reset')
evt.set()
await asyncio.sleep(1)

In this design main() can be a coroutine:

async def main():
   loop = asyncio.get_event_loop()
reset_evt = asyncio.Event()
loop.create_task(monitor(reset_evt))
offset = 0
while True:
   workers = []
workers.append(loop.create_task(work(offset + 1)))
workers.append(loop.create_task(work(offset + 2)))
workers.append(loop.create_task(work(offset + 3)))
await reset_evt.wait()
reset_evt.clear()
for t in workers:
   t.cancel()
offset += 3

if __name__ == '__main__':
   asyncio.run(main())
# or asyncio.get_event_loop().run_until_complete(main())

Suggestion : 2

I'm working with an asyncio forever() eventloop. Now I want to restart the loop (stop the loop and recreate a new loop) after a process or a signal or a change in a file, but I have some problems to do that:,The recursive call to main() and the new event loop adds unnecessary complication. Here is a simpler prototype to play with - it monitors an external source (the file system) and, when a file is created, it just stops the loop. main() contains a loop that takes care of both (re-)creating and cancelling the tasks:,In the #3rd try, apparently I've done it, but print('Cancel the tasks') increases up after each restarting, what's the reason?!,In the #3rd try, apparently I've done it, but print('Cancel the tasks') increases up after each restarting, what's the reason?!


import asyncio async def coro_worker(proc): print(f 'Worker: {proc} started.') while True: print(f 'Worker: {proc} process.') await asyncio.sleep(proc) async def reset_loop(loop): # Some process
for i in range(5): # Like a process.print(f '{i} counting for reset the eventloop.') await asyncio.sleep(1) main(loop) # Expected close the current loop and start a new loop!def main(previous_loop = None): offset = 0
if previous_loop is not None: # Trying
for close the last loop
if exist.offset = 1 # An offset to change the process name.for task in asyncio.Task.all_tasks(): print('Cancel the tasks') # Why it increase up ? task.cancel() # task.clear() # task.close() # task.stop() print("Done cancelling tasks") asyncio.get_event_loop().stop() process = [1 + offset, 2 + offset] loop = asyncio.get_event_loop() futures = [loop.create_task(coro_worker(proc)) for proc in process] futures.append(loop.create_task(reset_loop(loop))) try : loop.run_forever() except KeyboardInterrupt: pass except asyncio.CancelledError: print('Tasks has been canceled') main() # Recursively
finally: print("Closing Loop") loop.close() main()

import os, asyncio, random async def monitor(): loop = asyncio.get_event_loop() while True: if os.path.exists('reset'): print('reset!') os.unlink('reset') loop.stop() await asyncio.sleep(1) async def work(workid): while True: t = random.random() print(workid, 'sleeping for', t) await asyncio.sleep(t) def main(): loop = asyncio.get_event_loop() loop.create_task(monitor()) offset = 0
while True: workers = [] workers.append(loop.create_task(work(offset + 1))) workers.append(loop.create_task(work(offset + 2))) workers.append(loop.create_task(work(offset + 3))) loop.run_forever() for t in workers: t.cancel() offset += 3
if __name__ == '__main__': main()

Suggestion : 3

The event loop is the core of every asyncio application. Event loops run asynchronous tasks and callbacks, perform network IO operations, and run subprocesses.,If there is no current event loop set in the current OS thread, the OS thread is main, and set_event_loop() has not yet been called, asyncio will create a new event loop and set it as the current one.,Return the running event loop in the current OS thread.,Because this function has rather complex behavior (especially when custom event loop policies are in use), using the get_running_loop() function is preferred to get_event_loop() in coroutines and callbacks.

try:
loop.run_forever()
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
# will schedule "print("
Hello ", flush=True)"
loop.call_soon(
   functools.partial(print, "Hello", flush = True))
import asyncio
import concurrent.futures

def blocking_io():
   # File operations(such as logging) can block the
# event loop: run them in a thread pool.
with open('/dev/urandom', 'rb') as f:
   return f.read(100)

def cpu_bound():
   # CPU - bound operations will block the event loop:
   # in general it is preferable to run them in a
# process pool.
return sum(i * i
   for i in range(10 ** 7))

async def main():
   loop = asyncio.get_running_loop()

# # Options:

   # 1. Run in the
default loop 's executor:
result = await loop.run_in_executor(
   None, blocking_io)
print('default thread pool', result)

# 2. Run in a custom thread pool:
   with concurrent.futures.ThreadPoolExecutor() as pool:
   result = await loop.run_in_executor(
      pool, blocking_io)
print('custom thread pool', result)

# 3. Run in a custom process pool:
   with concurrent.futures.ProcessPoolExecutor() as pool:
   result = await loop.run_in_executor(
      pool, cpu_bound)
print('custom process pool', result)

asyncio.run(main())
srv = await loop.create_server(...)

async with srv:
   # some code

# At this point, srv is closed and no longer accepts new connections.
async def client_connected(reader, writer):
   # Communicate with the client with
# reader / writer streams.For example:
   await reader.readline()

async def main(host, port):
   srv = await asyncio.start_server(
      client_connected, host, port)
await srv.serve_forever()

asyncio.run(main('127.0.0.1', 0))
import asyncio
import selectors

selector = selectors.SelectSelector()
loop = asyncio.SelectorEventLoop(selector)
asyncio.set_event_loop(loop)

Suggestion : 4

Client.retire_workers([workers, close_workers]),Retire certain workers on the scheduler,Restart all workers. Reset local state. Optionally wait for workers to return.,Used with workers. Indicates whether or not the computations may be performed on workers that are not in the workers set(s).

# blocking
client = Client()
future = client.submit(func, * args) # immediate, no blocking / async difference
result = client.gather(future) # blocking

# asynchronous Python 2 / 3
client = yield Client(asynchronous = True)
future = client.submit(func, * args) # immediate, no blocking / async difference
result = yield client.gather(future) # non - blocking / asynchronous

# asynchronous Python 3
client = await Client(asynchronous = True)
future = client.submit(func, * args) # immediate, no blocking / async difference
result = await client.gather(future) # non - blocking / asynchronous
>>> client = Client('127.0.0.1:8786')
>>> a = client.submit(add, 1, 2) >>>
   b = client.submit(add, 10, 20)
>>> c = client.submit(add, a, b)
>>> client.gather(c)
33
>>> client = Client() # makes your own local "cluster"

Suggestion : 5

Async functions require an event loop to run. Flask, as a WSGI application, uses one worker to handle one request/response cycle. When a request comes in to an async view, Flask will start an event loop in a thread, run the view function there, then return the result.,Each request still ties up one worker, even for async views. The upside is that you can run async code within a view, for example to make multiple concurrent database queries, HTTP requests to an external API, etc. However, the number of requests your application can handle at one time will remain the same.,At the moment Flask only supports asyncio. It’s possible to override flask.Flask.ensure_sync() to change how async functions are wrapped to use a different library.,When using gevent or eventlet to serve an application or patch the runtime, greenlet>=1.0 is required. When using PyPy, PyPy>=7.3.7 is required.

@app.route("/get-data")
async def get_data():
   data = await async_db_query(...)
return jsonify(data)
def extension(func):
   @wraps(func)
def wrapper( * args, ** kwargs):
   ...# Extension logic
return current_app.ensure_sync(func)( * args, ** kwargs)

return wrapper