In this article, we are going to convert multiple columns to map using Pyspark in Python.
An RDD transformation that is used to apply the transformation function on every element of the data frame is known as a map. While working in the Pyspark data frame, we might encounter some circumstances in which we need to convert columns of the data frame to map columns of the data frame as the map keys.
Syntax: create_map( lit(“mapkey_1”),col(“column_1”))
Parameters:
- column_1: These are the column names which needs to be converted to map.
- mapkey_1: These are the names of the map keys to be given to data on creation of map.
Stepwise Implementation:
Step 1: First of all, import the required libraries, i.e., SparkSession, col, lit, and create_map. The SparkSession is used to create the session, while col is used to return a column based on the given column name. The lit is used to add a new column to the DataFrame by assigning a literal or constant value, while create_map is used to convert selected DataFrame columns to MapType.
from pyspark.sql import SparkSession from pyspark.sql.functions import col,lit,create_map
Step 2: Now, we create a spark session using getOrCreate() function.
spark_session = SparkSession.builder.getOrCreate()
Step 3: Then, either read the CSV file for the data frame or create a new data frame using createDataFrame() function.
data_frame=csv_file = spark_session.read.csv('#Path of CSV file', sep = ',', inferSchema = True, header = True)
or
data_frame=spark_session.createDataFrame( [(column_1_data), (column_2_data), (column_3_data)], ['column_name_1', 'column_name_2', 'column_name_3'])
Step 4: Further, create a column having map_column_name and create_map() function as arguments. The create_map() function will contain all the column names that need to be converted to map and the name to be given to mapkeys in the map.
data_frame.withColumn("map_column_name", create_map(lit("mapkey_1"), col("column_name_1"), lit("mapkey_2"), col("column_name_2"), lit("mapkey_3"), col("column_name_3"))).drop("column_name_1", "column_name_2", "column_name_3")
Step 5: Finally, display the updated data frame.
data_frame.show(truncate=False)
Example 1:
In this example, we have used a data set (link), which is basically a 5×5 data frame as follows:
Then, we converted the columns ‘name,’ ‘class‘ and ‘fees‘ to map using create_map() function and stored them in the column ‘student_details‘ dropping the existing ‘name,’ ‘class‘ and ‘fees‘ columns.
Python3
# Pyspark convert multiple columns to map # Import the libraries SparkSession, col, lit, create_map from pyspark.sql import SparkSession from pyspark.sql.functions import col,lit,create_map # Create a spark session using getOrCreate() function spark_session = SparkSession.builder.getOrCreate() # Read the CSV file data_frame = csv_file = spark_session.read.csv( '/content/class_data.csv' , sep = ',' , inferSchema = True , header = True ) # Convert name, class and fees columns to map data_frame = data_frame.withColumn( "student_details" ,create_map(lit( "student_name" ), col( "name" ), lit( "student_class" ),col( "class" ), lit( "student_fees" ),col( "fees" ))).drop( "name" , "class" , "fees" ) # Display the data frame data_frame.show(truncate = False ) |
Output:
Example 2:
In this example, we have created a data frame with columns emp_id, name, superior_emp_id, year_joined, emp_dept_id, gender, and salary as follows:
Then, we converted the columns name, superior_emp_id, year_joined, emp_dept_id, gender, and salary to map using create_map() function and stored in the column ‘employee_details‘ dropping the existing name, superior_emp_id, year_joined, emp_dept_id, gender, and salary columns.
Python3
# Pyspark convert multiple columns to map # Import the libraries SparkSession, col, lit, create_map from pyspark.sql import SparkSession from pyspark.sql.functions import col,lit,create_map # Create a spark session using getOrCreate() function spark_session = SparkSession.builder.getOrCreate() # Define the data set emp = [( 1 , "Smith" , - 1 , "2018" , "10" , "M" , 3000 ), ( 2 , "Rose" , 1 , "2010" , "20" , "M" , 4000 ), ( 3 , "Williams" , 1 , "2010" , "10" , "M" , 1000 ), ( 4 , "Jones" , 2 , "2005" , "10" , "F" , 2000 ), ( 5 , "Brown" , 2 , "2010" , "40" , "F" , 4000 ), ( 6 , "Brown" , 2 , "2010" , "50" , "M" , 2000 )] # Define the schema of the data set empColumns = [ "emp_id" , "name" , "superior_emp_id" , "year_joined" , "emp_dept_id" , "gender" , "salary" ] # Create the data frame through data set and schema empDF = spark_session.createDataFrame(data = emp, schema = empColumns) # Convert name, superior_emp_id, year_joined, emp_dept_id, # gender, and salary columns to map empDF = empDF.withColumn( "employee_details" , create_map(lit( "name" ),col( "name" ), lit( "superior_emp_id" ),col( "superior_emp_id" ), lit( "year_joined" ),col( "year_joined" ), lit( "emp_dept_id" ),col( "emp_dept_id" ), lit( "gender" ),col( "gender" ), lit( "salary" ),col( "salary" ))).drop( "name" , "superior_emp_id" , "year_joined" , "emp_dept_id" , "gender" , "salary" ) # Display the data frame empDF.show(truncate = False ) |
Output: