Lab 7 - Apache Spark DataFrame & SQL

Course: Big Data - IU S25
Author: Firas Jolha

Datasets

PySpark on Colab

Readings

Agenda

Prerequisites

Spark DataFrame

Spark DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as structured data files, tables in Hive, external databases, or existing RDDs.

PySpark DataFrame is mostly similar to Pandas DataFrame with the exception PySpark DataFrames are distributed in the cluster (meaning the data in DataFrame’s are stored in different machines in a cluster) and any operations in PySpark executes in parallel on all machines whereas Panda Dataframe stores and operates on a single machine. Due to parallel execution on all cores on multiple machines, PySpark runs operations faster then pandas. You need to initiate spark context in the beginning of the application.

from pyspark.sql import SparkSession
import pandas as pd


spark = SparkSession\
    .builder\
    .appName("Spark SQL")\
    .getOrCreate()    

sc = spark.sparkContext
sc.setLogLevel("OFF") # WARN, FATAL, INFO

Each record in the dataframe is of type pyspark.sql.Row whereas each column is of type pyspark.sql.Column. There are multiple ways to create DataFrame in PySpark:

1. using createDataFrame() function

data = [(1,2,3, 'a b c'), (4,5,6, 'd e f'), (7,8,9,'g h i')]
df = spark.createDataFrame(data) # is a dataframe from a list

df.rdd # Convert Spark DataFrame into RDD
    
    
rdd = sc.parallelize(data)
df = spark.createDataFrame(rdd) # is a dataframe from an rdd

2. using toDF() function

rdd = sc.parallelize(data)

    
df = rdd.toDF() # From RDD to Dataframe

# from RDD to Dataframe with custom column names
df = rdd.toDF(["int1", "int2", "int3", "str"])


# from RDD to Dataframe without defining the schema (inferSchema option is true)
df = spark.createDataFrame(rdd) 
    
    


from pyspark.sql.types import *    
    
rdd = rdd.map(lambda t: Row(t._1, t._2, t._3))

schema = StructType([
    # StructField(<fieldname>, <fieldtype>, <nullability>)
    StructField("int1", IntegerType(), True),
    StructField("int2", IntegerType(), True),
    StructField("int3", IntegerType(), True),
    StructField("str", StringType(), True)
    ])

# From RDD to Dataframe with an explicit schema
df = spark.createDataFrame(rdd, schema) 

3. Read from a local file

path = "file:///sparkdata/movies.csv"

df1 = spark.read.format("csv") \
  .option("sep", ",") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .load(path)

df1 = spark.read.csv(path)

df1.show() # Display the dataframe
df1.printSchema() # print the schema of the dataframe

You can find more csv options here.

4. Read from MongoDB

You can put the file movies.csv in the moviesdb database inside the collection movies as follows.

mongoimport --db moviesdb --collection movies --type csv --headerline --file movies.csv

You can read it using pymongo.

import pymongo


MONGODB_HOST = "host.docker.internal"
MONGODB_PORT=27017

# The default configuration
# localhost:27017
client = pymongo.MongoClient(host=MONGODB_HOST, port=MONGODB_PORT)

db = client['moviesdb'] # client['<db_name>']

# A pymongo Cursor 
# db.<collection_name>
movies_cur = db.movies.find() # Get all documents

print(movies_cur.to_list(10))

Now let’s try to read it from Spark using Mongo Spark Connector.


from pyspark.sql.types import *
from pyspark.sql import SparkSession

# We will use this port number for monitoring Spark Jobs
port=4040 # If this port is not free then use 4041 but do not forget to publish it in the master container


spark = SparkSession.builder\
        .appName("Spark Mongodb")\
        .config('spark.ui.port', str(port))\
        .config("spark.mongodb.read.connection.uri", f"mongodb://{MONGODB_HOST}:{MONGODB_PORT}/moviesdb.movies") \
        .config("spark.mongodb.write.connection.uri", f"mongodb://{MONGODB_HOST}:{MONGODB_PORT}/moviesdb.movies") \
        .getOrCreate()


sc = spark.sparkContext
# Set log level to ERROR
sc.setLogLevel("ERROR")


schema = StructType([
    # StructField(<fieldname>, <fieldtype>, <nullability>)
    StructField("Film", StringType(), True),
    StructField("Genre", StringType(), True),
    StructField("Lead Studio", StringType(), True),
    StructField("Audience score %", IntegerType(), True),
    StructField("Profitability", StringType(), True),
    StructField("Rotten Tomatoes %", IntegerType(), True),
    StructField("Worldwide Gross", StringType(), True),
    StructField("Year", IntegerType(), True),
    StructField("_id", StringType(), True) # Special field for the documents in Mongodb
    ])


    
movies_cur = db.movies.find() # Get all documents
    
rdd = sc.parallelize(movies_cur)

rdd1 = rdd.map(lambda x: {k if k!='_id' else '_id': v if k!='_id' else str(v) for k,v in x.items() })

# Convert to RDD then to Spark DataFrame
df1 = spark.createDataFrame(rdd1, schema) # Convert to Spark DataFrame
df1.show()

df2 = spark.read\
        .format("mongodb")\
        .load()
df2.show()


df2.write\
    .format("mongodb") \
    .option("database", "moviesdb")\
    .option("collection", "movies_copy") \
    .mode("overwrite")\
    .save()



df3 = spark.read\
    .format("mongodb") \
    .option("database", "moviesdb")\
    .option("collection", "movies_copy") \
    .load()

df3.show()

Note: Here you need to set the following configuration:

  1. Mongo Spark connector
    • in spark-submit, you set --packages "org.mongodb.spark:mongo-spark-connector_2.12:10.4.1"
    • OR in spark session, you set .config("packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.4.1")
  2. Mongodb read connection URI
    • in spark-submit, you set --conf "spark.mongodb.read.connection.uri=mongodb://host.docker.internal:27017/moviesdb.movies"
    • OR in spark session, you set .config("spark.mongodb.write.connection.uri", "mongodb://host.docker.internal:27017/moviesdb.movies")
  3. Mongodb write connection URI
    • in spark-submit, you set --conf "spark.mongodb.write.connection.uri=mongodb://host.docker.internal:27017/moviesdb.movies"
    • OR in spark session, you set .config("spark.mongodb.read.connection.uri", "mongodb://host.docker.internal:27017/moviesdb.movies")

You can run the previous code snippets in a shell created as follows:

pyspark --master yarn 
        --archives .venv.tar.gz#.venv 
        --packages "org.mongodb.spark:mongo-spark-connector_2.12:10.4.1" 
        --conf "spark.mongodb.read.connection.uri=mongodb://host.docker.internal:27017/moviesdb.movies"
        --conf "spark.mongodb.write.connection.uri=mongodb://host.docker.internal:27017/moviesdb.movies"

OR using spark-submit as follows:

spark-submit --master yarn 
        --archives .venv.tar.gz#.venv 
        --packages "org.mongodb.spark:mongo-spark-connector_2.12:10.4.1" 
        --conf "spark.mongodb.read.connection.uri=mongodb://host.docker.internal:27017/moviesdb.movies"
        --conf "spark.mongodb.write.connection.uri=mongodb://host.docker.internal:27017/moviesdb.movies"
        mongodbspark.py

5. Read from HDFS

path = "hdfs://localhost:9000/data/movies.csv"
df = spark.read.load(path, format="csv", sep = ",", inferSchema = "true", header = "true") 
# a spark dataframe

# OR
df = spark.read.csv(path, sep = ",", inferSchema = "true", header = "true") 
# a spark dataframe

df.printSchema()
df.show(truncate=False)

The file movies.csv is uploaded to HDFS and stored in the folder /data.

StructType & StructField

StructType and StructField classes are used to programmatically specify the schema to the DataFrame and create complex columns like nested struct, array, and map columns. StructType is a collection of StructField’s that defines column name, column data type, boolean to specify if the field can be nullable or not and metadata.


from pyspark.sql.types import StructField, StructType, StringType, IntegerType

# A sample data
data = [ (("James","","Smith"),"36636","M",3100),
    (("Michael","Rose",""),"40288","M",4300),
    (("Robert","","Williams"),"42114","M",1400),
    (("Maria","Anne","Jones"),"39192","F",5500),
    (("Jen","Mary","Brown"),"","F",-1)
]


schema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('id', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])
 
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show(truncate=False)

Spark DataFrame Operations

show [Action]

show() is used to display the contents of the DataFrame in a Table Row and Column Format. By default, it shows only 20 Rows, and the column values are truncated at 20 characters.


df.show()
df.show(5)
df.show(5, truncate=False)
df.show(10, truncate=False, vertical=True)

collect [Action]

collect() is an action operation that is used to retrieve all the elements of the dataset (from all nodes) to the driver node. It retrieves all elements in a DataFrame as a list of Row type to the driver node. We should use collect() on smaller dataset usually after filter(), group() e.t.c. Retrieving larger datasets results in OutOfMemory error. You can use head operation to get only the first rows/records.

df.collect() # all elements
df.collect()[0] # first row
df.collect()[0][0] # first cell at first row and first column

Notice that collect on a big dataframe can cause performance and memory issues since this action collects the partitions of the dataframe from all cluster nodes to the memory of one machine. So we suggest more efficient methods as follows:

df.take(10) # first 10 elements
df.take(1)[0] # first row
df.take(1)[0][0] # first cell at first row and first column

select [Transformation]

select() function is used to select single, multiple, column by index, all columns from the list and the nested columns from a DataFrame. This function returns a DataFrame with the selected columns.

from pyspark.sql.functions import col
df.select("name", \
          "name.firstname", \
          df.id, \
          df['gender'], \
          col("salary")) \
.show()

df.select("*").show()

df.select([col for col in df.columns]).show()

df.select(df.columns[:2]).show()

withColumn, withColumnRenamed, drop [Transformation]

withColumn() is a transformation function of DataFrame which is used to change the value, convert the datatype of an existing column, create a new column, and many more.

# Read the data
path = "hdfs://localhost:9000/data/movies.csv"
df = spark.read.load(path, format="csv", sep = ",", inferSchema = "true", header = "true") 

# Print Schema
df.printSchema()
  1. Change the datatype of the column.
# Convert the `Worldwide Gross` column to double
# 1. Remove the $ sign
import pyspark.sql.functions as F
df.withColumn("Worldwide Gross", F.translate('Worldwide Gross', '$', '').cast("Double")).show(5)

df.withColumn("Worldwide Gross", F.col("Worldwide Gross").cast("Double"))

# You can merge the previous operations into one operation as shown below

  1. Update the values in a column
col_name = df.columns[3]
df2.withColumn(col_name, F.col(col_name)/100).show(5)

  1. Create a Column from an existing one
col_name = df2.columns[3]
df2.withColumn("score", F.col(col_name)/100).show(5)

  1. Add a New Column with fixed value
df2.withColumn("Country", F.lit("Russia")).show()
  1. Rename a column
df2.withColumnRenamed(df2.columns[3], "score").show(5)

  1. Drop a column
df2.drop("Year").show(5)

filter, where [Transformation]

PySpark filter() function is used to filter the rows from RDD/DataFrame based on the given condition or SQL expression, you can also use where() clause instead of the filter() if you are coming from an SQL background, both these functions operate exactly the same.

df2.filter((df2.Year == 2008) & (df2['Film'].startswith("Wh"))).show(5)

df2.filter(~F.col('Genre').isin(['Comedy', 'Drama'])).show(5)

distinct, dropDuplicates [Transformation]

PySpark distinct() function is used to drop/remove the duplicate rows (all columns) from DataFrame and dropDuplicates() is used to drop rows based on selected (one or multiple) columns.

print(df2.count() - df2.distinct().count())

df2.dropDuplicates(['Genre', 'Lead Studio']).show(5)

groupby [Transformation]

Similar to SQL GROUP BY clause, PySpark groupBy() function is used to collect the identical data into groups on DataFrame and perform count, sum, avg, min, max functions on the grouped data. When we perform groupBy() on PySpark Dataframe, it returns GroupedData object which contains aggregate functions. Some of them are avg, sum, min, max.

Notice that the aggregate functions are transformations and will return a DataFrame. You need to call an action to see the output of the aggregation.

  1. Total gross for each film genre.
df2.groupby("Genre").sum("Worldwide Gross").show()


2. Calculate the average score for audience and max gross for each film genre every year. Exclude elements whose max gross is less than 50.

df2.groupby("Genre", 'Year') \
    .agg(
        F.avg("Audience score %").alias("avg_score"), \
        F.max(df2.columns[6]).alias("max_gross")
).where(F.col("max_gross")>=50) \
.show(5)


# Equivalent SQL Query
# SELECT Genre, 
# Year, 
# avg("Audience score ") AS avg_score, 
# max("Worldwide Gross") AS max_score 
# FROM movies
# GROUP BY Genre, Year
# HAVING max_score >= 50

orderBy, sort [Transformation]

You can use either sort or orderBy function of PySpark DataFrame to sort DataFrame by ascending or descending order based on single or multiple columns, you can also do sorting using PySpark SQL sorting functions.
Example: Calculate the average score for audience and max gross for each film genre every year. Exclude elements whose max gross is less than 50.

df2.groupby("Genre", 'Year') \
    .agg(
        F.avg("Audience score %").alias("avg_score"), \
        F.max(df2.columns[6]).alias("max_gross")
).where(F.col("max_gross")>=50) \
.sort(F.col("max_gross").asc(), F.col("avg_score").desc()) \
.show(5)


# Equivalent SQL Query
# SELECT Genre, 
# Year, 
# avg("Audience score ") AS avg_score, 
# max("Worldwide Gross") AS max_score 
# FROM movies
# GROUP BY Genre, Year
# HAVING max_score >= 50
# ORDER BY max_gross asc, avg_score desc

Join

Join is used to combine two DataFrames and by chaining these you can join multiple DataFrames. it supports all basic join type operations available in traditional SQL like INNER, LEFT OUTER, RIGHT OUTER, LEFT ANTI, LEFT SEMI, CROSS, SELF JOIN.

df3 = df2.groupby("Genre", 'Year') \
    .agg(
        F.avg("Audience score %").alias("avg_score"), \
        F.max(df2.columns[6]).alias("max_gross")
).where(F.col("max_gross")>=50)


df3.join(df2, (df2.Genre==df3.Genre) & (df2.Year==df3.Year), how="inner").show(5)

UDF (User Defined Function)

PySpark UDF is the most useful feature of Spark SQL & DataFrame that is used to extend the PySpark built-in capabilities. I will show here the steps for creating UDF for capitalizing the first character in each word. Steps of creating UDFs are:

  1. Create a Python function.
def convertCase(s):
    resStr=""
    arr = s.split(" ")
    for x in arr:
        resStr =  resStr + x[0].upper() + x[1:len(x)] + " "
    return resStr 
  1. Convert a Python function to PySpark UDF
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
capitalizeUDF = F.udf(lambda x: convertCase(x),StringType())

# Since the default return type of the udf() is StringType, you can write it as follows
capitalizeUDF = F.udf(lambda x: convertCase(x))
  1. Use the UDF
df2.select("Film", capitalizeUDF(F.col("Film")).alias("Capitalized_Film")).show(5, truncate = False)

Note: UDFs are treated as a black box to Spark hence it can not apply optimization and you will lose all the optimization PySpark does on Dataframe/Dataset. We recommend to use UDFs only if you do not have them as built-in functions.

Save DataFrame to disk

In Spark, you can save (write/extract) a DataFrame to a CSV file on disk by using dataframeObj.write.csv(“path”). Using this you can also write DataFrame to HDFS, or any Spark supported file systems.

#Write DataFrame data to CSV file
df.write.csv("movies_df")

#You can also use below
df.write.format("csv").save("movies_df")
    
df.write.format("csv") \
    .option("header", "true") \
    .option("delimiter", ",") \
    .save("movies_df")
    
df.write.option("header","true") \
  .csv("hdfs://localhost:9000/movies_df")

Partitioning in Spark

PySpark partitionBy() is a function of pyspark.sql.DataFrameWriter class which is used to partition the large dataset (DataFrame) into smaller files based on one or multiple columns while writing to disk. Partitioning the data on the file system is a way to improve the performance of the query when dealing with a large dataset in the Data lake.

PySpark supports partition in two ways; partition in memory (DataFrame) and partition on the disk (File system).

  1. Partition in memory: You can partition or repartition the DataFrame by calling repartition() or coalesce() transformations. This is discussed above in RDD section.

  2. Partition on disk: While writing the PySpark DataFrame back to disk, you can choose how to partition the data based on columns using partitionBy() of pyspark.sql.DataFrameWriter. This is similar to Hives partitions scheme.

Some advantages of partitions are: a) Fast access to the data, b) The ability to perform an operation on a smaller dataset.

# We can store the dataframe in the disk in partitions based on the values of Genre column.
df.write.option("header",True) \
  .partitionBy("Genre") \
  .csv("movies_df")

# Read only a specific parition `Genre=Animation` of the dataframe.
df = spark.read.csv("movies_df/Genre=Animation", header=True, sep=",")

Save modes

Save operations can optionally take a SaveMode, that specifies how to handle existing data if present. It is important to realize that these save modes do not utilize any locking and are not atomic. Additionally, when performing an Overwrite, the data will be deleted before writing out the new data. Options include:

  1. append: Append contents of this DataFrame to existing data.
  2. overwrite: Overwrite existing data.
  3. error or errorifexists: Throw an exception if data already exists. (default option)
  4. ignore: Silently ignore this operation if data already exists.
# Save modes
df.write.format("csv") \
    .mode("append") \
    .option("delimiter", "|") \
    .save("/tmp/spark_output/datacsv")

df.write.format("csv") \
    .option("mode","append") \
    .option("delimiter", "|") \
    .save("/tmp/spark_output/datacsv")

Spark Dataset

Spark Dataset is an interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. Spark Dataset API is supported in statically typed languages like Java and Scala since Spark Datasets rely heavily on static typing. A DataFrame is a Dataset organized into named columns. Python is a dynamically typed language, it still has access to Spark’s DataFrame API, which offers similar functionality as Datasets.

Spark SQL

It is a module used for structured data processing. Spark SQL allows you to query structured data using either SQL or DataFrame API.

The pyspark.sql is a module in Spark that is used to perform SQL-like operations on the data stored in memory. You can either leverage using programming API to query the data or use the ANSI SQL queries similar to RDBMS. You can also mix both, for example, use API on the result of an SQL query.

Spark SQL is one of the most used Spark modules for processing structured columnar data format. Once you have a DataFrame created, you can interact with the data by using SQL syntax. In other words, Spark SQL brings native RAW SQL queries on Spark meaning you can run traditional ANSI SQL on Spark Dataframe.

In order to use SQL, first, register a temporary table/view on DataFrame using the createOrReplaceTempView() function. Once created, this table can be accessed throughout the SparkSession using sql() and it will be dropped along with your SparkContext termination. Use sql() method of the SparkSession object to run the query and this method returns a new DataFrame

Data Description

This dataset includes 44,341 results of international football matches starting from the very first official match in 1872 up to 2022. The matches range from FIFA World Cup to FIFI Wild Cup to regular friendly matches. The matches are strictly men’s full internationals and the data does not include Olympic Games or matches where at least one of the teams was the nation’s B-team, U-23 or a league select team.

results.csv includes the following columns:

For the dataset of scorers and shootouts you can check this Kaggle data card.

Spark SQL Examples

Here we will use the dataset

  1. Create SQL View

    • Load the data and read the results dataframe.
      from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DateType
      
      schema = StructType([
          StructField("date", DateType(), False),
          StructField("home_team", StringType(), False),
          StructField("away_team", StringType(), False),
          StructField("home_score", IntegerType(), False),
          StructField("away_score", IntegerType(), False),
          StructField("tournament", StringType(), False),
          StructField("city", StringType(), False),
          StructField("country", StringType(), False),
          StructField("neutral", BooleanType(), False),
      ])
      
      # You can also use spark.read.csv function
      df = spark.read.format("csv").load("results.csv", header = True, schema = schema)
      df
      
    • Creat the temporary view.
      df.createOrReplaceTempView("results_table")
      
  2. Spark SQL to Select Columns

    # DataFrame API Select query
    df.select("home_team","city","country","tournament") 
         .show(5)
    
    # SQL Select query
    spark.sql("SELECT home_team, city, country, tournament FROM RESULTS_TABLE") 
         .show(5)
    
  3. Filter Rows
    To filter the rows from the data, you can use where() function from the DataFrame API.

    # DataFrame API where()
    df.select("country","city","home_team","tournament") 
      .where("city == 'Moscow'") 
      .show(5)
    

    Similarly, in SQL you can use WHERE clause as follows.

    # SQL where
    spark.sql(""" SELECT  country, city, home_team, tournament FROM RESULTS_TABLE 
              WHERE city = 'Moscow' """) 
         .show(5)
    
    
  4. Sorting

    # sorting
    df.select("country","city","home_team","tournament") 
      .where("city in ('London','Paris','Moscow')") 
      .orderBy("city")
      .show(10)
    
    
    # SQL ORDER BY
    spark.sql(""" SELECT  country, city, home_team, tournament FROM RESULTS_TABLE 
              WHERE city in ('London','Paris','Moscow') order by city """) 
         .show(10)
    
    
  5. Grouping

# grouping
df.groupBy("city").count() 
  .show()
    
# SQL GROUP BY clause
spark.sql(""" SELECT city, count(*) as count FROM RESULTS_TABLE 
          GROUP BY city""") 
     .show()
  1. SQL Join Operations

PySpark SQL join has a below syntax and it can be accessed directly from DataFrame.

join(self, other, on=None, how=None)

join() operation takes parameters as below and returns DataFrame.

You can also write Join expression by adding where() and filter() methods on DataFrame and can have Join on multiple columns.

emp = [(1,"Smith",-1,"2018","10","M",3000), \
    (2,"Rose",1,"2010","20","M",4000), \
    (3,"Williams",1,"2010","10","M",1000), \
    (4,"Jones",2,"2005","10","F",2000), \
    (5,"Brown",2,"2010","40","",-1), \
      (6,"Brown",2,"2010","50","",-1) \
  ]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
       "emp_dept_id","gender","salary"]

empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.printSchema()
empDF.show(truncate=False)

dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")

# Join in pyspark.sql.DataFrame API

empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"inner") \
     .show(truncate=False)


# SQL INNER JOIN
joinDF = spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id") \
  .show(truncate=False)

# SQL INNER JOIN
joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \
  .show(truncate=False)

# Left Join in pyspark.sql.DataFrame API
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id, "left") \ 
    .show(truncate=False)
    
    
# SQL LEFT JOIN
joinDF = spark.sql("select * from EMP e LEFT OUTER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \
  .show(truncate=False)

# Right Join in pyspark.sql.DataFrame API
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id, "right") \ 
    .show(truncate=False)
    
    
# SQL RIGHT JOIN
joinDF = spark.sql("select * from EMP e RIGHT OUTER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \
  .show(truncate=False)

# Full Join in pyspark.sql.DataFrame API
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id, "full") \ 
    .show(truncate=False)
    
    
# SQL FULL JOIN
joinDF = spark.sql("select * from EMP e FULL OUTER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \
  .show(truncate=False)

You can read about Anti-joins, semi-joins and unions from here

References