how do i run cleanup code in a python multiprocessing pool?

  • Last Update :
  • Techknowledgy :

Nevertheless, if that is what you want to do, you could do it by monkeypatching multiprocessing.pool.worker:

import multiprocessing as mp
import multiprocessing.pool as mpool
from multiprocessing.util
import debug

def cleanup():
   print('{n} CLEANUP'.format(n = mp.current_process().name))

# This code comes from / usr / lib / python2 .6 / multiprocessing / pool.py,
   # except
for the single line at the end which calls cleanup().
def myworker(inqueue, outqueue, initializer = None, initargs = ()):
   put = outqueue.put
get = inqueue.get
if hasattr(inqueue, '_writer'):
   inqueue._writer.close()
outqueue._reader.close()

if initializer is not None:
   initializer( * initargs)

while 1:
   try:
   task = get()
except(EOFError, IOError):
   debug('worker got EOFError or IOError -- exiting')
break

if task is None:
   debug('worker got sentinel -- exiting')
break

job, i, func, args, kwds = task
try:
result = (True, func( * args, ** kwds))
except Exception, e:
   result = (False, e)
put((job, i, result))
cleanup()

# Here we monkeypatch mpool.worker
mpool.worker = myworker

def foo(i):
   return i * i

def main():
   pool = mp.Pool(8)
results = pool.map_async(foo, range(40)).get()
print(results)

if __name__ == '__main__':
   main()

yields:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521]
PoolWorker - 8 CLEANUP
PoolWorker - 3 CLEANUP
PoolWorker - 7 CLEANUP
PoolWorker - 1 CLEANUP
PoolWorker - 6 CLEANUP
PoolWorker - 2 CLEANUP
PoolWorker - 4 CLEANUP
PoolWorker - 5 CLEANUP

That being said, if you have a bunch of functions that you need to run cleanup on, you can save duplication of effort by designing a decorator. Here's an example of what I mean:

import functools
import multiprocessing

def cleanup(f):
   ""
"Decorator for shared cleanup mechanism"
""
@functools.wraps(f)
def wrapped(arg):
   result = f(arg)
print("Cleaning up after f({0})".format(arg))
return result
return wrapped

@cleanup
def task1(arg):
   print("Hello from task1({0})".format(arg))
return arg * 2

@cleanup
def task2(arg):
   print("Bonjour from task2({0})".format(arg))
return arg ** 2

def main():
   p = multiprocessing.Pool(processes = 3)
print(p.map(task1, [1, 2, 3]))
print(p.map(task2, [1, 2, 3]))

if __name__ == "__main__":
   main()

When you execute this (barring stdout being jumbled because I'm not locking it here for brevity), the order you get things out should indicate that your cleanup task is running at the end of each task:

Hello from task1(1)
Cleaning up after f(1)
Hello from task1(2)
Cleaning up after f(2)
Hello from task1(3)
Cleaning up after f(3)[2, 4, 6]

Bonjour from task2(1)
Cleaning up after f(1)
Bonjour from task2(2)
Cleaning up after f(2)
Bonjour from task2(3)
Cleaning up after f(3)[1, 4, 9]

Suggestion : 2

Note that the start(), join(), is_alive(), terminate() and exitcode methods should only be called by the process that created the process object.,Note that the methods of the pool object should only be called by the process which created the pool.,Note that the methods of a pool should only ever be used by the process which created it.,If the child terminated due to an exception not caught within run(), the exit code will be 1. If it was terminated by signal N, the exit code will be the negative value -N.

from multiprocessing
import Pool

def f(x):
   return x * x

if __name__ == '__main__':
   with Pool(5) as p:
   print(p.map(f, [1, 2, 3]))
[1, 4, 9]
from multiprocessing
import Process

def f(name):
   print('hello', name)

if __name__ == '__main__':
   p = Process(target = f, args = ('bob', ))
p.start()
p.join()
from multiprocessing
import Process
import os

def info(title):
   print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())

def f(name):
   info('function f')
print('hello', name)

if __name__ == '__main__':
   info('main line')
p = Process(target = f, args = ('bob', ))
p.start()
p.join()
import multiprocessing as mp

def foo(q):
   q.put('hello')

if __name__ == '__main__':
   mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target = foo, args = (q, ))
p.start()
print(q.get())
p.join()
import multiprocessing as mp

def foo(q):
   q.put('hello')

if __name__ == '__main__':
   ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target = foo, args = (q, ))
p.start()
print(q.get())
p.join()

Suggestion : 3

I am running a multiprocessing pool, mapped over a number of inputs. My worker processes have an initialization step that spins up a connection to selenium and a database. When the pool finishes its job, what is the graceful way to close these connections rather than just relying on python’s memory management and del definitions?,Because some_args is large, I only want to call shutdown when the worker processes have no other jobs to do. I don’t want to close / reopen connections to my database until everything is done.,I think you have a good chance of closing your drivers if you first wait for your pool processes to terminate and then force a garbage collection:,As of right now, I would expect the memory manager to call __del__ if the worker process shutsdown, but I don’t know if it does occur. I’ve gotten strange scenarios where it hasn’t been called. I’m hoping to better understand how to manage shutdown.

class WebDriver():
   def close():
   //close logic

   def __del__():
   self.driver.close()

def init():
   global DRIVER
DRIVER = WebDriver()

def shutdown():
   DRIVER.close()

if __name__ == '__main__':
   with multiprocessing.Pool(initializer = init) as pool:
   pool.map(some_function, some_args)
if __name__ == '__main__':
   with multiprocessing.Pool(initializer = init) as pool:
   try:
   pool.map(some_function, some_args)
finally:
# Wait
for all tasks to complete and all processes to terminate:
   pool.close()
pool.join()
# Processes should be done now:
   import gc
gc.collect() # ensure garbage collection
import multiprocessing

class WebDriver():

   def close(self):
   ...
   print('driver is now closed')

def do_something(self, i):
   import time
time.sleep(.1)
print(i, flush = True)

def __enter__(self):
   self.driver = [] # this would be an actual driver
return self

def __exit__(self, exc_type, exc_val, exc_tb):
   self.close()

def some_function(i):
   # Do something with DRIVER:
   ...
   DRIVER.do_something(i)

def worker(in_q):
   global DRIVER

with WebDriver() as DRIVER:
   # Iterate until we get special None record and then cleanup:
   for i in iter(in_q.get, None):
   try:
   some_function(i)
except BaseException as e:
   pass

if __name__ == '__main__':
   POOL_SIZE = multiprocessing.cpu_count()
# Create pool:
   # Assumption is that we don 't need an output queue for output
in_q = multiprocessing.Queue()
processes = [multiprocessing.Process(target = worker, args = (in_q, ))
   for _ in range(POOL_SIZE)
]
for p in processes:
   p.start()
# Write arguments to input_queue:
   some_args = range(16)
for arg in some_args:
   in_q.put(arg)
# Now write POOL_SIZE "quit"
messages:
   for _ in range(POOL_SIZE):
   in_q.put(None)
# Wait
for processes to terminate:
   for p in processes:
   p.join()
0
1
2
3
4
5
6
7
8
driver is now closed
9
driver is now closed
10
driver is now closed
11
driver is now closed
12
driver is now closed
14
13
driver is now closed
driver is now closed
15
driver is now closed

Suggestion : 4

If you use multiprocessing.Pool,If you use multiprocessing.Pool.terminate or the context manager API (__exit__ will just call terminate) then the workers can get SIGTERM and then the finalizers won’t run or complete in time. Thus you need to make sure your multiprocessing.Pool gets a nice and clean exit:,Subprocess support If you use multiprocessing.Pool If you use multiprocessing.Process If you got custom signal handling If you use Windows ,If you use multiprocessing.Process

from multiprocessing
import Pool

def f(x):
   return x * x

if __name__ == '__main__':
   p = Pool(5)
try:
print(p.map(f, [1, 2, 3]))
finally:
p.close() # Marks the pool as closed.
p.join() # Waits
for workers to exit.
from multiprocessing
import Pool

def f(x):
   return x * x

if __name__ == '__main__':
   try:
   from pytest_cov.embed
import cleanup_on_sigterm
except ImportError:
   pass
else:
   cleanup_on_sigterm()

with Pool(5) as p:
   print(p.map(f, [1, 2, 3]))
from multiprocessing
import Process

def f(name):
   print('hello', name)

if __name__ == '__main__':
   try:
   from pytest_cov.embed
import cleanup_on_sigterm
except ImportError:
   pass
else:
   cleanup_on_sigterm()

p = Process(target = f, args = ('bob', ))
try:
p.start()
finally:
p.join() # necessary so that the Process exists before the test suite exits(thus coverage is collected)
import os
import signal

def restart_service(frame, signum):
   os.exec(...) # or whatever your custom signal would do
      signal.signal(signal.SIGHUP, restart_service)

   try:
   from pytest_cov.embed
import cleanup_on_signal
except ImportError:
   pass
else:
   cleanup_on_signal(signal.SIGHUP)
import os
import signal

try:
from pytest_cov.embed
import cleanup
except ImportError:
   cleanup = None

def restart_service(frame, signum):
   if cleanup is not None:
   cleanup()

os.exec(...) # or whatever your custom signal would do
   signal.signal(signal.SIGHUP, restart_service)
import os
import signal

def shutdown(frame, signum):
   # your app 's shutdown or whatever
signal.signal(signal.SIGBREAK, shutdown)

try:
from pytest_cov.embed
import cleanup_on_signal
except ImportError:
   pass
else:
   cleanup_on_signal(signal.SIGBREAK)

Suggestion : 5

I've found that, even using the with statement, if you don't close and join the pool, the processes continue to exist. To clean up resources, I always close and join my pools.,The simple answer, when asking how to use threads in Python is: "Don't. Use processes, instead." The multiprocessing module lets you create processes with similar syntax to creating threads, but I prefer using their convenient Pool object.,Using the code that David Beazley first used to show the dangers of threads against the GIL, we'll rewrite it using multiprocessing.Pool:, Defining functions with list arguments

David Beazley's code that showed GIL threading problems

from threading
import Thread
import time
def countdown(n):
   while n > 0:
   n -= 1

COUNT = 10000000

t1 = Thread(target = countdown, args = (COUNT / 2, ))
t2 = Thread(target = countdown, args = (COUNT / 2, ))
start = time.time()
t1.start();
t2.start()
t1.join();
t2.join()
end = time.time()
print end - start
import multiprocessing
import time
def countdown(n):
   while n > 0:
   n -= 1

COUNT = 10000000

start = time.time()
with multiprocessing.Pool as pool:
   pool.map(countdown, [COUNT / 2, COUNT / 2])

pool.close()
pool.join()

end = time.time()
print(end - start)

Suggestion : 6

This is because Python’s Pool class creates workers processes which are daemonic. It does this for a number of reasons, one being to disallow children processes to spawn of children processes to prevent an “army of zombie grandchildren”.,Python’s multiprocessing.Pool allows you create a number of “workers” which run in child processes. The parent process can then give the Pool tasks, and the pool will distribute the tasks as evenly as possible across the workers. A Pool is a great way of distributing work across multiple processes without you having to manage the process creation/teardown and work dirstribution yourself.,Child processes do not crash the main process if they throw an exception/seg fault e.t.c, resulting in a more resiliant application than when using multithreading.,The Python multiprocessing library allows you to spawn multiple child processes from the main Python process. This allows you to take advantage of multiple cores inside of a processor to perform work in a parallel fashion, improving performance.

 1
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
#!/usr/bin/env python

# - * -coding: UTF - 8 - * -

   import multiprocessing
# We must
import this explicitly, it is not imported by the top - level
# multiprocessing module.
import multiprocessing.pool
import time

from random
import randint

class NoDaemonProcess(multiprocessing.Process):
   # make 'daemon'
attribute always
return False
def _get_daemon(self):
   return False
def _set_daemon(self, value):
   pass
daemon = property(_get_daemon, _set_daemon)

# We sub - class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper
function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
   Process = NoDaemonProcess
 1
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
#!/usr/bin/env python

# - * -coding: UTF - 8 - * -

   import multiprocessing
# We must
import this explicitly, it is not imported by the top - level
# multiprocessing module.
import multiprocessing.pool
import time

from random
import randint

def sleepawhile(t):
   print("Sleeping %i seconds..." % t)
time.sleep(t)
return t

def work(num_procs):
   print("Creating %i (daemon) workers and jobs in child." % num_procs)
pool = multiprocessing.Pool(num_procs)

result = pool.map(sleepawhile,
   [randint(1, 5) for x in range(num_procs)])

# The following is not really needed, since the(daemon) workers of the
# child 's pool are killed when the child is terminated, but it'
s good
# practice to cleanup after ourselves anyway.
pool.close()
pool.join()
return result

def test():
   print("Creating 5 (non-daemon) workers and jobs in main process.")
pool = MyPool(5)

result = pool.map(work, [randint(1, 5) for x in range(5)])

pool.close()
pool.join()
print(result)

if __name__ == '__main__':
   test()