The most useful feature of Spark SQL used to create a reusable function in Pyspark is known as UDF or User defined function in Python. The column type of the Pyspark can be String, Integer, Array, etc. There occurs some situations in which you have got ArrayType column in Pyspark data frame and you need to sort that list in each Row of the column. This can be achieved in various ways but the easiest way is to do using UDF. In this article, we will discuss the same.
Example 1:
In this example, we have created a data frame with four columns ‘Full_Name‘, ‘Date_Of_Birth‘, ‘Gender‘, ‘Fees‘. The ‘Full_Name‘ column is further nested and contains a list with the list values ‘First_Name‘, ‘Middle_Name‘ and ‘Last_Name‘ as follows:
Then, we created a user-defined function to sort the ArrayType column, i.e., Full_Name in ascending order and put the sorted values in the new column of the data frame ‘Sorted_Full_Name‘ by calling that user-defined function.
Python3
# Python program to sort list using UDF in PySpark # Import the libraries SparkSession, StructType, # StructField, StringType, IntegerType, UDF from pyspark.sql import SparkSession from pyspark.sql.functions import udf from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType # Create a spark session using getOrCreate() function spark_session = SparkSession.builder.getOrCreate() # Define the data set data_set = [(( 'Ishita' , 'Rai' , 'Pundir' ), '2000-21-02' , 'Male' , 13000 ), (( 'Aia' , 'Singh' , 'Rajput' ), '2004-01-06' , 'Female' , 10000 )] # Define the structure for the # data frame by adding StructType columns schema = StructType([ StructField( 'Full_Name' , StructType([ StructField( 'First_Name' , StringType(), True ), StructField( 'Middle_Name' , StringType(), True ), StructField( 'Last_Name' , StringType(), True ) ])), StructField( 'Date_Of_Birth' , StringType(), True ), StructField( 'Gender' , StringType(), True ), StructField( 'Fees' , IntegerType(), True ) ]) # Create the Pyspark data frame using # createDataFrame function df = spark_session.createDataFrame(data = data_set, schema = schema) # Create a user defined function # to sort the ArrayType column udf_sort = udf( lambda x: sorted (x), ArrayType(StringType())) # Create a new column by calling # the user defined function created df.withColumn( 'Sorted_Full_Name' , udf_sort( df[ "Full_Name" ])).show(truncate = False ) |
Output:
Example 2:
In this example, we have created the data frame with two columns ‘name‘ and ‘marks‘. The ‘marks‘ column has the data in the form of a list as follows:
Then, we created a user-defined function to sort the ArrayType column, i.e., marks in descending order, and put the sorted values in the new column of the data frame ‘Sorted_Marks‘ by calling that user-defined function.
Python3
# Python program to sort list using UDF in PySpark # Import the SparkSession, Row, UDF, ArrayType, IntegerType from pyspark.sql import SparkSession, Row from pyspark.sql.functions import udf from pyspark.sql.types import ArrayType, IntegerType # Create a spark session using getOrCreate() function spark_session = SparkSession.builder.getOrCreate() # Create a spark context sc = spark_session.sparkContext # Create a data frame with # columns 'name' and 'marks' df = sc.parallelize([Row(name = 'Arun' , marks = [ 95 , 58 , 63 ]), Row( name = 'Ishita' , marks = [ 87 , 69 , 56 ]), Row(name = 'Vinayak' , marks = [ 49 , 75 , 98 ])]).toDF() # Create a user defined function # to sort the ArrayType column udf_sort = udf( lambda x: sorted (x, reverse = True ), ArrayType(IntegerType())) # Create a new column by calling the # user defined function created df.withColumn( 'Sorted_Marks' , udf_sort(df[ "marks" ])).show(truncate = False ) |
Output: