Nevertheless, if that is what you want to do, you could do it by monkeypatching multiprocessing.pool.worker
:
import multiprocessing as mp
import multiprocessing.pool as mpool
from multiprocessing.util
import debug
def cleanup():
print('{n} CLEANUP'.format(n = mp.current_process().name))
# This code comes from / usr / lib / python2 .6 / multiprocessing / pool.py,
# except
for the single line at the end which calls cleanup().
def myworker(inqueue, outqueue, initializer = None, initargs = ()):
put = outqueue.put
get = inqueue.get
if hasattr(inqueue, '_writer'):
inqueue._writer.close()
outqueue._reader.close()
if initializer is not None:
initializer( * initargs)
while 1:
try:
task = get()
except(EOFError, IOError):
debug('worker got EOFError or IOError -- exiting')
break
if task is None:
debug('worker got sentinel -- exiting')
break
job, i, func, args, kwds = task
try:
result = (True, func( * args, ** kwds))
except Exception, e:
result = (False, e)
put((job, i, result))
cleanup()
# Here we monkeypatch mpool.worker
mpool.worker = myworker
def foo(i):
return i * i
def main():
pool = mp.Pool(8)
results = pool.map_async(foo, range(40)).get()
print(results)
if __name__ == '__main__':
main()
yields:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521]
PoolWorker - 8 CLEANUP
PoolWorker - 3 CLEANUP
PoolWorker - 7 CLEANUP
PoolWorker - 1 CLEANUP
PoolWorker - 6 CLEANUP
PoolWorker - 2 CLEANUP
PoolWorker - 4 CLEANUP
PoolWorker - 5 CLEANUP
That being said, if you have a bunch of functions that you need to run cleanup on, you can save duplication of effort by designing a decorator. Here's an example of what I mean:
import functools
import multiprocessing
def cleanup(f):
""
"Decorator for shared cleanup mechanism"
""
@functools.wraps(f)
def wrapped(arg):
result = f(arg)
print("Cleaning up after f({0})".format(arg))
return result
return wrapped
@cleanup
def task1(arg):
print("Hello from task1({0})".format(arg))
return arg * 2
@cleanup
def task2(arg):
print("Bonjour from task2({0})".format(arg))
return arg ** 2
def main():
p = multiprocessing.Pool(processes = 3)
print(p.map(task1, [1, 2, 3]))
print(p.map(task2, [1, 2, 3]))
if __name__ == "__main__":
main()
When you execute this (barring stdout
being jumbled because I'm not locking it here for brevity), the order you get things out should indicate that your cleanup task is running at the end of each task:
Hello from task1(1)
Cleaning up after f(1)
Hello from task1(2)
Cleaning up after f(2)
Hello from task1(3)
Cleaning up after f(3)[2, 4, 6]
Bonjour from task2(1)
Cleaning up after f(1)
Bonjour from task2(2)
Cleaning up after f(2)
Bonjour from task2(3)
Cleaning up after f(3)[1, 4, 9]
Note that the start(), join(), is_alive(), terminate() and exitcode methods should only be called by the process that created the process object.,Note that the methods of the pool object should only be called by the process which created the pool.,Note that the methods of a pool should only ever be used by the process which created it.,If the child terminated due to an exception not caught within run(), the exit code will be 1. If it was terminated by signal N, the exit code will be the negative value -N.
from multiprocessing
import Pool
def f(x):
return x * x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
[1, 4, 9]
from multiprocessing
import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target = f, args = ('bob', ))
p.start()
p.join()
from multiprocessing
import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target = f, args = ('bob', ))
p.start()
p.join()
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target = foo, args = (q, ))
p.start()
print(q.get())
p.join()
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target = foo, args = (q, ))
p.start()
print(q.get())
p.join()
I am running a multiprocessing pool, mapped over a number of inputs. My worker processes have an initialization step that spins up a connection to selenium and a database. When the pool finishes its job, what is the graceful way to close these connections rather than just relying on python’s memory management and del definitions?,Because some_args is large, I only want to call shutdown when the worker processes have no other jobs to do. I don’t want to close / reopen connections to my database until everything is done.,I think you have a good chance of closing your drivers if you first wait for your pool processes to terminate and then force a garbage collection:,As of right now, I would expect the memory manager to call __del__ if the worker process shutsdown, but I don’t know if it does occur. I’ve gotten strange scenarios where it hasn’t been called. I’m hoping to better understand how to manage shutdown.
class WebDriver():
def close():
//close logic
def __del__():
self.driver.close()
def init():
global DRIVER
DRIVER = WebDriver()
def shutdown():
DRIVER.close()
if __name__ == '__main__':
with multiprocessing.Pool(initializer = init) as pool:
pool.map(some_function, some_args)
if __name__ == '__main__': with multiprocessing.Pool(initializer = init) as pool: try: pool.map(some_function, some_args) finally: # Wait for all tasks to complete and all processes to terminate: pool.close() pool.join() # Processes should be done now: import gc gc.collect() # ensure garbage collection
import multiprocessing class WebDriver(): def close(self): ... print('driver is now closed') def do_something(self, i): import time time.sleep(.1) print(i, flush = True) def __enter__(self): self.driver = [] # this would be an actual driver return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def some_function(i): # Do something with DRIVER: ... DRIVER.do_something(i) def worker(in_q): global DRIVER with WebDriver() as DRIVER: # Iterate until we get special None record and then cleanup: for i in iter(in_q.get, None): try: some_function(i) except BaseException as e: pass if __name__ == '__main__': POOL_SIZE = multiprocessing.cpu_count() # Create pool: # Assumption is that we don 't need an output queue for output in_q = multiprocessing.Queue() processes = [multiprocessing.Process(target = worker, args = (in_q, )) for _ in range(POOL_SIZE) ] for p in processes: p.start() # Write arguments to input_queue: some_args = range(16) for arg in some_args: in_q.put(arg) # Now write POOL_SIZE "quit" messages: for _ in range(POOL_SIZE): in_q.put(None) # Wait for processes to terminate: for p in processes: p.join()
0 1 2 3 4 5 6 7 8 driver is now closed 9 driver is now closed 10 driver is now closed 11 driver is now closed 12 driver is now closed 14 13 driver is now closed driver is now closed 15 driver is now closed
If you use multiprocessing.Pool,If you use multiprocessing.Pool.terminate or the context manager API (__exit__ will just call terminate) then the workers can get SIGTERM and then the finalizers won’t run or complete in time. Thus you need to make sure your multiprocessing.Pool gets a nice and clean exit:,Subprocess support If you use multiprocessing.Pool If you use multiprocessing.Process If you got custom signal handling If you use Windows ,If you use multiprocessing.Process
from multiprocessing
import Pool
def f(x):
return x * x
if __name__ == '__main__':
p = Pool(5)
try:
print(p.map(f, [1, 2, 3]))
finally:
p.close() # Marks the pool as closed.
p.join() # Waits
for workers to exit.
from multiprocessing
import Pool
def f(x):
return x * x
if __name__ == '__main__':
try:
from pytest_cov.embed
import cleanup_on_sigterm
except ImportError:
pass
else:
cleanup_on_sigterm()
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
from multiprocessing
import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
try:
from pytest_cov.embed
import cleanup_on_sigterm
except ImportError:
pass
else:
cleanup_on_sigterm()
p = Process(target = f, args = ('bob', ))
try:
p.start()
finally:
p.join() # necessary so that the Process exists before the test suite exits(thus coverage is collected)
import os
import signal
def restart_service(frame, signum):
os.exec(...) # or whatever your custom signal would do
signal.signal(signal.SIGHUP, restart_service)
try:
from pytest_cov.embed
import cleanup_on_signal
except ImportError:
pass
else:
cleanup_on_signal(signal.SIGHUP)
import os
import signal
try:
from pytest_cov.embed
import cleanup
except ImportError:
cleanup = None
def restart_service(frame, signum):
if cleanup is not None:
cleanup()
os.exec(...) # or whatever your custom signal would do
signal.signal(signal.SIGHUP, restart_service)
import os import signal def shutdown(frame, signum): # your app 's shutdown or whatever signal.signal(signal.SIGBREAK, shutdown) try: from pytest_cov.embed import cleanup_on_signal except ImportError: pass else: cleanup_on_signal(signal.SIGBREAK)
I've found that, even using the with statement, if you don't close and join the pool, the processes continue to exist. To clean up resources, I always close and join my pools.,The simple answer, when asking how to use threads in Python is: "Don't. Use processes, instead." The multiprocessing module lets you create processes with similar syntax to creating threads, but I prefer using their convenient Pool object.,Using the code that David Beazley first used to show the dangers of threads against the GIL, we'll rewrite it using multiprocessing.Pool:, Defining functions with list arguments
David Beazley's code that showed GIL threading problems
from threading
import Thread
import time
def countdown(n):
while n > 0:
n -= 1
COUNT = 10000000
t1 = Thread(target = countdown, args = (COUNT / 2, ))
t2 = Thread(target = countdown, args = (COUNT / 2, ))
start = time.time()
t1.start();
t2.start()
t1.join();
t2.join()
end = time.time()
print end - start
import multiprocessing
import time
def countdown(n):
while n > 0:
n -= 1
COUNT = 10000000
start = time.time()
with multiprocessing.Pool as pool:
pool.map(countdown, [COUNT / 2, COUNT / 2])
pool.close()
pool.join()
end = time.time()
print(end - start)
This is because Python’s Pool class creates workers processes which are daemonic. It does this for a number of reasons, one being to disallow children processes to spawn of children processes to prevent an “army of zombie grandchildren”.,Python’s multiprocessing.Pool allows you create a number of “workers” which run in child processes. The parent process can then give the Pool tasks, and the pool will distribute the tasks as evenly as possible across the workers. A Pool is a great way of distributing work across multiple processes without you having to manage the process creation/teardown and work dirstribution yourself.,Child processes do not crash the main process if they throw an exception/seg fault e.t.c, resulting in a more resiliant application than when using multithreading.,The Python multiprocessing library allows you to spawn multiple child processes from the main Python process. This allows you to take advantage of multiple cores inside of a processor to perform work in a parallel fashion, improving performance.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
#!/usr/bin/env python
# - * -coding: UTF - 8 - * -
import multiprocessing
# We must
import this explicitly, it is not imported by the top - level
# multiprocessing module.
import multiprocessing.pool
import time
from random
import randint
class NoDaemonProcess(multiprocessing.Process):
# make 'daemon'
attribute always
return False
def _get_daemon(self):
return False
def _set_daemon(self, value):
pass
daemon = property(_get_daemon, _set_daemon)
# We sub - class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper
function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
Process = NoDaemonProcess
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
#!/usr/bin/env python
# - * -coding: UTF - 8 - * -
import multiprocessing
# We must
import this explicitly, it is not imported by the top - level
# multiprocessing module.
import multiprocessing.pool
import time
from random
import randint
def sleepawhile(t):
print("Sleeping %i seconds..." % t)
time.sleep(t)
return t
def work(num_procs):
print("Creating %i (daemon) workers and jobs in child." % num_procs)
pool = multiprocessing.Pool(num_procs)
result = pool.map(sleepawhile,
[randint(1, 5) for x in range(num_procs)])
# The following is not really needed, since the(daemon) workers of the
# child 's pool are killed when the child is terminated, but it'
s good
# practice to cleanup after ourselves anyway.
pool.close()
pool.join()
return result
def test():
print("Creating 5 (non-daemon) workers and jobs in main process.")
pool = MyPool(5)
result = pool.map(work, [randint(1, 5) for x in range(5)])
pool.close()
pool.join()
print(result)
if __name__ == '__main__':
test()