Course: Big Data - IU S25
Author: Firas Jolha
This section gives a theoretical introduction to Spark, which will be covered in the lecture. Feel free to skip it if you have enough theoretical knowledge about Spark.
Apache Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, pandas API on Spark for pandas workloads (only for spark 3), MLlib for machine learning, GraphX for graph processing, and Structured Streaming for incremental computation and stream processing.

Spark introduces the concept of an RDD (Resilient Distributed Dataset), an immutable fault-tolerant, distributed collection of objects that can be operated on in parallel. An RDD can contain any type of object and is created by loading an external dataset or distributing a collection from the driver program. Each RDD is split into multiple partitions (similar pattern with smaller sets), which may be computed on different nodes of the cluster
RDD means:
Resilient – capable of rebuilding data on failure
Distributed – distributes data among various nodes in cluster
Dataset – collection of partitioned data with values
main() function of the application and creating the SparkContext.--master option in spark-submit tool.--deploy-mode option in spark-submit tool.Check the glossary from here.
Apache Spark is widely considered to be the successor to MapReduce for general purpose data processing on Apache Hadoop clusters. In MapReduce, the highest-level unit of computation is a job. A job loads data, applies a map function, shuffles it, applies a reduce function, and writes data back out to persistent storage. In Spark, the highest-level unit of computation is an application. A Spark application can be used for a single batch job, an interactive session with multiple jobs, or a long-lived server continually satisfying requests. A Spark job can consist of more than just a single map and reduce.
MapReduce starts a process for each task. In contrast, a Spark application can have processes running on its behalf even when it is not running a job. Furthermore, multiple tasks can run within the same executor. Both combine to enable extremely fast task startup time as well as in-memory data storage, resulting in orders of magnitude faster performance over MapReduce.
At runtime, a Spark application maps to a single driver process and a set of executor processes distributed across the hosts in a cluster.
The driver process manages the job flow and schedules tasks and is available the entire time the application is running. Typically, this driver process is the same as the client process used to initiate the job, although when run on YARN, the driver can run in the cluster. In interactive mode, the shell itself is the driver process.
The executors are responsible for executing work, in the form of tasks, as well as for storing any data that you cache. Executor lifetime depends on whether dynamic allocation is enabled. An executor has a number of slots for running tasks, and will run many concurrently throughout its lifetime.

Invoking an action operation inside a Spark application triggers the launch of a job to fulfill it. Spark examines the dataset on which that action depends and formulates an execution plan. The execution plan assembles the dataset transformations into stages. A stage is a collection of tasks that run the same code, each on a different subset of the data.

Spark Driver contains more components responsible for translation of user code into actual jobs executed on cluster:

Apache Spark works in a master-slave architecture where the master is called “Driver” and slaves are called “Workers”.

When you run a Spark application, Spark Driver creates a context that is an entry point to your application, and all operations (transformations and actions) are executed on worker nodes, and the resources are managed by Cluster Manager.
Spark has a small code base and the system is divided in various layers. Each layer has some responsibilities. The layers are independent of each other.

The first layer is the interpreter, Spark uses a Scala interpreter, with some modifications. As you enter your code in spark console (creating RDD’s and applying operators), Spark creates a operator graph. When the user runs an action (like collect), the Graph is submitted to a DAG Scheduler. The DAG scheduler divides operator graph into (map and reduce) stages. A stage is comprised of tasks based on partitions of the input data. The DAG scheduler pipelines operators together to optimize the graph. For e.g. Many map operators can be scheduled in a single stage. This optimization is key to Spark’s performance. The final result of a DAG scheduler is a set of stages. The stages are passed on to the Task Scheduler. The task scheduler launches tasks via cluster manager (Spark Standalone/Yarn/Mesos). The task scheduler doesn’t know about dependencies among stages.
Spark supports four cluster managers:
Scala is the native language for writing Spark applications but Apache Spark supports drivers for other languages such as Python (PySpark package), Java, and R.
PySpark is a Spark library written in Python to run Python applications using Apache Spark capabilities. Using PySpark we can run applications in parallel on the distributed cluster (multiple nodes). In other words, PySpark is a Python API for Apache Spark.
Spark is written in Scala and later on due to its industry adaptation, its API PySpark released for Python using Py4J Java library that is integrated within PySpark and allows Python to dynamically interface with JVM objects. Hence, to run PySpark you need Java to be installed along with Python, and Apache Spark.
pyspark.rdd)pyspark.sql)
pyspark.pandaspyspark.mllib)pyspark.ml)pyspark.streaming)
pyspark.sql.streamingpyspark.resource)
Besides these, if you wanted to use third-party libraries, you can find them at https://spark-packages.org/ . This page is kind of a repository of all Spark third-party libraries.
There are different ways to install Apache Spark on your machine.
You can use the notebook shared in the beginning of this tutorial for installation approach to run PySpark on Colab Notebook.
You can install pyspark for Python language by simply installing the package pyspark using pip as follows:
python3 -m venv venv
source venv/bin/activate
pip install pyspark
This will install two packages pyspark and py4j. py4j is Python programs running in a Python interpreter to dynamically access Java objects in a Java Virtual Machine. This will install Spark software and you will be able to run Spark applications.
You can also install it using Docker as below.
spark:python3docker pull spark:python3
pyspark shell.docker run --name pyspark -it -p 4040:4040 spark:python3 /opt/spark/bin/pyspark
Here we create a container from the image above, publish the port 4040 for the Spark Web UI and run the pyspark shell.
You can deploy a fully-distributed Hadoop cluster where YARN is the resource manager. I have prepared a repository which can deploy the cluster using Docker containers. This cluster also can be used for running your Spark applications on the cluster. You can check the repository from here. In order to start the cluster, you need to follow the steps below:
mkdir -p ~/spark-yarn-test && cd ~/spark-yarn-test
git clone git@github.com:firas-jolha/docker-spark-yarn-cluster.git
cd docker-spark-yarn-cluster
startHadoopCluster.sh passing the number of slaves (default value is 2).N=5
bash startHadoopCluster.sh $N
Here I am running a cluster of one master node and 5 slave nodes. In HDFS, the master node will run the namenode and seconadry namenodes and the slave nodes will run the datanodes. In YARN, the master node will run the resource manager and the slave nodes will run the node managers. In Spark, the master node will run the Spark Cluster master and the slave nodes will run the workers.
docker exec -it cluster-master bash
jps -lm
# Starting HDFS
start-dfs.sh
# Starting the namenode
hdfs --daemon start namenode
# Starting the secondary namenode
hdfs --daemon start secondarynamenode
# Starting a datanode (do this on the datanode)
hdfs --daemon start datanode
# Starting YARN
start-yarn.sh
# Starting YARN + HDFS
start-all.sh
# Starting the MapReduce History server
mapred --daemon start historyserver
# Starting the Spark Cluster master
$SPARK_HOME/sbin/start-master.sh
# Starting the Spark Cluster workers
$SPARK_HOME/sbin/start-workers.sh
# Starting the Spark Cluster master + workers
$SPARK_HOME/sbin/start-all.sh
The table below shows some of the default ports for the running services.
| Service | Port |
|---|---|
| HDFS namenode | 9870 |
| HDFS datanode | 9864 |
| HDFS secondary namenode | 9868 |
| YARN resource manager | 8088 |
| YARN node manager | 8042 |
| Spark master | 8080 |
| Spark jobs per application | 4040 |
| Spark history server | 18080 |
| MapReduce history server | 19888 |
remove_containers.sh (CAUTION) which will remove all containers whose name contains cluster. Be careful, if you have any containers that contains cluster in its name, will also be removed.bash remove_containers.sh
Or you can remove containers one by one if you do not use this script.
If you have installed Python and pip, then you just need to run the following command:
pip2 install pyspark
Otherwise, you need to return to the install them.
Note: You can use Python3.6 to write Spark applications if you configured it properly in HDP Sandbox.
You can execute PySpark statements in interactive mode on the shell pyspark or you can write them in Pyhton file and submit it to Spark using the tool spark-submit. In addition, Spark comes with a set of ready-to-execute examples of Spark applications.
You can open a spark shell session by running the shell pyspark for Python, spark-shell for Scala, sparkR for R language.
pyspark

The pyspark shell starts a Spark application and initiates/gets the Spark context sc (spark for Spark session) which internally creates a Web UI with URL localhost:4040 (or next port if 4040 is not free). By default, it uses local[*] as master (this means that Spark will run jobs on the same machine using all CPU cores). It also displays Spark, and Python versions.
Note: If you are running two or more Spark applications simultaneously, then the next ports to 4040 (e.g. 4041, 4042, …etc) are not published in Docker container to the host machine and you can set the port for Spark app web UI manually to one of the free ports such as 4042 as follows:
docker run --name pyspark -it -p 4042:4042 spark:python3 /opt/spark/bin/pyspark --conf spark.ui.port=4042
On pyspark shell, you can write only Python code.

You can exit the pyspark shell by executing statement exit() like in a Python shell.
Whereas spark-shell command runs a Spark shell for Scala.
spark-shell

We can use this tool for non-interactive mode of data anlysis. This command accepts the path for the spark application.
Let’s write our simple application in app.py and execute it using spark-submit.
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession\
.builder\
.appName("PythonSparkApp")\
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("OFF") # WARN, FATAL, INFO
data = [('hello', 1), ('world', 2)]
print(pd.DataFrame(data))
spark.createDataFrame(data).show()
rdd = sc.parallelize(data)
print(rdd.collect())
Now you can submit this app using spark-submit
# Submit the application main.py with default configs
spark-submit app.py
# master is local[*]
# Submit the application main.py on the YARN cluster where the driver program will run in the client machine
spark-submit \
--master yarn
app.p
# Run Spark app example 'SparkPi' to calculate the value of π (pi) with the Monte-Carlo method for 10 iterations locally on 8 cores
spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[8] \
/path/to/examples.jar \
10
# See note below.
# Run Spark app example 'wordcount.py' with the name "my app" to calculate the number of words in the file `/sparkdata/movies.csv` and run the Spark Uu=i web on the custom port 4242 on Yarn cluster where the driver program can use 2G memory and 4 cores and the application will run on 8 executors where each executor process will have 2G and 4 CPU cores.
spark-submit \
--name "my app" \
--master yarn \
--executor-memory 2G \
--executor-cores 4 \
--driver-memory 2G \
--driver-cores 4 \
--num-executors 8 \
--conf spark.ui.port=4242 \
/usr/hdp/current/spark2-client/examples/src/main/python/wordcount.py \
/sparkdata/movies.csv
The general template for spark-submit tool:
spark-submit \
--name "app name" \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
Some of the commonly used options are:
--class: The entry point for your application (e.g. org.apache.spark.examples.SparkPi)--master: The master URL for the cluster (e.g. spark://23.195.26.187:7077)--deploy-mode: Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client) (default: client). This option will be explained next lab.--conf: Arbitrary Spark configuration property in key=value format. For values that contain spaces wrap “key=value” in quotes (as shown). Multiple configurations should be passed as separate arguments. (e.g. --conf <key>=<value> --conf <key2>=<value2>)application-jar: Path to a bundled jar including your application and all dependencies. The URL must be globally visible inside of your cluster, for instance, an hdfs:// path or a file:// path that is present on all nodes.application-arguments: Arguments passed to the main method of your main class, if anyWhen you want to run your PySpark application on a cluster such as YARN, you need to make sure that your code and all used libraries are available on the executors that actually running on remote machines.
Let’s try to create a PySpark app that imports pandas package and uses a custom module backend.py as a backend.
# /app/app.py
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession\
.builder\
.appName("PythonPackageManagementPySparkApp")\
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("OFF") # WARN, FATAL, INFO
data = [('hello', 1), ('world', 2)]
print(pd.DataFrame(data))
spark.createDataFrame(data).show()
rdd = sc.parallelize(data)
print(rdd.collect())
from backend import some_f
some_f(spark)
# /app/backend.py
import pandas
def some_f(session):
df = pandas.DataFrame()
print(f"hello from backend where df.shape is {df.shape}")
print(pandas.__version__)
session.createDataFrame(df).show()
PySpark allows to upload Python files (.py), zipped Python packages (.zip), and Egg files (.egg) to the executors by setting the configuration setting spark.submit.pyFiles, setting the option --py-files in spark-submit, or directly calling pyspark.SparkContext.addPyFile() in the application. This is the case for custom modules/files but what about packages and virtual environments. You can actually use the python package venv-pack to package your virtual environment and pass it to spark-submit using the option archives.
Here I have a demo on using a backend.py custom module and pandas package in my application.
python3 -m venv .venv
source .venv/bin/activate
pip install pandas
pip install venv-pack
venv-pack# make sure that you
venv-pack -o /app/.venv.tar.gz
# Python of the driver (/app/.venv/bin/python)
export PYSPARK_DRIVER_PYTHON=$(which python)
# Python of the excutor (./.venv/bin/python)
export PYSPARK_PYTHON=./.venv/bin/python
spark-submit in client mode.spark-submit --master yarn \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./.venv/bin/python \
--deploy-mode client \
--archives /app/.venv.tar.gz#.venv \
--py-files /app/backend.py \
/app/app.py
unset PYSPARK_DRIVER_PYTHON
spark-submit --master yarn \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./.venv/bin/python \
--deploy-mode cluster \
--archives /app/.venv.tar.gz#.venv \
--py-files /app/backend.py \
/app/app.py
Apache Spark provides a suite of web user interfaces (UIs) that you can use to monitor the status and resource consumption of your Spark cluster.
The Jobs tab displays a summary page of all jobs in the Spark application and a details page for each job. The summary page shows high-level information, such as the status, duration, and progress of all jobs and the overall event timeline. When you click on a job on the summary page, you see the details page for that job. The details page further shows the event timeline, DAG visualization, and all stages of the job.
The information that is displayed in this section is:



When you click on a specific job, you can see the detailed information of this job. This page displays the details of a specific job identified by its job ID.
The Stages tab displays a summary page that shows the current state of all stages of all jobs in the Spark application.
At the beginning of the page is the summary with the count of all stages by status (active, pending, completed, skipped, and failed)

After that are the details of stages per status (active, pending, completed, skipped, failed). In active stages, it’s possible to kill the stage with the kill link. Only in failed stages, failure reason is shown. Task detail can be accessed by clicking on the description.

The stage details page begins with information like total time across all tasks, Locality level summary, Shuffle Read Size / Records and Associated Job IDs.

There is also a visual representation of the directed acyclic graph (DAG) of this stage, where vertices represent the RDDs or DataFrames and the edges represent an operation to be applied. Nodes are grouped by operation scope in the DAG visualization and labelled with the operation scope name (BatchScan, WholeStageCodegen, Exchange, etc). Notably, Whole Stage Code Generation operations are also annotated with the code generation id. For stages belonging to Spark DataFrame or SQL execution, this allows to cross-reference Stage execution details to the relevant details in the Web-UI SQL Tab page where SQL plan graphs and execution plans are reported.

You can find more info about the web UI from here.
RDD is a fundamental building block of Spark. RDDs are immutable distributed collections of objects. Immutable meaning once you create an RDD you cannot change it. Each record in RDD is divided into logical partitions, which can be computed on different nodes of the cluster.
In other words, RDDs are a collection of objects similar to list in Python, with the difference being RDD is computed on several processes scattered across multiple physical servers also called nodes in a cluster while a Python collection lives and process in just one process.
Additionally, RDDs provide data abstraction of partitioning and distribution of the data designed to run computations in parallel on several nodes, while doing transformations on RDD we do not have to worry about the parallelism as Spark by default provides.
There are two main types of Spark operations: Transformations and Actions.

Spark RDD Transformations are functions that take an RDD as the input and produce one or many RDDs as the output. They do not change the input RDD (since RDDs are immutable), but always produce one or more new RDDs by applying the computations they represent e.g. map(), filter(), reduceByKey() etc.
Spark supports lazy evaluation and when you apply the transformation on any RDD it will not perform the operation immediately. It will create a DAG(Directed Acyclic Graph) using 1) the applied operation, 2) source RDD and 3) function used for transformation. It will keep on building this graph using the references till you apply any action operation on the last lined up RDDs. That is why the transformations in Spark are lazy.
Transformations construct a new RDD from a previous one.

For example, we can build a simple Spark application for counting the words in a text file. Let’s write a simple wordcount.py application and run it using PySpark. This application accepts any text file and returns the number of words in the text, so it is basically a word counter.
import pyspark
import sys
from pyspark.sql import SparkSession
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: wordcount <file>", file=sys.stderr)
sys.exit(-1)
spark = SparkSession\
.builder\
.appName("PythonWordCount")\
.getOrCreate()
lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(lambda x, y: x + y)
output = counts.collect()
for (word, count) in output:
print("%s: %i" % (word, count))
spark.stop()
A spark job of two stages needs to be created (we do not create stages manually but we define a pipeline for which the stages will be determined):
Stage 1
Stage 2


There are two types of transformations:
Narrow transformation — In Narrow transformation, all the elements that are required to compute the records in single partition live in the single partition of parent RDD. A limited subset of partition is used to calculate the result. Narrow transformations are the result of map(), filter().

Wide transformation — In wide transformation, all the elements that are required to compute the records in the single partition may live in many partitions of parent RDD. Wide transformations are the result of groupbyKey() and reducebyKey().
Spark Pair RDDs are nothing but RDDs containing a key-value pair. Basically, key-value pair (KVP) consists of a two linked data item in it. Here, the key is the identifier, whereas value is the data corresponding to the key value.
Moreover, Spark operations work on RDDs containing any type of objects. However key-value pair RDDs attains few special operations in it. Such as, distributed “shuffle” operations, grouping or aggregating the elements by a key.
The following Spark transformations accept input as an RDD consisting of single values.

Where the transformations below accept an input as a pair RDD consisting of key-value pairs.

Actions, on the other hand, compute a result based on an RDD, and either return it to the driver program or save it to an external storage system (e.g., HDFS).
The following actions are applied on RDDs which contains single values.

Action is one of the ways of sending data from Executer to the driver. Executors are agents that are responsible for executing a task. While the driver is a JVM process that coordinates workers and execution of the task. The following actions are applied on pair RDDs which contain key-value pairs.

The dataset for this demo is movies data and can be downloaded from the link attached to this document.
import pandas as pd
import pyspark
import pymongo
import numpy as np
from pyspark.sql import SparkSession
from os.path import join
# Import SparkSession
from pyspark.sql import SparkSession
# Create SparkSession
spark = SparkSession \
.builder \
.appName("my spark app") \
.master("local[*]") \ # local[n] it uses n cores/threads for running spark job
.getOrCreate()
sc = spark.sparkContext
# Or
# Import SparkContext and SparkConf
from pyspark import SparkContext, SparkConf
# Create SparkContext
conf = SparkConf() \
.setAppName("my spark app")\
.setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession.builder.config(conf).getOrCreate()
sc = spark.sparkContext
Here we are using the local machine as the resource manager with as many as threads/cores as it has.
A Spark Driver is an application that creates a SparkContext for executing one or more jobs in the cluster. It allows your Spark/PySpark application to access Spark Cluster with the help of Resource Manager.
When you create a SparkSession object, SparkContext is also created and can be retrieved using spark.sparkContext. SparkContext will be created only once for an application; even if you try to create another SparkContext, it still returns existing SparkContext.
There are multiple ways to create RDDs in PySpark.
parallelize() functiondata = [(1,2,3, 'a b c'), (4,5,6, 'd e f'), (7,8,9,'g h i')]
rdd = sc.parallelize(data) # Spark RDD creation
# rdd = sc.parallelize(data, n) # n is minimum number of partitions
This function will split the dataset into multiple partitions. You can get number of partitions by using the function <rdd>.getNumParititions().
textFile or wholeTextFiles functionspath = "file:///data/movies.csv"
rdd = sc.textFile(path) # a spark rdd
The file movies.csv is uploaded to the local file system and stored in the folder /sparkdata.
path = "hdfs://localhost:9000/data/movies.csv"
# path = "/data/movies.csv"
rdd3 = sc.textFile(path) # a spark rdd
The file movies.csv is uploaded to HDFS and stored in the folder /data.
The function wholeTextFiles will read all files in the directory and each file will be read as a single row in the RDD whereas the function textFile will read each line of the file as a row in the RDD.
When we use parallelize() or textFile() or wholeTextFiles() methods of SparkContxt to initiate RDD, it automatically splits the data into partitions based on resource availability. when you run it on a local machine it would create partitions as the same number of cores available on your system.
rdd = sc.emptyRDD()
rdd = sc.paralellize([], 10)
This will create an empty RDD with 10 partitions.
Sometimes we may need to repartition the RDD, PySpark provides two ways to repartition; first using repartition() method which shuffles data from all nodes also called full shuffle and second coalesce() method which shuffle data from minimum nodes, for examples if you have data in 4 partitions and doing coalesce(2) moves data from just 2 nodes.
rdd = sc.parallelize(range(1,100), 10)
print(rdd.getNumPartitions())
print(rdd.collect())
rdd2 = rdd.repartition(4)
print(rdd2.getNumPartitions())
print(rdd2.collect())
rdd3 = rdd.coalesce(4)
print(rdd3.getNumPartitions())
print(rdd3.collect())
Note that repartition() method is a very expensive operation as it shuffles data from all nodes in a cluster. Both functions return RDD.
collect is a Spark action and returns all elements of the RDD.repartition and coalesce are considered as transformations in Spark.Transformations are lazy operations, instead of updating an RDD, these operations return another RDD.
map operation returns a new RDD by applying a function to each element of this RDD. flatMap applies the map operation and then flattens the RDD rows. We use this function when you the map operation returns a list of values and flattening will convert the list of list of values into list of values.
# Read file
rdd1 = sc.textFile("movies.csv")
rdd1.take(10)
# tokenize
rdd2 = rdd1.flatMap(lambda x : x.split(","))
rdd2.take(10)
# Or
# def f(x):
# return x.split(",")
# rdd2 = rdd1.flatMap(f)
# rdd2.take(10)
# Remove the additional spaces
rdd3 = rdd2.map(lambda x : x.strip())
rdd3.take(10)
take(k) is a Spark action and returns the first elements of the RDD.
Returns a new RDD after applying filter function on source dataset.
# Returns only values which are digits
rdd4 = rdd3.filter(lambda x : str(x).isdigit())
print(rdd4.count())
count is a Spark action and returns the number of elements in the RDD.
Returns a new RDD after eliminating all duplicated elements…
# Returns unique number of values which are only digits
rdd5 = rdd4.distinct()
print(rdd5.count())
Return a sampled subset of this RDD.
rdd6 = rdd5.sample(withReplacement=False, fraction=0.6, seed=0)
print(rdd6.count())
Splits the RDD by the weights specified in the argument. For example rdd.randomSplit(0.7,0.3)
rdd7, rdd8 = rdd1.randomSplit(weights=[0.2, 0.3], seed=0)
print(rdd7.count())
print(rdd8.count())
mapPartitions is similar to map, but executes transformation function on each partition, This gives better performance than map function. mapPartitionsWithIndex returns a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition. They both should return a generator.
# a generator function
def f9(iter):
for v in iter:
yield (v, 1)
rdd9 = rdd3.mapPartitions(f9)
print(rdd9.take(10))
# a generator function
def f10(index, iter):
for v in iter:
yield (v, index)
rdd10 = rdd3.mapPartitionsWithIndex(f10)
print(rdd10.take(10))
# Compute the word-digit frequency in the file and show the top 10 words.
# Group by all tokens
rdd11 = rdd3.groupBy(lambda x : x)
# Calculate the length of the list of token duplicates
rdd12 = rdd11.map(lambda x: (x[0], len(x[1])))
# or
# rdd12 = rd11.mapValues(len)
# Sort the results
rdd13 = rdd12.sortBy(lambda x : x[1], ascending=False)
# Take the first elements of the RDD and display
print(rdd13.take(10))

# Compute the digit frequency in the file and show the top 10 words.
# Get all digits
rdd14 = rdd3.filter(lambda x: x.isdigit())
# Initialize the counters
rdd15 = rdd14.map(lambda x : (x, 1))
# Aggregate the counters who have same key which is here a digit
rdd16 = rdd15.reduceByKey(lambda x, y : x+y)
# Sort the results
rdd17 = rdd16.sortBy(lambda x : x[1], ascending=False)
# Take the first elements of the RDD and display
print(rdd17.take(10))

RDD Action operations return the values from an RDD to a driver program. In other words, any RDD function that returns non-RDD is considered as an action.
returns the complete dataset as an Array.
Using the action on a large RDD is dangerous because it will collect all elements of the distributed RDD in one machine (the driver) and this can cause the machine to run out of memory since Spark is an in-memory processing engine.
max returns the maximum value from the dataset whereas min returns the minimum value from the dataset. first returns the first element in the dataset. top returns top n elements from the dataset (after sorting them). take returns the first n elements of the dataset.

The operation take in Spark RDD is the same as head in pandas DataFrame whereas top is interpreted as the first elements after sorting them.
count returns the count of elements in the dataset.
There are other similar operations. countApprox(timeout, confidence=0.95) which is the approximate version of count() and returns a potentially incomplete result within a timeout, even if not all tasks have finished. countApproxDistinct(relative_accuracy) returns an approximate number of distinct elements in the dataset.
Note: These operations are used when you have very large dataset which takes a lot of time to get the count.
countByValueReturn a dictionary where the key represents each unique value in the dataset and the value represents count of each value present.
print(rdd3.countByValue())

reduce reduces the elements of the dataset using the specified binary operator. treeReduce reduces the elements of this RDD in a multi-level tree pattern. The output is the same.
result = rdd3 \
.map(lambda x : 1) \
.reduce(lambda x, y : x+y)
resultTree = rdd3 \
.map(lambda x : 1) \
.treeReduce(lambda x, y : x+y)
# You should get the same results as rdd3.count() operation
assert rdd3.count()==result==resultTree
Here I showed some of the operations, but you can find more in the documentation.
Used to save the rdd to an external data store.
rdd3.saveAsTextFile("/root/myrdd")

Persistence is useful due to:
We have different levels for storage like memory, disk, serialized, unserialized, repliacted, unreplicated. You can check here for the avilable options.
PySpark RDD cache() method by default saves RDD computation to storage level MEMORY_ONLY meaning it will store the data in the JVM heap as unserialized objects.
cachedRDD = rdd.cache()
cachedRDD.collect()
PySpark persist() method is used to store the RDD to a specific storage level.
import pyspark
persistedRDD = rdd.persist(pyspark.StorageLevel.MEMORY_ONLY)
persistedRDD.collect()
PySpark automatically monitors every persist() and cache() calls you make and it checks usage on each node and drops persisted data if not used or by using least-recently-used (LRU) algorithm. You can also manually remove using unpersist() method. unpersist() marks the RDD as non-persistent, and remove all blocks for it from memory and disk.
unpersistedRDD = persistedRDD.unpersist()
unpersistedRDD.collect()
Shuffling is a mechanism Spark to redistribute the data across different executors and even across machines. PySpark shuffling triggers when we perform certain transformation operations like gropByKey(), reduceByKey(), join() on RDDS.
Shuffling is an expensive operation since it involves the following:
For example, when we perform reduceByKey() operation, PySpark does the following:
PySpark RDD triggers shuffle and repartition for several operations like repartition(), coalesce(), groupByKey(), and reduceByKey().
Based on your dataset size, a number of cores and specific memory size, can benefit or harm the Spark shuffling. When you deal with less amount of data, you should typically reduce the number of partitions otherwise you will end up with many partitioned files with less number of records in each partition. which results in running many tasks with lesser data to process.
On other hand, when you have too much of data and having less number of partitions results in fewer longer running tasks and some times you may also get out of memory error.
Getting the right size of the shuffle partition is always tricky and takes many runs with different values to achieve the optimized number. This is one of the key properties to look for when you have performance issues on Spark jobs.
When Spark executes transformation using map or reduce operations, It executes the transformations on a remote node by using the variables that are shipped with the tasks and these variables are not sent back to PySpark Driver hence there is no capability to reuse and sharing the variables across tasks. PySpark shared variables solve this problem using the below two techniques. PySpark provides two types of shared variables.
We can create broadcast variables using the function sc.broadcast. A broadcast variable created with SparkContext.broadcast(). Access its value through value.
v = sc.broadcast(range(1, 100))
print(v.value)
A shared variable that can be accumulated, i.e., has a commutative and associative add operation. Worker tasks on a Spark cluster can add values to an Accumulator with the += operator, but only the driver program is allowed to access its value, using value. Updates from the workers get propagated automatically to the driver program.
acc = sc.accumulator(0)
acc+=10
acc.add(10)
print(acc.value) # 20
DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as structured data files, tables in Hive, external databases, or existing RDDs.
PySpark DataFrame is mostly similar to Pandas DataFrame with the exception PySpark DataFrames are distributed in the cluster (meaning the data in DataFrame’s are stored in different machines in a cluster) and any operations in PySpark executes in parallel on all machines whereas Panda Dataframe stores and operates on a single machine. Due to parallel execution on all cores on multiple machines, PySpark runs operations faster then pandas.
Each record in the dataframe is of type pyspark.sql.Row whereas each column is of type pyspark.sql.Column. There are multiple ways to create DataFrame in PySpark:
createDataFrame() functiondata = [(1,2,3, 'a b c'), (4,5,6, 'd e f'), (7,8,9,'g h i')]
df = spark.createDataFrame(data) # is a dataframe from a list
df.rdd # Convert Spark DataFrame into RDD
rdd = sc.parallelize(data)
df = spark.createDataFrame(rdd) # is a dataframe from an rdd
toDF() functionrdd = sc.parallelize(data)
df = rdd.toDF() # From RDD to Dataframe
# from RDD to Dataframe with custom column names
df = rdd.toDF(["int1", "int2", "int3", "str"])
# from RDD to Dataframe without defining the schema (inferSchema option is true)
df = spark.createDataFrame(rdd)
from pyspark.sql.types import *
rdd = rdd.map(lambda t: Row(t._1, t._2, t._3))
schema = StructType([
# StructField(<fieldname>, <fieldtype>, <nullability>)
StructField("int1", IntegerType(), True),
StructField("int2", IntegerType(), True),
StructField("int3", IntegerType(), True),
StructField("str", StringType(), True)
])
# From RDD to Dataframe with an explicit schema
df = spark.createDataFrame(rdd, schema)
path = "file:///sparkdata/movies.csv"
df1 = spark.read.format("csv") \
.option("sep", ",") \
.option("inferSchema", "true") \
.option("header", "true") \
.load(path)
df1 = spark.read.csv(path)
df1.show() # Display the dataframe
df1.printSchema() # print the schema of the dataframe
You can find more csv options here.
You can put the file movies.csv in the moviesdb database inside the collection movies as follows.
mongoimport --db moviesdb --collection movies --type csv --headerline --file movies.csv
import pymongo
# The default configuration
# localhost:27017
client = pymongo.MongoClient()
db = client['moviesdb'] # client['<db_name>']
# A pymongo Cursor
# db.<collection_name>
movies_cur = db.movies.find() # Get all documents
# Convert to Pandas DataFrame
df1 = pd.DataFrame(movies_cur)
from pyspark.sql.types import *
schema = StructType([
# StructField(<fieldname>, <fieldtype>, <nullability>)
StructField("Film", StringType(), True),
StructField("Genre", StringType(), True),
StructField("Lead Studio", StringType(), True),
StructField("Audience score %", IntegerType(), True),
StructField("Profitability", FloatType(), True),
StructField("Rotten Tomatoes %", IntegerType(), True),
StructField("Worldwide Gross", StringType(), True),
StructField("Year", IntegerType(), True),
StructField("_id", StringType(), True) # Special field for the documents in Mongodb
])
# Try to run spark.createDataFrame(movies_cur)
# Convert immediately to Spark DataFrame
df3 = spark.createDataFrame(movies_cur, schema)
# Convert to RDD then to Spark DataFrame
df4 = spark.createDataFrame(sc.parallelize(movies_cur), schema) # Convert to Spark DataFrame
schema = StructType([
StructField("Film", StringType(), True),
StructField("Genre", StringType(), True),
StructField("Lead Studio", StringType(), True),
StructField("Audience score %", IntegerType(), True),
StructField("Profitability", FloatType(), True),
StructField("Rotten Tomatoes %", IntegerType(), True),
StructField("Worldwide Gross", StringType(), True),
StructField("Year", IntegerType(), True)
])
# Read from local file system with a schema
df3 = spark.read.csv(path, schema)
You can store the dataframe in Mongodb as follows.
df.write.format("com.mongodb.spark.sql.DefaultSource") \
.option("uri", "mongodb://127.0.0.1:27017/db_name.collection_name") \
.save()
Note: Make sure that you added Mongodb connector .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') to the configuration of SparkSession.
path = "hdfs://localhost:9000/data/movies.csv"
df = spark.read.load(path, format="csv", sep = ",", inferSchema = "true", header = "true")
# a spark dataframe
# OR
df = spark.read.csv(path, sep = ",", inferSchema = "true", header = "true")
# a spark dataframe
df.printSchema()
df.show(truncate=False)
The file movies.csv is uploaded to HDFS and stored in the folder /data.
StructType and StructField classes are used to programmatically specify the schema to the DataFrame and create complex columns like nested struct, array, and map columns. StructType is a collection of StructField’s that defines column name, column data type, boolean to specify if the field can be nullable or not and metadata.
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
# A sample data
data = [ (("James","","Smith"),"36636","M",3100),
(("Michael","Rose",""),"40288","M",4300),
(("Robert","","Williams"),"42114","M",1400),
(("Maria","Anne","Jones"),"39192","F",5500),
(("Jen","Mary","Brown"),"","F",-1)
]
schema = StructType([
StructField('name', StructType([
StructField('firstname', StringType(), True),
StructField('middlename', StringType(), True),
StructField('lastname', StringType(), True)
])),
StructField('id', StringType(), True),
StructField('gender', StringType(), True),
StructField('salary', IntegerType(), True)
])
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show(truncate=False)
show() is used to display the contents of the DataFrame in a Table Row and Column Format. By default, it shows only 20 Rows, and the column values are truncated at 20 characters.
df.show()
df.show(5)
df.show(5, truncate=False)
df.show(10, truncate=False, vertical=True)
collect() is an action operation that is used to retrieve all the elements of the dataset (from all nodes) to the driver node. It retrieves all elements in a DataFrame as a list of Row type to the driver node. We should use collect() on smaller dataset usually after filter(), group() e.t.c. Retrieving larger datasets results in OutOfMemory error. You can use head operation to get only the first rows/records.
df.collect() # all elements
df.collect()[0] # first row
df.collect()[0][0] # first cell at first row and first column
Notice that collect on a big dataframe can cause performance and memory issues since this action collects the partitions of the dataframe from all cluster nodes to the memory of one machine. So we suggest more efficient methods as follows:
df.take(10) # first 10 elements
df.take(1)[0] # first row
df.take(1)[0][0] # first cell at first row and first column
select() function is used to select single, multiple, column by index, all columns from the list and the nested columns from a DataFrame. This function returns a DataFrame with the selected columns.
from pyspark.sql.functions import col
df.select("name", \
"name.firstname", \
df.id, \
df['gender'], \
col("salary")) \
.show()

df.select("*").show()

df.select([col for col in df.columns]).show()

df.select(df.columns[:2]).show()

withColumn() is a transformation function of DataFrame which is used to change the value, convert the datatype of an existing column, create a new column, and many more.
# Read the data
path = "hdfs://localhost:9000/data/movies.csv"
df = spark.read.load(path, format="csv", sep = ",", inferSchema = "true", header = "true")
# Print Schema
df.printSchema()
# Convert the `Worldwide Gross` column to double
# 1. Remove the $ sign
import pyspark.sql.functions as F
df.withColumn("Worldwide Gross", F.translate('Worldwide Gross', '$', '').cast("Double")).show(5)
df.withColumn("Worldwide Gross", F.col("Worldwide Gross").cast("Double"))
# You can merge the previous operations into one operation as shown below

col_name = df.columns[3]
df2.withColumn(col_name, F.col(col_name)/100).show(5)

col_name = df2.columns[3]
df2.withColumn("score", F.col(col_name)/100).show(5)

df2.withColumn("Country", F.lit("Russia")).show()
df2.withColumnRenamed(df2.columns[3], "score").show(5)

df2.drop("Year").show(5)

PySpark filter() function is used to filter the rows from RDD/DataFrame based on the given condition or SQL expression, you can also use where() clause instead of the filter() if you are coming from an SQL background, both these functions operate exactly the same.
df2.filter((df2.Year == 2008) & (df2['Film'].startswith("Wh"))).show(5)

df2.filter(~F.col('Genre').isin(['Comedy', 'Drama'])).show(5)

PySpark distinct() function is used to drop/remove the duplicate rows (all columns) from DataFrame and dropDuplicates() is used to drop rows based on selected (one or multiple) columns.
print(df2.count() - df2.distinct().count())

df2.dropDuplicates(['Genre', 'Lead Studio']).show(5)

Similar to SQL GROUP BY clause, PySpark groupBy() function is used to collect the identical data into groups on DataFrame and perform count, sum, avg, min, max functions on the grouped data. When we perform groupBy() on PySpark Dataframe, it returns GroupedData object which contains aggregate functions. Some of them are avg, sum, min, max.
Notice that the aggregate functions are transformations and will return a DataFrame. You need to call an action to see the output of the aggregation.
df2.groupby("Genre").sum("Worldwide Gross").show()

2. Calculate the average score for audience and max gross for each film genre every year. Exclude elements whose max gross is less than 50.
df2.groupby("Genre", 'Year') \
.agg(
F.avg("Audience score %").alias("avg_score"), \
F.max(df2.columns[6]).alias("max_gross")
).where(F.col("max_gross")>=50) \
.show(5)
# Equivalent SQL Query
# SELECT Genre,
# Year,
# avg("Audience score ") AS avg_score,
# max("Worldwide Gross") AS max_score
# FROM movies
# GROUP BY Genre, Year
# HAVING max_score >= 50

You can use either sort or orderBy function of PySpark DataFrame to sort DataFrame by ascending or descending order based on single or multiple columns, you can also do sorting using PySpark SQL sorting functions.
Example: Calculate the average score for audience and max gross for each film genre every year. Exclude elements whose max gross is less than 50.
df2.groupby("Genre", 'Year') \
.agg(
F.avg("Audience score %").alias("avg_score"), \
F.max(df2.columns[6]).alias("max_gross")
).where(F.col("max_gross")>=50) \
.sort(F.col("max_gross").asc(), F.col("avg_score").desc()) \
.show(5)
# Equivalent SQL Query
# SELECT Genre,
# Year,
# avg("Audience score ") AS avg_score,
# max("Worldwide Gross") AS max_score
# FROM movies
# GROUP BY Genre, Year
# HAVING max_score >= 50
# ORDER BY max_gross asc, avg_score desc
Join is used to combine two DataFrames and by chaining these you can join multiple DataFrames. it supports all basic join type operations available in traditional SQL like INNER, LEFT OUTER, RIGHT OUTER, LEFT ANTI, LEFT SEMI, CROSS, SELF JOIN.
df3 = df2.groupby("Genre", 'Year') \
.agg(
F.avg("Audience score %").alias("avg_score"), \
F.max(df2.columns[6]).alias("max_gross")
).where(F.col("max_gross")>=50)
df3.join(df2, (df2.Genre==df3.Genre) & (df2.Year==df3.Year), how="inner").show(5)

PySpark UDF is the most useful feature of Spark SQL & DataFrame that is used to extend the PySpark built-in capabilities. I will show here the steps for creating UDF for capitalizing the first character in each word. Steps of creating UDFs are:
def convertCase(s):
resStr=""
arr = s.split(" ")
for x in arr:
resStr = resStr + x[0].upper() + x[1:len(x)] + " "
return resStr
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
capitalizeUDF = F.udf(lambda x: convertCase(x),StringType())
# Since the default return type of the udf() is StringType, you can write it as follows
capitalizeUDF = F.udf(lambda x: convertCase(x))
df2.select("Film", capitalizeUDF(F.col("Film")).alias("Capitalized_Film")).show(5, truncate = False)

Note: UDFs are treated as a black box to Spark hence it can not apply optimization and you will lose all the optimization PySpark does on Dataframe/Dataset. We recommend to use UDFs only if you do not have them as built-in functions.
In Spark, you can save (write/extract) a DataFrame to a CSV file on disk by using dataframeObj.write.csv(“path”). Using this you can also write DataFrame to HDFS, or any Spark supported file systems.
#Write DataFrame data to CSV file
df.write.csv("movies_df")
#You can also use below
df.write.format("csv").save("movies_df")
df.write.format("csv") \
.option("header", "true") \
.option("delimiter", ",") \
.save("movies_df")
df.write.option("header","true") \
.csv("hdfs://localhost:9000/movies_df")
PySpark partitionBy() is a function of pyspark.sql.DataFrameWriter class which is used to partition the large dataset (DataFrame) into smaller files based on one or multiple columns while writing to disk. Partitioning the data on the file system is a way to improve the performance of the query when dealing with a large dataset in the Data lake.
PySpark supports partition in two ways; partition in memory (DataFrame) and partition on the disk (File system).
Partition in memory: You can partition or repartition the DataFrame by calling repartition() or coalesce() transformations. This is discussed above in RDD section.
Partition on disk: While writing the PySpark DataFrame back to disk, you can choose how to partition the data based on columns using partitionBy() of pyspark.sql.DataFrameWriter. This is similar to Hives partitions scheme.
Some advantages of partitions are: a) Fast access to the data, b) The ability to perform an operation on a smaller dataset.
# We can store the dataframe in the disk in partitions based on the values of Genre column.
df.write.option("header",True) \
.partitionBy("Genre") \
.csv("movies_df")
# Read only a specific parition `Genre=Animation` of the dataframe.
df = spark.read.csv("movies_df/Genre=Animation", header=True, sep=",")
Save operations can optionally take a SaveMode, that specifies how to handle existing data if present. It is important to realize that these save modes do not utilize any locking and are not atomic. Additionally, when performing an Overwrite, the data will be deleted before writing out the new data. Options include:
# Save modes
df.write.format("csv") \
.mode("append") \
.option("delimiter", "|") \
.save("/tmp/spark_output/datacsv")
df.write.format("csv") \
.option("mode","append") \
.option("delimiter", "|") \
.save("/tmp/spark_output/datacsv")
Spark Dataset is an interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. Spark Dataset API is supported in statically typed languages like Java and Scala since Spark Datasets rely heavily on static typing. A DataFrame is a Dataset organized into named columns. Python is a dynamically typed language, it still has access to Spark’s DataFrame API, which offers similar functionality as Datasets.
It is a module used for structured data processing. Spark SQL allows you to query structured data using either SQL or DataFrame API.
The pyspark.sql is a module in Spark that is used to perform SQL-like operations on the data stored in memory. You can either leverage using programming API to query the data or use the ANSI SQL queries similar to RDBMS. You can also mix both, for example, use API on the result of an SQL query.
Spark SQL is one of the most used Spark modules for processing structured columnar data format. Once you have a DataFrame created, you can interact with the data by using SQL syntax. In other words, Spark SQL brings native RAW SQL queries on Spark meaning you can run traditional ANSI SQL on Spark Dataframe.
In order to use SQL, first, register a temporary table/view on DataFrame using the createOrReplaceTempView() function. Once created, this table can be accessed throughout the SparkSession using sql() and it will be dropped along with your SparkContext termination. Use sql() method of the SparkSession object to run the query and this method returns a new DataFrame
This dataset includes results of international football matches starting from the very first official match in up to . The matches range from FIFA World Cup to FIFI Wild Cup to regular friendly matches. The matches are strictly men’s full internationals and the data does not include Olympic Games or matches where at least one of the teams was the nation’s B-team, U-23 or a league select team.
results.csv includes the following columns:
For the dataset of scorers and shootouts you can check this Kaggle data card.
Here we will use the dataset
Create SQL View
results dataframe.from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DateType
schema = StructType([
StructField("date", DateType(), False),
StructField("home_team", StringType(), False),
StructField("away_team", StringType(), False),
StructField("home_score", IntegerType(), False),
StructField("away_score", IntegerType(), False),
StructField("tournament", StringType(), False),
StructField("city", StringType(), False),
StructField("country", StringType(), False),
StructField("neutral", BooleanType(), False),
])
# You can also use spark.read.csv function
df = spark.read.format("csv").load("results.csv", header = True, schema = schema)
df
df.createOrReplaceTempView("results_table")
Spark SQL to Select Columns
// DataFrame API Select query
df.select("home_team","city","country","tournament")
.show(5)
// SQL Select query
spark.sql("SELECT home_team, city, country, tournament FROM RESULTS_TABLE")
.show(5)
Filter Rows
To filter the rows from the data, you can use function from the DataFrame API.
// DataFrame API where()
df.select("country","city","home_team","tournament")
.where("city == 'Moscow'")
.show(5)
Similarly, in SQL you can use WHERE clause as follows.
// SQL where
spark.sql(""" SELECT country, city, home_team, tournament FROM RESULTS_TABLE
WHERE city = 'Moscow' """)
.show(5)
Sorting
// sorting
df.select("country","city","home_team","tournament")
.where("city in ('London','Paris','Moscow')")
.orderBy("city")
.show(10)
// SQL ORDER BY
spark.sql(""" SELECT country, city, home_team, tournament FROM RESULTS_TABLE
WHERE city in ('London','Paris','Moscow') order by city """)
.show(10)
Grouping
// grouping
df.groupBy("city").count()
.show()
// SQL GROUP BY clause
spark.sql(""" SELECT city, count(*) as count FROM RESULTS_TABLE
GROUP BY city""")
.show()
PySpark SQL join has a below syntax and it can be accessed directly from DataFrame.
join(self, other, on=None, how=None)
join() operation takes parameters as below and returns DataFrame.
You can also write Join expression by adding and methods on DataFrame and can have Join on multiple columns.
emp = [(1,"Smith",-1,"2018","10","M",3000), \
(2,"Rose",1,"2010","20","M",4000), \
(3,"Williams",1,"2010","10","M",1000), \
(4,"Jones",2,"2005","10","F",2000), \
(5,"Brown",2,"2010","40","",-1), \
(6,"Brown",2,"2010","50","",-1) \
]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
"emp_dept_id","gender","salary"]
empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.printSchema()
empDF.show(truncate=False)
dept = [("Finance",10), \
("Marketing",20), \
("Sales",30), \
("IT",40) \
]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")
# Join in pyspark.sql.DataFrame API
empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id,"inner") \
.show(truncate=False)
# SQL INNER JOIN
joinDF = spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id") \
.show(truncate=False)
# SQL INNER JOIN
joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \
.show(truncate=False)
# Left Join in pyspark.sql.DataFrame API
empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id, "left") \
.show(truncate=False)
# SQL LEFT JOIN
joinDF = spark.sql("select * from EMP e LEFT OUTER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \
.show(truncate=False)
# Right Join in pyspark.sql.DataFrame API
empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id, "right") \
.show(truncate=False)
# SQL RIGHT JOIN
joinDF = spark.sql("select * from EMP e RIGHT OUTER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \
.show(truncate=False)
# Full Join in pyspark.sql.DataFrame API
empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id, "full") \
.show(truncate=False)
# SQL FULL JOIN
joinDF = spark.sql("select * from EMP e FULL OUTER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \
.show(truncate=False)
You can read about Anti-joins, semi-joins and unions from here