Pyspark offers a very useful function, Window which is operated on a group of rows and returns a single value for every input row. Do you know you can even find the difference between two columns using the WIndow function? In this article, we will discuss applying a Window function to calculate differences in Pyspark in Python.
Here we are going to apply a windows function to calculate the difference in Pyspark. Window function in Pyspark is used to perform statical operation.
Applying a Window function to calculate differences in PySpark
First of all, import the required libraries, i.e. SparkSession, Window, and functions. The SparkSession library is used to create the session, while the Window function returns a single value for every input row. Also, pyspark.sql.functions return a column based on the given column name.
Now, create a spark session using the getOrCreate function. Then, read the CSV file and display it to see if it is correctly uploaded. Next, rearrange the data through any column name using the Window function. Later on, calculate the lag of the column using the Window function by 1 for each entry and store it in the new column. Further, calculate the difference between the data frame column and the column created in the previous step using the Window function. Finally, display the updated data frame with the column having a difference.
Example:
In this example, we have used a data frame (link), i.e., a data set of 5×5, on which we applied the window function lag on the column discount and stored it in the new column ‘Updated Discount‘. Then, we subtracted the ‘Updated Discount’ from the column ‘Fees‘.
Python3
# Python program for applying a Window function # to calculate differences in pySpark # Import the SparkSession, Window and func libraries from pyspark.sql import SparkSession from pyspark.sql.window import Window import pyspark.sql.functions as func # Create a spark session using getOrCreate() function spark_session = SparkSession.builder.getOrCreate() # Read the CSV file data_frame = csv_file = spark_session.read.csv( '/content/class_data.csv' , sep = ',' , inferSchema = True , header = True ) # Sort data through class in Window function Windowspec = Window.orderBy( "class" ) # Calculating lag of fine using WIndow function by # 1 for each student and # putting in new column 'Updated Fine' updated_discount = data_frame.withColumn( 'Updated Discount' , func.lag(data_frame[ 'discount' ]).over(Windowspec)) # Calculate difference of fees and lagged discount # using Window function and storing in new column updated_fees = updated_discount.withColumn( 'Updated Fees' , (updated_discount[ 'fees' ] - updated_discount[ 'Updated Discount' ])) # Displaying the column obtained # by subtracting discount from fees updated_fees.show() |
Output:
+-------+--------------+-----+-----+--------+----------------+------------+ | name| subject|class| fees|discount|Updated Discount|Updated Fees| +-------+--------------+-----+-----+--------+----------------+------------+ | Ishita| English| 9| 9000| 0| null| null| | Arun| Maths| 10|12000| 400| 0| 12000| | Aniket|Social Science| 11|15000| 600| 400| 14600| |Pranjal| Science| 12|18000| 1000| 600| 17400| |Vinayak| Computer| 12|18000| 500| 1000| 17000| +-------+--------------+-----+-----+--------+----------------+------------+