why python multiprocessing is running sequencially?

  • Last Update :
  • Techknowledgy :

Think about what you're doing here:

for i in range(num_workers):
   p = pool.apply_async(f, args = (i, ))
p.get()

The usual way to do this is more like:

workers = [pool.apply_async(f, args = (i, )) for i in range(num_workers)]
for w in workers:
   w.get()

Suggestion : 2

last modified July 29, 2022

1._
#!/usr/bin/python

from multiprocessing
import Process

def fun(name):
   print(f 'hello {name}')

def main():

   p = Process(target = fun, args = ('Peter', ))
p.start()

if __name__ == '__main__':
   main()

We create a new process and pass a value to it.

def fun(name):
   print(f 'hello {name}')

The function prints the passed parameter.

def main():

   p = Process(target = fun, args = ('Peter', ))
p.start()
5._
#!/usr/bin/python

from multiprocessing
import Process
import time

def fun():

   print('starting fun')
time.sleep(2)
print('finishing fun')

def main():

   p = Process(target = fun)
p.start()
p.join()

if __name__ == '__main__':

   print('starting main')
main()
print('finishing main')

The example calls the join on the newly created process.

$. / joining.py
starting main
starting fun
finishing fun
finishing main

Suggestion : 3

Published on May 2, 2021

1._
 import time
 def useless_function(sec = 1):
    print(f 'Sleeping for {sec} second(s)')
 time.sleep(sec)
 print(f 'Done sleeping')
 start = time.perf_counter()
 useless_function()
 useless_function()
 end = time.perf_counter()
 print(f 'Finished in {round(end-start, 2)} second(s)')
 import time
 def useless_function(sec = 1):
     print(f'Sleeping for {sec} second(s)')
     time.sleep(sec)
     print(f'Done sleeping')
 start = time.perf_counter()
 useless_function()
 useless_function()
 end = time.perf_counter()
 print(f'Finished in {round(end-start, 2)} second(s)') 
-- -- -- -- -- -- -- -- -- -- -- -- -- -- - Output-- -- -- -- -- -- -- -- -- -- -- -- -- -- -
Sleeping
for 1 second(s)
Done sleeping
Sleeping
for 1 second(s)
Done sleeping
Finished in 2.02 second(s)

Running the function twice sequentially took roughly two seconds as expected. Let’s create two processes, run them in parallel and see how that pans out. 

 import multiprocessing
 start = time.perf_counter()
 process1 = multiprocessing.Process(target = useless_function)
 process2 = multiprocessing.Process(target = useless_function)
 process1.start()
 process2.start()
 end = time.perf_counter()
 print(f 'Finished in {round(end-start, 2)} second(s)')

Something seems wrong with the output, granted that we forgot to wait for the processes to finish but according to the output, the processes were started after the program finished execution. The output appears in this order because it takes a while to create the processes and get them running. This isn’t the case for threads that start instantly. Like threads, the join() method is used to wait for the processes to finish execution. 

 start = time.perf_counter()
 process1.start()
 process2.start()
 process1.join()
 process2.join()
 end = time.perf_counter()
 print(f 'Finished in {round(end-start, 2)} second(s)')
 start = time.perf_counter()
 process1.start()
 process2.start()
 process1.join()
 process2.join()
 end = time.perf_counter()
 print(f'Finished in {round(end-start, 2)} second(s)') 
-- -- -- -- -- -- -- -- -- -- -- -- -- -- - Output-- -- -- -- -- -- -- -- -- -- -- -- -- -- -
Sleeping
for 1 second(s)
Sleeping
for 1 second(s)
Done sleeping
Done sleeping
Finished in 1.04 second(s)

Suggestion : 4

With this, we make sure that the code in the body of the if-statement is only executed for the main process we start by running our script file in Python, not the subprocesses we will create when using multiprocessing, which also are loading this file. Otherwise, this would result in an infinite creation of subprocesses, subsubprocesses, and so on. Next, we need to have that mp_handler() function we are calling defined. This is the function that will set up our pool of processors and also assign (map) each of our tasks onto a worker (usually a processor) in that pool.,Here the Pool variable is created without the with ... as ... statement. As a result, the statements in the last two lines are needed for telling Python that we are done adding jobs to the pool and for cleaning up all sub-processes when we are done to free up resources. We prefer to use the version using the with ... as ... construct in this course.,The second thing our code needs to have is a __main__ method defined. We’ll add that into our code at the very bottom with:,To prepare for the multiprocessing version, we’ll take our Cherry-O code from before and make a couple of small changes. We’ll define function cherryO() around this code (taking the game number as parameter as explained above) and we’ll remove the while loop that currently executes the code 10,000 times (our map range above will take care of that) and we’ll therefore need to “dedent“ the code.

The second thing our code needs to have is a __main__ method defined. We’ll add that into our code at the very bottom with:

if __name__ == '__main__':
   mp_handler()

The first instantiates a pool with a number of workers (usually our number of processors or a number slightly less than our number of processors). There’s a function to determine how many processors we have, multiprocessing.cpu_count(), so that our code can take full advantage of whichever machine it is running on. That first line is:

with multiprocessing.Pool(multiprocessing.cpu_count()) as myPool:
   ...# code
for setting up the pool of jobs

You have probably already seen this notation from working with arcpy cursors. This with ... as ... statement creates an object of the Pool class defined in the multiprocessing module and assigns it to variable myPool. The parameter given to it is the number of processors on my machine (which is the value that multiprocessing.cpu_count() is returning), so here we are making sure that all processor cores will be used. All code that uses the variable myPool (e.g., for setting up the pool of multiprocessing jobs) now needs to be indented relative to the "with" and the construct makes sure that everything is cleaned up afterwards. The same could be achieved with the following lines of code:

myPool = multiprocessing.Pool(multiprocessing.cpu_count())
   ...# code
for setting up the pool of jobs
myPool.close()
myPool.join()

Here’s what our revised function will look like :

def cherryO(game):
   spinnerChoices = [-1, -2, -3, -4, 2, 2, 10]
turns = 0
cherriesOnTree = 10

# Take a turn as long as you have more than 0 cherries

while cherriesOnTree > 0:
   # Spin the spinner
spinIndex = random.randrange(0, 7)
spinResult = spinnerChoices[spinIndex]
# Print the spin result
#print("You spun " + str(spinResult) + ".")
# Add or remove cherries based on the result
cherriesOnTree += spinResult
# Make sure the number of cherries is between 0 and 10
if cherriesOnTree > 10:
   cherriesOnTree = 10
elif cherriesOnTree < 0:
   cherriesOnTree = 0
# Print the number of cherries on the tree
#print("You have " + str(cherriesOnTree) + " cherries on your tree.")
turns += 1
#
return the number of turns it took to win the game
return (turns)

Suggestion : 5

multiprocessing.dummy replicates the API of multiprocessing but is no more than a wrapper around the threading module.,multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.,In multiprocessing, processes are spawned by creating a Process object and then calling its start() method. Process follows the API of threading.Thread. A trivial example of a multiprocess program is,in the multiprocessing.Process._bootstrap() method — this resulted in issues with processes-in-processes. This has been changed to:

from multiprocessing
import Pool

def f(x):
   return x * x

if __name__ == '__main__':
   with Pool(5) as p:
   print(p.map(f, [1, 2, 3]))
[1, 4, 9]
from multiprocessing
import Process

def f(name):
   print('hello', name)

if __name__ == '__main__':
   p = Process(target = f, args = ('bob', ))
p.start()
p.join()
from multiprocessing
import Process
import os

def info(title):
   print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())

def f(name):
   info('function f')
print('hello', name)

if __name__ == '__main__':
   info('main line')
p = Process(target = f, args = ('bob', ))
p.start()
p.join()
import multiprocessing as mp

def foo(q):
   q.put('hello')

if __name__ == '__main__':
   mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target = foo, args = (q, ))
p.start()
print(q.get())
p.join()
import multiprocessing as mp

def foo(q):
   q.put('hello')

if __name__ == '__main__':
   ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target = foo, args = (q, ))
p.start()
print(q.get())
p.join()

Suggestion : 6

The Multiprocessing library actually spawns multiple operating system processes for each parallel task. This nicely side-steps the GIL, by giving each process its own Python interpreter and thus own GIL. Hence each process can be fed to a separate processor core and then regrouped at the end once all processes have finished.,In particular we are going to consider the Threading library and the Multiprocessing library.,In order to actually make use of the extra cores present in nearly all modern consumer processors we can instead use the Multiprocessing library. This works in a fundamentally different way to the Threading library, even though the syntax of the two is extremely similar.,The only modifications needed for the Multiprocessing implementation include changing the import line and the functional form of the multiprocessing.Process line. In this case the arguments to the target function are passed separately. Beyond that the code is almost identical to the Threading implementation above:

Finally, the jobs are sequentially started and then sequentially "joined". The join() method blocks the calling thread (i.e. the main Python interpreter thread) until the thread has terminated. This ensures that all of the threads are complete before printing the completion message to the console:

# thread_test.py

import random
import threading

def list_append(count, id, out_list):
   ""
"
Creates an empty list and then appends a
random number to the list 'count'
number
   of times.A CPU - heavy operation!
   ""
"
for i in range(count):
   out_list.append(random.random())

if __name__ == "__main__":
   size = 10000000 # Number of random numbers to add
threads = 2 # Number of threads to create

# Create a list of jobs and then iterate through
# the number of threads appending each thread to
# the job list
jobs = []
for i in range(0, threads):
   out_list = list()
thread = threading.Thread(target = list_append(size, i, out_list))
jobs.append(thread)

# Start the threads(i.e.calculate the random number lists)
for j in jobs:
   j.start()

# Ensure all of the threads have finished
for j in jobs:
   j.join()

print "List processing complete."

We can time this code using the following console call:

time python thread_test.py

It produces the following output:

List processing complete.

real 0 m2 .003 s
user 0 m1 .838 s
sys 0 m0 .161 s

We can once again time this code using a similar console call:

time python multiproc_test.py

We receive the following output:

List processing complete.

real 0 m1 .045 s
user 0 m1 .824 s
sys 0 m0 .231 s

Suggestion : 7

Using a concurrent.futures.ThreadPoolExecutor makes the Python threading example code almost identical to the multiprocessing module.,The multiprocessing module is easier to drop in than the threading module, as we don’t need to add a class like the Python threading example. The only changes we need to make are in the main function.,Hopefully the Python threading examples in this article—and update—will point you in the right direction so you have an idea of where to look in the Python standard library if you need to introduce concurrency into your programs.,Something new since Python 3.2 that wasn’t touched upon in the original article is the concurrent.futures package. This package provides yet another way to use concurrency and parallelism with Python.

This is what the script looks like:

import json
import logging
import os
from pathlib
import Path
from urllib.request
import urlopen, Request

logger = logging.getLogger(__name__)

types = {
   'image/jpeg',
   'image/png'
}

def get_links(client_id):
   headers = {
      'Authorization': 'Client-ID {}'.format(client_id)
   }
req = Request('https://api.imgur.com/3/gallery/random/random/', headers = headers, method = 'GET')
with urlopen(req) as resp:
   data = json.loads(resp.read().decode('utf-8'))
return [item['link']
   for item in data['data']
   if 'type' in item and item['type'] in types
]

def download_link(directory, link):
   download_path = directory / os.path.basename(link)
with urlopen(link) as image, download_path.open('wb') as f:
   f.write(image.read())
logger.info('Downloaded %s', link)

def setup_download_dir():
   download_dir = Path('images')
if not download_dir.exists():
   download_dir.mkdir()
return download_dir

Next, we will need to write a module that will use these functions to download the images, one by one. We will name this single.py. This will contain the main function of our first, naive version of the Imgur image downloader. The module will retrieve the Imgur client ID in the environment variable IMGUR_CLIENT_ID. It will invoke the setup_download_dir to create the download destination directory. Finally, it will fetch a list of images using the get_links function, filter out all GIF and album URLs, and then use download_link to download and save each of those images to the disk. Here is what single.py looks like:

import logging
import os
from time
import time

from download
import setup_download_dir, get_links, download_link

logging.basicConfig(level = logging.INFO, format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def main():
   ts = time()
client_id = os.getenv('IMGUR_CLIENT_ID')
if not client_id:
   raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
download_dir = setup_download_dir()
links = get_links(client_id)
for link in links:
   download_link(download_dir, link)
logging.info('Took %s seconds', time() - ts)

if __name__ == '__main__':
   main()

This is almost the same as the previous one, with the exception that we now have a new class, DownloadWorker, which is a descendent of the Python Thread class. The run method has been overridden, which runs an infinite loop. On every iteration, it calls self.queue.get() to try and fetch a URL to from a thread-safe queue. It blocks until there is an item in the queue for the worker to process. Once the worker receives an item from the queue, it then calls the same download_link method that was used in the previous script to download the image to the images directory. After the download is finished, the worker signals the queue that that task is done. This is very important, because the Queue keeps track of how many tasks were enqueued. The call to queue.join() would block the main thread forever if the workers did not signal that they completed a task.

import logging
import os
from queue
import Queue
from threading
import Thread
from time
import time

from download
import setup_download_dir, get_links, download_link

logging.basicConfig(level = logging.INFO, format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')

logger = logging.getLogger(__name__)

class DownloadWorker(Thread):

   def __init__(self, queue):
   Thread.__init__(self)
self.queue = queue

def run(self):
   while True:
   # Get the work from the queue and expand the tuple
directory, link = self.queue.get()
try:
download_link(directory, link)
finally:
self.queue.task_done()

def main():
   ts = time()
client_id = os.getenv('IMGUR_CLIENT_ID')
if not client_id:
   raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
download_dir = setup_download_dir()
links = get_links(client_id)
# Create a queue to communicate with the worker threads
queue = Queue()
# Create 8 worker threads
for x in range(8):
   worker = DownloadWorker(queue)
# Setting daemon to True will
let the main thread exit even though the workers are blocking
worker.daemon = True
worker.start()
# Put the tasks into the queue as a tuple
for link in links:
   logger.info('Queueing {}'.format(link))
queue.put((download_dir, link))
# Causes the main thread to wait
for the queue to finish processing all the tasks
queue.join()
logging.info('Took %s', time() - ts)

if __name__ == '__main__':
   main()

One last step we need to do is to start up some workers. RQ provides a handy script to run workers on the default queue. Just run rqworker in a terminal window and it will start a worker listening on the default queue. Please make sure your current working directory is the same as where the scripts reside in. If you want to listen to a different queue, you can run rqworker queue_name and it will listen to that named queue. The great thing about RQ is that as long as you can connect to Redis, you can run as many workers as you like on as many different machines as you like; therefore, it is very easy to scale up as your application grows. Here is the source for the RQ version:

import logging
import os

from redis
import Redis

from rq
import Queue

from download
import setup_download_dir, get_links, download_link

logging.basicConfig(level = logging.DEBUG, format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)

def main():
   client_id = os.getenv('IMGUR_CLIENT_ID')
if not client_id:
   raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
download_dir = setup_download_dir()
links = get_links(client_id)
q = Queue(connection = Redis(host = 'localhost', port = 6379))
for link in links:
   q.enqueue(download_link, download_dir, link)

if __name__ == '__main__':
   main()

Using a concurrent.futures.ThreadPoolExecutor makes the Python threading example code almost identical to the multiprocessing module.

import logging
import os
from concurrent.futures
import ThreadPoolExecutor
from functools
import partial
from time
import time

from download
import setup_download_dir, get_links, download_link

logging.basicConfig(level = logging.INFO, format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')

logger = logging.getLogger(__name__)

def main():
   client_id = os.getenv('IMGUR_CLIENT_ID')
if not client_id:
   raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
download_dir = setup_download_dir()
links = get_links(client_id)

# By placing the executor inside a with block, the executors shutdown method
# will be called cleaning up threads.
#
# By
default, the executor sets number of workers to 5 times the number of
   # CPUs.
with ThreadPoolExecutor() as executor:

   # Create a new partially applied
function that stores the directory
# argument.
#
# This allows the download_link
function that normally takes two
# arguments to work with the map
function that expects a
function of a
# single argument.
fn = partial(download_link, download_dir)

# Executes fn concurrently using threads on the links iterable.The
# timeout is
for the entire process, not a single call, so downloading
# all images must complete within 30 seconds.
executor.map(fn, links, timeout = 30)

if __name__ == '__main__':
   main()