Here is my existing code for parallelizing and getting the return value using a queue:
import multiprocessing as mpc
...
def Wrapper(self, ...):
jobs = []
q = mpc.Queue()
p1 = mpc.Process(target = self.function1, args = (timestep, ))
jobs.append(p1)
p2 = mpc.Process(target = self.function2, args = (timestep, arg1, arg2, arg3, ..., q))
jobs.append(p2)
for j in jobs:
j.start()
result = q.get()
for j in jobs:
j.join()
My goal with this code is to show how parallelizing a section of code can improve the time required for execution in needlessly parallel functions. For reference, I am using the cProfile package, generating a profile of my code, and looking at the time required for Wrapper to run.,Parallelized code with the Queue: 60 seconds,If I look at overall execution time of the program, the parallelized code runs much faster. However, when I dig a bit deeper my parallelized code begins to appear to take longer. ,Parallelized code without using a Queue: 8 seconds
import multiprocessing as mpc...def Wrapper(self, ...): jobs = [] q = mpc.Queue() p1 = mpc.Process(target = self.function1, args = (timestep, )) jobs.append(p1) p2 = mpc.Process(target = self.function2, args = (timestep, arg1, arg2, arg3, ..., q)) jobs.append(p2) for j in jobs: j.start() result = q.get() for j in jobs: j.join()
import multiprocessing as mpc...def Wrapper(self, ...): jobs = [] q = mpc.Queue() p1 = mpc.Process(target = self.function1, args = (timestep, )) jobs.append(p1) p2 = mpc.Process(target = self.function2, args = (timestep, arg1, arg2, arg3, ..., q)) jobs.append(p2) for j in jobs: j.start() result = q.get() for j in jobs: j.join()
Remove and return an item from the queue. If optional args block is True (the default) and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the queue.Empty exception if no item was available within that time. Otherwise (block is False), return an item if one is immediately available, else raise the queue.Empty exception (timeout is ignored in that case).,Return True if the queue is empty, False otherwise. Because of multithreading/multiprocessing semantics, this is not reliable.,Put obj into the queue. If the optional argument block is True (the default) and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the queue.Full exception if no free slot was available within that time. Otherwise (block is False), put an item on the queue if a free slot is immediately available, else raise the queue.Full exception (timeout is ignored in that case).,Return True if the queue is full, False otherwise. Because of multithreading/multiprocessing semantics, this is not reliable.
from multiprocessing
import Pool
def f(x):
return x * x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
[1, 4, 9]
from multiprocessing
import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target = f, args = ('bob', ))
p.start()
p.join()
from multiprocessing
import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target = f, args = ('bob', ))
p.start()
p.join()
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target = foo, args = (q, ))
p.start()
print(q.get())
p.join()
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target = foo, args = (q, ))
p.start()
print(q.get())
p.join()
last modified July 29, 2022
#!/usr/bin/python
from multiprocessing
import Process
def fun(name):
print(f 'hello {name}')
def main():
p = Process(target = fun, args = ('Peter', ))
p.start()
if __name__ == '__main__':
main()
We create a new process and pass a value to it.
def fun(name):
print(f 'hello {name}')
The function prints the passed parameter.
def main():
p = Process(target = fun, args = ('Peter', ))
p.start()
#!/usr/bin/python
from multiprocessing
import Process
import time
def fun():
print('starting fun')
time.sleep(2)
print('finishing fun')
def main():
p = Process(target = fun)
p.start()
p.join()
if __name__ == '__main__':
print('starting main')
main()
print('finishing main')
The example calls the join
on the newly created process.
$. / joining.py
starting main
starting fun
finishing fun
finishing main