In this article, we are going to learn how to distinguish columns with duplicated names in the Pyspark data frame in Python.
A dispersed collection of data grouped into named columns is known as the Pyspark data frame. While working in Pyspark, there occurs various situations in which we get the data frame that has various columns with duplicate names or the user even mistakenly creates a data frame with duplicate column names. What it results in? Due to the duplicate column names, the user is further not able to apply the functions properly on the data frame. In this article, we will discuss the same, i.e., how can we rename each column and apply functions to it later.
Steps to distinguish columns with the duplicated name in the Pyspark data frame:
Step 1: First of all, we need to import the required libraries, i.e., SparkSession, which is used to create the session.
from pyspark.sql import SparkSession
Step 2: Now, create a spark session using the getOrCreate() function.
spark_session = SparkSession.builder.getOrCreate()
Step 3: Then, either create the data frame using the createDataFrame() function or read the CSV file having some columns with duplicate names in it.
data_frame=csv_file = spark_session.read.csv('#Path of CSV file', sep = ',', inferSchema = True, header = True)
or
data_frame=spark_session.createDataFrame([(column_1_data), (column_2_data), (column_3_data)], ['column_name_1', 'column_name_2', 'column_name_3'])
Step 4: Moreover, create a list storing all the column names.
df_cols = df.columns
Step 5: Later on, using this list, find the index of all the columns which are repeating in that list.
duplicate_col_index = [idx for idx, val in enumerate(df_cols) if val in df_cols[:idx]]
Step 6: Further, rename all the column names by adding any suffix or prefix to the repeated column names.
for i in duplicate_col_index: df_cols[i] = df_cols[i] + '_duplicate_'+ str(i)
Step 7: In this step, we store the repeated column names in the list.
df = df.toDF(*df_cols)
Step 8: Finally, display the updated data frame with new column names.
df.show()
Example 1:
In this example, we have created the data frame, which has various columns with the same name, i.e., ‘temperature‘, as follows:
Thus, we have found all the duplicate column names in the data frame and renamed all the duplicate column names differently, i.e., ‘temperature_duplicate_2,’ ‘temperature_duplicate_3,’ and ‘temperature_duplicate_4‘ keeping the first column name the same.
Python3
# Import the library SparkSession from pyspark.sql import SparkSession # Create a spark session using getOrCreate() function spark_session = SparkSession.builder.getOrCreate() # Create a data frame with duplicate column names df = spark_session.createDataFrame( [( 'Monday' , 25 , 27 , 29 , 30 ),( 'Tuesday' , 40 , 38 , 36 , 34 ), ( 'Wednesday' , 18 , 20 , 22 , 17 ),( 'Thursday' , 25 , 27 , 29 , 19 )], [ 'day' , 'temperature' , 'temperature' , 'temperature' , 'temperature' ]) # Store all the column names in the list df_cols = df.columns # Get index of the duplicate columns duplicate_col_index = [idx for idx, val in enumerate (df_cols) if val in df_cols[:idx]] # Create a new list by renaming duplicate # columns by adding prefix '_duplicate_'+index for i in duplicate_col_index: df_cols[i] = df_cols[i] + '_duplicate_' + str (i) # Rename the duplicate columns in data frame df = df.toDF( * df_cols) # Display the data frame df.show() |
Output:
Example 2:
In this example, we have created the data frame, which has various columns with the same name, i.e., ‘marks‘, as follows:
Thus, we have found all the duplicate column names in the data frame and renamed all the duplicate column names differently, i.e., ‘marks_duplicate_2‘ and ‘marks_duplicate_3,’ keeping the first column name the same. Later, we created a new column ‘Sum of Marks‘ applying the sum function on all the marks columns and displayed the data frame
Python3
# PySpark program to distinguish columns with duplicated name # Import the library SparkSession from pyspark.sql import SparkSession # Create a spark session using getOrCreate() function spark_session = SparkSession.builder.getOrCreate() # Create a data frame with duplicate column names df = spark_session.createDataFrame( [( 'Arun' , 1 , 2 , 3 ),( 'Aniket' , 4 , 5 , 6 ), ( 'Ishita' , 7 , 8 , 9 )], [ 'name' , 'marks' , 'marks' , 'marks' ]) # Store all the column names in the list df_cols = df.columns # Get index of the duplicate columns duplicate_col_index = [idx for idx, val in enumerate (df_cols) if val in df_cols[:idx]] # Create a new list by renaming duplicate # columns by adding prefix '_duplicate_'+index for i in duplicate_col_index: df_cols[i] = df_cols[i] + '_duplicate_' + str (i) # Rename the duplicate columns in data frame df = df.toDF( * df_cols) # Define a function to do sum sum_marks = lambda a,b,c: a + b + c # Display the data frame with new column calling # the sum_marks function with marks as arguments df.withColumn( 'Sum of Marks' , sum_marks(df.marks,df.marks_duplicate_2, df.marks_duplicate_3) ).show() |
Output:
Example 3:
In this example, we have read the CSV file (link), i.e., has the columns ‘currency‘, ‘price‘, ‘price‘, ‘price‘, and ‘price‘ as follows:
When we import the CSV file, we need to follow one extra step, i.e., removing a character added at the end of the repeated column names. Then, we find all the duplicate column names in the data frame and renamed all the duplicate column names differently, i.e., ‘price_duplicate_2‘, ‘price_duplicate_3‘ and ‘price_duplicate_4,’ keeping the first column name the same. Later, we created a new column ‘Average Price‘ applying the average_price function on all the price columns and displayed the data frame
Python3
# Import the libraries SparkSession from pyspark.sql import SparkSession # Create a spark session using getOrCreate() function spark_session = SparkSession.builder.getOrCreate() # Read the CSV file df = csv_file = spark_session.read.csv( '/content/currency_prices.csv' , sep = ',' , inferSchema = True , header = True ) # Store all the column names in the list df_cols = df.columns # Create loop to remove extra characters got in # the column names while importing CSV file for i in range ( len (df_cols)): if df_cols[i][ - 1 ].isdigit(): m = df_cols[i][: - 1 ] df_cols[i] = m # Rename the duplicate columns in data frame df = df.toDF( * df_cols) duplicate_col_index = [idx for idx, val in enumerate (df_cols) if val in df_cols[:idx]] # Create a new list by renaming duplicate # columns by adding prefix '_duplicate_'+index for i in duplicate_col_index: df_cols[i] = df_cols[i] + '_duplicate_' + str (i) # Rename the duplicate columns in data frame df = df.toDF( * df_cols) # Define a function to do sum average_price = lambda a,b,c,d: (a + b + c + d) / 4 # Display the data frame with new column calling # the average_price function with prices as arguments df.withColumn( 'Average Price' , average_price(df.price, df.price_duplicate_2, df.price_duplicate_3, df.price_duplicate_4) ).show() |
Output: