An RDD transformation that applies the transformation function to every element of the data frame is known as a map in Pyspark. There occurs various situations when you have numerous columns and you need to convert them to map-type columns. It can be done easily by using the create_map function with the map key column name and column name as arguments. Continue reading the article further to know about it in detail.
Syntax: df.withColumn(“map_column_name”,create_map( lit(“mapkey_1”),col(“column_1”), lit(“mapkey_2”),col(“column_2”) )).drop( “column_1”, “column_2” ).show(truncate=False)
Here,
- column_1, column_2, column_3: These are the column names which needs to be converted to map.
- mapkey_1, mapkey_2, mapkey_3: These are the names of the map keys to be given to data on creation of map.
- map_column_name: It is the name given to the column in which map is stored.
Example 1:
In this example, we have used a data set (link), which is basically a 5×5 data frame as follows:
Then, we converted the columns ‘name,’ ‘class’ and ‘fees’ to map using the create_map function and stored them in the column ‘student_details‘ dropping the existing ‘name,’ ‘class’ and ‘fees’ columns.
Python3
# PySpark - Create MapType Column from existing columns # Import the libraries SparkSession, col, lit, create_map from pyspark.sql import SparkSession from pyspark.sql.functions import col, lit, create_map # Create a spark session using getOrCreate() function spark_session = SparkSession.builder.getOrCreate() # Read the CSV file data_frame = csv_file = spark_session.read.csv( '/content/class_data.csv' , sep = ',' , inferSchema = True , header = True ) # Convert name, class and fees columns to map data_frame = data_frame.withColumn( "student_details" , create_map(lit( "student_name" ), col( "name" ), lit( "student_class" ), col( "class" ), lit( "student_fees" ), col( "fees" ))).drop( "name" , "class" , "fees" ) # Display the data frame data_frame.show(truncate = False ) |
Output:
Example 2:
In this example, we have created a data frame with columns emp_id, name, superior_emp_id, year_joined, emp_dept_id, gender, and salary as follows:
Then, we converted the columns name, superior_emp_id, year_joined, emp_dept_id, gender, and salary to map using the create_map function and stored in the column ‘employee_details‘ dropping the existing name, superior_emp_id, year_joined, emp_dept_id, gender, and salary columns.
Python3
#PySpark - Create MapType Column from existing columns # Import the libraries SparkSession, col, lit, create_map from pyspark.sql import SparkSession from pyspark.sql.functions import col,lit,create_map # Create a spark session using getOrCreate() function spark_session = SparkSession.builder.getOrCreate() # Define the data set emp = [( 1 , "Smith" , - 1 , "2018" , "10" , "M" , 3000 ), ( 2 , "Rose" , 1 , "2010" , "20" , "M" , 4000 ), ( 3 , "Williams" , 1 , "2010" , "10" , "M" , 1000 ), ( 4 , "Jones" , 2 , "2005" , "10" , "F" , 2000 ), ( 5 , "Brown" , 2 , "2010" , "40" , "F" , 4000 ), ( 6 , "Brown" , 2 , "2010" , "50" , "M" , 2000 ) ] # Define the schema of the data set empColumns = [ "emp_id" , "name" , "superior_emp_id" , "year_joined" , "emp_dept_id" , "gender" , "salary" ] # Create the data frame through data set and schema empDF = spark_session.createDataFrame(data = emp, schema = empColumns) # Convert name, superior_emp_id, year_joined, emp_dept_id, gender, and salary columns to maptype column empDF = empDF.withColumn( "employee_details" , create_map(lit( "name" ), col( "name" ), lit( "superior_emp_id" ), col( "superior_emp_id" ), lit( "year_joined" ), col( "year_joined" ), lit( "emp_dept_id" ), col( "emp_dept_id" ), lit( "gender" ), col( "gender" ), lit( "salary" ), col( "salary" ))).drop( "name" , "superior_emp_id" , "year_joined" , "emp_dept_id" , "gender" , "salary" ) # Display the data frame empDF.show(truncate = False ) |
Output: