Monday, January 6, 2025
Google search engine
HomeLanguagesApply same function to all fields of PySpark dataframe row

Apply same function to all fields of PySpark dataframe row

Are you a data scientist or data analyst who handles a lot of data? Have you ever felt the need to apply the same function whether it is uppercase, lowercase, subtract, add, etc. to apply to all the fields of data frame rows? This is possible in Pyspark in not only one way but numerous ways. In this article, we will discuss all the ways to apply the same function to all fields of the PySpark data frame row.

Modules Required

Pyspark: The API which was introduced to support Spark and Python language and has features of Scikit-learn and Pandas libraries of Python is known as Pyspark. This module can be installed through the following command in Python:

pip install pyspark

Methods to apply the same function to all fields of PySpark data frame row:

Method 1: Using reduce function

Syntax:

updated_data_frame = (reduce( lambda traverse_df, col_name: traverse_df.withColumn(col_name, function_to_perform(col(col_name))), data_frame.columns, data_frame ))

Here,

  • function_to_perform: It is the function that needs to be applied on all the data frame rows such as upper, lower, etc.
  • data_frame: It is the data frame taken as input from the user.

student_data.csv file:

student_data.csv

Stepwise Implementation:

Step 1: First, import the required libraries, i.e. SparkSession, reduce, col, and upper. The SparkSession library is used to create the session, while reduce applies a particular function passed to all of the list elements mentioned in the sequence. The col is used to get the column name, while the upper is used to convert the text to upper case. Instead of upper, you can use any other function too that you want to apply on each row of the data frame.

from pyspark.sql import SparkSession
from functools import reduce
from pyspark.sql.functions import col, upper

Step 2: Now, create a spark session using the getOrCreate function.

spark_session = SparkSession.builder.getOrCreate()

Step 3: Then, read the CSV file and display it to see if it is correctly uploaded.

data_frame=csv_file = spark_session.read.csv('#Path of CSV file', sep = ',', inferSchema = True, header = True)

Step 4: Next, apply a particular function passed as an argument to all the row elements of the data frame using reduce function.

updated_data_frame = (reduce(lambda traverse_df, col_name: traverse_df.withColumn(col_name, upper(col(col_name))), data_frame.columns, data_frame))

Step 5: Finally, display the updated data frame in the previous step.

updated_data_frame.show()

Example:

In this example, we have used the reduce function to make all the elements of rows of the data frame i.e., the dataset of 5×5 uppercase through the function upper.

Python3




# Python program to apply same function to all 
# fields of PySpark dataframe row using reduce function
  
# Import the SparkSession, reduce, col and upper libraries
from pyspark.sql import SparkSession
from functools import reduce
from pyspark.sql.functions import col, upper
  
# Create a spark session using getOrCreate() function
spark_session = SparkSession.builder.getOrCreate()
  
# Read the CSV file
data_frame=csv_file = spark_session.read.csv('/content/student_data.csv',
                              sep = ',', inferSchema = True, header = True)
  
# Apply the function to all rows of data frame using reduce function
updated_data_frame = (reduce(lambda traverse_df, col_name: traverse_df.withColumn(col_name, upper(col(col_name))), data_frame.columns, data_frame))
  
# Show the updated data frame
updated_data_frame.show()


Output:

 

Method 2: Using for loop

Syntax:

for col_name in data_frame.columns:

   data_frame = data_frame.withColumn(col_name, function_to_perform(col(col_name)))

Here,

  • function_to_perform: It is the function that needs to be applied on all the data frame rows such as upper, lower, etc.
  • data_frame: It is the data frame taken as input from the user.

Stepwise Implementation

Step 1: First, import the required libraries, i.e. SparkSession, reduce, col, and upper. The SparkSession library is used to create the session. The col is used to get the column name, while the upper is used to convert the text to upper case. Instead of upper, you can use any other function too that you want to apply on each row of the data frame.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, upper

Step 2: Now, create a spark session using the getOrCreate function.

spark_session = SparkSession.builder.getOrCreate()

Step 3: Then, read the CSV file and display it to see if it is correctly uploaded.

data_frame=csv_file = spark_session.read.csv('#Path of CSV file', sep = ',', inferSchema = True, header = True)

Step 4: Next, create a for loop to traverse all the elements and convert it to uppercase.

for col_name in data_frame.columns:
   data_frame = data_frame.withColumn(col_name, upper(col(col_name)))

Step 5: Finally, display the updated data frame in the previous step.

data_frame.show()

Example:

In this example, we have used the for loop to make all the elements of rows of the data frame i.e., the dataset of 5×5 uppercase through the function upper.

Python3




# Python program to apply same function to all
# fields of PySpark dataframe row using for loop
  
# Import the SparkSession, col and upper libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, upper
  
# Create a spark session using getOrCreate() function
spark_session = SparkSession.builder.getOrCreate()
  
# Read the CSV file
data_frame=csv_file = spark_session.read.csv('/content/student_data.csv',
                             sep = ',', inferSchema = True, header = True)
  
# Apply the function to all rows of data frame using for loop
for col_name in data_frame.columns:
    data_frame = data_frame.withColumn(col_name, upper(col(col_name)))
  
# Show the updated data frame
data_frame.show()


Output:

 

Method 3: Using list comprehension

Syntax:

updated_data_frame = data_frame.select(*[function_to_perform(col(col_name)).name(col_name) for col_name in data_frame.columns])

Here,

  • function_to_perform: It is the function that needs to be applied on all the data frame rows such as upper, lower, etc.
  • data_frame: It is the data frame taken as input from the user.

Stepwise Implementation:

Step 1: First, import the required libraries, i.e. SparkSession, reduce, col, and upper. The SparkSession library is used to create the session. The col is used to get the column name, while the upper is used to convert the text to upper case. Instead of upper, you can use any other function too that you want to apply on each row of the data frame.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, upper

Step 2: Now, create a spark session using the getOrCreate function.

spark_session = SparkSession.builder.getOrCreate()

Step 3: Then, read the CSV file and display it to see if it is correctly uploaded.

data_frame=csv_file = spark_session.read.csv('#Path of CSV file', sep = ',', inferSchema = True, header = True)

Step 4: Next, create a list comprehension to traverse all the elements and convert it to uppercase.

updated_data_frame = data_frame.select(*[upper(col(col_name)).name(col_name) for col_name in data_frame.columns])

Step 5: Finally, display the updated data frame in the previous step.

updated_data_frame.show()

Example:

In this example, we have used list comprehension to make all the elements of rows of the data frame i.e., the dataset of 5×5 uppercase through the function upper.

Python3




# Python program to apply same function to all
# fields of PySpark dataframe row using list comprehension
  
# Import the SparkSession, col and upper libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, upper
  
# Create a spark session using getOrCreate() function
spark_session = SparkSession.builder.getOrCreate()
  
# Read the CSV file
data_frame=csv_file = spark_session.read.csv('/content/student_data.csv',
                           sep = ',', inferSchema = True, header = True)
  
# Apply the function to all rows of data frame using list comprehension
updated_data_frame = data_frame.select(*[upper(col(col_name)).name(col_name) for col_name in data_frame.columns])
  
# Show the updated data frame
updated_data_frame.show()


Output:

 

Dominic Rubhabha-Wardslaus
Dominic Rubhabha-Wardslaushttp://wardslaus.com
infosec,malicious & dos attacks generator, boot rom exploit philanthropist , wild hacker , game developer,
RELATED ARTICLES

Most Popular

Recent Comments