Saturday, January 4, 2025
Google search engine
HomeLanguagesMultiprocessing in Python and PyTorch

Multiprocessing in Python and PyTorch

Multiprocessing is a technique in computer science by which a computer can perform multiple tasks or processes simultaneously using a multi-core CPU or multiple GPUs. It is a type of parallel processing in which a program is divided into smaller jobs that can be carried out simultaneously. The program is able to utilize all of the hardware resources because each task is given to a distinct process that can operate on a different core or processor. The performance can be increased by multiprocessing by speeding up the process of completing a job. This is achieved by breaking down the task into smaller parts, which can be executed simultaneously, and then combining the results to produce the final output. By utilizing multiple cores or CPUs, multiprocessing can also help to reduce the load on each individual core, preventing bottlenecks and improving overall system performance.

In this Article, we try to understand how to do multiprocessing using PyTorch torch.multiprocessing importing which helps to do high time-consuming work through multiple processes.

Training Neural Networks using Pytorch

The general training pipeline in Pytorch generally includes 3 steps:

  1. First, we design the model with the number of inputs and number of outputs and also, we design the forward pass with all different operations or all the different layers.
  2. Then, we construct the loss and optimizer
  3. In the last step, we iterate the training loop a couple of times which basically includes:
    • We start with our forward pass, i.e., here we compute our prediction
    • Then, we do the backward pass, i.e, where we get the gradients
    • Finally, we update our weights

Out of all the steps, Step 2 and Step 3 may take a lot of time, if the training data is large. To avoid this, we can split the task into different processes, using the multiprocessing feature of Pytorch. We generally use this feature to reduce the time to train neural networks and sometimes, also to reduce workload over one GPU.

Installation:

Make sure all the requirements are installed. If not, you can install them using the commands below:

Installing Python:

sudo apt install python

Installing PyTorch:

pip install torch

Explanation with Example: 

In the example below, we develop a model that works over mathematical functions as shown below and we use Pytorch multiprocessing to train the model.

f(x) = 2 * x
Train X: 1, 2, 3, 4
Train Y: 2, 4, 6, 8

Test X : 5
Estimate Y : 10

Multiprocessing in PyTorch

Step 1: Import the necessary libraries

Firstly, We import all required libraries:  torch, torch.nn, and torch.multiprocessing.

Python3




import torch
import torch.nn as nn
import torch.multiprocessing as mp


Step 2: Training function

After that, we define a function called train that takes a model, input data, and output data as inputs. This function trains the model on the input and output data by performing the following steps:

  • A learning rate and the number of iterations are defined.
  • After this, a loss function is defined as the mean squared error (MSE) and an optimizer as stochastic gradient descent (SGD) with the specified learning rate.
  • Then a Looping is done through the specified number of iterations and  the following steps are performed:
    • The predicted output of the model is computed on the input data.
    • The loss between the predicted output and the actual output is also calculated along with the gradients of the loss with respect to the model parameters.
    • Then, the model parameters are updated using the optimizer, and gradients are reset to zero.
    • The current epoch number and the current loss are printed every 10 epochs.

Python3




# Define the training function
def train(model, X, Y):
    # Define the learning rate, number of iterations, and loss function
    learning_rate = 0.01
    n_iters = 100
    loss = nn.MSELoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)
      
    # Loop through the specified number of iterations
    for epoch in range(n_iters):
        # Make predictions using the model
        y_predicted = model(X)
          
        # Calculate the loss
        l = loss(Y, y_predicted)
          
        # Backpropagate the loss to update the model parameters
        l.backward()
        optimizer.step()
        optimizer.zero_grad()
          
        # Print the current loss and weights every 10 epochs
        if epoch % 10 == 0:
            [w, b] = model.parameters() 
            print(f'Rank {mp.current_process().name}: epoch {epoch+1}: w = {w[0][0].item():.3f}, loss = {l:.3f}')


Step 3: Define the model and multiprocessing

Main function

After this, the main function is defined :

  • sets up the input and output data, 
  • creates a linear model, 
  • Print the predicted output before training the model on a test input.
# input and output data
X = torch.tensor([[1], [2], [3], [4]], dtype=torch.float32)
Y = torch.tensor([[2], [4], [6], [8]], dtype=torch.float32)
n_samples, n_features = X.shape

# Print the number of samples and features
print(f'#samples: {n_samples}, #features: {n_features}')

# Define the test input and the model input/output sizes
X_test = torch.tensor([5], dtype=torch.float32)
input_size = n_features
output_size = n_features

# Define the linear model and print its prediction on the test input before training
model = nn.Linear(input_size, output_size)
print(f'Prediction before training: f(5) = {model(X_test).item():.3f}')

Multiprocessing

  • Multiprocessing is set up by defining the number of processes to use and creating a list of processes.
  • model.share_memory() method helps to allocate shared memory for the model parameters so that they can be accessed by multiple processes.
  • For each, a new process is created that calls the train function with the same model and input/output data as before. This is done using mp.Process() method with arguments target=train, args=(model, X, Y,).
    • Here train function is called to train the model.
  •  p.start() starts the process and p.join() waits for the process to finish before continuing.  Using this, we start each process and add it to the list of processes and wait for each process to finish.
# Number of processes
num_processes = 4
# Share the model's memory to allow it to be accessed by multiple processes
model.share_memory()

# Create a list of processes and start each process with the train function
processes = []
for rank in range(num_processes):
    p = mp.Process(target=train, args=(model, X, Y,), name=f'Process-{rank}')
    p.start()
    processes.append(p)
    print(f'Started {p.name}')

# Wait for all processes to finish
for p in processes:
    p.join()
    print(f'Finished {p.name}')

Print the final predicted output

  • Finally, the predicted output of the trained model on the test input is displayed, which should now be different after the training is performed in parallel by the multiple processes.
  • Thus, by using multiprocessing, we can train the model in parallel using multiple processes, which can speed up the training process on multi-core CPUs or multi-GPU systems.

Python3




# Main function
if __name__=='__main__':
    # Set the number of processes and define the input and output data
      
    X = torch.tensor([[1], [2], [3], [4]], dtype=torch.float32)
    Y = torch.tensor([[2], [4], [6], [8]], dtype=torch.float32)
    n_samples, n_features = X.shape
      
    # Print the number of samples and features
    print(f'#samples: {n_samples}, #features: {n_features}')
      
    # Define the test input and the model input/output sizes
    X_test = torch.tensor([5], dtype=torch.float32)
    input_size = n_features
    output_size = n_features
      
    # Define the linear model and print its prediction on the test input before training
    model = nn.Linear(input_size, output_size)
    print(f'Prediction before training: f(5) = {model(X_test).item():.3f}')
      
    # Number of processes
    num_processes = 4
    # Share the model's memory to allow it to be accessed by multiple processes
    model.share_memory()
  
    # Create a list of processes and start each process with the train function
    processes = []
    for rank in range(num_processes):
        p = mp.Process(target=train, args=(model, X, Y,), name=f'Process-{rank}')
        p.start()
        processes.append(p)
        print(f'Started {p.name}')
      
    # Wait for all processes to finish
    for p in processes:
        p.join()
        print(f'Finished {p.name}')
      
    # Print the model's prediction on the test input after training
    print(f'Prediction after training: f(5) = {model(X_test).item():.3f}')


Output:

#samples: 4, #features: 1
Prediction before training: f(5) = 1.969
Started Process-0
RankProcess-0:epoch 1:w = 0.787,loss = 21.710Started Process-1
Started Process-2
RankProcess-0:epoch 11:w = 1.852,loss = 0.566
RankProcess-1:epoch 1:w = 1.886,loss = 0.394
RankProcess-0:epoch 21:w = 2.038,loss = 0.010
RankProcess-1:epoch 11:w = 2.049,loss = 0.004
RankProcess-2:epoch 1:w = 2.052,loss = 0.005
RankProcess-0:epoch 31:w = 2.051,loss = 0.004
RankProcess-2:epoch 11:w = 2.050,loss = 0.004
RankProcess-3:epoch 1:w = 2.050,loss = 0.004
RankProcess-1:epoch 21:w = 2.047,loss = 0.003
RankProcess-3:epoch 11:w = 2.045,loss = 0.003
RankProcess-2:epoch 21:w = 2.044,loss = 0.003
RankProcess-0:epoch 41:w = 2.047,loss = 0.003
RankProcess-1:epoch 31:w = 2.041,loss = 0.003
RankProcess-3:epoch 21:w = 2.041,loss = 0.002
RankProcess-0:epoch 51:w = 2.040,loss = 0.002
RankProcess-2:epoch 31:w = 2.039,loss = 0.002
RankProcess-1:epoch 41:w = 2.037,loss = 0.002
RankProcess-0:epoch 61:w = 2.035,loss = 0.002
RankProcess-3:epoch 31:w = 2.036,loss = 0.002
RankProcess-0:epoch 71:w = 2.032,loss = 0.001
RankProcess-1:epoch 51:w = 2.035,loss = 0.002
RankProcess-3:epoch 41:w = 2.032,loss = 0.001
RankProcess-0:epoch 81:w = 2.031,loss = 0.001
RankProcess-2:epoch 41:w = 2.032,loss = 0.002
RankProcess-1:epoch 61:w = 2.029,loss = 0.001
RankProcess-3:epoch 51:w = 2.029,loss = 0.001
RankProcess-3:epoch 61:w = 2.028,loss = 0.001
RankProcess-2:epoch 51:w = 2.026,loss = 0.001
RankProcess-3:epoch 71:w = 2.025,loss = 0.001
RankProcess-0:epoch 91:w = 2.026,loss = 0.001
RankProcess-1:epoch 71:w = 2.025,loss = 0.001
RankProcess-2:epoch 61:w = 2.024,loss = 0.001
RankProcess-3:epoch 81:w = 2.022,loss = 0.001
RankProcess-1:epoch 81:w = 2.023,loss = 0.001
RankProcess-2:epoch 71:w = 2.023,loss = 0.001
Started Process-3
RankProcess-1:epoch 91:w = 2.021,loss = 0.001
RankProcess-2:epoch 81:w = 2.020,loss = 0.001
Finished Process-0
RankProcess-3:epoch 91:w = 2.020,loss = 0.001
RankProcess-2:epoch 91:w = 2.018,loss = 0.000
Finished Process-1
Finished Process-2
Finished Process-3
Prediction after training: f(5) = 10.036

Full implementations code:

Python3




# Import the necessary libraries
import torch
import torch.nn as nn
import torch.multiprocessing as mp
  
  
# Define the training function
def train(model, X, Y):
    # Define the learning rate, number of iterations, and loss function
    learning_rate = 0.01
    n_iters = 100
    loss = nn.MSELoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)
  
    # Loop through the specified number of iterations
    for epoch in range(n_iters):
        # Make predictions using the model
        y_predicted = model(X)
  
        # Calculate the loss
        l = loss(Y, y_predicted)
  
        # Backpropagate the loss to update the model parameters
        l.backward()
        optimizer.step()
        optimizer.zero_grad()
  
        # Print the current loss and weights every 10 epochs
        if epoch % 10 == 0:
            [w, b] = model.parameters()
            print(
                f"Rank {mp.current_process().name}: epoch {epoch+1}: w = {w[0][0].item():.3f}, loss = {l:.3f}"
            )
  
  
# Main function
if __name__ == "__main__":
    # Set the number of processes and define the input and output data
    num_processes = 4
    X = torch.tensor([[1], [2], [3], [4]], dtype=torch.float32)
    Y = torch.tensor([[2], [4], [6], [8]], dtype=torch.float32)
    n_samples, n_features = X.shape
  
    # Print the number of samples and features
    print(f"#samples: {n_samples}, #features: {n_features}")
  
    # Define the test input and the model input/output sizes
    X_test = torch.tensor([5], dtype=torch.float32)
    input_size = n_features
    output_size = n_features
  
    # Define the linear model and print its prediction on the test input before training
    model = nn.Linear(input_size, output_size)
    print(f"Prediction before training: f(5) = {model(X_test).item():.3f}")
  
    # Share the model's memory to allow it to be accessed by multiple processes
    model.share_memory()
  
    # Create a list of processes and start each process with the train function
    processes = []
    for rank in range(num_processes):
        p = mp.Process(
            target=train,
            args=(
                model,
                X,
                Y,
            ),
            name=f"Process-{rank}",
        )
        p.start()
        processes.append(p)
        print(f"Started {p.name}")
  
    # Wait for all processes to finish
    for p in processes:
        p.join()
        print(f"Finished {p.name}")
  
    # Print the model's prediction on the test input after training
    print(f"Prediction after training: f(5) = {model(X_test).item():.3f}")


Output:

#samples: 4, #features: 1
Prediction before training: f(5) = 3.966
Started Process-0
Started Process-1
Started Process-2
Rank Process-0: epoch 1: w = 0.813, loss = 9.314
Rank Process-0: epoch 11: w = 1.515, loss = 0.416
Started Process-3
Rank Process-0: epoch 21: w = 1.637, loss = 0.176
Rank Process-0: epoch 31: w = 1.665, loss = 0.160
Rank Process-0: epoch 41: w = 1.678, loss = 0.150
Rank Process-0: epoch 51: w = 1.688, loss = 0.142
Rank Process-0: epoch 61: w = 1.697, loss = 0.133
Rank Process-0: epoch 71: w = 1.706, loss = 0.126
Rank Process-1: epoch 1: w = 1.707, loss = 0.157
Rank Process-2: epoch 1: w = 1.782, loss = 1.700
Rank Process-1: epoch 11: w = 1.728, loss = 0.112
Rank Process-0: epoch 81: w = 1.730, loss = 0.107
Rank Process-2: epoch 11: w = 1.743, loss = 0.097
Rank Process-1: epoch 21: w = 1.745, loss = 0.095
Rank Process-0: epoch 91: w = 1.762, loss = 0.083
Rank Process-2: epoch 21: w = 1.763, loss = 0.083
Rank Process-1: epoch 31: w = 1.764, loss = 0.081
Rank Process-2: epoch 31: w = 1.781, loss = 0.070
Rank Process-1: epoch 41: w = 1.781, loss = 0.070
Rank Process-2: epoch 41: w = 1.790, loss = 0.064
Rank Process-2: epoch 51: w = 1.797, loss = 0.060
Finished Process-0
Rank Process-3: epoch 1: w = 1.800, loss = 0.133
Rank Process-2: epoch 61: w = 1.811, loss = 0.053
Rank Process-1: epoch 51: w = 1.813, loss = 0.052
Rank Process-3: epoch 11: w = 1.819, loss = 0.048
Rank Process-1: epoch 61: w = 1.826, loss = 0.044
Rank Process-2: epoch 71: w = 1.828, loss = 0.043
Rank Process-3: epoch 21: w = 1.836, loss = 0.040
Rank Process-1: epoch 71: w = 1.839, loss = 0.038
Rank Process-2: epoch 81: w = 1.849, loss = 0.034
Rank Process-3: epoch 31: w = 1.849, loss = 0.033
Rank Process-1: epoch 81: w = 1.851, loss = 0.033
Rank Process-2: epoch 91: w = 1.861, loss = 0.028
Rank Process-3: epoch 41: w = 1.861, loss = 0.028
Rank Process-1: epoch 91: w = 1.865, loss = 0.027
Rank Process-3: epoch 51: w = 1.874, loss = 0.023
Rank Process-3: epoch 61: w = 1.877, loss = 0.022
Finished Process-1
Rank Process-3: epoch 71: w = 1.881, loss = 0.021
Rank Process-3: epoch 81: w = 1.885, loss = 0.019
Rank Process-3: epoch 91: w = 1.888, loss = 0.018
Finished Process-2
Finished Process-3
Prediction after training: f(5) = 9.775

You may also refer to this video for output:

Dominic Rubhabha-Wardslaus
Dominic Rubhabha-Wardslaushttp://wardslaus.com
infosec,malicious & dos attacks generator, boot rom exploit philanthropist , wild hacker , game developer,
RELATED ARTICLES

Most Popular

Recent Comments