In this article, we will discuss how to convert the RDD to dataframe in PySpark. There are two approaches to convert RDD to dataframe.
- Using createDataframe(rdd, schema)
- Using toDF(schema)
But before moving forward for converting RDD to Dataframe first let’s create an RDD
Example:
Python
# importing necessary libraries from pyspark.sql import SparkSession # function to create new SparkSession def create_session(): spk = SparkSession.builder \ .appName( "Corona_cases_statewise.com" ) \ .getOrCreate() return spk # function to create RDD def create_RDD(sc_obj, data): df = sc.parallelize(data) return df if __name__ = = "__main__" : input_data = [( "Uttar Pradesh" , 122000 , 89600 , 12238 ), ( "Maharashtra" , 454000 , 380000 , 67985 ), ( "Tamil Nadu" , 115000 , 102000 , 13933 ), ( "Karnataka" , 147000 , 111000 , 15306 ), ( "Kerala" , 153000 , 124000 , 5259 )] # calling function to create SparkSession spark = create_session() # creating spark context object sc = spark.sparkContext # calling function to create RDD rd_df = create_RDD(sc, input_data) # printing the type print ( type (rd_df)) |
Output:
<class 'pyspark.rdd.RDD'>
Method 1: Using createDataframe() function.
After creating the RDD we have converted it to Dataframe using createDataframe() function in which we have passed the RDD and defined schema for Dataframe.
Syntax:
spark.CreateDataFrame(rdd, schema)
Python
# importing necessary libraries from pyspark.sql import SparkSession # function to create new SparkSession def create_session(): spk = SparkSession.builder \ .appName( "Corona_cases_statewise.com" ) \ .getOrCreate() return spk # function to create RDD def create_RDD(sc_obj,data): df = sc.parallelize(data) return df # function to convert RDD to dataframe def RDD_to_df(spark,df,schema): # converting RDD to df using createDataframe() # in which we are passing RDD and schema of df df1 = spark.createDataFrame(df,schema) return df1 if __name__ = = "__main__" : input_data = [( "Uttar Pradesh" , 122000 , 89600 , 12238 ), ( "Maharashtra" , 454000 , 380000 , 67985 ), ( "Tamil Nadu" , 115000 , 102000 , 13933 ), ( "Karnataka" , 147000 , 111000 , 15306 ), ( "Kerala" , 153000 , 124000 , 5259 )] # calling function to create SparkSession spark = create_session() # creating spark context object sc = spark.sparkContext # calling function to create RDD rd_df = create_RDD(sc,input_data) schema_lst = [ "State" , "Cases" , "Recovered" , "Deaths" ] # calling function to convert RDD to dataframe converted_df = RDD_to_df(spark,rd_df,schema_lst) # visualizing the schema and dataframe converted_df.printSchema() converted_df.show() |
Output:
Method 2: Using toDF() function.
After creating the RDD we have converted it to Dataframe using the toDF() function in which we have passed the defined schema for Dataframe.
Syntax:
df.toDF(schema)
Python
# importing necessary libraries from pyspark.sql import SparkSession # function to create new SparkSession def create_session(): spk = SparkSession.builder \ .appName( "Corona_cases_statewise.com" ) \ .getOrCreate() return spk # function to create RDD def create_RDD(sc,data): df = sc.parallelize(data) return df # function to convert RDD to dataframe def RDD_to_df(df,schema): # converting RDD to dataframe using toDF() # in which we are passing schema of df df = rd_df.toDF(schema) return df if __name__ = = "__main__" : input_data = [( "Uttar Pradesh" , 122000 , 89600 , 12238 ), ( "Maharashtra" , 454000 , 380000 , 67985 ), ( "Tamil Nadu" , 115000 , 102000 , 13933 ), ( "Karnataka" , 147000 , 111000 , 15306 ), ( "Kerala" , 153000 , 124000 , 5259 )] # calling function to create SparkSession spark = create_session() # creating spark context object sc = spark.sparkContext # calling function to create RDD rd_df = create_RDD(sc,input_data) schema_lst = [ "State" , "Cases" , "Recovered" , "Deaths" ] # calling function to convert RDD to dataframe converted_df = RDD_to_df(rd_df,schema_lst) # visualizing the schema and dataframe converted_df.printSchema() converted_df.show() |
Output: