As far as my knowledge goes which is "not so far", you have to call "e.results()" after "executor.submit(testClass.testMethodToExecInParallel)" in order to execute the threadpool . I have tried what you said and it is giving me error, below is the code
>>> import concurrent.futures as cf
>>> executor = cf.ThreadPoolExecutor(1)
>>> def a(x,y):
... print(x+y)
...
>>> future = executor.submit(a, 2, 35, 45)
>>> future.result()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "C:\Users\username
\AppData\Local\Programs\Python\Python37\lib\concurrent\futures\_base.py", line
425, in result
return self.__get_result()
File "C:\Users\username
\AppData\Local\Programs\Python\Python37\lib\concurrent\futures\_base.py", line
384, in __get_result
raise self._exception
File "C:\Users\username
\AppData\Local\Programs\Python\Python37\lib\concurrent\futures\thread.py", line
57, in run
result = self.fn(*self.args, **self.kwargs)
TypeError: a() takes 2 positional arguments but 3 were given
I am trying to use anycodings_python concurrent.futures.ThreadPoolExecutor module anycodings_python to run a class method in parallel, the anycodings_python simplified version of my code is pretty much anycodings_python the following:,the program doesn't return any error; just anycodings_python prints "before try" and ends execution...,(for the first case of mistake) anycodings_python concurrent.futures.ThreadPoolExecutor anycodings_python doesn't check for a function with the anycodings_python specified signature to submit and, anycodings_python eventually, throw some sort of anycodings_python "noSuchFunction" exception?,Or maybe the error is thrown inside the anycodings_python thread and for some reason I can't read it?
I am trying to use anycodings_python concurrent.futures.ThreadPoolExecutor module anycodings_python to run a class method in parallel, the anycodings_python simplified version of my code is pretty much anycodings_python the following:
class TestClass:
def __init__(self, secondsToSleepFor):
self.secondsToSleepFor = secondsToSleepFor
def testMethodToExecInParallel(self):
print("ThreadName: " + threading.currentThread().getName())
print(threading.currentThread().getName() + " is sleeping for " + str(self.secondsToSleepFor) + " seconds")
time.sleep(self.secondsToSleepFor)
print(threading.currentThread().getName() + " has finished!!")
with concurrent.futures.ThreadPoolExecutor(max_workers = 2) as executor:
futuresList = []
print("before try")
try:
testClass = TestClass(3)
future = executor.submit(testClass.testMethodToExecInParallel)
futuresList.append(future)
except Exception as exc:
print('Exception generated: %s' % exc)
If I execute this code it seems to behave anycodings_python like it is intended to. But if I make a anycodings_python mistake like specifying a wrong number of anycodings_python parameters in "testMethodToExecInParallel" anycodings_python like:
def testMethodToExecInParallel(self, secondsToSleepFor):
and then still submitting the function as:
future = executor.submit(testClass.testMethodToExecInParallel)
As far as my knowledge goes which is anycodings_multithreading "not so far", you have to call anycodings_multithreading "e.results()" after anycodings_multithreading "executor.submit(testClass.testMethodToExecInParallel)" anycodings_multithreading in order to execute the threadpool . I anycodings_multithreading have tried what you said and it is anycodings_multithreading giving me error, below is the code
>>> import concurrent.futures as cf
>>> executor = cf.ThreadPoolExecutor(1)
>>> def a(x,y):
... print(x+y)
...
>>> future = executor.submit(a, 2, 35, 45)
>>> future.result()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "C:\Users\username
\AppData\Local\Programs\Python\Python37\lib\concurrent\futures\_base.py", line
425, in result
return self.__get_result()
File "C:\Users\username
\AppData\Local\Programs\Python\Python37\lib\concurrent\futures\_base.py", line
384, in __get_result
raise self._exception
File "C:\Users\username
\AppData\Local\Programs\Python\Python37\lib\concurrent\futures\thread.py", line
57, in run
result = self.fn(*self.args, **self.kwargs)
TypeError: a() takes 2 positional arguments but 3 were given
The concurrent.futures module provides a high-level interface for asynchronously executing callables.,The asynchronous execution can be performed with threads, using ThreadPoolExecutor, or separate processes, using ProcessPoolExecutor. Both implement the same interface, which is defined by the abstract Executor class.,The Future class encapsulates the asynchronous execution of a callable. Future instances are created by Executor.submit().,Encapsulates the asynchronous execution of a callable. Future instances are created by Executor.submit() and should not be created directly except for testing.
with ThreadPoolExecutor(max_workers = 1) as executor:
future = executor.submit(pow, 323, 1235)
print(future.result())
import shutil
with ThreadPoolExecutor(max_workers = 4) as e:
e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
import time
def wait_on_b():
time.sleep(5)
print(b.result()) # b will never complete because it is waiting on a.
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # a will never complete because it is waiting on b.
return 6
executor = ThreadPoolExecutor(max_workers = 2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
def wait_on_future():
f = executor.submit(pow, 5, 2)
# This will never complete because there is only one worker thread and
# it is executing this
function.
print(f.result())
executor = ThreadPoolExecutor(max_workers = 1)
executor.submit(wait_on_future)
import concurrent.futures import urllib.request URLS = ['http://www.foxnews.com/', 'http://www.cnn.com/', 'http://europe.wsj.com/', 'http://www.bbc.co.uk/', 'http://some-made-up-domain.com/' ] # Retrieve a single page and report the URL and contents def load_url(url, timeout): with urllib.request.urlopen(url, timeout = timeout) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers = 5) as executor: # Start the load operations and mark each future with its URL future_to_url = { executor.submit(load_url, url, 60): url for url in URLS } for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: print('%r page is %d bytes' % (url, len(data)))
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419
]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
July 10, 2021 | 14 Minute Read
from concurrent.futures
import as_completed, ThreadPoolExecutor
def task(num):
return f "I'm running in a thread: {num}"
futures = []
with ThreadPoolExecutor(max_workers = 2) as executor:
for i in range(1, 11):
futures.append(executor.submit(task, i))
for future in as_completed(futures):
print(future.result())
from concurrent.futures
import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers = 2) as executor:
executor.shutdown(wait = True)
from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime import time def task(num): if num == 5: raise ValueError("Num is 5") else: # To ensure this task doesn 't complete too fast time.sleep(1) return f "I'm running in a thread: {num}" futures = [] with ThreadPoolExecutor(3) as pool: futures = [ pool.submit(task, i) for i in range(1, 10) ] for future in as_completed(futures): try: n = future.result() print(f "{datetime.now()} - {n}") except ValueError as e: print(f "{datetime.now()} - EXCEPTION! {e}") pool.shutdown(wait = True) print(f "{datetime.now()} - Run complete")
2021 - 07 - 10 11: 42: 15.382637 - I 'm running in a thread: 1 2021 - 07 - 10 11: 42: 15.383645 - I 'm running in a thread: 2 2021 - 07 - 10 11: 42: 16.389649 - I 'm running in a thread: 4 2021 - 07 - 10 11: 42: 16.389885 - EXCEPTION!Num is 5 2021 - 07 - 10 11: 42: 18.397034 - I 'm running in a thread: 3 2021 - 07 - 10 11: 42: 18.397034 - I 'm running in a thread: 7 2021 - 07 - 10 11: 42: 18.398046 - I 'm running in a thread: 6 2021 - 07 - 10 11: 42: 18.398046 - I 'm running in a thread: 9 2021 - 07 - 10 11: 42: 18.399038 - I 'm running in a thread: 8 2021 - 07 - 10 11: 42: 18.400030 - Run complete
2021 - 07 - 10 11: 46: 00.282762 - I 'm running in a thread: 1 2021 - 07 - 10 11: 46: 00.283760 - I 'm running in a thread: 2 2021 - 07 - 10 11: 46: 01.283596 - I 'm running in a thread: 4 2021 - 07 - 10 11: 46: 01.284606 - EXCEPTION!Num is 5 2021 - 07 - 10 11: 46: 01.286596 - I 'm running in a thread: 3 2021 - 07 - 10 11: 46: 02.285767 - I 'm running in a thread: 7 2021 - 07 - 10 11: 46: 02.287289 - I 'm running in a thread: 6 2021 - 07 - 10 11: 46: 03.296363 - I 'm running in a thread: 9 2021 - 07 - 10 11: 46: 03.297305 - I 'm running in a thread: 8 2021 - 07 - 10 11: 46: 03.299306 - Run complete
from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime import time def task(num): if num == 5: raise ValueError("Num is 5") else: # To ensure this task doesn 't complete too fast time.sleep(1) return f "I'm running in a thread: {num}" with ThreadPoolExecutor(2) as pool: futures = [ pool.submit(task, i) for i in range(1, 10) ] for future in as_completed(futures): print([f._state for f in futures ]) try: n = future.result() print(f "{datetime.now()} - {n}") except ValueError as e: print(f "{datetime.now()} - EXCEPTION! {e}") pool.shutdown(wait = False, cancel_futures = True) print(f "{datetime.now()} - Run complete")
Apr 21, 2020 • 19 min read
Executor(Abstract Base Class)│├── ThreadPoolExecutor│││ A concrete subclass of the Executor class to││ manage I / O bound tasks with threading underneath│├── ProcessPoolExecutor│││ A concrete subclass of the Executor class to││ manage CPU bound tasks with multiprocessing underneath
with ThreadPoolExecutor(max_workers = 1) as executor:
future = executor.submit(pow, 323, 1235)
print(future.result())
for task in get_tasks():
perform(task)
import concurrent.futures
with concurrent.futures.Executor() as executor:
futures = {
executor.submit(perform, task) for task in get_tasks()
}
for fut in concurrent.futures.as_completed(futures):
print(f "The outcome is {fut.result()}")
import concurrent.futures
with concurrent.futures.Executor() as executor:
futures = {
executor.submit(perform, task): task
for task in get_tasks()
}
for fut in concurrent.futures.as_completed(futures):
original_task = futures[fut]
print(f "The result of {original_task} is {fut.result()}")
import concurrent.futures
with concurrent.futures.Executor() as executor:
for arg, res in zip(get_tasks(), executor.map(perform, get_tasks())):
print(f "The result of {arg} is {res}")
Posted 16 October 2019
for task in get_tasks_to_do():
perform(task)
import concurrent.futures
with concurrent.futures.Executor() as executor:
futures = {
executor.submit(perform, task) for task in get_tasks_to_do()
}
for fut in concurrent.futures.as_completed(futures):
print(f "The outcome is {fut.result()}")
import concurrent.futures
with concurrent.futures.Executor() as executor:
futures = {
executor.submit(perform, task): task
for task in get_tasks_to_do()
}
for fut in concurrent.futures.as_completed(futures):
original_task = futures[fut]
print(f "The outcome of {original_task} is {fut.result()}")
for task_set in chunked_iterable(get_tasks_to_do(), HOW_MANY_TASKS_AT_ONCE):
with concurrent.futures.Executor() as executor:
futures = {
executor.submit(perform, task)
for task in task_set
}
for fut in concurrent.futures.as_completed(futures):
print(f "The outcome is {fut.result()}")
finished, unfinished = concurrent.futures.wait( futures, return_when = concurrent.futures.FIRST_COMPLETED )
import concurrent.futures import itertools tasks_to_do = get_tasks_to_do() with concurrent.futures.ThreadPoolExecutor() as executor: # Schedule the first N futures.We don 't want to schedule them all # at once, to avoid consuming excessive amounts of memory. futures = { executor.submit(perform, task) for task in itertools.islice(tasks_to_do, HOW_MANY_TASKS_AT_ONCE) } while futures: # Wait for the next future to complete. done, futures = concurrent.futures.wait( futures, return_when = concurrent.futures.FIRST_COMPLETED ) for fut in done: print(f "The outcome is {fut.result()}") # Schedule the next set of futures.We don 't want more than N futures # in the pool at a time, to keep memory consumption down. for task in itertools.islice(tasks_to_do, len(done)): futures.add( executor.submit(perform, task) )
07 Jul 2022 ,08 Aug 2022
from concurrent
import futures
with futures.ThreadPoolExecutor() as executor:
results = futures.wait([executor.submit(download, filename) for filename in filenames])
from concurrent
import futures
with futures.ThreadPoolExecutor() as executor:
results = futures.wait([executor.submit(download, filename) for filename in filenames])
for result in results.done:
result.result()
from concurrent
import futures
with futures.ThreadPoolExecutor() as executor:
results = futures.wait([download(filename) for filename in filenames])
for result in results.done:
if result.exception() is not None:
raise result.exception()
Using a concurrent.futures.ThreadPoolExecutor makes the Python threading example code almost identical to the multiprocessing module.,Multithreading (sometimes simply "threading") is when a program creates multiple threads with execution cycling among them, so one longer-running task doesn’t block all the others. This works well for tasks that can be broken down into smaller subtasks, which can then each be given to a thread to be completed.,The multiprocessing module is easier to drop in than the threading module, as we don’t need to add a class like the Python threading example. The only changes we need to make are in the main function.,Something new since Python 3.2 that wasn’t touched upon in the original article is the concurrent.futures package. This package provides yet another way to use concurrency and parallelism with Python.
This is what the script looks like:
import json
import logging
import os
from pathlib
import Path
from urllib.request
import urlopen, Request
logger = logging.getLogger(__name__)
types = {
'image/jpeg',
'image/png'
}
def get_links(client_id):
headers = {
'Authorization': 'Client-ID {}'.format(client_id)
}
req = Request('https://api.imgur.com/3/gallery/random/random/', headers = headers, method = 'GET')
with urlopen(req) as resp:
data = json.loads(resp.read().decode('utf-8'))
return [item['link']
for item in data['data']
if 'type' in item and item['type'] in types
]
def download_link(directory, link):
download_path = directory / os.path.basename(link)
with urlopen(link) as image, download_path.open('wb') as f:
f.write(image.read())
logger.info('Downloaded %s', link)
def setup_download_dir():
download_dir = Path('images')
if not download_dir.exists():
download_dir.mkdir()
return download_dir
Next, we will need to write a module that will use these functions to download the images, one by one. We will name this single.py
. This will contain the main function of our first, naive version of the Imgur image downloader. The module will retrieve the Imgur client ID in the environment variable IMGUR_CLIENT_ID
. It will invoke the setup_download_dir
to create the download destination directory. Finally, it will fetch a list of images using the get_links
function, filter out all GIF and album URLs, and then use download_link
to download and save each of those images to the disk. Here is what single.py
looks like:
import logging
import os
from time
import time
from download
import setup_download_dir, get_links, download_link
logging.basicConfig(level = logging.INFO, format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def main():
ts = time()
client_id = os.getenv('IMGUR_CLIENT_ID')
if not client_id:
raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
download_dir = setup_download_dir()
links = get_links(client_id)
for link in links:
download_link(download_dir, link)
logging.info('Took %s seconds', time() - ts)
if __name__ == '__main__':
main()
This is almost the same as the previous one, with the exception that we now have a new class, DownloadWorker
, which is a descendent of the Python Thread
class. The run method has been overridden, which runs an infinite loop. On every iteration, it calls self.queue.get()
to try and fetch a URL to from a thread-safe queue. It blocks until there is an item in the queue for the worker to process. Once the worker receives an item from the queue, it then calls the same download_link
method that was used in the previous script to download the image to the images directory. After the download is finished, the worker signals the queue that that task is done. This is very important, because the Queue keeps track of how many tasks were enqueued. The call to queue.join()
would block the main thread forever if the workers did not signal that they completed a task.
import logging import os from queue import Queue from threading import Thread from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level = logging.INFO, format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class DownloadWorker(Thread): def __init__(self, queue): Thread.__init__(self) self.queue = queue def run(self): while True: # Get the work from the queue and expand the tuple directory, link = self.queue.get() try: download_link(directory, link) finally: self.queue.task_done() def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) # Create a queue to communicate with the worker threads queue = Queue() # Create 8 worker threads for x in range(8): worker = DownloadWorker(queue) # Setting daemon to True will let the main thread exit even though the workers are blocking worker.daemon = True worker.start() # Put the tasks into the queue as a tuple for link in links: logger.info('Queueing {}'.format(link)) queue.put((download_dir, link)) # Causes the main thread to wait for the queue to finish processing all the tasks queue.join() logging.info('Took %s', time() - ts) if __name__ == '__main__': main()
One last step we need to do is to start up some workers. RQ provides a handy script to run workers on the default queue. Just run rqworker
in a terminal window and it will start a worker listening on the default queue. Please make sure your current working directory is the same as where the scripts reside in. If you want to listen to a different queue, you can run rqworker queue_name
and it will listen to that named queue. The great thing about RQ is that as long as you can connect to Redis, you can run as many workers as you like on as many different machines as you like; therefore, it is very easy to scale up as your application grows. Here is the source for the RQ version:
import logging
import os
from redis
import Redis
from rq
import Queue
from download
import setup_download_dir, get_links, download_link
logging.basicConfig(level = logging.DEBUG, format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)
def main():
client_id = os.getenv('IMGUR_CLIENT_ID')
if not client_id:
raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
download_dir = setup_download_dir()
links = get_links(client_id)
q = Queue(connection = Redis(host = 'localhost', port = 6379))
for link in links:
q.enqueue(download_link, download_dir, link)
if __name__ == '__main__':
main()
Using a concurrent.futures.ThreadPoolExecutor makes the Python threading example code almost identical to the multiprocessing module.
import logging import os from concurrent.futures import ThreadPoolExecutor from functools import partial from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level = logging.INFO, format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) # By placing the executor inside a with block, the executors shutdown method # will be called cleaning up threads. # # By default, the executor sets number of workers to 5 times the number of # CPUs. with ThreadPoolExecutor() as executor: # Create a new partially applied function that stores the directory # argument. # # This allows the download_link function that normally takes two # arguments to work with the map function that expects a function of a # single argument. fn = partial(download_link, download_dir) # Executes fn concurrently using threads on the links iterable.The # timeout is for the entire process, not a single call, so downloading # all images must complete within 30 seconds. executor.map(fn, links, timeout = 30) if __name__ == '__main__': main()