Sunday, December 29, 2024
Google search engine
HomeLanguagesConvert PySpark Row List to Pandas DataFrame

Convert PySpark Row List to Pandas DataFrame

In this article, we will convert a PySpark Row List to Pandas Data Frame. A Row object is defined as a single Row in a PySpark DataFrame. Thus, a Data Frame can be easily represented as a Python List of Row objects.

Method 1 : Use createDataFrame() method and use toPandas() method

Here is the syntax of the createDataFrame() method : 

Syntax : current_session.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)

Parameters

  • data : a resilient distributed dataset or data in form of MySQL/SQL datatypes
  • schema : string or list of columns names for the DataFrame.
  • samplingRatio -> float: a sample ratio of the rows
  • verifySchema -> bool: check if the datatypes of the rows is as specified in the schema

Returns : PySpark DataFrame object.

Example:

 In this example, we will pass the Row list as data and create a PySpark DataFrame. We will then use the toPandas() method to get a Pandas DataFrame.

Python




# Importing PySpark and importantly
# Row from pyspark.sql
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
 
# PySpark Session
row_pandas_session = SparkSession.builder.appName(
    'row_pandas_session'
).getOrCreate()
 
# List of Sample Row objects
row_object_list = [Row(Topic='Dynamic Programming', Difficulty=10),
                   Row(Topic='Arrays', Difficulty=5),
                   Row(Topic='Sorting', Difficulty=6),
                   Row(Topic='Binary Search', Difficulty=7)]
 
# creating PySpark DataFrame using createDataFrame()
df = row_pandas_session.createDataFrame(row_object_list)
 
# Printing the Spark DataFrame
df.show()
 
# Conversion to Pandas DataFrame
pandas_df = df.toPandas()
 
# Final Result
print(pandas_df)


Output

Method 2 : Using parallelize()

We are going to use parallelize() to create an RDD. Parallelize means to copy the elements present in a pre-defined collection to a distributed dataset on which we can operate in parallel. Here is the syntax of parallelize()

Syntax : sc.parallelize(data,numSlices)

sc : Spark Context Object

Parameters

  • data : data for which RDD is to be made.
  • numSlices : number of partitions that need to be made. This is an optional parameter.

Example:

 In this example, we will then use createDataFrame() to create a PySpark DataFrame and then use toPandas() to get a Pandas DataFrame.

Python




# Importing PySpark and importantly
# Row from pyspark.sql
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
 
# PySpark Session
row_pandas_session = SparkSession.builder.appName(
    'row_pandas_session'
).getOrCreate()
 
# List of Sample Row objects
row_object_list = [Row(Topic='Dynamic Programming', Difficulty=10),
                   Row(Topic='Arrays', Difficulty=5),
                   Row(Topic='Sorting', Difficulty=6),
                   Row(Topic='Binary Search', Difficulty=7)]
 
# Creating an RDD
rdd = row_pandas_session.sparkContext.parallelize(row_object_list)
 
# DataFrame created using RDD
df = row_pandas_session.createDataFrame(rdd)
 
# Checking the DataFrame
df.show()
 
# Conversion of DataFrame
df2 = df.toPandas()
 
# Final DataFrame needed
print(df2)


Output

Method 3: Iteration through Row list

In this method, we will traverse through the Row list, and convert each row object to a DataFrame using createDataFrame(). We will then append() this DataFrame to an accumulative final DataFrame which will be our final answer. The details of append() are given below : 

Syntax: df.append(other, ignore_index=False, verify_integrity=False, sort=None)

df : Pandas DataFrame

Parameters :

  • other : Pandas DataFrame, Numpy Array, Numpy Series etc.
  • ignore_index : Checks if index labels are to be used or not.
  • verify_integrity : If True, raise ValueError on creating index with duplicates.
  • sort : Sort columns if the columns of df and other are unaligned.

Returns: A new appended DataFrame

Example:

 In this example, we will then use createDataFrame() to create a PySpark DataFrame and then use append() to get a Pandas DataFrame.

Python




# Importing PySpark
# Importing Pandas for append()
import pyspark
import pandas
from pyspark.sql import SparkSession
from pyspark.sql import Row
 
# PySpark Session
row_pandas_session = SparkSession.builder.appName(
    'row_pandas_session'
).getOrCreate()
 
# List of Sample Row objects
row_object_list = [Row(Topic='Dynamic Programming', Difficulty=10),
                   Row(Topic='Arrays', Difficulty=5),
                   Row(Topic='Sorting', Difficulty=6),
                   Row(Topic='Binary Search', Difficulty=7)]
 
# Our final DataFrame initialized
mega_df = pandas.DataFrame()
 
# Traversing through the list
for i in range(len(row_object_list)):
   
    # Creating a Spark DataFrame of a single row
    small_df = row_pandas_session.createDataFrame([row_object_list[i]])
 
    # appending the Pandas version of small_df
    # to mega_df
    mega_df = mega_df.append(small_df.toPandas(),
                             ignore_index=True)
 
# Printing our desired DataFrame
print(mega_df)


Output : 

Dominic Rubhabha-Wardslaus
Dominic Rubhabha-Wardslaushttp://wardslaus.com
infosec,malicious & dos attacks generator, boot rom exploit philanthropist , wild hacker , game developer,
RELATED ARTICLES

Most Popular

Recent Comments