Spark DataFrames is a distributed collection of data organized into named columns. They are similar to tables in a traditional relational database but can handle large amounts of data more efficiently thanks to their distributed nature. DataFrames can be created from a variety of sources such as structured data files, tables in Hive, external databases, or existing RDDs (Resilient Distributed Datasets).
A column with comma-separated list
Imagine we have a Spark DataFrame with a column called “items” that contains a list of items separated by commas.
To extract the individual items from this column, we can use the split() function.
Using split() function
The split() function is a built-in function in the PySpark library that allows you to split a string into an array of substrings based on a delimiter.
syntax:
split(str: Column, pattern: str, limit: int = -1) -> Column
Example 1:
The str parameter is the input string or column that you want to split. The pattern parameter is the delimiter used to split the string. The limit parameter is optional and specifies the maximum number of splits to perform. If not specified, the string is split as many times as possible.
Python3
from pyspark.sql import SparkSession from pyspark.sql.functions import split, col, explode # create sample data data = [( "1" , "milk,bread" ), ( "2" , "eggs,meat" ), ( "3" , "milk,eggs" )] columns = [ "user_id" , "items" ] # create Spark session and DataFrame spark = SparkSession.builder.appName( "Comma Separated List Example" ).getOrCreate() df = spark.createDataFrame(data, columns) # extract list items into an array df = df.withColumn( "item_list" , split(col( "items" ), "," )) # show list df.show() # stop SparkSession spark.stop() |
Output :
Example 2:
In this example from the item column, we make a list of items into separate columns titled as item_list
Python3
from pyspark.sql import SparkSession from pyspark.sql.functions import split, col, explode # create sample data data = [( "1" , "java,swift" ), ( "2" , "python, ruby" ), ( "3" , "java,python" )] columns = [ "user_id" , "items" ] # create Spark session and DataFrame spark = SparkSession.builder.appName( "Comma Separated List Example" ).getOrCreate() df = spark.createDataFrame(data, columns) # extract list items into an array df = df.withColumn( "item_list" , split(col( "items" ), "," )) # show list df.show() # stop SparkSession spark.stop() |
Output :
Example 3: transform an input table of records, where each record has an ID, a list of users, and a list of departments separated by commas, into an output table that groups the records by user and combines the departments for each user into a single list
Input :
Python3
from pyspark.sql import SparkSession from pyspark.sql.functions import split, explode, col from pyspark.sql.functions import collect_list, concat_ws # create sample data data = [( "1" , [ "user1" ], "Admin" ), ( "2" , [ "user1" ], "Accounts" ), ( "3" , [ "user2" ], "finance" ), ( "4" , [ "user3" ], "sales" ), ( "5" , [ "user3" ], "finance" )] columns = [ "ID" , "USER_LIST" , "DEPT_LIST" ] # create Spark session and DataFrame spark = SparkSession.builder.appName( "Comma Separated List Example" ).getOrCreate() df = spark.createDataFrame(data, columns) # explode the USER_LIST column df = df.select( "ID" , explode( "USER_LIST" ).alias( "USER" ), "DEPT_LIST" ) # group the dataframe by USER and concatenate the DEPT_LIST df = df.groupBy( "USER" ).agg( min ( "ID" ).alias( "ID" ), collect_list( "DEPT_LIST" ).alias( "DEPT_LIST" )) df = df.withColumn( "DEPT_LIST" , concat_ws( "," , col( "DEPT_LIST" ))) # show the output table df.show() # stop SparkSession spark.stop() |
Output :