In this article, we are going to learn how to pass multiple columns in UDF using Pyspark in Python.
Pyspark has numerous types of functions, such as string functions, sort functions, Window functions, etc. but do you know Pyspark has also one of the most essential types of functions, i.e., User Defined Function or UDF? UDF is a crucial feature of Spark SQL and data frame that is used to extend Pyspark’s built-in capabilities. UDF also gives you the feature to not only pass one column but multiple columns. In this article, we will discuss the same.
Methods to pass multiple columns in UDF:
- Simple Approach
- Approach using struct
- Approach using array
Method 1: Simple Approach
In this method, we are going to make a data frame with three columns Roll_Number, Fees, and Fine, and then we are going to add a new column of “Total Amount” using udf() in which we are going to pass two column store the total of them in “Total Amount” and then using withColumn() adding “Total Amount” column in data frame.
Implementation:
Step 1: First of all, import the libraries, SparkSession, IntegerType, UDF, and array. The SparkSession library is used to create the session while IntegerType is used to convert internal SQL objects to native Python objects. The UDF library is used to create a reusable function in Pyspark.
Step 2: Now, create a spark session using getOrCreate() function and a function to be performed on the columns of the data frame.
Step 3: Pass multiple columns in UDF with parameters as the function created above on the data frame and IntegerType.
Step 4: Create the data frame and call the function created before with the struct to present the data frame with the new column.
Python3
# Pyspark program to pass multiple # columns in UDF: Simple Approach # Import the libraries SparkSession, IntegerType and udf libraries from pyspark.sql import SparkSession from pyspark.sql.types import IntegerType from pyspark.sql.functions import udf # Create a spark session using getOrCreate() function spark_session = SparkSession.builder.getOrCreate() # Create a function to add two numbers def sum (x, y): return x + y # Pass multiple columns in UDF sum_cols = udf( sum , IntegerType()) # Create a data frame with three columns 'Roll_Number,' 'Fees' and 'Fine' data_frame = spark_session.createDataFrame( [( 1 , 10000 , 400 ),( 2 , 14000 , 500 ), ( 3 , 12000 , 800 )], [ 'Roll_Number' , 'Fees' , 'Fine' ]) # Display the data frame showing new column formed # by calling sum function on columns 'Fees' and 'Fine' data_frame.withColumn( 'Total Amount' , sum_cols( 'Fees' , 'Fine' )).show() |
Output:
Method 2: Approach using struct
In this method, we are going to do the same thing as in the above method but in this method, we are going to use struct to pass multiple columns.
Implementation:
Step 1: First of all, import the libraries, SparkSession, IntegerType, UDF, and array. The SparkSession library is used to create the session while IntegerType is used to convert internal SQL objects to native Python objects. The UDF library is used to create a reusable function in Pyspark while the struct library is used to create a new struct column.
Step 2: Create a spark session using getOrCreate() function and pass multiple columns in UDF with parameters as the function to be performed on the data frame and IntegerType.
Step 3: Create the data frame and call the function created before with the struct to present the data frame with the new column.
Python3
# Pyspark program to pass multiple # columns in UDF: Approach using struct # Import the libraries SparkSession, # IntegerType, struct and udf libraries from pyspark.sql import SparkSession from pyspark.sql.types import IntegerType from pyspark.sql.functions import udf, struct # Create a spark session using getOrCreate() function spark_session = SparkSession.builder.getOrCreate() # Pass multiple columns in UDF by adding the numbers sum_cols = udf( lambda x: x[ 0 ] + x[ 1 ], IntegerType()) # Create a data frame with three columns 'Roll_Number,' 'Fees' and 'Fine' data_frame = spark_session.createDataFrame( [( 1 , 10000 , 400 ), ( 2 , 14000 , 500 ), ( 3 , 12000 , 800 )], [ 'Roll_Number' , 'Fees' , 'Fine' ]) # Display the data frame showing new column formed # by calling sum function inside struct on columns 'Fees' and 'Fine' data_frame.withColumn( 'Total Amount' , sum_cols(struct( 'Fees' , 'Fine' ))).show() |
Output:
Method 3: Approach using an array
In this method, the final output is the same as above but in this, we are using an array to pass multiple columns using the udf() function by applying the sum operation on the columns that we are passing.
Implementation:
Step 1: First of all, import the libraries, SparkSession, IntegerType, UDF, and array. The SparkSession library is used to create the session while IntegerType is used to convert internal SQL objects to native Python objects. The UDF library is used to create a reusable function in Pyspark while the array library is used to create a new array column.
Step 2: Create a spark session using getOrCreate() function and pass multiple columns in UDF with parameters as inbuilt function to be performed on the data frame and IntegerType.
Step 3: Create the data frame and call the function created before with the array to present the data frame with the new column.
Python3
# Pyspark program to pass multiple # columns in UDF: Approach using array # Import the libraries SparkSession, IntegerType, array and udf libraries from pyspark.sql import SparkSession from pyspark.sql.types import IntegerType from pyspark.sql.functions import udf, array # Create a spark session using getOrCreate() function spark_session = SparkSession.builder.getOrCreate() # Pass multiple columns in UDF by calling the inbuilt sum function sum_cols = udf( lambda arr1: sum (arr1), IntegerType()) # Create a data frame with three columns 'Roll_Number,' 'Fees' and 'Fine' # and display the data frame showing new column formed # by calling sum function inside array on columns 'Fees' and 'Fine' spark_session.createDataFrame( [( 1 , 10000 , 400 ), ( 2 , 14000 , 500 ), ( 3 , 12000 , 800 )], [ 'Roll_Number' , 'Fees' , 'Fine' ]).withColumn( 'Total Amount' , sum_cols(array( 'Fees' , 'Fine' ))).show() |
Output: