Thursday, December 26, 2024
Google search engine
HomeLanguagesHow to duplicate a row N time in Pyspark dataframe?

How to duplicate a row N time in Pyspark dataframe?

In this article, we are going to learn how to duplicate a row N times in a PySpark DataFrame.

Method 1: Repeating rows based on column value

In this method, we will first make a PySpark DataFrame using createDataFrame(). In our example, the column “Y” has a numerical value that can only be used here to repeat rows. We will use withColumn() function here and its parameter expr will be explained below.

Syntax :

DataFrame.withColumn(colName,col)

Parameters

  • colName : str name of the new column
  • col : Column(DataType) a column expression of the new column

The colName here is “Y”. The col expression we will be using here is :

explode(array_repeat(Y,int(Y)))
  • array_repeat is an expression that creates an array containing a column repeated count times.
  • explode is an expression that returns a new row for each element in the given array or map.

Example:

Python




# Importing PySpark and Pandas
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,expr
 
# Session Creation
Spark_Session = SparkSession.builder.appName(
    'Spark Session'
).getOrCreate()
 
# Accepting n from the user.
n = int(input('Enter n : '))
 
# Data filled in our DataFrame
rows = [['a',1,'@'],
        ['b',3,'_'],
        ['c',2,'!'],
        ['d',6,'(']]
 
# Columns of our DataFrame
columns = ['X','Y','Z']
 
# DataFrame is created
df = Spark_Session.createDataFrame(rows,columns)
 
# Printing the DataFrame
df.show()
 
# Creating a new DataFrame with a
# expression using functions
new_df = df.withColumn(
  "Y", expr("explode(array_repeat(Y,int(Y)))"))
 
# Printing the new DataFrame
new_df.show()


Output : 

Method 2: Using collect() and appending a random row in the list

In this method, we will first accept N from the user. We will then create a PySpark DataFrame using createDataFrame(). We can then store the list of Row objects found using collect() method. The Syntax needed is : 

DataFrame.collect()

in a variable. We will then use the Python List append() function to append a row object in the list which will be done in a loop of N iterations. Finally, the list of Row objects will be converted to a PySpark DataFrame.

Example:

Python




# Importing PySpark and random
import pyspark
from pyspark.sql import SparkSession
import random
 
# Session Creation
Spark_Session = SparkSession.builder.appName(
    'Spark Session'
).getOrCreate()
 
# Accepting n from the user.
n = int(input('Enter n : '))
 
# Data filled in our DataFrame
rows = [['a',1,'@'],
        ['b',3,'_'],
        ['c',2,'!'],
        ['d',6,'(']]
 
# Columns of our DataFrame
columns = ['X','Y','Z']
 
# DataFrame is created
df = Spark_Session.createDataFrame(rows,columns)
 
# Showing the DataFrame
df.show()
 
# Creating a list of rows and
# getting a random row from the list
row_list = df.collect()
repeated = random.choice(row_list)
 
# adding a row object to the list
# n times
for _ in range(n):
  row_list.append(repeated)
 
# Final DataFrame
df = Spark_Session.createDataFrame(row_list)
 
# Result
df.show()


Output : 

Method 3: Convert the PySpark DataFrame to a Pandas DataFrame

In this method, we will first accept N from the user. We will then create a PySpark DataFrame using createDataFrame(). We will then be converting a PySpark DataFrame to a Pandas DataFrame using toPandas(). We will then get the first row of the DataFrame using slicing with the Syntax DataFrame[:1]. We will then use append() function to stick the row to the Pandas DataFrame using a loop. They syntax of append() is : 

Syntax : DataFrame.append(other, ignore_index=False, verify_integrity=False, sort=False)

Parameters

  • other : DataFrame/Numpy Series The data to be appended
  • ignore_index : bool, default : False Check if the DataFrame of the new DataFrame depends on the older DataFrame
  • verify_integrity : bool, default : False Takes care of duplicate values
  • sort : bool, default : False Sort columns based on the value

Example:

Python




# Importing PySpark and Pandas
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
 
# Session Creation
Spark_Session = SparkSession.builder.appName(
    'Spark Session'
).getOrCreate()
 
# Accepting n from the user.
n = int(input('Enter n : '))
 
# Data filled in our DataFrame
rows = [['a',1,'@'],
        ['b',3,'_'],
        ['c',2,'!'],
        ['d',6,'(']]
 
# Columns of our DataFrame
columns = ['X','Y','Z']
 
# DataFrame is created
df = Spark_Session.createDataFrame(rows,columns)
 
# Converting to a Pandas DataFrame
df_pandas = df.toPandas()
 
# The initial DataFrame
print('First DF')
print(df_pandas)
 
# the first row
first_row = df_pandas[:1]
 
# Appending the row n times
for _ in range(n):
  df_pandas = df_pandas.append(first_row,ignore_index = True)
 
# Final DataFrame
print('New DF')
print(df_pandas)


Output : 

RELATED ARTICLES

Most Popular

Recent Comments