A distributed collection of data grouped into named columns is known as a Pyspark data frame in Python.
The columns on the Pyspark data frame can be of any type, IntegerType, StringType, ArrayType, etc. Do you know for an ArrayType column, you can apply a function to all the values in the array? This can be achieved by creating a user-defined function and calling that function to create a new column in the data frame. In this article, we have discussed the same.
Creating a data frame for demonstration:
In this example, we define the libraries, SparkSession and Row. Then, we created a data frame using spark context row-wise with four columns ‘Roll_Number‘, ‘Full_Name‘, ‘Marks‘, and ‘Subjects‘. The ‘Full_Name‘, ‘Marks‘, and ‘Subjects‘ columns are ArrayType columns that have three elements in them.
Python3
# Creating a Dataframe for demonstration # Import the SparkSession, Row from pyspark.sql import SparkSession, Row # Create a spark session using getOrCreate() function spark_session = SparkSession.builder.getOrCreate() # Create a spark context sc = spark_session.sparkContext # Create a data frame with columns 'Roll_Number', 'Full_Name', # 'Marks' and 'Subjects' df = sc.parallelize([Row(Roll_Number = 1 , Full_Name = [ 'Arun' , 'Kumar' , 'Chaudhary' ], Marks = [ 95 , 58 , 63 ], Subjects = [ 'Maths' , 'Physics' , 'Chemistry' ]), Row(Roll_Number = 2 , Full_Name = [ 'Aniket' , 'Singh' , 'Rajpoot' ], Marks = [ 87 , 69 , 56 ], Subjects = [ 'History' , 'Geography' , 'Arts' ]), Row(Roll_Number = 3 , Full_Name = [ 'Ishita' , 'Rai' , 'Pundir' ], Marks = [ 49 , 75 , 98 ], Subjects = [ 'Accounts' , 'Business Studies' , 'Maths' ])]).toDF() # Display the data frame df.show(truncate = False ) |
Output:
Example 1:
In this example, using UDF, we defined a function, i.e., subtract 3 from each mark, to perform an operation on each element of an array. Later on, we called that function to create the new column ‘Updated Marks‘ and displayed the data frame.
Python3
# Apply function to all values in array column in PySpark # Import the UDF, ArrayType, IntegerType from pyspark.sql.functions import udf from pyspark.sql.types import ArrayType, IntegerType # Reduce each marks by 3 reduce_marks = udf( lambda x: [i - 3 for i in x], ArrayType(IntegerType())) # Add a new column to display updated marks in data frame updated_df = df.withColumn( 'Updated Marks' , reduce_marks( 'Marks' )) # Display the updated data frame updated_df.show(truncate = False ) |
Output:
Example 2:
In this example, using UDF, we defined a function, i.e., convert string to upper case, to perform an operation on each element of an array. Later on, we called that function to create the new column ‘Updated_Full_Name‘ and displayed the data frame.
Python3
# Apply function to all values in array column in PySpark # Import the UDF, ArrayType, IntegerType from pyspark.sql.functions import udf from pyspark.sql.types import ArrayType, StringType # Create a UDF to convert each array element to upper case upper_case = udf( lambda x: [i.upper() for i in x], ArrayType(StringType())) # Add a new column to display updated full name in data frame updated_df = df.withColumn( 'Updated_Full_Name' , upper_case( 'Full_Name' )) # Display the updated data frame updated_df.show(truncate = False ) |
Output: