Course: Big Data - IU S25
Author: Firas Jolha
Spark DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as structured data files, tables in Hive, external databases, or existing RDDs.
PySpark DataFrame is mostly similar to Pandas DataFrame with the exception PySpark DataFrames are distributed in the cluster (meaning the data in DataFrame’s are stored in different machines in a cluster) and any operations in PySpark executes in parallel on all machines whereas Panda Dataframe stores and operates on a single machine. Due to parallel execution on all cores on multiple machines, PySpark runs operations faster then pandas. You need to initiate spark context in the beginning of the application.
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession\
.builder\
.appName("Spark SQL")\
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("OFF") # WARN, FATAL, INFO
Each record in the dataframe is of type pyspark.sql.Row
whereas each column is of type pyspark.sql.Column
. There are multiple ways to create DataFrame in PySpark:
createDataFrame()
functiondata = [(1,2,3, 'a b c'), (4,5,6, 'd e f'), (7,8,9,'g h i')]
df = spark.createDataFrame(data) # is a dataframe from a list
df.rdd # Convert Spark DataFrame into RDD
rdd = sc.parallelize(data)
df = spark.createDataFrame(rdd) # is a dataframe from an rdd
toDF()
functionrdd = sc.parallelize(data)
df = rdd.toDF() # From RDD to Dataframe
# from RDD to Dataframe with custom column names
df = rdd.toDF(["int1", "int2", "int3", "str"])
# from RDD to Dataframe without defining the schema (inferSchema option is true)
df = spark.createDataFrame(rdd)
from pyspark.sql.types import *
rdd = rdd.map(lambda t: Row(t._1, t._2, t._3))
schema = StructType([
# StructField(<fieldname>, <fieldtype>, <nullability>)
StructField("int1", IntegerType(), True),
StructField("int2", IntegerType(), True),
StructField("int3", IntegerType(), True),
StructField("str", StringType(), True)
])
# From RDD to Dataframe with an explicit schema
df = spark.createDataFrame(rdd, schema)
path = "file:///sparkdata/movies.csv"
df1 = spark.read.format("csv") \
.option("sep", ",") \
.option("inferSchema", "true") \
.option("header", "true") \
.load(path)
df1 = spark.read.csv(path)
df1.show() # Display the dataframe
df1.printSchema() # print the schema of the dataframe
You can find more csv options here.
You can put the file movies.csv
in the moviesdb
database inside the collection movies
as follows.
mongoimport --db moviesdb --collection movies --type csv --headerline --file movies.csv
You can read it using pymongo
.
import pymongo
MONGODB_HOST = "host.docker.internal"
MONGODB_PORT=27017
# The default configuration
# localhost:27017
client = pymongo.MongoClient(host=MONGODB_HOST, port=MONGODB_PORT)
db = client['moviesdb'] # client['<db_name>']
# A pymongo Cursor
# db.<collection_name>
movies_cur = db.movies.find() # Get all documents
print(movies_cur.to_list(10))
Now let’s try to read it from Spark using Mongo Spark Connector.
from pyspark.sql.types import *
from pyspark.sql import SparkSession
# We will use this port number for monitoring Spark Jobs
port=4040 # If this port is not free then use 4041 but do not forget to publish it in the master container
spark = SparkSession.builder\
.appName("Spark Mongodb")\
.config('spark.ui.port', str(port))\
.config("spark.mongodb.read.connection.uri", f"mongodb://{MONGODB_HOST}:{MONGODB_PORT}/moviesdb.movies") \
.config("spark.mongodb.write.connection.uri", f"mongodb://{MONGODB_HOST}:{MONGODB_PORT}/moviesdb.movies") \
.getOrCreate()
sc = spark.sparkContext
# Set log level to ERROR
sc.setLogLevel("ERROR")
schema = StructType([
# StructField(<fieldname>, <fieldtype>, <nullability>)
StructField("Film", StringType(), True),
StructField("Genre", StringType(), True),
StructField("Lead Studio", StringType(), True),
StructField("Audience score %", IntegerType(), True),
StructField("Profitability", StringType(), True),
StructField("Rotten Tomatoes %", IntegerType(), True),
StructField("Worldwide Gross", StringType(), True),
StructField("Year", IntegerType(), True),
StructField("_id", StringType(), True) # Special field for the documents in Mongodb
])
movies_cur = db.movies.find() # Get all documents
rdd = sc.parallelize(movies_cur)
rdd1 = rdd.map(lambda x: {k if k!='_id' else '_id': v if k!='_id' else str(v) for k,v in x.items() })
# Convert to RDD then to Spark DataFrame
df1 = spark.createDataFrame(rdd1, schema) # Convert to Spark DataFrame
df1.show()
df2 = spark.read\
.format("mongodb")\
.load()
df2.show()
df2.write\
.format("mongodb") \
.option("database", "moviesdb")\
.option("collection", "movies_copy") \
.mode("overwrite")\
.save()
df3 = spark.read\
.format("mongodb") \
.option("database", "moviesdb")\
.option("collection", "movies_copy") \
.load()
df3.show()
Note: Here you need to set the following configuration:
spark-submit
, you set --packages "org.mongodb.spark:mongo-spark-connector_2.12:10.4.1"
spark session
, you set .config("packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.4.1")
spark-submit
, you set --conf "spark.mongodb.read.connection.uri=mongodb://host.docker.internal:27017/moviesdb.movies"
spark session
, you set .config("spark.mongodb.write.connection.uri", "mongodb://host.docker.internal:27017/moviesdb.movies")
spark-submit
, you set --conf "spark.mongodb.write.connection.uri=mongodb://host.docker.internal:27017/moviesdb.movies"
spark session
, you set .config("spark.mongodb.read.connection.uri", "mongodb://host.docker.internal:27017/moviesdb.movies")
You can run the previous code snippets in a shell created as follows:
pyspark --master yarn
--archives .venv.tar.gz#.venv
--packages "org.mongodb.spark:mongo-spark-connector_2.12:10.4.1"
--conf "spark.mongodb.read.connection.uri=mongodb://host.docker.internal:27017/moviesdb.movies"
--conf "spark.mongodb.write.connection.uri=mongodb://host.docker.internal:27017/moviesdb.movies"
OR using spark-submit
as follows:
spark-submit --master yarn
--archives .venv.tar.gz#.venv
--packages "org.mongodb.spark:mongo-spark-connector_2.12:10.4.1"
--conf "spark.mongodb.read.connection.uri=mongodb://host.docker.internal:27017/moviesdb.movies"
--conf "spark.mongodb.write.connection.uri=mongodb://host.docker.internal:27017/moviesdb.movies"
mongodbspark.py
path = "hdfs://localhost:9000/data/movies.csv"
df = spark.read.load(path, format="csv", sep = ",", inferSchema = "true", header = "true")
# a spark dataframe
# OR
df = spark.read.csv(path, sep = ",", inferSchema = "true", header = "true")
# a spark dataframe
df.printSchema()
df.show(truncate=False)
The file movies.csv
is uploaded to HDFS and stored in the folder /data
.
StructType
and StructField
classes are used to programmatically specify the schema to the DataFrame and create complex columns like nested struct, array, and map columns. StructType
is a collection of StructField
’s that defines column name, column data type, boolean to specify if the field can be nullable or not and metadata.
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
# A sample data
data = [ (("James","","Smith"),"36636","M",3100),
(("Michael","Rose",""),"40288","M",4300),
(("Robert","","Williams"),"42114","M",1400),
(("Maria","Anne","Jones"),"39192","F",5500),
(("Jen","Mary","Brown"),"","F",-1)
]
schema = StructType([
StructField('name', StructType([
StructField('firstname', StringType(), True),
StructField('middlename', StringType(), True),
StructField('lastname', StringType(), True)
])),
StructField('id', StringType(), True),
StructField('gender', StringType(), True),
StructField('salary', IntegerType(), True)
])
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show(truncate=False)
show()
is used to display the contents of the DataFrame in a Table Row and Column Format. By default, it shows only 20 Rows, and the column values are truncated at 20 characters.
df.show()
df.show(5)
df.show(5, truncate=False)
df.show(10, truncate=False, vertical=True)
collect()
is an action operation that is used to retrieve all the elements of the dataset (from all nodes) to the driver node. It retrieves all elements in a DataFrame as a list
of Row
type to the driver node. We should use collect()
on smaller dataset usually after filter()
, group()
e.t.c. Retrieving larger datasets results in OutOfMemory error. You can use head
operation to get only the first rows/records.
df.collect() # all elements
df.collect()[0] # first row
df.collect()[0][0] # first cell at first row and first column
Notice that collect on a big dataframe can cause performance and memory issues since this action collects the partitions of the dataframe from all cluster nodes to the memory of one machine. So we suggest more efficient methods as follows:
df.take(10) # first 10 elements
df.take(1)[0] # first row
df.take(1)[0][0] # first cell at first row and first column
select()
function is used to select single, multiple, column by index, all columns from the list and the nested columns from a DataFrame. This function returns a DataFrame with the selected columns.
from pyspark.sql.functions import col
df.select("name", \
"name.firstname", \
df.id, \
df['gender'], \
col("salary")) \
.show()
df.select("*").show()
df.select([col for col in df.columns]).show()
df.select(df.columns[:2]).show()
withColumn() is a transformation function of DataFrame which is used to change the value, convert the datatype of an existing column, create a new column, and many more.
# Read the data
path = "hdfs://localhost:9000/data/movies.csv"
df = spark.read.load(path, format="csv", sep = ",", inferSchema = "true", header = "true")
# Print Schema
df.printSchema()
# Convert the `Worldwide Gross` column to double
# 1. Remove the $ sign
import pyspark.sql.functions as F
df.withColumn("Worldwide Gross", F.translate('Worldwide Gross', '$', '').cast("Double")).show(5)
df.withColumn("Worldwide Gross", F.col("Worldwide Gross").cast("Double"))
# You can merge the previous operations into one operation as shown below
col_name = df.columns[3]
df2.withColumn(col_name, F.col(col_name)/100).show(5)
col_name = df2.columns[3]
df2.withColumn("score", F.col(col_name)/100).show(5)
df2.withColumn("Country", F.lit("Russia")).show()
df2.withColumnRenamed(df2.columns[3], "score").show(5)
df2.drop("Year").show(5)
PySpark filter()
function is used to filter the rows from RDD/DataFrame based on the given condition or SQL expression, you can also use where()
clause instead of the filter()
if you are coming from an SQL background, both these functions operate exactly the same.
df2.filter((df2.Year == 2008) & (df2['Film'].startswith("Wh"))).show(5)
df2.filter(~F.col('Genre').isin(['Comedy', 'Drama'])).show(5)
PySpark distinct()
function is used to drop/remove the duplicate rows (all columns) from DataFrame and dropDuplicates()
is used to drop rows based on selected (one or multiple) columns.
print(df2.count() - df2.distinct().count())
df2.dropDuplicates(['Genre', 'Lead Studio']).show(5)
Similar to SQL GROUP BY
clause, PySpark groupBy()
function is used to collect the identical data into groups on DataFrame and perform count, sum, avg, min, max functions on the grouped data. When we perform groupBy()
on PySpark Dataframe, it returns GroupedData
object which contains aggregate functions. Some of them are avg
, sum
, min
, max
.
Notice that the aggregate functions are transformations and will return a DataFrame. You need to call an action to see the output of the aggregation.
df2.groupby("Genre").sum("Worldwide Gross").show()
2. Calculate the average score for audience and max gross for each film genre every year. Exclude elements whose max gross is less than 50.
df2.groupby("Genre", 'Year') \
.agg(
F.avg("Audience score %").alias("avg_score"), \
F.max(df2.columns[6]).alias("max_gross")
).where(F.col("max_gross")>=50) \
.show(5)
# Equivalent SQL Query
# SELECT Genre,
# Year,
# avg("Audience score ") AS avg_score,
# max("Worldwide Gross") AS max_score
# FROM movies
# GROUP BY Genre, Year
# HAVING max_score >= 50
You can use either sort
or orderBy
function of PySpark DataFrame to sort DataFrame by ascending or descending order based on single or multiple columns, you can also do sorting using PySpark SQL sorting functions.
Example: Calculate the average score for audience and max gross for each film genre every year. Exclude elements whose max gross is less than 50.
df2.groupby("Genre", 'Year') \
.agg(
F.avg("Audience score %").alias("avg_score"), \
F.max(df2.columns[6]).alias("max_gross")
).where(F.col("max_gross")>=50) \
.sort(F.col("max_gross").asc(), F.col("avg_score").desc()) \
.show(5)
# Equivalent SQL Query
# SELECT Genre,
# Year,
# avg("Audience score ") AS avg_score,
# max("Worldwide Gross") AS max_score
# FROM movies
# GROUP BY Genre, Year
# HAVING max_score >= 50
# ORDER BY max_gross asc, avg_score desc
Join is used to combine two DataFrames and by chaining these you can join multiple DataFrames. it supports all basic join type operations available in traditional SQL like INNER, LEFT OUTER, RIGHT OUTER, LEFT ANTI, LEFT SEMI, CROSS, SELF JOIN.
df3 = df2.groupby("Genre", 'Year') \
.agg(
F.avg("Audience score %").alias("avg_score"), \
F.max(df2.columns[6]).alias("max_gross")
).where(F.col("max_gross")>=50)
df3.join(df2, (df2.Genre==df3.Genre) & (df2.Year==df3.Year), how="inner").show(5)
PySpark UDF is the most useful feature of Spark SQL & DataFrame that is used to extend the PySpark built-in capabilities. I will show here the steps for creating UDF for capitalizing the first character in each word. Steps of creating UDFs are:
def convertCase(s):
resStr=""
arr = s.split(" ")
for x in arr:
resStr = resStr + x[0].upper() + x[1:len(x)] + " "
return resStr
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
capitalizeUDF = F.udf(lambda x: convertCase(x),StringType())
# Since the default return type of the udf() is StringType, you can write it as follows
capitalizeUDF = F.udf(lambda x: convertCase(x))
df2.select("Film", capitalizeUDF(F.col("Film")).alias("Capitalized_Film")).show(5, truncate = False)
Note: UDFs are treated as a black box to Spark hence it can not apply optimization and you will lose all the optimization PySpark does on Dataframe/Dataset. We recommend to use UDFs only if you do not have them as built-in functions.
In Spark, you can save (write/extract) a DataFrame to a CSV file on disk by using dataframeObj.write.csv(“path”). Using this you can also write DataFrame to HDFS, or any Spark supported file systems.
#Write DataFrame data to CSV file
df.write.csv("movies_df")
#You can also use below
df.write.format("csv").save("movies_df")
df.write.format("csv") \
.option("header", "true") \
.option("delimiter", ",") \
.save("movies_df")
df.write.option("header","true") \
.csv("hdfs://localhost:9000/movies_df")
PySpark partitionBy()
is a function of pyspark.sql.DataFrameWriter
class which is used to partition the large dataset (DataFrame) into smaller files based on one or multiple columns while writing to disk. Partitioning the data on the file system is a way to improve the performance of the query when dealing with a large dataset in the Data lake.
PySpark supports partition in two ways; partition in memory (DataFrame) and partition on the disk (File system).
Partition in memory: You can partition or repartition the DataFrame by calling repartition() or coalesce() transformations. This is discussed above in RDD section.
Partition on disk: While writing the PySpark DataFrame back to disk, you can choose how to partition the data based on columns using partitionBy()
of pyspark.sql.DataFrameWriter
. This is similar to Hives partitions scheme.
Some advantages of partitions are: a) Fast access to the data, b) The ability to perform an operation on a smaller dataset.
# We can store the dataframe in the disk in partitions based on the values of Genre column.
df.write.option("header",True) \
.partitionBy("Genre") \
.csv("movies_df")
# Read only a specific parition `Genre=Animation` of the dataframe.
df = spark.read.csv("movies_df/Genre=Animation", header=True, sep=",")
Save operations can optionally take a SaveMode, that specifies how to handle existing data if present. It is important to realize that these save modes do not utilize any locking and are not atomic. Additionally, when performing an Overwrite, the data will be deleted before writing out the new data. Options include:
# Save modes
df.write.format("csv") \
.mode("append") \
.option("delimiter", "|") \
.save("/tmp/spark_output/datacsv")
df.write.format("csv") \
.option("mode","append") \
.option("delimiter", "|") \
.save("/tmp/spark_output/datacsv")
Spark Dataset is an interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. Spark Dataset API is supported in statically typed languages like Java and Scala since Spark Datasets rely heavily on static typing. A DataFrame is a Dataset organized into named columns. Python is a dynamically typed language, it still has access to Spark’s DataFrame API, which offers similar functionality as Datasets.
It is a module used for structured data processing. Spark SQL allows you to query structured data using either SQL or DataFrame API.
The pyspark.sql
is a module in Spark that is used to perform SQL-like operations on the data stored in memory. You can either leverage using programming API to query the data or use the ANSI SQL queries similar to RDBMS. You can also mix both, for example, use API on the result of an SQL query.
Spark SQL is one of the most used Spark modules for processing structured columnar data format. Once you have a DataFrame created, you can interact with the data by using SQL syntax. In other words, Spark SQL brings native RAW SQL queries on Spark meaning you can run traditional ANSI SQL on Spark Dataframe.
In order to use SQL, first, register a temporary table/view on DataFrame using the createOrReplaceTempView()
function. Once created, this table can be accessed throughout the SparkSession using sql()
and it will be dropped along with your SparkContext termination. Use sql()
method of the SparkSession object to run the query and this method returns a new DataFrame
This dataset includes results of international football matches starting from the very first official match in up to . The matches range from FIFA World Cup to FIFI Wild Cup to regular friendly matches. The matches are strictly men’s full internationals and the data does not include Olympic Games or matches where at least one of the teams was the nation’s B-team, U-23 or a league select team.
results.csv includes the following columns:
For the dataset of scorers and shootouts you can check this Kaggle data card.
Here we will use the dataset
Create SQL View
results
dataframe.from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DateType
schema = StructType([
StructField("date", DateType(), False),
StructField("home_team", StringType(), False),
StructField("away_team", StringType(), False),
StructField("home_score", IntegerType(), False),
StructField("away_score", IntegerType(), False),
StructField("tournament", StringType(), False),
StructField("city", StringType(), False),
StructField("country", StringType(), False),
StructField("neutral", BooleanType(), False),
])
# You can also use spark.read.csv function
df = spark.read.format("csv").load("results.csv", header = True, schema = schema)
df
df.createOrReplaceTempView("results_table")
Spark SQL to Select Columns
# DataFrame API Select query
df.select("home_team","city","country","tournament")
.show(5)
# SQL Select query
spark.sql("SELECT home_team, city, country, tournament FROM RESULTS_TABLE")
.show(5)
Filter Rows
To filter the rows from the data, you can use function from the DataFrame API.
# DataFrame API where()
df.select("country","city","home_team","tournament")
.where("city == 'Moscow'")
.show(5)
Similarly, in SQL you can use WHERE clause as follows.
# SQL where
spark.sql(""" SELECT country, city, home_team, tournament FROM RESULTS_TABLE
WHERE city = 'Moscow' """)
.show(5)
Sorting
# sorting
df.select("country","city","home_team","tournament")
.where("city in ('London','Paris','Moscow')")
.orderBy("city")
.show(10)
# SQL ORDER BY
spark.sql(""" SELECT country, city, home_team, tournament FROM RESULTS_TABLE
WHERE city in ('London','Paris','Moscow') order by city """)
.show(10)
Grouping
# grouping
df.groupBy("city").count()
.show()
# SQL GROUP BY clause
spark.sql(""" SELECT city, count(*) as count FROM RESULTS_TABLE
GROUP BY city""")
.show()
PySpark SQL join has a below syntax and it can be accessed directly from DataFrame.
join(self, other, on=None, how=None)
join() operation takes parameters as below and returns DataFrame.
You can also write Join expression by adding and methods on DataFrame and can have Join on multiple columns.
emp = [(1,"Smith",-1,"2018","10","M",3000), \
(2,"Rose",1,"2010","20","M",4000), \
(3,"Williams",1,"2010","10","M",1000), \
(4,"Jones",2,"2005","10","F",2000), \
(5,"Brown",2,"2010","40","",-1), \
(6,"Brown",2,"2010","50","",-1) \
]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
"emp_dept_id","gender","salary"]
empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.printSchema()
empDF.show(truncate=False)
dept = [("Finance",10), \
("Marketing",20), \
("Sales",30), \
("IT",40) \
]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")
# Join in pyspark.sql.DataFrame API
empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id,"inner") \
.show(truncate=False)
# SQL INNER JOIN
joinDF = spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id") \
.show(truncate=False)
# SQL INNER JOIN
joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \
.show(truncate=False)
# Left Join in pyspark.sql.DataFrame API
empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id, "left") \
.show(truncate=False)
# SQL LEFT JOIN
joinDF = spark.sql("select * from EMP e LEFT OUTER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \
.show(truncate=False)
# Right Join in pyspark.sql.DataFrame API
empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id, "right") \
.show(truncate=False)
# SQL RIGHT JOIN
joinDF = spark.sql("select * from EMP e RIGHT OUTER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \
.show(truncate=False)
# Full Join in pyspark.sql.DataFrame API
empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id, "full") \
.show(truncate=False)
# SQL FULL JOIN
joinDF = spark.sql("select * from EMP e FULL OUTER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \
.show(truncate=False)
You can read about Anti-joins, semi-joins and unions from here