Here's my suggestion:
import multiprocessing
import threading
import time
def good_worker():
print "[GoodWorker] Starting"
time.sleep(4)
print "[GoodWorker] all good"
def bad_worker():
print "[BadWorker] Starting"
time.sleep(2)
raise Exception("ups!")
class MyProcManager(object):
def __init__(self):
self.procs = []
self.errors_flag = False
self._threads = []
self._lock = threading.Lock()
def terminate_all(self):
with self._lock:
for p in self.procs:
if p.is_alive():
print "Terminating %s" % p
p.terminate()
def launch_proc(self, func, args = (), kwargs = {}):
t = threading.Thread(target = self._proc_thread_runner,
args = (func, args, kwargs))
self._threads.append(t)
t.start()
def _proc_thread_runner(self, func, args, kwargs):
p = multiprocessing.Process(target = func, args = args, kwargs = kwargs)
self.procs.append(p)
p.start()
while p.exitcode is None:
p.join()
if p.exitcode > 0:
self.errors_flag = True
self.terminate_all()
def wait(self):
for t in self._threads:
t.join()
if __name__ == '__main__':
proc_manager = MyProcManager()
proc_manager.launch_proc(good_worker)
proc_manager.launch_proc(good_worker)
proc_manager.launch_proc(bad_worker)
proc_manager.wait()
if proc_manager.errors_flag:
print "Errors flag is set: some process crashed"
else:
print "Everything closed cleanly"
If you want children to cleanup after themselves; you could use multiprocessing.Event()
as a global flag:
import multiprocessing
def event_func(event):
print '\t%r is waiting' % multiprocessing.current_process()
event.wait()
print '\t%r has woken up' % multiprocessing.current_process()
if __name__ == '__main__':
event = multiprocessing.Event()
processes = [multiprocessing.Process(target = event_func, args = (event, ))
for i in range(5)
]
for p in processes:
p.start()
print 'main is sleeping'
time.sleep(2)
print 'main is setting event'
event.set()
for p in processes:
p.join()
Roughly, a process object is alive from the moment the start() method returns until the child process terminates.,Note that the start(), join(), is_alive(), terminate() and exitcode methods should only be called by the process that created the process object.,On Unix using the fork start method, a child process can make use of a shared resource created in a parent process using a global resource. However, it is better to pass the object as an argument to the constructor for the child process.,When a process exits, it attempts to terminate all of its daemonic child processes.
from multiprocessing
import Pool
def f(x):
return x * x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
[1, 4, 9]
from multiprocessing
import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target = f, args = ('bob', ))
p.start()
p.join()
from multiprocessing
import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target = f, args = ('bob', ))
p.start()
p.join()
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target = foo, args = (q, ))
p.start()
print(q.get())
p.join()
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target = foo, args = (q, ))
p.start()
print(q.get())
p.join()
Since the main process waits for the daemon to exit using join(), the “Exiting” message is printed this time.,By default, join() blocks indefinitely. It is also possible to pass a timeout argument (a float representing the number of seconds to wait for the process to become inactive). If the process does not complete within the timeout period, join() returns anyway.,The output does not include the “Exiting” message from the daemon process, since all of the non-daemon processes (including the main program) exit before the daemon process wakes up from its 2 second sleep.,Since the timeout passed is less than the amount of time the daemon sleeps, the process is still “alive” after join() returns.
import multiprocessing
def worker():
""
"worker function"
""
print 'Worker'
return
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target = worker)
jobs.append(p)
p.start()
$ python multiprocessing_simple.py Worker Worker Worker Worker Worker
import multiprocessing
def worker(num):
""
"thread worker function"
""
print 'Worker:', num
return
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target = worker, args = (i, ))
jobs.append(p)
p.start()
$ python multiprocessing_simpleargs.py Worker: 0 Worker: 1 Worker: 2 Worker: 3 Worker: 4
import multiprocessing
import multiprocessing_import_worker
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target = multiprocessing_import_worker.worker)
jobs.append(p)
p.start()
def worker():
""
"worker function"
""
print 'Worker'
return
Depending on the platform, multiprocessing supports three ways to start a process. These start methods are,When a process exits, it attempts to terminate all of its daemonic child processes.,If multiple processes are enqueuing objects, it is possible for the objects to be received at the other end out-of-order. However, objects enqueued by the same process will always be in the expected order with respect to each other.,Returns a process shared queue implemented using a pipe and a few locks/semaphores. When a process first puts an item on the queue a feeder thread is started which transfers objects from a buffer into the pipe.
>>> from multiprocessing
import Pool
>>>
p = Pool(5) >>>
def f(x):
...
return x * x
...
>>>
p.map(f, [1, 2, 3])
Process PoolWorker - 1:
Process PoolWorker - 2:
Process PoolWorker - 3:
Traceback(most recent call last):
AttributeError: 'module'
object has no attribute 'f'
AttributeError: 'module'
object has no attribute 'f'
AttributeError: 'module'
object has no attribute 'f'
from multiprocessing
import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target = f, args = ('bob', ))
p.start()
p.join()
from multiprocessing
import Process
import os
def info(title):
print(title)
print('module name:', __name__)
if hasattr(os, 'getppid'): # only available on Unix
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target = f, args = ('bob', ))
p.start()
p.join()
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target = foo, args = (q, ))
p.start()
print(q.get())
p.join()
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target = foo, args = (q, ))
p.start()
print(q.get())
p.join()
from multiprocessing
import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target = f, args = (q, ))
p.start()
print(q.get()) # prints "[42, None, 'hello']"
p.join()