In this article, we are going to learn how to convert Python functions into Pyspark UDFs
We will discuss the process of converting Python functions into PySpark User-Defined Functions (UDFs). PySpark UDFs are a powerful tool for data processing and analysis, as they allow for the use of Python functions within the Spark ecosystem. By converting Python functions into UDFs, we can leverage the distributed processing capabilities of Spark to perform complex data transformations and operations on large datasets.
PySpark
PySpark is the Python library for Spark programming. It provides a Python API for interacting with the Spark ecosystem, including support for data frames, SQL operations, and machine learning.
User Defined Function (UDF)
A User Defined Function (UDF) is a function that is defined and written by the user, rather than being provided by the system. UDFs in PySpark are used to perform specific operations or calculations on data within the Spark ecosystem.
Distributed Processing
Spark is a distributed computing framework, which means that it can process data in parallel across a cluster of machines. This enables Spark to handle very large datasets, and perform operations quickly by breaking them down into smaller chunks.
Example 1:
In this example, we define a Python function square() that takes a single argument and returns its square. We then create a UDF from this function using the udf() function and specify that the return type of the UDF is IntegerType(). Finally, we use the UDF in a data frame operation to square the values in the “id” column of the data frame. The output of the code will be a data frame with one column with square values from 1 to 10.
Python
# Import required modules from pyspark.sql.functions import udf from pyspark.sql.types import IntegerType from pyspark.sql import SparkSession spark = SparkSession.builder.appName( "gfg" ).getOrCreate() # Define the Python function def square(x): return x * x # Create the UDF square_udf = udf(square, IntegerType()) # Use the UDF in a DataFrame operation df = spark. range ( 1 , 10 ) df.select(square_udf( "id" ).alias( "squared" )).show() |
Output: The below output is the result of running the UDF square_udf on the ‘id’ column of the data frame df. The UDF is applying the Python function square on each value of the ‘id’ column, which is squaring the values. The result of this operation is a new column ‘squared’ in the data frame that contains the square values of the ‘id’ column.
+-------+ |squared| +-------+ | 1| | 4| | 9| | 16| | 25| | 36| | 49| | 64| | 81| +-------+
Example 2:
In this example, we define a Python function concat_strings(x,y) that takes two arguments and concatenates them, and returns a single concatenated string. We then create a UDF from this function using the udf() function and specify that the return type of the UDF is StringType(). We then create an example data frame “df” for demonstration purposes and run the UDF on the columns of this data frame as shown in the example.
Python
# Importing required modules from pyspark.sql.functions import udf from pyspark.sql.types import StringType from pyspark.sql import SparkSession spark = SparkSession.builder.appName( "gfg" ).getOrCreate() # Define the Python function def concat_strings(x, y): return x + ' ' + y # Create the UDF concat_strings_udf = udf(concat_strings, StringType()) # Create data frame df = spark.createDataFrame([( 'John' , 'Doe' ), ( 'Adam' , 'Smith' ), ( 'Jane' , 'Doe' )], [ 'first_name' , 'last_name' ]) df.show() print ( "after applying udf function" ) df.select(concat_strings_udf( 'first_name' , 'last_name' ).alias( 'full_name' )).show() |
Output: As we can see, the UDF concatenates the values of the ‘first_name’ and ‘last_name’ columns and returns the concatenated string in a new column ‘full_name’.
+----------+---------+ |first_name|last_name| +----------+---------+ | John| Doe| | Adam| Smith| | Jane| Doe| +----------+---------+ after applying udf function +----------+ | full_name| +----------+ | John Doe| |Adam Smith| | Jane Doe| +----------+