In this article, we are going to learn how to add multiple columns using UDF in Pyspark in Python.
Have you ever worked on a Pyspark data frame? If yes, then you might surely know how to add a column and you might have also done it. But have you ever thought about how you can add multiple columns using UDF in Pyspark? If not, then this article is meant for you. Continue reading this article further to know more about the way in which you can add multiple columns using UDF in Pyspark.
Stepwise implementation to add multiple columns using UDF in PySpark:
Step 1: First of all, import the required libraries, i.e., SparkSession, functions, StructType, StructField, IntegerType, and Row. The SparkSession library is used to create the session while the functions give access to all built-in functions in the Pyspark. The StructType and StructField classes are used to programmatically specify the schema to the DataFrame and create complex columns. The IntegerType is used to convert internal SQL objects to native Python objects while Row is used to create a new row in the data frame.
from pyspark.sql import SparkSession, functions as F from pyspark.sql.types import StructType, StructField, IntegerType, Row
Step 2: Create a spark session using the getOrCreate function.
spark_session = SparkSession.builder.getOrCreate()
Step 3: Create a data frame with the column names and column headings respectively.
data_frame=spark_session.createDataFrame([(column_data_1), (column_data_2 ), (column_data_3 )], ['column_name_1', 'column_name_2','column_name_3'])
Step 4: Create a function to specify what actions to be performed on the new columns which need to be added to the data frame.
def function_to_be_performed(n): return Row('new_column_1', 'new_column_2')(operation_to_be_performed, operation_to_be_performed)
Step 5: Assign the structure and naming for newly created columns.
schema = StructType([StructField("new_column_1", column_type(), False), StructField("new_column_2", column_type(), False)])
Step 6: Call the function created in the previous step as an argument for the UDF function and store it in a variable.
data_frame_udf = F.udf(function_to_be_performed, schema)
Step 7: Call the variable created through UDF in the previous step to add the columns to the data frame.
updated_data_frame = data_frame.withColumn("Result", data_frame_udf(data_frame["column_on_which_operation_has_to_be_performed"]))
Step 8: Finally, display the data frame.
updated_data_frame.select("column_name_1", "column_name_2", "column_name_3", "Result.*").show()
Example 1:
In this program, we have created the data frame with four columns, i.e., ‘Roll_Number,’ ‘Fees,’ ‘Fine‘ and ‘Discount‘ as follows:
Then, we calculated the sum of fees and fine as well as the difference between fees and discount by calling the function and passing the value through UDF. Finally, we called the function and displayed the data in the new columns ‘Fees+Fine‘ and ‘Fees-Discount‘ respectively.
Python3
# Python program to add multiple columns using UDF in Pyspark # Import the libraries SparkSession, functions, StructType, # StructField, IntegerType, and Row libraries from pyspark.sql import SparkSession, functions as F from pyspark.sql.types import StructType,StructField, IntegerType,Row # Create a spark session using getOrCreate() function spark_session = SparkSession.builder.getOrCreate() # Create a data frame with four columns 'Roll_Number,' # 'Fees,' 'Fine' and 'Discount' data_frame = spark_session.createDataFrame([( 1 , 10000 , 400 , 100 ), ( 2 , 14000 , 500 , 200 ), ( 3 , 12000 , 800 , 300 )], [ 'Roll_Number' , 'Fees' , 'Fine' , 'Discount' ]) # Create a function to calculate sum of fees and fine # as well as difference of fine and discount def student_function(fees,fine,discount): return Row( 'Out1' , 'Out2' )(fees + fine, fees - discount) # Assign the structure and naming for newly created columns schema = StructType([StructField( "Fees+Fine" , IntegerType(), False ), StructField( "Fees-Discount" , IntegerType(), False )]) # Call the user defined function inside UDF # function to create new columns data_frame_udf = F.udf(student_function, schema) # Call the variable created through UDF in # previous step to add new columns updated_data_frame = data_frame.withColumn( "Result" , data_frame_udf(data_frame[ "Fees" ], data_frame[ "Fine" ], data_frame[ "Discount" ])) # Display the updated data frame with # multiple columns added using UDF updated_data_frame.select( "Roll_Number" , "Fees" , "Fine" , "Discount" , "Result.*" ).show() |
Output:
Example 2:
In this program, we have created the data frame with two columns ‘Serial_Number‘, ‘Item‘, and ‘Price‘ as follows:
Then, we performed the multiplication and division by 2 on the column ‘Price‘ by calling the function to calculate the price 5 years before and after 5 years and passing the value through UDF. Finally, we have displayed the data in the new columns ‘Price after 5 years’ and ‘Price before 5 years‘ respectively.
Python3
# Python program to add multiple columns using UDF in Pyspark # Import the libraries SparkSession, functions, StructType, # StructField, IntegerType, and Row libraries from pyspark.sql import SparkSession, functions as F from pyspark.sql.types import StructType,StructField, IntegerType,Row # Create a spark session using getOrCreate() function spark_session = SparkSession.builder.getOrCreate() # Create a data frame with three columns 'Serial_Number,' # 'Item' and 'Current Price' data_frame = spark_session.createDataFrame([( 1 , 'Milk' , 50 ), ( 2 , 'Fish' , 250 ), ( 3 , 'Chicken' , 400 )], [ 'Serial_Number' , 'Item Name' , 'Current Price' ]) # Create a function to calculate product and division of the number def arithmetic_operation(n): return Row( 'Out1' , 'Out2' )(n * 2 , n / / 2 ) # Assign the structure and naming for newly created columns schema = StructType([StructField( "Price after 5 years" , IntegerType(), False ), StructField( "Price before 5 years" , IntegerType(), False )]) # Call the user defined function inside UDF # function to create new columns data_frame_udf = F.udf(arithmetic_operation, schema) # Call the variable created through UDF in # previous step to add new columns updated_data_frame = data_frame.withColumn( "Result" , data_frame_udf(data_frame[ "Current Price" ])) # Display the updated data frame with multiple columns added using UDF updated_data_frame.select( "Serial_Number" , "Item Name" , "Current Price" , "Result.*" ).show() |
Output: