In this article, we are going to learn how to add a column to a nested struct using Pyspark in Python.
Have you ever worked in a Pyspark data frame? If yes, then might surely know how to add a column in Pyspark, but do you know that you can also create a struct in Pyspark? The struct is used to programmatically specify the schema to the DataFrame and create complex columns. Apart from creating a nested struct, you can also add a column to a nested struct in the Pyspark data frame later. In this article, we will discuss the same, i.e., how to add a column to a nested struct in a Pyspark.
Stepwise Implementation to add a column to a nested struct.
Step 1: First of all, we need to import the required libraries, i.e., libraries SparkSession, StructType, StructField, StringType, IntegerType, col, lit, and when. The SparkSession library is used to create the session while StructType defines the structure of the data frame and StructField defines the columns of the data frame. The StringType and IntegerType are used to represent String and Integer values for the data frame respectively. The col is used to return a column based on the given column name while lit is used to add a new column to the data frame.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType from pyspark.sql.functions import col, lit, when from pyspark.sql import SparkSession
Step 2: Now, create a spark session using the getOrCreate() function.
spark_session = SparkSession.builder.getOrCreate()
Step 3: Then, define the data set in the list.
data_set = [((nested_values_1), column_value_1), ((nested_values_2), column_value_2), ((nested_values_3), column_value_3)]
Step 4: Later on, define the structure using StructType and StructField functions respectively.
schema = StructType([StructField('column_1', StructType([StructField('nested_column_1', column_type(), True), StructField('nested_column_2', column_type(), True), StructField('nested_column_3', column_type(), True) ])), StructField('column_2', column_type(), True)])
Step 4: Further, create a Pyspark data frame using the specified structure and data set.
df = spark_session.createDataFrame(data = data_set, schema = schema)
Step 5: Moreover, we add a new column to the nested struct using the withField function with nested_column_name and replace_value with lit function as arguments.
updated_df = df.withColumn("column_name", col("column_name").withField("nested_column_name", lit("replace_value"))))
Step 6: Finally, we display the updated data frame.
updated_df.show()
Example 1:
In this example, we have defined the data structure and data set and created the Pyspark data frame according to the data structure with two columns ‘Date_Of_Birth’ and ‘Age’. The ‘Date_Of_Birth’ column is nested as given below. Further, we have added a column ‘Year’ to a nested struct, i.e., ‘Date_Of_Birth’ by checking the condition if ‘Age’ is equal to the value ’18’ and putting the value ‘2004’ if the condition meets else by putting the value ‘2002’.
Python3
# Pyspark program to add a column to a nested struct in a pyspark # Import the libraries SparkSession, StructType, # StructField, StringType, IntegerType, col, lit, when from pyspark.sql.types import StructType, StructField, StringType, IntegerType from pyspark.sql.functions import col, lit, when from pyspark.sql import SparkSession # Create a spark session using getOrCreate() function spark_session = SparkSession.builder.getOrCreate() # Define the data set data_set = [(( 21 , 2 ), 18 ), (( 16 , 4 ), 20 ), (( 11 , 1 ), 18 ), (( 6 , 3 ), 20 )] # Define the structure for the data frame schema = StructType([ StructField( 'Date_Of_Birth' , StructType([ StructField( 'Date' , IntegerType(), True ), StructField( 'Month' , IntegerType(), True )])), StructField( 'Age' , IntegerType(), True )]) # Create the Pyspark data frame using createDataFrame function df = spark_session.createDataFrame(data = data_set, schema = schema) # Add column to nested struct using lit function with # specific condition using when and otherwise function updated_df = df.withColumn( "Date_Of_Birth" , col( "Date_Of_Birth" ).withField( "Year" ,when (col( "Age" ) = = 18 , lit( 2004 )).otherwise(lit( 2002 )))) # Display the updated data frame updated_df.show() |
Output:
Example 2:
In this example, we have defined the data structure and data set and created the Pyspark data frame according to the data structure with four columns ‘Full_Name’, ‘Date_Of_Birth’, ‘Gender’, and ‘Fees’. The ‘Full_Name’ column is further nested as follows:
Further, we have added the nested column ‘Middle_Name’ to a nested struct ‘Full_Name’ by checking the condition if ‘Gender’ is equal to the value ‘Male’ and adding the value ‘Singh’ if the condition meets else by putting the value ‘Kaur’.
Python3
# Pyspark program to add a column to a nested struct in a pyspark # Import the libraries SparkSession, StructType, # StructField, StringType, IntegerType, col, lit, when from pyspark.sql.types import StructType, StructField, StringType, IntegerType from pyspark.sql.functions import col, lit, when from pyspark.sql import SparkSession # Create a spark session using getOrCreate() function spark_session = SparkSession.builder.getOrCreate() # Define the data set data_set = [(( 'Vansh' , 'Rai' ), '2000-21-02' , 'Male' , 13000 ), (( 'Ria' , 'Kapoor' ), '2004-01-06' , 'Female' , 10000 )] # Define the structure for the data frame schema = StructType([ StructField( 'Full_Name' , StructType([ StructField( 'First_Name' , StringType(), True ), StructField( 'Last_Name' , StringType(), True )])), StructField( 'Date_Of_Birth' , StringType(), True ), StructField( 'Gender' , StringType(), True ), StructField( 'Fees' , IntegerType(), True )]) # Create the Pyspark data frame using createDataFrame function df = spark_session.createDataFrame(data = data_set, schema = schema) # Add nested column using lit function with # specific condition using when and otherwise function updated_df = df.withColumn( "Full_Name" , col( "Full_Name" ).withField( "Middle_Name" , when (col( "Gender" ) = = "Male" , lit( "Singh" )).otherwise(lit( "Kaur" )))) # Display the updated data frame updated_df.show() |
Output: