submit code for execution to all processes in a concurrent.futures.processpool

  • Last Update :
  • Techknowledgy :

You can get this type of necessary synchronization with help of a multiprocessing.Barrier(parties[, action[, timeout]]). The barrier will hold back parties calling barrier.wait() until every party has done so and then release them all at once.

import multiprocessing as mp
from concurrent.futures
import ProcessPoolExecutor

def foo(x):
   for _ in range(int(42e4)):
   pass
return x

def reload(something):
   print(f "{mp.current_process().name} --- reloading {something} and waiting.")
barrier.wait()
print(f "{mp.current_process().name} --- released.")

def init_barrier(barrier):
   globals()['barrier'] = barrier

if __name__ == '__main__':

   MAX_WORKERS = 4
barrier = mp.Barrier(MAX_WORKERS)

with ProcessPoolExecutor(
      MAX_WORKERS, initializer = init_barrier, initargs = (barrier, )
   ) as executor:
   print(list(executor.map(foo, range(10))))
# then something
for all processes
futures = [executor.submit(reload, "something") for _ in range(MAX_WORKERS)]
for f in futures:
   f.result()

print(list(executor.map(foo, range(10))))

Example Output:

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
ForkProcess - 3-- - reloading something and waiting.
ForkProcess - 2-- - reloading something and waiting.
ForkProcess - 1-- - reloading something and waiting.
ForkProcess - 4-- - reloading something and waiting.
ForkProcess - 1-- - released.
ForkProcess - 4-- - released.
ForkProcess - 3-- - released.
ForkProcess - 2-- - released.
   [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Process finished with exit code 0

Suggestion : 2

An abstract class that provides methods to execute calls asynchronously. It should not be used directly, but through its concrete subclasses.,Encapsulates the asynchronous execution of a callable. Future instances are created by Executor.submit() and should not be created directly except for testing.,The Future class encapsulates the asynchronous execution of a callable. Future instances are created by Executor.submit().,An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.

with ThreadPoolExecutor(max_workers = 1) as executor:
   future = executor.submit(pow, 323, 1235)
print(future.result())
import shutil
with ThreadPoolExecutor(max_workers = 4) as e:
   e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
import time
def wait_on_b():
   time.sleep(5)
print(b.result()) # b will never complete because it is waiting on a.
return 5

def wait_on_a():
   time.sleep(5)
print(a.result()) # a will never complete because it is waiting on b.
return 6

executor = ThreadPoolExecutor(max_workers = 2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
def wait_on_future():
   f = executor.submit(pow, 5, 2)
# This will never complete because there is only one worker thread and
# it is executing this
function.
print(f.result())

executor = ThreadPoolExecutor(max_workers = 1)
executor.submit(wait_on_future)
import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
   'http://www.cnn.com/',
   'http://europe.wsj.com/',
   'http://www.bbc.co.uk/',
   'http://some-made-up-domain.com/'
]

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
   with urllib.request.urlopen(url, timeout = timeout) as conn:
   return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers = 5) as executor:
   # Start the load operations and mark each future with its URL
future_to_url = {
   executor.submit(load_url, url, 60): url
   for url in URLS
}
for future in concurrent.futures.as_completed(future_to_url):
   url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
   print('%r generated an exception: %s' % (url, exc))
else:
   print('%r page is %d bytes' % (url, len(data)))
import concurrent.futures
import math

PRIMES = [
   112272535095293,
   112582705942171,
   112272535095293,
   115280095190773,
   115797848077099,
   1099726899285419
]

def is_prime(n):
   if n < 2:
   return False
if n == 2:
   return True
if n % 2 == 0:
   return False

sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
   if n % i == 0:
   return False
return True

def main():
   with concurrent.futures.ProcessPoolExecutor() as executor:
   for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
   print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
   main()

Suggestion : 3

Executor is an abstract class of the concurrent.futures Python module. It cannot be used directly and we need to use one of the following concrete subclasses −,In our subsequent sections, we will look at the different subclasses of the concurrent.futures module.,With the help of the concurrent.futures module and its concrete subclass Executor, we can easily create a pool of process. For this, we need to construct a ProcessPoolExecutor with the number of processes we want in the pool. By default, the number is 5. This is followed by submitting a task to the process pool.,It is one of the concrete subclasses of the Executor class. It uses multi-processing and we get a pool of processes for submitting the tasks. This pool assigns tasks to the available processes and schedule them to run.

We will now consider the same example that we used while creating thread pool, the only difference being that now we will use ProcessPoolExecutor instead of ThreadPoolExecutor .

from concurrent.futures
import ProcessPoolExecutor
from time
import sleep
def task(message):
   sleep(2)
return message

def main():
   executor = ProcessPoolExecutor(5)
future = executor.submit(task, ("Completed"))
print(future.done())
sleep(2)
print(future.done())
print(future.result())
if __name__ == '__main__':
   main()

Output

False
False
Completed

Another way to instantiate ProcessPoolExecutor is with the help of context manager. It works similar to the method used in the above example. The main advantage of using context manager is that it looks syntactically good. The instantiation can be done with the help of the following code −

with ProcessPoolExecutor(max_workers = 5) as executor

The above Python script will generate the following output −

'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
   'http://www.foxnews.com/' page is 229476 bytes
   'http://www.cnn.com/' page is 165323 bytes
   'http://www.bbc.co.uk/' page is 284981 bytes
   'http://europe.wsj.com/' page is 967575 bytes

We will consider the same example that we used while creating thread pool using the Executor.map() function. In the example givenbelow, the map function is used to apply square() function to every value in the values array.

from concurrent.futures
import ProcessPoolExecutor
from concurrent.futures
import as_completed
values = [2, 3, 4, 5]
def square(n):
   return n * n
def main():
   with ProcessPoolExecutor(max_workers = 3) as executor:
   results = executor.map(square, values)
for result in results:
   print(result)
if __name__ == '__main__':
   main()

Suggestion : 4

Last Updated : 08 Oct, 2021,GATE CS 2021 Syllabus

Output:

Cube of 2: 8
Cube of 3: 27
Cube of 6: 216
Cube of 4: 64
Cube of 5: 125

Suggestion : 5

Apr 21, 2020 • 19 min read

Executor(Abstract Base Class)│├── ThreadPoolExecutor│││ A concrete subclass of the Executor class to││ manage I / O bound tasks with threading underneath│├── ProcessPoolExecutor│││ A concrete subclass of the Executor class to││ manage CPU bound tasks with multiprocessing underneath
with ThreadPoolExecutor(max_workers = 1) as executor:
   future = executor.submit(pow, 323, 1235)
print(future.result())
for task in get_tasks():
   perform(task)
import concurrent.futures

with concurrent.futures.Executor() as executor:
   futures = {
      executor.submit(perform, task) for task in get_tasks()
   }

for fut in concurrent.futures.as_completed(futures):
   print(f "The outcome is {fut.result()}")
import concurrent.futures

with concurrent.futures.Executor() as executor:
   futures = {
      executor.submit(perform, task): task
      for task in get_tasks()
   }

for fut in concurrent.futures.as_completed(futures):
   original_task = futures[fut]
print(f "The result of {original_task} is {fut.result()}")
import concurrent.futures

with concurrent.futures.Executor() as executor:
   for arg, res in zip(get_tasks(), executor.map(perform, get_tasks())):
   print(f "The result of {arg} is {res}")

Suggestion : 6

submit() – dispatch a function to be executed by the process and return a Future object.,The following program uses a process pool to create thumbnails for pictures in the images folder and save them to the thumbs folder.,Third, create a process pool and call the create_thumbnail() function for each picture specified in the filenames:,The ProcessPoolExecutor class from the concurrent.futures module allows you to create and manage a process pool.

The following program uses a process pool to create thumbnails for pictures in the images folder and save them to the thumbs folder.

.wp - block - code {
      border: 0;
      padding: 0;
   }

   .wp - block - code > div {
      overflow: auto;
   }

   .shcb - language {
      border: 0;
      clip: rect(1 px, 1 px, 1 px, 1 px); -
      webkit - clip - path: inset(50 % );
      clip - path: inset(50 % );
      height: 1 px;
      margin: -1 px;
      overflow: hidden;
      padding: 0;
      position: absolute;
      width: 1 px;
      word - wrap: normal;
      word - break: normal;
   }

   .hljs {
      box - sizing: border - box;
   }

   .hljs.shcb - code - table {
      display: table;
      width: 100 % ;
   }

   .hljs.shcb - code - table > .shcb - loc {
      color: inherit;
      display: table - row;
      width: 100 % ;
   }

   .hljs.shcb - code - table.shcb - loc > span {
      display: table - cell;
   }

   .wp - block - code code.hljs: not(.shcb - wrap - lines) {
      white - space: pre;
   }

   .wp - block - code code.hljs.shcb - wrap - lines {
      white - space: pre - wrap;
   }

   .hljs.shcb - line - numbers {
      border - spacing: 0;
      counter - reset: line;
   }

   .hljs.shcb - line - numbers > .shcb - loc {
      counter - increment: line;
   }

   .hljs.shcb - line - numbers.shcb - loc > span {
      padding - left: 0.75 em;
   }

   .hljs.shcb - line - numbers.shcb - loc::before {
      border - right: 1 px solid #ddd;
      content: counter(line);
      display: table - cell;
      padding: 0 0.75 em;
      text - align: right; -
      webkit - user - select: none; -
      moz - user - select: none; -
      ms - user - select: none;
      user - select: none;
      white - space: nowrap;
      width: 1 % ;
   }
import time
import os
from concurrent.futures
import ProcessPoolExecutor
from PIL
import Image, ImageFilter

filenames = [
   'images/1.jpg',
   'images/2.jpg',
   'images/3.jpg',
   'images/4.jpg',
   'images/5.jpg',
]

def create_thumbnail(filename, size = (50, 50), thumb_dir = 'thumbs'):
   img = Image.open(filename)
img = img.filter(ImageFilter.GaussianBlur())
img.thumbnail(size)
img.save(f '{thumb_dir}/{os.path.basename(filename)}')
print(f '{filename} was processed...')

if __name__ == '__main__':
   start = time.perf_counter()

with ProcessPoolExecutor() as executor:
   executor.map(create_thumbnail, filenames)

finish = time.perf_counter()

print(f 'It took {finish-start: .2f} second(s) to finish') Code language: Python(python)

Output:

images / 5. jpg was processed...
   images / 4. jpg was processed...
   images / 3. jpg was processed...
   images / 2. jpg was processed...
   images / 1. jpg was processed...
   It took 0.79 second(s) to finishCode language: Python(python)

First, declare a list of files for creating thumbnails:

filenames = [
   'images/1.jpg',
   'images/2.jpg',
   'images/3.jpg',
   'images/4.jpg',
   'images/5.jpg',
] Code language: Python(python)

Third, create a process pool and call the create_thumbnail() function for each picture specified in the filenames:

with ProcessPoolExecutor() as executor:
   executor.map(create_thumbnail, filenames) Code language: Python(python)

Suggestion : 7

submit returns Future in order that the call of submit does not block the code. Once submit has returned Future, code can be executed further. And once all functions in threads are running, you can start requesting Future if results are ready. Or take advantage of special function as_completed, which requests the result itself and code gets it when it’s ready,submit returns special Future object that represents execution of function. submit returns Future in order that the call of submit does not block the code. Once submit has returned Future, code can be executed further. And once all functions in threads are running, you can start requesting Future if results are ready. Or take advantage of special function as_completed, which requests the result itself and code gets it when it’s ready ,Method submit uses Future object - an object that represents a delayed computation. This object can be requested for status (completed or not), and results or exceptions can be obtained from the job. Future does not need to be created manually, these objects are created by submit.,the next cycle runs through future_list using as_completed function. This function returns a Future objects only when they have finished or been cancelled. Future is then returned as soon as work is completed, not in the order of adding to future_list

from concurrent.futures
import ThreadPoolExecutor, as_completed
from pprint
import pprint
from datetime
import datetime
import time
import logging

import yaml
from netmiko
import ConnectHandler, NetMikoAuthenticationException

logging.getLogger("paramiko").setLevel(logging.WARNING)

logging.basicConfig(
   format = '%(threadName)s %(name)s %(levelname)s: %(message)s',
   level = logging.INFO)

def send_show(device_dict, command):
   start_msg = '===> {} Connection: {}'
received_msg = '<=== {} Received: {}'
ip = device_dict['host']
logging.info(start_msg.format(datetime.now().time(), ip))
if ip == '192.168.100.1':
   time.sleep(5)

with ConnectHandler( ** device_dict) as ssh:
   ssh.enable()
result = ssh.send_command(command)
logging.info(received_msg.format(datetime.now().time(), ip))
return {
   ip: result
}

with open('devices.yaml') as f:
   devices = yaml.safe_load(f)

with ThreadPoolExecutor(max_workers = 2) as executor:
   future_list = []
for device in devices:
   future = executor.submit(send_show, device, 'sh clock')
future_list.append(future)
# the same in the form of list comprehensions:
   # future_list = [executor.submit(send_show, device, 'sh clock') for device in devices]
for f in as_completed(future_list):
   print(f.result())
with ThreadPoolExecutor(max_workers = 2) as executor:
   future_list = []
for device in devices:
   future = executor.submit(send_show, device, 'sh clock')
future_list.append(future)
for f in as_completed(future_list):
   print(f.result())
$ python netmiko_threads_submit_basics.py
ThreadPoolExecutor - 0_0 root INFO: === > 17: 32: 59.088025 Connection: 192.168 .100 .1
ThreadPoolExecutor - 0_1 root INFO: === > 17: 32: 59.094103 Connection: 192.168 .100 .2
ThreadPoolExecutor - 0_1 root INFO: <= == 17: 33: 11.639672 Received: 192.168 .100 .2 {
   '192.168.100.2': '*17:33:11.429 UTC Thu Jul 4 2019'
}
ThreadPoolExecutor - 0_1 root INFO: === > 17: 33: 11.849132 Connection: 192.168 .100 .3
ThreadPoolExecutor - 0_0 root INFO: <= == 17: 33: 17.735761 Received: 192.168 .100 .1 {
   '192.168.100.1': '*17:33:17.694 UTC Thu Jul 4 2019'
}
ThreadPoolExecutor - 0_1 root INFO: <= == 17: 33: 23.230123 Received: 192.168 .100 .3 {
   '192.168.100.3': '*17:33:23.188 UTC Thu Jul 4 2019'
}
In [1]: from concurrent.futures import ThreadPoolExecutor

In [2]: from netmiko_threads_submit_futures import send_show

In [3]: executor = ThreadPoolExecutor(max_workers=2)

In [4]: f1 = executor.submit(send_show, r1, 'sh clock')
...: f2 = executor.submit(send_show, r2, 'sh clock')
...: f3 = executor.submit(send_show, r3, 'sh clock')
...:
ThreadPoolExecutor-0_0 root INFO: ===> 17:53:19.656867 Connection: 192.168.100.1
ThreadPoolExecutor-0_1 root INFO: ===> 17:53:19.657252 Connection: 192.168.100.2

In [5]: print(f1, f2, f3, sep='\n')
<Future at 0xb488e2ac state=running>
   <Future at 0xb488ef2c state=running>
      <Future at 0xb488e72c state=pending>

         ThreadPoolExecutor-0_1 root INFO: <=== 17:53:25.757704 Received: 192.168.100.2 ThreadPoolExecutor-0_1 root INFO:===> 17:53:25.869368 Connection: 192.168.100.3

            In [6]: print(f1, f2, f3, sep='\n')
            <Future at 0xb488e2ac state=running>
               <Future at 0xb488ef2c state=finished returned dict>
                  <Future at 0xb488e72c state=running>

                     ThreadPoolExecutor-0_0 root INFO: <=== 17:53:30.431207 Received: 192.168.100.1 ThreadPoolExecutor-0_1 root INFO: <===17:53:31.636523 Received: 192.168.100.3 In [7]: print(f1, f2, f3, sep='\n' ) <Future at 0xb488e2ac state=finished returned dict>
                        <Future at 0xb488ef2c state=finished returned dict>
                           <Future at 0xb488e72c state=finished returned dict>
from concurrent.futures
import ThreadPoolExecutor, as_completed
from pprint
import pprint
from datetime
import datetime
import time
import logging

import yaml
from netmiko
import ConnectHandler, NetMikoAuthenticationException

logging.getLogger("paramiko").setLevel(logging.WARNING)

logging.basicConfig(
   format = '%(threadName)s %(name)s %(levelname)s: %(message)s',
   level = logging.INFO)

def send_show(device_dict, command):
   start_msg = '===> {} Connection: {}'
received_msg = '<=== {} Received: {}'
ip = device_dict['host']
logging.info(start_msg.format(datetime.now().time(), ip))
if ip == '192.168.100.1':
   time.sleep(5)

with ConnectHandler( ** device_dict) as ssh:
   ssh.enable()
result = ssh.send_command(command)
logging.info(received_msg.format(datetime.now().time(), ip))
return {
   ip: result
}

def send_command_to_devices(devices, command):
   data = {}
with ThreadPoolExecutor(max_workers = 2) as executor:
   future_list = []
for device in devices:
   future = executor.submit(send_show, device, command)
future_list.append(future)
print('Future: {} for device {}'.format(future, device['host']))
for f in as_completed(future_list):
   result = f.result()
print('Future done {}'.format(f))
data.update(result)
return data

if __name__ == '__main__':
   with open('devices.yaml') as f:
   devices = yaml.safe_load(f)
pprint(send_command_to_devices(devices, 'sh clock'))
$ python netmiko_threads_submit_futures.py
Future: <Future at 0xb5ed938c state=running> for device 192.168.100.1
ThreadPoolExecutor-0_0 root INFO: ===> 07:14:26.298007 Connection: 192.168.100.1
Future: <Future at 0xb5ed96cc state=running> for device 192.168.100.2
Future: <Future at 0xb5ed986c state=pending> for device 192.168.100.3
ThreadPoolExecutor-0_1 root INFO: ===> 07:14:26.299095 Connection: 192.168.100.2
ThreadPoolExecutor-0_1 root INFO: <=== 07:14:32.056003 Received: 192.168.100.2
ThreadPoolExecutor-0_1 root INFO: ===> 07:14:32.164774 Connection: 192.168.100.3
Future done <Future at 0xb5ed96cc state=finished returned dict>
ThreadPoolExecutor-0_0 root INFO: <=== 07:14:36.714923 Received: 192.168.100.1
Future done <Future at 0xb5ed938c state=finished returned dict>
ThreadPoolExecutor-0_1 root INFO: <=== 07:14:37.577327 Received: 192.168.100.3
Future done <Future at 0xb5ed986c state=finished returned dict>
{'192.168.100.1': '*07:14:36.546 UTC Fri Jul 26 2019',
 '192.168.100.2': '*07:14:31.865 UTC Fri Jul 26 2019',
 '192.168.100.3': '*07:14:37.413 UTC Fri Jul 26 2019'}

Suggestion : 8

Executor is an abstract class that provides methods to execute calls asynchronously. It should not be used directly, but through its two subclasses: ThreadPoolExecutor and ProcessPoolExecutor.,Signal the executor that it should free any resources that it is using when the currently pending futures are done executing. Calls to Executor.submit() and Executor.map() made after shutdown will raise RuntimeError.,This method should only be called by Executor implementations before executing the work associated with the Future and by unit tests.,The Future class encapulates the asynchronous execution of a callable. Future instances are created by Executor.submit().

with ThreadPoolExecutor(max_workers = 1) as executor:
   future = executor.submit(pow, 323, 1235)
print(future.result())
import shutil
with ThreadPoolExecutor(max_workers = 4) as e:
   e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
e.submit(shutil.copy, 'src3.txt', 'dest4.txt')
import time
def wait_on_b():
   time.sleep(5)
print(b.result()) # b will never complete because it is waiting on a.
return 5

def wait_on_a():
   time.sleep(5)
print(a.result()) # a will never complete because it is waiting on b.
return 6

executor = ThreadPoolExecutor(max_workers = 2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
def wait_on_future():
   f = executor.submit(pow, 5, 2)
# This will never complete because there is only one worker thread and
# it is executing this
function.
print(f.result())

executor = ThreadPoolExecutor(max_workers = 1)
executor.submit(wait_on_future)
from concurrent
import futures
import urllib.request

URLS = ['http://www.foxnews.com/',
   'http://www.cnn.com/',
   'http://europe.wsj.com/',
   'http://www.bbc.co.uk/',
   'http://some-made-up-domain.com/'
]

def load_url(url, timeout):
   return urllib.request.urlopen(url, timeout = timeout).read()

with futures.ThreadPoolExecutor(max_workers = 5) as executor:
   future_to_url = dict((executor.submit(load_url, url, 60), url) for url in URLS)

for future in futures.as_completed(future_to_url):
   url = future_to_url[future]
if future.exception() is not None:
   print('%r generated an exception: %s' % (url,
      future.exception()))
else:
   print('%r page is %d bytes' % (url, len(future.result())))
import math

PRIMES = [
   112272535095293,
   112582705942171,
   112272535095293,
   115280095190773,
   115797848077099,
   1099726899285419
]

def is_prime(n):
   if n % 2 == 0:
   return False

sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
   if n % i == 0:
   return False
return True

def main():
   with futures.ProcessPoolExecutor() as executor:
   for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
   print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
   main()