Multiprocessing is a technique in computer science by which a computer can perform multiple tasks or processes simultaneously using a multi-core CPU or multiple GPUs. It is a type of parallel processing in which a program is divided into smaller jobs that can be carried out simultaneously. The program is able to utilize all of the hardware resources because each task is given to a distinct process that can operate on a different core or processor. The performance can be increased by multiprocessing by speeding up the process of completing a job. This is achieved by breaking down the task into smaller parts, which can be executed simultaneously, and then combining the results to produce the final output. By utilizing multiple cores or CPUs, multiprocessing can also help to reduce the load on each individual core, preventing bottlenecks and improving overall system performance.
In this Article, we try to understand how to do multiprocessing using PyTorch torch.multiprocessing importing which helps to do high time-consuming work through multiple processes.
Training Neural Networks using Pytorch
The general training pipeline in Pytorch generally includes 3 steps:
- First, we design the model with the number of inputs and number of outputs and also, we design the forward pass with all different operations or all the different layers.
- Then, we construct the loss and optimizer
- In the last step, we iterate the training loop a couple of times which basically includes:
- We start with our forward pass, i.e., here we compute our prediction
- Then, we do the backward pass, i.e, where we get the gradients
- Finally, we update our weights
Out of all the steps, Step 2 and Step 3 may take a lot of time, if the training data is large. To avoid this, we can split the task into different processes, using the multiprocessing feature of Pytorch. We generally use this feature to reduce the time to train neural networks and sometimes, also to reduce workload over one GPU.
Installation:
Make sure all the requirements are installed. If not, you can install them using the commands below:
Installing Python:
sudo apt install python
Installing PyTorch:
pip install torch
Explanation with Example:
In the example below, we develop a model that works over mathematical functions as shown below and we use Pytorch multiprocessing to train the model.
f(x) = 2 * x Train X: 1, 2, 3, 4 Train Y: 2, 4, 6, 8 Test X : 5 Estimate Y : 10
Multiprocessing in PyTorch
Step 1: Import the necessary libraries
Firstly, We import all required libraries: torch, torch.nn, and torch.multiprocessing.
Python3
import torch import torch.nn as nn import torch.multiprocessing as mp |
Step 2: Training function
After that, we define a function called train that takes a model, input data, and output data as inputs. This function trains the model on the input and output data by performing the following steps:
- A learning rate and the number of iterations are defined.
- After this, a loss function is defined as the mean squared error (MSE) and an optimizer as stochastic gradient descent (SGD) with the specified learning rate.
- Then a Looping is done through the specified number of iterations and the following steps are performed:
- The predicted output of the model is computed on the input data.
- The loss between the predicted output and the actual output is also calculated along with the gradients of the loss with respect to the model parameters.
- Then, the model parameters are updated using the optimizer, and gradients are reset to zero.
- The current epoch number and the current loss are printed every 10 epochs.
Python3
# Define the training function def train(model, X, Y): # Define the learning rate, number of iterations, and loss function learning_rate = 0.01 n_iters = 100 loss = nn.MSELoss() optimizer = torch.optim.SGD(model.parameters(), lr = learning_rate) # Loop through the specified number of iterations for epoch in range (n_iters): # Make predictions using the model y_predicted = model(X) # Calculate the loss l = loss(Y, y_predicted) # Backpropagate the loss to update the model parameters l.backward() optimizer.step() optimizer.zero_grad() # Print the current loss and weights every 10 epochs if epoch % 10 = = 0 : [w, b] = model.parameters() print (f 'Rank {mp.current_process().name}: epoch {epoch+1}: w = {w[0][0].item():.3f}, loss = {l:.3f}' ) |
Step 3: Define the model and multiprocessing
Main function
After this, the main function is defined :
- sets up the input and output data,
- creates a linear model,
- Print the predicted output before training the model on a test input.
# input and output data X = torch.tensor([[1], [2], [3], [4]], dtype=torch.float32) Y = torch.tensor([[2], [4], [6], [8]], dtype=torch.float32) n_samples, n_features = X.shape # Print the number of samples and features print(f'#samples: {n_samples}, #features: {n_features}') # Define the test input and the model input/output sizes X_test = torch.tensor([5], dtype=torch.float32) input_size = n_features output_size = n_features # Define the linear model and print its prediction on the test input before training model = nn.Linear(input_size, output_size) print(f'Prediction before training: f(5) = {model(X_test).item():.3f}')
Multiprocessing
- Multiprocessing is set up by defining the number of processes to use and creating a list of processes.
- model.share_memory() method helps to allocate shared memory for the model parameters so that they can be accessed by multiple processes.
- For each, a new process is created that calls the train function with the same model and input/output data as before. This is done using mp.Process() method with arguments target=train, args=(model, X, Y,).
- Here train function is called to train the model.
- p.start() starts the process and p.join() waits for the process to finish before continuing. Using this, we start each process and add it to the list of processes and wait for each process to finish.
# Number of processes num_processes = 4 # Share the model's memory to allow it to be accessed by multiple processes model.share_memory() # Create a list of processes and start each process with the train function processes = [] for rank in range(num_processes): p = mp.Process(target=train, args=(model, X, Y,), name=f'Process-{rank}') p.start() processes.append(p) print(f'Started {p.name}') # Wait for all processes to finish for p in processes: p.join() print(f'Finished {p.name}')
Print the final predicted output
- Finally, the predicted output of the trained model on the test input is displayed, which should now be different after the training is performed in parallel by the multiple processes.
- Thus, by using multiprocessing, we can train the model in parallel using multiple processes, which can speed up the training process on multi-core CPUs or multi-GPU systems.
Python3
# Main function if __name__ = = '__main__' : # Set the number of processes and define the input and output data X = torch.tensor([[ 1 ], [ 2 ], [ 3 ], [ 4 ]], dtype = torch.float32) Y = torch.tensor([[ 2 ], [ 4 ], [ 6 ], [ 8 ]], dtype = torch.float32) n_samples, n_features = X.shape # Print the number of samples and features print (f '#samples: {n_samples}, #features: {n_features}' ) # Define the test input and the model input/output sizes X_test = torch.tensor([ 5 ], dtype = torch.float32) input_size = n_features output_size = n_features # Define the linear model and print its prediction on the test input before training model = nn.Linear(input_size, output_size) print (f 'Prediction before training: f(5) = {model(X_test).item():.3f}' ) # Number of processes num_processes = 4 # Share the model's memory to allow it to be accessed by multiple processes model.share_memory() # Create a list of processes and start each process with the train function processes = [] for rank in range (num_processes): p = mp.Process(target = train, args = (model, X, Y,), name = f 'Process-{rank}' ) p.start() processes.append(p) print (f 'Started {p.name}' ) # Wait for all processes to finish for p in processes: p.join() print (f 'Finished {p.name}' ) # Print the model's prediction on the test input after training print (f 'Prediction after training: f(5) = {model(X_test).item():.3f}' ) |
Output:
#samples: 4, #features: 1 Prediction before training: f(5) = 1.969 Started Process-0 RankProcess-0:epoch 1:w = 0.787,loss = 21.710Started Process-1 Started Process-2 RankProcess-0:epoch 11:w = 1.852,loss = 0.566 RankProcess-1:epoch 1:w = 1.886,loss = 0.394 RankProcess-0:epoch 21:w = 2.038,loss = 0.010 RankProcess-1:epoch 11:w = 2.049,loss = 0.004 RankProcess-2:epoch 1:w = 2.052,loss = 0.005 RankProcess-0:epoch 31:w = 2.051,loss = 0.004 RankProcess-2:epoch 11:w = 2.050,loss = 0.004 RankProcess-3:epoch 1:w = 2.050,loss = 0.004 RankProcess-1:epoch 21:w = 2.047,loss = 0.003 RankProcess-3:epoch 11:w = 2.045,loss = 0.003 RankProcess-2:epoch 21:w = 2.044,loss = 0.003 RankProcess-0:epoch 41:w = 2.047,loss = 0.003 RankProcess-1:epoch 31:w = 2.041,loss = 0.003 RankProcess-3:epoch 21:w = 2.041,loss = 0.002 RankProcess-0:epoch 51:w = 2.040,loss = 0.002 RankProcess-2:epoch 31:w = 2.039,loss = 0.002 RankProcess-1:epoch 41:w = 2.037,loss = 0.002 RankProcess-0:epoch 61:w = 2.035,loss = 0.002 RankProcess-3:epoch 31:w = 2.036,loss = 0.002 RankProcess-0:epoch 71:w = 2.032,loss = 0.001 RankProcess-1:epoch 51:w = 2.035,loss = 0.002 RankProcess-3:epoch 41:w = 2.032,loss = 0.001 RankProcess-0:epoch 81:w = 2.031,loss = 0.001 RankProcess-2:epoch 41:w = 2.032,loss = 0.002 RankProcess-1:epoch 61:w = 2.029,loss = 0.001 RankProcess-3:epoch 51:w = 2.029,loss = 0.001 RankProcess-3:epoch 61:w = 2.028,loss = 0.001 RankProcess-2:epoch 51:w = 2.026,loss = 0.001 RankProcess-3:epoch 71:w = 2.025,loss = 0.001 RankProcess-0:epoch 91:w = 2.026,loss = 0.001 RankProcess-1:epoch 71:w = 2.025,loss = 0.001 RankProcess-2:epoch 61:w = 2.024,loss = 0.001 RankProcess-3:epoch 81:w = 2.022,loss = 0.001 RankProcess-1:epoch 81:w = 2.023,loss = 0.001 RankProcess-2:epoch 71:w = 2.023,loss = 0.001 Started Process-3 RankProcess-1:epoch 91:w = 2.021,loss = 0.001 RankProcess-2:epoch 81:w = 2.020,loss = 0.001 Finished Process-0 RankProcess-3:epoch 91:w = 2.020,loss = 0.001 RankProcess-2:epoch 91:w = 2.018,loss = 0.000 Finished Process-1 Finished Process-2 Finished Process-3 Prediction after training: f(5) = 10.036
Full implementations code:
Python3
# Import the necessary libraries import torch import torch.nn as nn import torch.multiprocessing as mp # Define the training function def train(model, X, Y): # Define the learning rate, number of iterations, and loss function learning_rate = 0.01 n_iters = 100 loss = nn.MSELoss() optimizer = torch.optim.SGD(model.parameters(), lr = learning_rate) # Loop through the specified number of iterations for epoch in range (n_iters): # Make predictions using the model y_predicted = model(X) # Calculate the loss l = loss(Y, y_predicted) # Backpropagate the loss to update the model parameters l.backward() optimizer.step() optimizer.zero_grad() # Print the current loss and weights every 10 epochs if epoch % 10 = = 0 : [w, b] = model.parameters() print ( f "Rank {mp.current_process().name}: epoch {epoch+1}: w = {w[0][0].item():.3f}, loss = {l:.3f}" ) # Main function if __name__ = = "__main__" : # Set the number of processes and define the input and output data num_processes = 4 X = torch.tensor([[ 1 ], [ 2 ], [ 3 ], [ 4 ]], dtype = torch.float32) Y = torch.tensor([[ 2 ], [ 4 ], [ 6 ], [ 8 ]], dtype = torch.float32) n_samples, n_features = X.shape # Print the number of samples and features print (f "#samples: {n_samples}, #features: {n_features}" ) # Define the test input and the model input/output sizes X_test = torch.tensor([ 5 ], dtype = torch.float32) input_size = n_features output_size = n_features # Define the linear model and print its prediction on the test input before training model = nn.Linear(input_size, output_size) print (f "Prediction before training: f(5) = {model(X_test).item():.3f}" ) # Share the model's memory to allow it to be accessed by multiple processes model.share_memory() # Create a list of processes and start each process with the train function processes = [] for rank in range (num_processes): p = mp.Process( target = train, args = ( model, X, Y, ), name = f "Process-{rank}" , ) p.start() processes.append(p) print (f "Started {p.name}" ) # Wait for all processes to finish for p in processes: p.join() print (f "Finished {p.name}" ) # Print the model's prediction on the test input after training print (f "Prediction after training: f(5) = {model(X_test).item():.3f}" ) |
Output:
#samples: 4, #features: 1 Prediction before training: f(5) = 3.966 Started Process-0 Started Process-1 Started Process-2 Rank Process-0: epoch 1: w = 0.813, loss = 9.314 Rank Process-0: epoch 11: w = 1.515, loss = 0.416 Started Process-3 Rank Process-0: epoch 21: w = 1.637, loss = 0.176 Rank Process-0: epoch 31: w = 1.665, loss = 0.160 Rank Process-0: epoch 41: w = 1.678, loss = 0.150 Rank Process-0: epoch 51: w = 1.688, loss = 0.142 Rank Process-0: epoch 61: w = 1.697, loss = 0.133 Rank Process-0: epoch 71: w = 1.706, loss = 0.126 Rank Process-1: epoch 1: w = 1.707, loss = 0.157 Rank Process-2: epoch 1: w = 1.782, loss = 1.700 Rank Process-1: epoch 11: w = 1.728, loss = 0.112 Rank Process-0: epoch 81: w = 1.730, loss = 0.107 Rank Process-2: epoch 11: w = 1.743, loss = 0.097 Rank Process-1: epoch 21: w = 1.745, loss = 0.095 Rank Process-0: epoch 91: w = 1.762, loss = 0.083 Rank Process-2: epoch 21: w = 1.763, loss = 0.083 Rank Process-1: epoch 31: w = 1.764, loss = 0.081 Rank Process-2: epoch 31: w = 1.781, loss = 0.070 Rank Process-1: epoch 41: w = 1.781, loss = 0.070 Rank Process-2: epoch 41: w = 1.790, loss = 0.064 Rank Process-2: epoch 51: w = 1.797, loss = 0.060 Finished Process-0 Rank Process-3: epoch 1: w = 1.800, loss = 0.133 Rank Process-2: epoch 61: w = 1.811, loss = 0.053 Rank Process-1: epoch 51: w = 1.813, loss = 0.052 Rank Process-3: epoch 11: w = 1.819, loss = 0.048 Rank Process-1: epoch 61: w = 1.826, loss = 0.044 Rank Process-2: epoch 71: w = 1.828, loss = 0.043 Rank Process-3: epoch 21: w = 1.836, loss = 0.040 Rank Process-1: epoch 71: w = 1.839, loss = 0.038 Rank Process-2: epoch 81: w = 1.849, loss = 0.034 Rank Process-3: epoch 31: w = 1.849, loss = 0.033 Rank Process-1: epoch 81: w = 1.851, loss = 0.033 Rank Process-2: epoch 91: w = 1.861, loss = 0.028 Rank Process-3: epoch 41: w = 1.861, loss = 0.028 Rank Process-1: epoch 91: w = 1.865, loss = 0.027 Rank Process-3: epoch 51: w = 1.874, loss = 0.023 Rank Process-3: epoch 61: w = 1.877, loss = 0.022 Finished Process-1 Rank Process-3: epoch 71: w = 1.881, loss = 0.021 Rank Process-3: epoch 81: w = 1.885, loss = 0.019 Rank Process-3: epoch 91: w = 1.888, loss = 0.018 Finished Process-2 Finished Process-3 Prediction after training: f(5) = 9.775
You may also refer to this video for output: