We can use the sort() function or orderBy() function to sort the Spark array, but these functions might not work if an array is of complex data type. For such complex data type arrays, we need to use different ways to sort an array of a complex data type in PySpark which will be defined in this article using Python.
Note: You can refer to the following articles for more information about basics:
Ways to Sort Array of Complex Data Types
Below are the ways by which we can sort an array of complex data types in PySpark:
- Using expr() function
- Using agg() function
Using expr() function:
A SQL function used to execute SQL-like expressions on an existing data frame column value as an expression argument to Spark built-in functions is called expr() function. In this way, we will see how we can sort an array of complex data type in Spark using expr() function.
Syntax: expr(str)
Here,
- str: It defines the string argument which defines the Spark column data type.
Example: In this example, we have used three columns, class, section, and name_age as shown in the image. Here, name_age is the column with complex data type which we will sort on the basis of age in descending order.
Python3
# Python program to sort an array of a complex # data type in Spark using expr() function import pyspark from pyspark.sql import SparkSession from pyspark.sql.functions import expr # Create a Spark Session spark = SparkSession.builder.getOrCreate() # Create an array data = [ [ 3 , 'A' , [[ "Ishita" , 7 ], [ "Ashish" , 9 ], [ "Ramesh" , 10 ], [ "Mathew" , 8 ]]], [ 7 , 'D' , [[ "Vinayak" , 13 ], [ "Pranjal" , 14 ], [ "Isha" , 12 ], [ "Adarsh" , 11 ]]], ] # Create a Spark data frame df = spark.createDataFrame( data, "class:int, section:string, name_age:array<struct<name:string, age:int>>" ) # Sort the name_age column by age in descending order df = df.withColumn( "name_age" , expr( "reverse(array_sort(transform(name_age,x->struct(x['age'] as age,x['name'] as name))))" ), ) # Display the data frame df.show( 2 , False ) |
Output
Using agg() function:
A function which is used to aggregate on the entire data frame without groups is known as agg() function. In this way, we will see how we can sort an array of complex data type in Spark using agg() function.
Syntax:
agg(*exprs)
Here,
- exprs: It defines the expression need to be passed to the function.
Example: In this example, we have used three columns, section, name and sequence as shown in the image. Then, we have sorted the column name on the basis of sequence after doing group by on the columns section.
Python3
# Python program to sort an array of a complex # data type in Spark using agg() function # Import the Pyspark library from pyspark.sql import * import pyspark from pyspark.sql import SparkSession # Create a Spark Session spark = SparkSession.builder.getOrCreate() # Create the Spark data frame data_df = spark.createDataFrame([ Row(section = 'A' , name = 'Ashish' , sequence = 1 ), Row(section = 'B' , name = 'Bharti' , sequence = 1 ), Row(section = 'B' , name = 'Charlie' , sequence = 2 ), Row(section = 'A' , name = 'Marie' , sequence = 2 ), Row(section = 'C' , name = 'Prabhakar' , sequence = 1 ), Row(section = 'D' , name = 'Shrey' , sequence = 1 ), Row(section = 'C' , name = 'Rose' , sequence = 2 ), Row(section = 'B' , name = 'Ishita' , sequence = 3 ), Row(section = 'C' , name = 'Samarth' , sequence = 3 ), Row(section = 'A' , name = 'Vinayak' , sequence = 4 ), Row(section = 'A' , name = 'Pranjal' , sequence = 3 ), ]) # Display the Spark data frame views_df = data_df.select( 'section' , 'name' , 'sequence' ) # Create a local temporary view views_df.createOrReplaceTempView( 'views' ) # Sort array of complex data type views_df = (views_df .groupBy( 'section' ) .agg( array_sort( collect_list(struct( 'sequence' , 'name' )) ).alias( 'sorted_names' ) ).select( "section" , col( "sorted_names.name" ).alias( "Sorted Names" )) ).show( 20 , False ) |
Output