Saturday, December 28, 2024
Google search engine
HomeLanguagesCleaning Data with PySpark Python

Cleaning Data with PySpark Python

In this article, we are going to know how to cleaning of data with PySpark in Python.

Pyspark is an interface for Apache Spark. Apache Spark is an Open Source Analytics Engine for Big Data Processing. Today we will be focusing on how to perform Data Cleaning using PySpark. We will perform Null Values Handing, Value Replacement & Outliers removal on our Dummy data given below. Save the below data in a notepad with the “.csv” extension.

Employee_Table.csv file:

First Name,Last Name,Age,Department,Joining Year
Donald,Biden,30,,    
,,25,,2015
Barack,Trump,40,IT,2012
George,Pence,32,,2020
Mike,,25,Consulting,
Mitt,Bush,200,Information Tech,

Prerequisites

Step-by-step implementation of code

Reading & Displaying Data

First, we import and create a Spark session which acts as an entry point to PySpark functionalities to create Dataframes, etc. 

Python3




# import necessary libraries
from pyspark.sql import SparkSession
  
# Create Spark Session
sparkSession = SparkSession.builder.appName('g1').getOrCreate()


The Spark Session appName sets a name for the application which will be displayed on Spark UI.

Python3




# Create Spark DataFrame
df_pyspark = sparkSession.read.csv(
    'Employee_Table.csv',
    header=True,
    inferSchema=True
)


The CSV method can be replaced by JDBC, JSON, etc depending on the file format. The header flag decides whether the first row should be considered as column headers or not. If the InferSchema flag is set True, the column datatypes are determined based on the content inside otherwise all are set to String Datatype. 

Python3




# Print Schema
df_pyspark.printSchema()
  
# Print Dataframe
df_pyspark.show()


Printing the schema and data table.

Output:

Cleaning Data with PySpark

 

Null Values Dropping

A null value in a database represents missing data. We can perform multiple activities as listed below to handle Null values.

Dropping Rows containing Null values

dataframe.na.drop() function drops rows containing even a single null value. A parameter “how” can be set to “any” or “all“. 
ANY -> Drops rows containing any number of Null values
ALL->  Drops rows only if a row contains all Null values

Note: The changes done to the Dataframe are not in place.

Python3




# Dropping Entire rows containing Null 
null_dropped=df_pyspark.na.drop()
null_dropped.show()


Output:

Cleaning Data with PySpark

 

Subset in dataframe.na.drop()

The subset parameter inside the drop method accepts a list of column names (List[String]) such that the Null check happens only in the mentioned subset of columns.

Python3




# Dropping Rows where Joining Year is missing
non_null_year = df_pyspark.na.drop(subset=['Joining Year'])
non_null_year.show()


Output:
 

Cleaning Data with PySpark

 

Thresh in dataframe.na.drop()

Thresh parameter inside drop method takes an integer which acts as a threshold such that all rows containing Non-Null values lesser than the threshold are dropped. 

Python3




# Drop rows containing non-null values less than thresh
df_pyspark.na.drop(thresh=4).show()


Output:
 

Cleaning Data with PySpark

 

Fill Null Values

Instead of dropping rows, Null Values can be replaced by any value you need. We use the fill method for this purpose
In our given example, we have Null Values in the Department column. Let’s say we assume employees having no specific department are generalists who hop from department to department. 

Python3




# Fill Null values inside Department column with the word 'Generalist'
df_pyspark.na.fill('Generalist',subset=['Department']).show()


Here, the subset parameter acts the same role as in the drop method.

Output:

Cleaning Data with PySpark

 

Replace Values

If you glance at the dataset, you may find the IT & Information Tech Department which means the same. So to reduce any further complexity, we can replace Information Tech with IT.
We can insert a dictionary inside this method. The dictionary keys represent the initial value while the dictionary values represent the corresponding replacement. 

Python3




# Replace Information Tech with IT
df_pyspark.replace({'Information Tech':'IT'},subset=['Department']).show()


 Output:
 

Cleaning Data with PySpark

 

Outlier Removal

An outlier is an observation that lies an abnormal distance from other values in a random sample of a population.

In our dataset one employee aged around 200. This can be safely assumed as an outlier and can be either removed or rectified. We will remove this outlier now.

For this purpose, we can use the Filter method. We can execute conditions (just like the ones you use in the SQL WHERE statement) and combine multiple conditions using AND, OR, IN, etc. operators.

Python3




# Remove Outlier -- We assume 59 to be maximum working age in the company
df_pyspark.filter('Age<60').show()


Output:
 

Cleaning Data with PySpark

 

Below is the complete implementation:

Python3




# import necessary libraries
from pyspark.sql import SparkSession
  
# Create Spark Session
sparkSession = SparkSession.builder.appName('g1').getOrCreate()
  
# Create Spark DataFrame
df_pyspark = sparkSession.read.csv(
    'Employee_Table.csv',
    header=True,
    inferSchema=True
)
  
# Print Schema
df_pyspark.printSchema()
print("Before Data Cleaning")
# Print Dataframe
df_pyspark.show()
  
# Fill Null values inside Department column with the word 'Generalist'
df_pyspark = df_pyspark.na.fill('Generalist', subset=['Department'])
  
# Assumed Null Value means Employee joined during Company Founding i.e. 2010
df_pyspark = df_pyspark.na.fill(2010, subset=['Joining Year'])
  
# Replace Information Tech with IT
df_pyspark = df_pyspark.replace(
    {'Information Tech': 'IT'}, subset=['Department'])
  
# Remove Outlier -- We assume 59 to be maximum working age in the company
df_pyspark = df_pyspark.filter('Age<60')
  
# Remove Rows not containing First as well as Last Name
df_pyspark = df_pyspark.filter(
    df_pyspark['First Name'].isNotNull() | df_pyspark['Last Name'].isNotNull())
  
print("After Data Cleaning")
df_pyspark.show()


Output:

Cleaning Data with PySpark

 

Dominic Rubhabha-Wardslaus
Dominic Rubhabha-Wardslaushttp://wardslaus.com
infosec,malicious & dos attacks generator, boot rom exploit philanthropist , wild hacker , game developer,
RELATED ARTICLES

Most Popular

Recent Comments