In this article, we are going to learn data partitioning using PySpark in Python.
In PySpark, data partitioning refers to the process of dividing a large dataset into smaller chunks or partitions, which can be processed concurrently. This is an important aspect of distributed computing, as it allows large datasets to be processed more efficiently by dividing the workload among multiple machines or processors.
Advantages of Data Partitioning :
- Improved performance: By dividing data into smaller partitions, it can be processed in parallel across multiple machines, leading to faster processing times and improved performance.
- Scalability: Partitioning allows for horizontal scalability, meaning that as the amount of data grows, more machines can be added to the cluster to handle the increased load, without having to make changes to the data processing code.
- Improved fault tolerance: Partitioning also allows for data to be distributed across multiple machines, which can help to prevent data loss in the event of a single machine failure.
- Data organization: Partitioning allows for data to be organized in a more meaningful way, such as by time period or geographic location, which can make it easier to analyze and query the data.
In this article, we will see different methods to perform data partition
Methods of data partitioning in PySpark
- Hash Partitioning
- Range Partitioning
- Using partitionBy
Using Hash partitioning
This is the default partitioning method in PySpark. It works by assigning a unique hash value to each record based on a specified column and then placing the record in the corresponding partition. This ensures that records with the same value for the specified column are placed in the same partition. Hash partitioning is a method of dividing a dataset into partitions based on the hash values of specified columns.
Steps to implement hash partitioning:
Step 1: First we will import all necessary libraries and create a sample DataFrame with three columns id, name, and age.
Step 2: Use the repartition function to perform hash partitioning on the DataFrame based on the id column. We will specify that we want to create four partitions.
Step 3: We can verify the partitioning by using the rdd method to access the underlying RDD and then calling the glom method, which returns an array of all the elements in each partition.
Here is the complete code :
Python3
# Import required modules from pyspark.sql import SparkSession # Create a SparkSession spark = SparkSession.builder.appName(" hash \ _partitioning").getOrCreate() # Create a sample DataFrame df = spark.createDataFrame([ ( 1 , "Alice" , 25 ), ( 2 , "Bob" , 30 ), ( 3 , "Charlie" , 35 ), ( 4 , "Dave" , 40 ), ( 5 , "Eve" , 45 ), ( 6 , "Frank" , 50 ) ], [ "id" , "name" , "age" ]) # Print the DataFrame df.show() # Perform hash partitioning on the # DataFrame based on the "id" column df = df.repartition( 4 , "id" ) # Print the elements in each partition print (df.rdd.glom().collect()) |
Output: In the below output we can see the data frame on which we are going to apply the partition and below that data frame a nested array can be seen which contains the partitioned data.
+---+-------+---+ | id| name|age| +---+-------+---+ | 1| Alice| 25| | 2| Bob| 30| | 3|Charlie| 35| | 4| Dave| 40| | 5| Eve| 45| | 6| Frank| 50| +---+-------+---+ [[Row(id=2, name='Bob', age=30), Row(id=4, name='Dave', age=40), Row(id=5, name='Eve', age=45)], [Row(id=1, name='Alice', age=25), Row(id=6, name='Frank', age=50)], [], [Row(id=3, name='Charlie', age=35)]]
Using Range partitioning
This method involves dividing the data into partitions based on a range of values for a specified column. For example, we could partition a dataset based on a range of dates, with each partition containing records from a specific time period. In this method, we will use the repartitionByRange() function to perform range partitioning on the DataFrame based on the age column.
Python3
from pyspark.sql import SparkSession # Create a SparkSession spark = SparkSession.builder.appName(" range \ _partitioning").getOrCreate() # Create a sample DataFrame df = spark.createDataFrame([ ( 1 , "Alice" , 25 ), ( 2 , "Bob" , 30 ), ( 3 , "Charlie" , 35 ), ( 4 , "Dave" , 40 ), ( 5 , "Eve" , 45 ), ( 6 , "Frank" , 50 ) ], [ "id" , "name" , "age" ]) # Perform range partitioning on the # DataFrame based on the "age" column df = df.repartitionByRange( 3 , "age" ) # Print the elements in each partition print (df.rdd.glom().collect()) |
Output: In the below output we can see the data frame is partitioned into three parts as specified in the repartitionByRange() function.
[[Row(id=1, name='Alice', age=25), Row(id=2, name='Bob', age=30)], [Row(id=3, name='Charlie', age=35), Row(id=4, name='Dave', age=40)], [Row(id=5, name='Eve', age=45), Row(id=6, name='Frank', age=50)]]
Using partitionBy() Method
The partitionBy() method in PySpark is used to split a DataFrame into smaller, more manageable partitions based on the values in one or more columns. The method takes one or more column names as arguments and returns a new DataFrame that is partitioned based on the values in those columns. In this, we are going to use a cricket data set which can be downloaded from this link Cricket_data_set_odi.csv. Let’s see the steps to partition the data using partitionBy() function.
Step 1: Import the required modules and read the CSV file and then print its schema.
Python3
# importing module import pyspark from pyspark.sql import SparkSession from pyspark.context import SparkContext # creating sparksession and giving an app name spark = SparkSession.builder.appName( 'sparkdf' ).getOrCreate() # create DataFrame df = spark.read.option( "header" , True ).csv( "Cricket_data_set_odi.csv" ) # Display schema df.printSchema() |
Output :
Step 2: In this step, we are going to use two columns “Team” and “Speciality” columns. All the partitions based on teams and their speciality are stored in “Team-Speciality” folder using write.option() function and partition is done by using partitionBy() function.
Python3
# From above DataFrame, we will be using # Team and Speciality as a partition key # for our examples below. # partitionBy() df.write.option( "header" , True ) \ .partitionBy( "Team" , "Speciality" ) \ .mode( "overwrite" ) \ .csv( "Team-Speciality" ) |
Output: In this output, we can see the directory overview of partition folders.
Conclusion :
It’s important to note that data partitioning can have a significant impact on the performance of a PySpark application. Proper partitioning can greatly improve the speed and efficiency of the code, while improper partitioning can lead to poor performance and inefficient use of resources.