Prerequisite – Multiprocessing
It allows parallelism of code and the Python language has two ways to achieve its 1st is via multiprocessing module and 2nd is via multithreading module. From Python 3.2 onwards a new class called ProcessPoolExecutor was introduced in python in concurrent. The futures module to efficiently manage and create Process. But wait, if python already had a multiprocessing module inbuilt then why a new module was introduced. Let me answer this first.
- Spawning a new process on the fly is not a problem when the number of processes is less, but it becomes really cumbersome to manage processes if we are dealing with many processes. Apart from this, it is computationally inefficient to create so many processes which will lead to a decline in throughput. An approach to keep up the throughput is to create & instantiate a pool of idle Processes beforehand and reuse the Processes from this pool until all the Processes are exhausted. This way the overhead of creating new Processes is reduced.
- Also, the pool keeps track and manages the Process lifecycle and schedules them on the programmer’s behalf thus making the code much simpler and less buggy.
Syntax:
concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=”, initializer=None, initargs=())
Parameters:
- max_workers: It is number of Process aka size of pool. If the value is None, then on Windows by default 61 process are created even if number of cores available is more than that.
- mp_context: It is the multiprocessing context, If None or empty then the default multiprocessing context is used. It allows user to control starting method.
- initializer: initializer takes a callable which is invoked on start of each worker Process.
- initargs: It’s a tuple of arguments passed to initializer.
ProcessPoolExecutor Methods: ProcessPoolExecutor class exposes the following methods to execute Process asynchronously. A detailed explanation is given below.
- submit(fn, *args, **kwargs): It runs a callable or a method and returns a Future object representing the execution state of the method.
- map(fn, *iterables, timeout=None, chunksize=1): It maps the method and iterables together immediately and will raise an exception concurrent. futures.TimeoutError if it fails to do so within the timeout limit.
- If the iterables are very large, then having a chunk-size larger than 1 can improve performance when using ProcessPoolExecutor .
- shutdown(wait=True, *, cancel_futures=False):
- It signals the executor to free up all resources when the futures are done executing.
- It must be called before executor.submit() and executor.map() method else it would throw RuntimeError.
- wait=True makes the method not to return until execution of all threads is done and resources are freed up.
- cancel_futures=True then the executor will cancel all the future threads that are yet to start.
- Cancel(): It Attempts to cancel the call, if the call cannot be canceled then it returns False, else True.
- cancelled(): returns True if the call is canceled.
- running(): return True if the process is running and cannot be canceled.
- done(): returns True if the process has finished executing.
- result(timeout=None): returns the value which is returned by the process, if the process is still in execution then it waits for the timeout specified else raises a TimeoutError, if None is specified it will wait forever for the process to finish.
- add_done_callback(fn): Attaches a callback function which is called when the process finishes its execution.
Example 1
The below code demonstrates the use of ProcessPoolExecutor, notice unlike with the multiprocessing module we do not have to explicitly call using a loop, keeping a track of the process using a list or wait for the process using join for synchronization, or releasing the resources after the Process are finished everything is taken under the hood by the constructor itself making the code compact and bug-free.
Python3
from concurrent.futures import ProcessPoolExecutor from time import sleep values = [ 3 , 4 , 5 , 6 ] def cube(x): print (f 'Cube of {x}:{x*x*x}' ) if __name__ = = '__main__' : result = [] with ProcessPoolExecutor(max_workers = 5 ) as exe: exe.submit(cube, 2 ) # Maps the method 'cube' with a iterable result = exe. map (cube,values) for r in result: print (r) |
Output:
Cube of 2:8 Cube of 3:27 Cube of 6:216 Cube of 4:64 Cube of 5:125
Example 2
The below code is fetching images over the internet by making an HTTP request, I am using the request library for the same. The first section of the code makes a one-to-one call to the API and i.e the download is slow, whereas the second section of the code makes a parallel request using multiple Processes to fetch API.
You can try all various parameters discussed above to see how it tunes the speedup for example if I make a Process pool of 6 instead of 3 the speedup is more significant.
Python3
import requests import time import os import concurrent.futures img_urls = [ ] t1 = time.time() print ( "Downloading images with single process" ) def download_image(img_url): img_bytes = requests.get(img_url).content print ( "Downloading.." ) for img in img_urls: download_image(img) t2 = time.time() print (f 'Single Process Code Took :{t2-t1} seconds' ) print ( '*' * 50 ) t1 = time.time() print ( "Downloading images with Multiprocess" ) def download_image(img_url): img_bytes = requests.get(img_url).content print (f "[Process ID]:{os.getpid()} Downloading.." ) with concurrent.futures.ProcessPoolExecutor( 3 ) as exe: exe. map (download_image, img_urls) t2 = time.time() print (f 'Multiprocess Code Took:{t2-t1} seconds' ) |
Output:
Downloading images with single process Downloading.. Downloading.. Downloading.. Downloading.. Downloading.. Downloading.. Single Process Code Took :1.2382981777191162 seconds ************************************************** Downloading images with Multiprocess [Process ID]:118741 Downloading.. [Process ID]:118742 Downloading.. [Process ID]:118740 Downloading.. [Process ID]:118741 Downloading.. [Process ID]:118742 Downloading.. [Process ID]:118740 Downloading.. Multiprocess Code Took:0.8398590087890625 seconds