In this article, we are going to know how to cleaning of data with PySpark in Python.
Pyspark is an interface for Apache Spark. Apache Spark is an Open Source Analytics Engine for Big Data Processing. Today we will be focusing on how to perform Data Cleaning using PySpark. We will perform Null Values Handing, Value Replacement & Outliers removal on our Dummy data given below. Save the below data in a notepad with the “.csv” extension.
Employee_Table.csv file:
First Name,Last Name,Age,Department,Joining Year Donald,Biden,30,, ,,25,,2015 Barack,Trump,40,IT,2012 George,Pence,32,,2020 Mike,,25,Consulting, Mitt,Bush,200,Information Tech,
Prerequisites
- Basic Knowledge of Python.
- Java JDK should be installed.
- Install PySpark using the pip command.
Step-by-step implementation of code
Reading & Displaying Data
First, we import and create a Spark session which acts as an entry point to PySpark functionalities to create Dataframes, etc.
Python3
# import necessary libraries from pyspark.sql import SparkSession # Create Spark Session sparkSession = SparkSession.builder.appName( 'g1' ).getOrCreate() |
The Spark Session appName sets a name for the application which will be displayed on Spark UI.
Python3
# Create Spark DataFrame df_pyspark = sparkSession.read.csv( 'Employee_Table.csv' , header = True , inferSchema = True ) |
The CSV method can be replaced by JDBC, JSON, etc depending on the file format. The header flag decides whether the first row should be considered as column headers or not. If the InferSchema flag is set True, the column datatypes are determined based on the content inside otherwise all are set to String Datatype.
Python3
# Print Schema df_pyspark.printSchema() # Print Dataframe df_pyspark.show() |
Printing the schema and data table.
Output:
Null Values Dropping
A null value in a database represents missing data. We can perform multiple activities as listed below to handle Null values.
Dropping Rows containing Null values
dataframe.na.drop() function drops rows containing even a single null value. A parameter “how” can be set to “any” or “all“.
ANY -> Drops rows containing any number of Null values
ALL-> Drops rows only if a row contains all Null values
Note: The changes done to the Dataframe are not in place.
Python3
# Dropping Entire rows containing Null null_dropped = df_pyspark.na.drop() null_dropped.show() |
Output:
Subset in dataframe.na.drop()
The subset parameter inside the drop method accepts a list of column names (List[String]) such that the Null check happens only in the mentioned subset of columns.
Python3
# Dropping Rows where Joining Year is missing non_null_year = df_pyspark.na.drop(subset = [ 'Joining Year' ]) non_null_year.show() |
Output:
Thresh in dataframe.na.drop()
Thresh parameter inside drop method takes an integer which acts as a threshold such that all rows containing Non-Null values lesser than the threshold are dropped.
Python3
# Drop rows containing non-null values less than thresh df_pyspark.na.drop(thresh = 4 ).show() |
Output:
Fill Null Values
Instead of dropping rows, Null Values can be replaced by any value you need. We use the fill method for this purpose
In our given example, we have Null Values in the Department column. Let’s say we assume employees having no specific department are generalists who hop from department to department.
Python3
# Fill Null values inside Department column with the word 'Generalist' df_pyspark.na.fill( 'Generalist' ,subset = [ 'Department' ]).show() |
Here, the subset parameter acts the same role as in the drop method.
Output:
Replace Values
If you glance at the dataset, you may find the IT & Information Tech Department which means the same. So to reduce any further complexity, we can replace Information Tech with IT.
We can insert a dictionary inside this method. The dictionary keys represent the initial value while the dictionary values represent the corresponding replacement.
Python3
# Replace Information Tech with IT df_pyspark.replace({ 'Information Tech' : 'IT' },subset = [ 'Department' ]).show() |
Output:
Outlier Removal
An outlier is an observation that lies an abnormal distance from other values in a random sample of a population.
In our dataset one employee aged around 200. This can be safely assumed as an outlier and can be either removed or rectified. We will remove this outlier now.
For this purpose, we can use the Filter method. We can execute conditions (just like the ones you use in the SQL WHERE statement) and combine multiple conditions using AND, OR, IN, etc. operators.
Python3
# Remove Outlier -- We assume 59 to be maximum working age in the company df_pyspark. filter ( 'Age<60' ).show() |
Output:
Below is the complete implementation:
Python3
# import necessary libraries from pyspark.sql import SparkSession # Create Spark Session sparkSession = SparkSession.builder.appName( 'g1' ).getOrCreate() # Create Spark DataFrame df_pyspark = sparkSession.read.csv( 'Employee_Table.csv' , header = True , inferSchema = True ) # Print Schema df_pyspark.printSchema() print ( "Before Data Cleaning" ) # Print Dataframe df_pyspark.show() # Fill Null values inside Department column with the word 'Generalist' df_pyspark = df_pyspark.na.fill( 'Generalist' , subset = [ 'Department' ]) # Assumed Null Value means Employee joined during Company Founding i.e. 2010 df_pyspark = df_pyspark.na.fill( 2010 , subset = [ 'Joining Year' ]) # Replace Information Tech with IT df_pyspark = df_pyspark.replace( { 'Information Tech' : 'IT' }, subset = [ 'Department' ]) # Remove Outlier -- We assume 59 to be maximum working age in the company df_pyspark = df_pyspark. filter ( 'Age<60' ) # Remove Rows not containing First as well as Last Name df_pyspark = df_pyspark. filter ( df_pyspark[ 'First Name' ].isNotNull() | df_pyspark[ 'Last Name' ].isNotNull()) print ( "After Data Cleaning" ) df_pyspark.show() |
Output: