In this article, we will convert a PySpark Row List to Pandas Data Frame. A Row object is defined as a single Row in a PySpark DataFrame. Thus, a Data Frame can be easily represented as a Python List of Row objects.
Method 1 : Use createDataFrame() method and use toPandas() method
Here is the syntax of the createDataFrame() method :
Syntax : current_session.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)
Parameters :
- data : a resilient distributed dataset or data in form of MySQL/SQL datatypes
- schema : string or list of columns names for the DataFrame.
- samplingRatio -> float: a sample ratio of the rows
- verifySchema -> bool: check if the datatypes of the rows is as specified in the schema
Returns : PySpark DataFrame object.
Example:
In this example, we will pass the Row list as data and create a PySpark DataFrame. We will then use the toPandas() method to get a Pandas DataFrame.
Python
# Importing PySpark and importantly # Row from pyspark.sql import pyspark from pyspark.sql import SparkSession from pyspark.sql import Row # PySpark Session row_pandas_session = SparkSession.builder.appName( 'row_pandas_session' ).getOrCreate() # List of Sample Row objects row_object_list = [Row(Topic = 'Dynamic Programming' , Difficulty = 10 ), Row(Topic = 'Arrays' , Difficulty = 5 ), Row(Topic = 'Sorting' , Difficulty = 6 ), Row(Topic = 'Binary Search' , Difficulty = 7 )] # creating PySpark DataFrame using createDataFrame() df = row_pandas_session.createDataFrame(row_object_list) # Printing the Spark DataFrame df.show() # Conversion to Pandas DataFrame pandas_df = df.toPandas() # Final Result print (pandas_df) |
Output :
Method 2 : Using parallelize()
We are going to use parallelize() to create an RDD. Parallelize means to copy the elements present in a pre-defined collection to a distributed dataset on which we can operate in parallel. Here is the syntax of parallelize() :
Syntax : sc.parallelize(data,numSlices)
sc : Spark Context Object
Parameters :
- data : data for which RDD is to be made.
- numSlices : number of partitions that need to be made. This is an optional parameter.
Example:
In this example, we will then use createDataFrame() to create a PySpark DataFrame and then use toPandas() to get a Pandas DataFrame.
Python
# Importing PySpark and importantly # Row from pyspark.sql import pyspark from pyspark.sql import SparkSession from pyspark.sql import Row # PySpark Session row_pandas_session = SparkSession.builder.appName( 'row_pandas_session' ).getOrCreate() # List of Sample Row objects row_object_list = [Row(Topic = 'Dynamic Programming' , Difficulty = 10 ), Row(Topic = 'Arrays' , Difficulty = 5 ), Row(Topic = 'Sorting' , Difficulty = 6 ), Row(Topic = 'Binary Search' , Difficulty = 7 )] # Creating an RDD rdd = row_pandas_session.sparkContext.parallelize(row_object_list) # DataFrame created using RDD df = row_pandas_session.createDataFrame(rdd) # Checking the DataFrame df.show() # Conversion of DataFrame df2 = df.toPandas() # Final DataFrame needed print (df2) |
Output :
Method 3: Iteration through Row list
In this method, we will traverse through the Row list, and convert each row object to a DataFrame using createDataFrame(). We will then append() this DataFrame to an accumulative final DataFrame which will be our final answer. The details of append() are given below :
Syntax: df.append(other, ignore_index=False, verify_integrity=False, sort=None)
df : Pandas DataFrame
Parameters :
- other : Pandas DataFrame, Numpy Array, Numpy Series etc.
- ignore_index : Checks if index labels are to be used or not.
- verify_integrity : If True, raise ValueError on creating index with duplicates.
- sort : Sort columns if the columns of df and other are unaligned.
Returns: A new appended DataFrame
Example:
In this example, we will then use createDataFrame() to create a PySpark DataFrame and then use append() to get a Pandas DataFrame.
Python
# Importing PySpark # Importing Pandas for append() import pyspark import pandas from pyspark.sql import SparkSession from pyspark.sql import Row # PySpark Session row_pandas_session = SparkSession.builder.appName( 'row_pandas_session' ).getOrCreate() # List of Sample Row objects row_object_list = [Row(Topic = 'Dynamic Programming' , Difficulty = 10 ), Row(Topic = 'Arrays' , Difficulty = 5 ), Row(Topic = 'Sorting' , Difficulty = 6 ), Row(Topic = 'Binary Search' , Difficulty = 7 )] # Our final DataFrame initialized mega_df = pandas.DataFrame() # Traversing through the list for i in range ( len (row_object_list)): # Creating a Spark DataFrame of a single row small_df = row_pandas_session.createDataFrame([row_object_list[i]]) # appending the Pandas version of small_df # to mega_df mega_df = mega_df.append(small_df.toPandas(), ignore_index = True ) # Printing our desired DataFrame print (mega_df) |
Output :