Saturday, November 16, 2024
Google search engine
HomeLanguagesConvert PySpark RDD to DataFrame

Convert PySpark RDD to DataFrame

In this article, we will discuss how to convert the RDD to dataframe in PySpark. There are two approaches to convert RDD to dataframe.

  1. Using createDataframe(rdd, schema)
  2. Using toDF(schema)

But before moving forward for converting RDD to Dataframe first let’s create an RDD

Example:

Python




# importing necessary libraries
from pyspark.sql import SparkSession
 
# function to create new SparkSession
def create_session():
    spk = SparkSession.builder \
        .appName("Corona_cases_statewise.com") \
        .getOrCreate()
    return spk
 
# function to create RDD
def create_RDD(sc_obj, data):
    df = sc.parallelize(data)
    return df
 
 
if __name__ == "__main__":
 
    input_data = [("Uttar Pradesh", 122000, 89600, 12238),
                  ("Maharashtra", 454000, 380000, 67985),
                  ("Tamil Nadu", 115000, 102000, 13933),
                  ("Karnataka", 147000, 111000, 15306),
                  ("Kerala", 153000, 124000, 5259)]
 
    # calling function to create SparkSession
    spark = create_session()
 
    # creating spark context object
    sc = spark.sparkContext
 
    # calling function to create RDD
    rd_df = create_RDD(sc, input_data)
 
    # printing the type
    print(type(rd_df))


Output:

<class 'pyspark.rdd.RDD'>

Method 1: Using createDataframe() function. 

After creating the RDD we have converted it to Dataframe using createDataframe() function in which we have passed the RDD and defined schema for Dataframe.

Syntax:

spark.CreateDataFrame(rdd, schema)

Python




# importing necessary libraries
from pyspark.sql import SparkSession
 
# function to create new SparkSession
def create_session():
  spk = SparkSession.builder \
      .appName("Corona_cases_statewise.com") \
      .getOrCreate()
  return spk
 
# function to create RDD
def create_RDD(sc_obj,data):
  df = sc.parallelize(data)
  return df
 
# function to convert RDD to dataframe
def RDD_to_df(spark,df,schema):
   
  # converting RDD to df using createDataframe()
  # in which we are passing RDD and schema of df
  df1 = spark.createDataFrame(df,schema)
  return df1
 
if __name__ == "__main__":
     
  input_data = [("Uttar Pradesh",122000,89600,12238),
          ("Maharashtra",454000,380000,67985),
          ("Tamil Nadu",115000,102000,13933),
          ("Karnataka",147000,111000,15306),
          ("Kerala",153000,124000,5259)]
 
  # calling function to create SparkSession
  spark = create_session()
 
  # creating spark context object
  sc = spark.sparkContext
 
  # calling function to create RDD
  rd_df = create_RDD(sc,input_data)
 
  schema_lst = ["State","Cases","Recovered","Deaths"]
 
  # calling function to convert RDD to dataframe
  converted_df = RDD_to_df(spark,rd_df,schema_lst)
   
  # visualizing the schema and dataframe
  converted_df.printSchema()
  converted_df.show()


Output:

Method 2: Using toDF() function.

After creating the RDD we have converted it to Dataframe using the toDF() function in which we have passed the defined schema for Dataframe.

Syntax:

df.toDF(schema)

Python




# importing necessary libraries
from pyspark.sql import SparkSession
 
# function to create new SparkSession
def create_session():
  spk = SparkSession.builder \
      .appName("Corona_cases_statewise.com") \
      .getOrCreate()
  return spk
 
# function to create RDD
def create_RDD(sc,data):
  df = sc.parallelize(data)
  return df
 
# function to convert RDD to dataframe
def RDD_to_df(df,schema):
   
  # converting RDD to dataframe using toDF()
  # in which we are passing schema of df
  df = rd_df.toDF(schema)
  return df
 
if __name__ == "__main__":
     
  input_data = [("Uttar Pradesh",122000,89600,12238),
          ("Maharashtra",454000,380000,67985),
          ("Tamil Nadu",115000,102000,13933),
          ("Karnataka",147000,111000,15306),
          ("Kerala",153000,124000,5259)]
 
  # calling function to create SparkSession
  spark = create_session()
 
  # creating spark context object
  sc = spark.sparkContext
 
  # calling function to create RDD
  rd_df = create_RDD(sc,input_data)
 
  schema_lst = ["State","Cases","Recovered","Deaths"]
 
  # calling function to convert RDD to dataframe
  converted_df = RDD_to_df(rd_df,schema_lst)
   
  # visualizing the schema and dataframe
  converted_df.printSchema()
  converted_df.show()


Output:

RELATED ARTICLES

Most Popular

Recent Comments