Prerequisite: Multithreading
Threading allows parallelism of code and Python language has two ways to achieve its 1st is via multiprocessing module and 2nd is via multithreading module. Multithreading is well suited to speed up I/O bound tasks like making a web request, or database operations, or reading/writing to a file. In contrast to this CPU intensive tasks like mathematical computational tasks are benefited the most using multiprocessing. This happens due to GIL (Global Interpreter Lock).
From Python 3.2 onwards a new class called ThreadPoolExecutor was introduced in Python in concurrent.futures module to efficiently manage and create threads. But wait if python already had a threading module inbuilt then why a new module was introduced. Let me answer this first.
- Spawning new threads on the fly is not a problem when the number of threads is less, but it becomes really cumbersome to manage threads if we are dealing with many threads. Apart from this, it is computationally inefficient to create so many threads which will lead to a decline in throughput. An approach to keep up the throughput is to create & instantiate a pool of idle threads beforehand and reuse the threads from this pool until all the threads are exhausted. This way the overhead of creating new threads is reduced.
- Also, the pool keeps track and manages the threads lifecycle and schedules them on the programmer’s behalf thus making the code much simpler and less buggy.
Syntax: concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix=”, initializer=None, initargs=())
Parameters:
- max_workers: It is a number of Threads aka size of pool. From 3.8 onwards default value is min(32, os.cpu_count() + 4). Out of these 5 threads are preserved for I/O bound task.
- thread_name_prefix : thread_name_prefix was added from python 3.6 onwards to give names to thread for easier debugging purpose.
- initializer: initializer takes a callable which is invoked on start of each worker thread.
- initargs: It’s a tuple of arguments passed to initializer.
ThreadPoolExecutor Methods :
ThreadPoolExecutor class exposes three methods to execute threads asynchronously. A detailed explanation is given below.
- submit(fn, *args, **kwargs): It runs a callable or a method and returns a Future object representing the execution state of the method.
- map(fn, *iterables, timeout = None, chunksize = 1) :
- It maps the method and iterables together immediately and will raise an exception concurrent. futures.TimeoutError if it fails to do so within the timeout limit.
- If the iterables are very large, then having a chunk-size larger than 1 can improve performance when using ProcessPoolExecutor but with ThreadPoolExecutor it has no such advantage, ie it can be left to its default value.
- shutdown(wait = True, *, cancel_futures = False):
- It signals the executor to free up all resources when the futures are done executing.
- It must be called before executor.submit() and executor.map() method else it would throw RuntimeError.
- wait=True makes the method not to return until execution of all threads is done and resources are freed up.
- cancel_futures=True then the executor will cancel all the future threads that are yet to start.
Example 1:
The below code demonstrates the use of ThreadPoolExecutor, notice unlike with the threading module we do not have to explicitly call using a loop, keeping a track of thread using a list or wait for threads using join for synchronization, or releasing the resources after the threads are finished everything is taken under the hood by the constructor itself making the code compact and bug-free.
Python3
from concurrent.futures import ThreadPoolExecutor from time import sleep values = [ 3 , 4 , 5 , 6 ] def cube(x): print (f 'Cube of {x}:{x*x*x}' ) if __name__ = = '__main__' : result = [] with ThreadPoolExecutor(max_workers = 5 ) as exe: exe.submit(cube, 2 ) # Maps the method 'cube' with a list of values. result = exe. map (cube,values) for r in result: print (r) |
Output:
Output: Cube of 2:8 Cube of 3:27 Cube of 4:64 Cube of 5:125 Cube of 6:216
Example 2:
The below code is fetching images over the internet by making an HTTP request, I am using the request library for the same. The first section of the code makes a one-to-one call to the API and i.e the download is slow, whereas the second section of the code makes a parallel request using threads to fetch API.
You can try all various parameters discussed above to see how it tunes the speedup for example if I make a thread pool of 6 instead of 3 the speedup is more significant.
Python3
import requests import time import concurrent.futures img_urls = [ ] t1 = time.perf_counter() def download_image(img_url): img_bytes = requests.get(img_url).content print ( "Downloading.." ) # Download images 1 by 1 => slow for img in img_urls: download_image(img) t2 = time.perf_counter() print (f 'Single Threaded Code Took :{t2 - t1} seconds' ) print ( '*' * 50 ) t1 = time.perf_counter() def download_image(img_url): img_bytes = requests.get(img_url).content print ( "Downloading.." ) # Fetching images concurrently thus speeds up the download. with concurrent.futures.ThreadPoolExecutor( 3 ) as executor: executor. map (download_image, img_urls) t2 = time.perf_counter() print (f 'MultiThreaded Code Took:{t2 - t1} seconds' ) |
Output:
Downloading.. Downloading.. Downloading.. Downloading.. Downloading.. Downloading.. Single Threaded Code Took :2.5529379630024778 seconds ************************************************** Downloading.. Downloading.. Downloading.. Downloading.. Downloading.. Downloading.. MultiThreaded Code Took:0.5221083430078579 seconds