Saturday, November 16, 2024
Google search engine
HomeLanguagesAdd Multiple Columns Using UDF in PySpark

Add Multiple Columns Using UDF in PySpark

In this article, we are going to learn how to add multiple columns using UDF in Pyspark in Python.

Have you ever worked on a Pyspark data frame? If yes, then you might surely know how to add a column and you might have also done it. But have you ever thought about how you can add multiple columns using UDF in Pyspark? If not, then this article is meant for you. Continue reading this article further to know more about the way in which you can add multiple columns using UDF in Pyspark.

Stepwise implementation to add multiple columns using UDF in PySpark:

Step 1: First of all, import the required libraries, i.e., SparkSession, functions, StructType, StructField, IntegerType, and Row. The SparkSession library is used to create the session while the functions give access to all built-in functions in the Pyspark. The StructType and StructField classes are used to programmatically specify the schema to the DataFrame and create complex columns. The IntegerType is used to convert internal SQL objects to native Python objects while Row is used to create a new row in the data frame. 

from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, Row

Step 2: Create a spark session using the getOrCreate function.

spark_session = SparkSession.builder.getOrCreate()

Step 3: Create a data frame with the column names and column headings respectively. 

data_frame=spark_session.createDataFrame([(column_data_1),
                                         (column_data_2 ), (column_data_3 )],
                                         ['column_name_1',
                                          'column_name_2','column_name_3'])

Step 4: Create a function to specify what actions to be performed on the new columns which need to be added to the data frame.

def function_to_be_performed(n):
   return Row('new_column_1',
              'new_column_2')(operation_to_be_performed, operation_to_be_performed)

Step 5: Assign the structure and naming for newly created columns. 

schema = StructType([StructField("new_column_1", column_type(), False),
                     StructField("new_column_2", column_type(), False)])

Step 6: Call the function created in the previous step as an argument for the UDF function and store it in a variable. 

data_frame_udf = F.udf(function_to_be_performed, schema)

Step 7: Call the variable created through UDF in the previous step to add the columns to the data frame.

updated_data_frame = data_frame.withColumn("Result",
         data_frame_udf(data_frame["column_on_which_operation_has_to_be_performed"]))

Step 8: Finally, display the data frame.

updated_data_frame.select("column_name_1",
                          "column_name_2",
                          "column_name_3", "Result.*").show()

Example 1:

In this program, we have created the data frame with four columns, i.e., ‘Roll_Number,’ ‘Fees,’ ‘Fine‘ and ‘Discount‘ as follows:

How to add multiple columns using UDF in PySpark ?

 

Then, we calculated the sum of fees and fine as well as the difference between fees and discount by calling the function and passing the value through UDF. Finally, we called the function and displayed the data in the new columns ‘Fees+Fine‘ and ‘Fees-Discount‘ respectively.

Python3




# Python program to add multiple columns using UDF in Pyspark
 
# Import the libraries SparkSession, functions, StructType,
# StructField, IntegerType, and Row libraries
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType,StructField,
                               IntegerType,Row
 
# Create a spark session using getOrCreate() function
spark_session = SparkSession.builder.getOrCreate()
 
# Create a data frame with four columns 'Roll_Number,'
# 'Fees,' 'Fine' and 'Discount'
data_frame=spark_session.createDataFrame([(1, 10000, 400, 100),
                                          (2, 14000 , 500, 200),
                                          (3, 12000 , 800, 300)],
                   ['Roll_Number', 'Fees', 'Fine', 'Discount'])
 
# Create a function to calculate sum of fees and fine
# as well as difference of fine and discount
def student_function(fees,fine,discount):
    return Row('Out1', 'Out2')(fees + fine, fees -discount)
 
# Assign the structure and naming for newly created columns
schema = StructType([StructField("Fees+Fine",
                                 IntegerType(), False),
                     StructField("Fees-Discount",
                                 IntegerType(), False)])
 
# Call the user defined function inside UDF
# function to create new columns
data_frame_udf = F.udf(student_function, schema)
 
# Call the variable created through UDF in
# previous step to add new columns
updated_data_frame = data_frame.withColumn("Result",
             data_frame_udf(data_frame["Fees"],
                            data_frame["Fine"],
                            data_frame["Discount"]))
 
# Display the updated data frame with
# multiple columns added using UDF
updated_data_frame.select("Roll_Number", "Fees",
                          "Fine", "Discount",
                          "Result.*").show()


Output:

How to add multiple columns using UDF in PySpark ?

 

Example 2:

In this program, we have created the data frame with two columns ‘Serial_Number‘, ‘Item‘, and ‘Price‘ as follows: 

How to add multiple columns using UDF in PySpark ?

 

Then, we performed the multiplication and division by 2 on the column ‘Price‘ by calling the function to calculate the price 5 years before and after 5 years and passing the value through UDF. Finally, we have displayed the data in the new columns ‘Price after 5 years’ and ‘Price before 5 years‘ respectively.

Python3




# Python program to add multiple columns using UDF in Pyspark
 
# Import the libraries SparkSession, functions, StructType,
# StructField, IntegerType, and Row libraries
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType,StructField,
                              IntegerType,Row
 
# Create a spark session using getOrCreate() function
spark_session = SparkSession.builder.getOrCreate()
 
# Create a data frame with three columns 'Serial_Number,'
# 'Item' and 'Current Price'
data_frame=spark_session.createDataFrame([(1, 'Milk', 50),
                                          (2, 'Fish', 250 ),
                                          (3, 'Chicken', 400 )],
                 ['Serial_Number', 'Item Name','Current Price'])
 
# Create a function to calculate product and division of the number
def arithmetic_operation(n):
    return Row('Out1', 'Out2')(n * 2, n // 2)
 
# Assign the structure and naming for newly created columns
schema = StructType([StructField("Price after 5 years",
                                 IntegerType(), False),
                     StructField("Price before 5 years",
                                 IntegerType(), False)])
 
# Call the user defined function inside UDF
# function to create new columns
data_frame_udf = F.udf(arithmetic_operation, schema)
 
# Call the variable created through UDF in
# previous step to add new columns
updated_data_frame = data_frame.withColumn("Result",
                data_frame_udf(data_frame["Current Price"]))
 
# Display the updated data frame with multiple columns added using UDF
updated_data_frame.select("Serial_Number", "Item Name",
                          "Current Price", "Result.*").show()


Output:

How to add multiple columns using UDF in PySpark ?

 

Dominic Rubhabha-Wardslaus
Dominic Rubhabha-Wardslaushttp://wardslaus.com
infosec,malicious & dos attacks generator, boot rom exploit philanthropist , wild hacker , game developer,
RELATED ARTICLES

Most Popular

Recent Comments