Lab 4 - Apache Spark Core & SQL

Course: Big Data - IU S25
Author: Firas Jolha

Datasets

PySpark on Colab

Readings

Agenda

Prerequisites

Objectives

Intro to Apache Spark [review]

This section gives a theoretical introduction to Spark, which will be covered in the lecture. Feel free to skip it if you have enough theoretical knowledge about Spark.

Apache Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, pandas API on Spark for pandas workloads (only for spark 3), MLlib for machine learning, GraphX for graph processing, and Structured Streaming for incremental computation and stream processing.

Spark introduces the concept of an RDD (Resilient Distributed Dataset), an immutable fault-tolerant, distributed collection of objects that can be operated on in parallel. An RDD can contain any type of object and is created by loading an external dataset or distributing a collection from the driver program. Each RDD is split into multiple partitions (similar pattern with smaller sets), which may be computed on different nodes of the cluster

RDD means:
Resilient – capable of rebuilding data on failure
Distributed – distributes data among various nodes in cluster
Dataset – collection of partitioned data with values

Core Concepts in Spark

Check the glossary from here.

Spark Application Model

Apache Spark is widely considered to be the successor to MapReduce for general purpose data processing on Apache Hadoop clusters. In MapReduce, the highest-level unit of computation is a job. A job loads data, applies a map function, shuffles it, applies a reduce function, and writes data back out to persistent storage. In Spark, the highest-level unit of computation is an application. A Spark application can be used for a single batch job, an interactive session with multiple jobs, or a long-lived server continually satisfying requests. A Spark job can consist of more than just a single map and reduce.

MapReduce starts a process for each task. In contrast, a Spark application can have processes running on its behalf even when it is not running a job. Furthermore, multiple tasks can run within the same executor. Both combine to enable extremely fast task startup time as well as in-memory data storage, resulting in orders of magnitude faster performance over MapReduce.

Spark Execution Model

At runtime, a Spark application maps to a single driver process and a set of executor processes distributed across the hosts in a cluster.

The driver process manages the job flow and schedules tasks and is available the entire time the application is running. Typically, this driver process is the same as the client process used to initiate the job, although when run on YARN, the driver can run in the cluster. In interactive mode, the shell itself is the driver process.

The executors are responsible for executing work, in the form of tasks, as well as for storing any data that you cache. Executor lifetime depends on whether dynamic allocation is enabled. An executor has a number of slots for running tasks, and will run many concurrently throughout its lifetime.

Invoking an action operation inside a Spark application triggers the launch of a job to fulfill it. Spark examines the dataset on which that action depends and formulates an execution plan. The execution plan assembles the dataset transformations into stages. A stage is a collection of tasks that run the same code, each on a different subset of the data.

Spark Components

Spark Driver contains more components responsible for translation of user code into actual jobs executed on cluster:

Spark Architecture

Apache Spark works in a master-slave architecture where the master is called “Driver” and slaves are called “Workers”.

When you run a Spark application, Spark Driver creates a context that is an entry point to your application, and all operations (transformations and actions) are executed on worker nodes, and the resources are managed by Cluster Manager.

How Spark works?

Spark has a small code base and the system is divided in various layers. Each layer has some responsibilities. The layers are independent of each other.

The first layer is the interpreter, Spark uses a Scala interpreter, with some modifications. As you enter your code in spark console (creating RDD’s and applying operators), Spark creates a operator graph. When the user runs an action (like collect), the Graph is submitted to a DAG Scheduler. The DAG scheduler divides operator graph into (map and reduce) stages. A stage is comprised of tasks based on partitions of the input data. The DAG scheduler pipelines operators together to optimize the graph. For e.g. Many map operators can be scheduled in a single stage. This optimization is key to Spark’s performance. The final result of a DAG scheduler is a set of stages. The stages are passed on to the Task Scheduler. The task scheduler launches tasks via cluster manager (Spark Standalone/Yarn/Mesos). The task scheduler doesn’t know about dependencies among stages.

Spark Features

Supported Cluster Managers

Spark supports four cluster managers:

Scala is the native language for writing Spark applications but Apache Spark supports drivers for other languages such as Python (PySpark package), Java, and R.

PySpark

PySpark is a Spark library written in Python to run Python applications using Apache Spark capabilities. Using PySpark we can run applications in parallel on the distributed cluster (multiple nodes). In other words, PySpark is a Python API for Apache Spark.

Spark is written in Scala and later on due to its industry adaptation, its API PySpark released for Python using Py4J Java library that is integrated within PySpark and allows Python to dynamically interface with JVM objects. Hence, to run PySpark you need Java to be installed along with Python, and Apache Spark.

PySpark Modules & Packages

Besides these, if you wanted to use third-party libraries, you can find them at https://spark-packages.org/ . This page is kind of a repository of all Spark third-party libraries.

Install PySpark

There are different ways to install Apache Spark on your machine.

On Colab

You can use the notebook shared in the beginning of this tutorial for installation approach to run PySpark on Colab Notebook.

Using pip

You can install pyspark for Python language by simply installing the package pyspark using pip as follows:

  1. Create a virtual environment before installing the package.
python3 -m venv venv
source venv/bin/activate
  1. Install the package
pip install pyspark

This will install two packages pyspark and py4j. py4j is Python programs running in a Python interpreter to dynamically access Java objects in a Java Virtual Machine. This will install Spark software and you will be able to run Spark applications.

Using Docker

You can also install it using Docker as below.

  1. Pull the image spark:python3
docker pull spark:python3
  1. Run a container and access the pyspark shell.
docker run --name pyspark -it -p 4040:4040 spark:python3 /opt/spark/bin/pyspark

Here we create a container from the image above, publish the port 4040 for the Spark Web UI and run the pyspark shell.

Deploying Spark on Yarn Cluster

You can deploy a fully-distributed Hadoop cluster where YARN is the resource manager. I have prepared a repository which can deploy the cluster using Docker containers. This cluster also can be used for running your Spark applications on the cluster. You can check the repository from here. In order to start the cluster, you need to follow the steps below:

  1. Clone the repository (https://github.com/firas-jolha/docker-spark-yarn-cluster) in a new folder
mkdir -p ~/spark-yarn-test && cd ~/spark-yarn-test
git clone git@github.com:firas-jolha/docker-spark-yarn-cluster.git
cd docker-spark-yarn-cluster
  1. Run the script startHadoopCluster.sh passing the number of slaves N (default value is 2).
N=5
bash startHadoopCluster.sh $N

Here I am running a cluster of one master node and 5 slave nodes. In HDFS, the master node will run the namenode and seconadry namenodes and the slave nodes will run the datanodes. In YARN, the master node will run the resource manager and the slave nodes will run the node managers. In Spark, the master node will run the Spark Cluster master and the slave nodes will run the workers.

  1. Access the master node.
docker exec -it cluster-master bash
  1. Check if the services are running. If some are not running then restart them.

jps -lm



# Starting HDFS
start-dfs.sh

# Starting the namenode
hdfs --daemon start namenode
# Starting the secondary namenode
hdfs --daemon start secondarynamenode
# Starting a datanode (do this on the datanode)
hdfs --daemon start datanode


# Starting YARN
start-yarn.sh

# Starting YARN + HDFS
start-all.sh


# Starting the MapReduce History server
mapred --daemon start historyserver


# Starting the Spark Cluster master
$SPARK_HOME/sbin/start-master.sh

# Starting the Spark Cluster workers
$SPARK_HOME/sbin/start-workers.sh

# Starting the Spark Cluster master + workers
$SPARK_HOME/sbin/start-all.sh

The table below shows some of the default ports for the running services.

Service Port
HDFS namenode 9870
HDFS datanode 9864
HDFS secondary namenode 9868
YARN resource manager 8088
YARN node manager 8042
Spark master 8080
Spark jobs per application 4040
Spark history server 18080
MapReduce history server 19888
  1. When you are done. You can run the script remove_containers.sh (CAUTION) which will remove all containers whose name contains cluster. Be careful, if you have any containers that contains cluster in its name, will also be removed.
bash remove_containers.sh

Or you can remove containers one by one if you do not use this script.

[Optional] Install PySpark on HDP Sandbox

If you have installed Python and pip, then you just need to run the following command:

pip2 install pyspark

Otherwise, you need to return to the install them.

Note: You can use Python3.6 to write Spark applications if you configured it properly in HDP Sandbox.

Running Spark applications

You can execute PySpark statements in interactive mode on the shell pyspark or you can write them in Pyhton file and submit it to Spark using the tool spark-submit. In addition, Spark comes with a set of ready-to-execute examples of Spark applications.

pyspark shell

You can open a spark shell session by running the shell pyspark for Python, spark-shell for Scala, sparkR for R language.

pyspark

The pyspark shell starts a Spark application and initiates/gets the Spark context sc (spark for Spark session) which internally creates a Web UI with URL localhost:4040 (or next port if 4040 is not free). By default, it uses local[*] as master (this means that Spark will run jobs on the same machine using all CPU cores). It also displays Spark, and Python versions.

Note: If you are running two or more Spark applications simultaneously, then the next ports to 4040 (e.g. 4041, 4042, …etc) are not published in Docker container to the host machine and you can set the port for Spark app web UI manually to one of the free ports such as 4042 as follows:

docker run --name pyspark -it -p 4042:4042 spark:python3 /opt/spark/bin/pyspark --conf spark.ui.port=4042

On pyspark shell, you can write only Python code.

You can exit the pyspark shell by executing statement exit() like in a Python shell.

Whereas spark-shell command runs a Spark shell for Scala.

spark-shell

spark-submit

We can use this tool for non-interactive mode of data anlysis. This command accepts the path for the spark application.

Let’s write our simple application in app.py and execute it using spark-submit.

from pyspark.sql import SparkSession
import pandas as pd


spark = SparkSession\
    .builder\
    .appName("PythonSparkApp")\
    .getOrCreate()    

sc = spark.sparkContext

sc.setLogLevel("OFF") # WARN, FATAL, INFO

data = [('hello', 1), ('world', 2)]
print(pd.DataFrame(data))

spark.createDataFrame(data).show()

rdd = sc.parallelize(data)
print(rdd.collect())

Now you can submit this app using spark-submit

# Submit the application main.py with default configs
spark-submit app.py
# master is local[*]

# Submit the application main.py on the YARN cluster where the driver program will run in the client machine
spark-submit \
    --master yarn
    app.p


# Run Spark app example 'SparkPi' to calculate the value of π (pi) with the Monte-Carlo method for 10 iterations locally on 8 cores
spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master local[8] \
  /path/to/examples.jar \
  10
# See note below.


# Run Spark app example 'wordcount.py' with the name "my app" to calculate the number of words in the file `/sparkdata/movies.csv` and run the Spark Uu=i web on the custom port 4242 on Yarn cluster where the driver program can use 2G memory and 4 cores and the application will run on 8 executors where each executor process will have 2G and 4 CPU cores.
spark-submit \
  --name "my app" \
  --master yarn \
  --executor-memory 2G \
  --executor-cores 4 \
  --driver-memory 2G \
  --driver-cores 4 \
  --num-executors 8 \
  --conf spark.ui.port=4242 \
  /usr/hdp/current/spark2-client/examples/src/main/python/wordcount.py \
  /sparkdata/movies.csv

The general template for spark-submit tool:

spark-submit \ --name "app name" \ --class <main-class> \ --master <master-url> \ --deploy-mode <deploy-mode> \ --conf <key>=<value> \ ... # other options <application-jar> \ [application-arguments]

Some of the commonly used options are:

Python Package Management in PySpark apps

When you want to run your PySpark application on a cluster such as YARN, you need to make sure that your code and all used libraries are available on the executors that actually running on remote machines.

Let’s try to create a PySpark app that imports pandas package and uses a custom module backend.py as a backend.

# /app/app.py
from pyspark.sql import SparkSession
import pandas as pd


spark = SparkSession\
    .builder\
    .appName("PythonPackageManagementPySparkApp")\
    .getOrCreate()    

sc = spark.sparkContext
sc.setLogLevel("OFF") # WARN, FATAL, INFO

data = [('hello', 1), ('world', 2)]
print(pd.DataFrame(data))


spark.createDataFrame(data).show()


rdd = sc.parallelize(data)
print(rdd.collect())

from backend import some_f
some_f(spark)

# /app/backend.py
import pandas


def some_f(session):
    df = pandas.DataFrame()
    print(f"hello from backend where df.shape is {df.shape}")
    print(pandas.__version__)
    session.createDataFrame(df).show()

PySpark allows to upload Python files (.py), zipped Python packages (.zip), and Egg files (.egg) to the executors by setting the configuration setting spark.submit.pyFiles, setting the option --py-files in spark-submit, or directly calling pyspark.SparkContext.addPyFile() in the application. This is the case for custom modules/files but what about packages and virtual environments. You can actually use the python package venv-pack to package your virtual environment and pass it to spark-submit using the option archives.

Here I have a demo on using a backend.py custom module and pandas package in my application.

  1. Create a virtual environment.
python3 -m venv .venv
source .venv/bin/activate
  1. Add the packages that you want to the virtual environment.
pip install pandas
pip install venv-pack
  1. Package the environment using venv-pack
# make sure that you 
venv-pack -o /app/.venv.tar.gz
  1. Configure the environment variables for PySpark driver and excutors.
# Python of the driver (/app/.venv/bin/python)
export PYSPARK_DRIVER_PYTHON=$(which python) 

# Python of the excutor (./.venv/bin/python)
export PYSPARK_PYTHON=./.venv/bin/python
  1. Run the application via spark-submit in client mode.
spark-submit --master yarn \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./.venv/bin/python \
--deploy-mode client \
--archives /app/.venv.tar.gz#.venv \
--py-files /app/backend.py \
/app/app.py
  1. You can also run the application in cluster mode as follows:
unset PYSPARK_DRIVER_PYTHON

spark-submit --master yarn \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./.venv/bin/python \
--deploy-mode cluster \
--archives /app/.venv.tar.gz#.venv \
--py-files /app/backend.py \
/app/app.py

Spark Web UI

Apache Spark provides a suite of web user interfaces (UIs) that you can use to monitor the status and resource consumption of your Spark cluster.

Jobs Tab

The Jobs tab displays a summary page of all jobs in the Spark application and a details page for each job. The summary page shows high-level information, such as the status, duration, and progress of all jobs and the overall event timeline. When you click on a job on the summary page, you see the details page for that job. The details page further shows the event timeline, DAG visualization, and all stages of the job.

The information that is displayed in this section is:

Job details

When you click on a specific job, you can see the detailed information of this job. This page displays the details of a specific job identified by its job ID.

Stages Tab

The Stages tab displays a summary page that shows the current state of all stages of all jobs in the Spark application.

At the beginning of the page is the summary with the count of all stages by status (active, pending, completed, skipped, and failed)

After that are the details of stages per status (active, pending, completed, skipped, failed). In active stages, it’s possible to kill the stage with the kill link. Only in failed stages, failure reason is shown. Task detail can be accessed by clicking on the description.

Stage details

The stage details page begins with information like total time across all tasks, Locality level summary, Shuffle Read Size / Records and Associated Job IDs.

There is also a visual representation of the directed acyclic graph (DAG) of this stage, where vertices represent the RDDs or DataFrames and the edges represent an operation to be applied. Nodes are grouped by operation scope in the DAG visualization and labelled with the operation scope name (BatchScan, WholeStageCodegen, Exchange, etc). Notably, Whole Stage Code Generation operations are also annotated with the code generation id. For stages belonging to Spark DataFrame or SQL execution, this allows to cross-reference Stage execution details to the relevant details in the Web-UI SQL Tab page where SQL plan graphs and execution plans are reported.

You can find more info about the web UI from here.

Spark Core

RDD is a fundamental building block of Spark. RDDs are immutable distributed collections of objects. Immutable meaning once you create an RDD you cannot change it. Each record in RDD is divided into logical partitions, which can be computed on different nodes of the cluster.

In other words, RDDs are a collection of objects similar to list in Python, with the difference being RDD is computed on several processes scattered across multiple physical servers also called nodes in a cluster while a Python collection lives and process in just one process.

Additionally, RDDs provide data abstraction of partitioning and distribution of the data designed to run computations in parallel on several nodes, while doing transformations on RDD we do not have to worry about the parallelism as Spark by default provides.

Spark RDD Operations

There are two main types of Spark operations: Transformations and Actions.

Spark RDD Transformations

Spark RDD Transformations are functions that take an RDD as the input and produce one or many RDDs as the output. They do not change the input RDD (since RDDs are immutable), but always produce one or more new RDDs by applying the computations they represent e.g. map(), filter(), reduceByKey() etc.

Spark supports lazy evaluation and when you apply the transformation on any RDD it will not perform the operation immediately. It will create a DAG(Directed Acyclic Graph) using 1) the applied operation, 2) source RDD and 3) function used for transformation. It will keep on building this graph using the references till you apply any action operation on the last lined up RDDs. That is why the transformations in Spark are lazy.

Transformations construct a new RDD from a previous one.

For example, we can build a simple Spark application for counting the words in a text file. Let’s write a simple wordcount.py application and run it using PySpark. This application accepts any text file and returns the number of words in the text, so it is basically a word counter.

import pyspark

import sys

from pyspark.sql import SparkSession

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: wordcount <file>", file=sys.stderr)
        sys.exit(-1)

    spark = SparkSession\
        .builder\
        .appName("PythonWordCount")\
        .getOrCreate()

    lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
    counts = lines.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(lambda x, y: x + y)
    output = counts.collect()
    for (word, count) in output:
        print("%s: %i" % (word, count))

    spark.stop()

A spark job of two stages needs to be created (we do not create stages manually but we define a pipeline for which the stages will be determined):

There are two types of transformations:
Narrow transformation — In Narrow transformation, all the elements that are required to compute the records in single partition live in the single partition of parent RDD. A limited subset of partition is used to calculate the result. Narrow transformations are the result of map(), filter().

Wide transformation — In wide transformation, all the elements that are required to compute the records in the single partition may live in many partitions of parent RDD. Wide transformations are the result of groupbyKey() and reducebyKey().

Spark Pair RDDs are nothing but RDDs containing a key-value pair. Basically, key-value pair (KVP) consists of a two linked data item in it. Here, the key is the identifier, whereas value is the data corresponding to the key value.
Moreover, Spark operations work on RDDs containing any type of objects. However key-value pair RDDs attains few special operations in it. Such as, distributed “shuffle” operations, grouping or aggregating the elements by a key.

The following Spark transformations accept input as an RDD consisting of single values.

Where the transformations below accept an input as a pair RDD consisting of key-value pairs.

Spark RDD Actions

Actions, on the other hand, compute a result based on an RDD, and either return it to the driver program or save it to an external storage system (e.g., HDFS).

The following actions are applied on RDDs which contains single values.

Action is one of the ways of sending data from Executer to the driver. Executors are agents that are responsible for executing a task. While the driver is a JVM process that coordinates workers and execution of the task. The following actions are applied on pair RDDs which contain key-value pairs.

Spark Context

The dataset for this demo is movies data and can be downloaded from the link attached to this document.

Import required packages

import pandas as pd
import pyspark 
import pymongo
import numpy as np
from pyspark.sql import SparkSession
from os.path import join

Create a SparkContext


# Import SparkSession
from pyspark.sql import SparkSession 

# Create SparkSession 
spark = SparkSession \
        .builder \
        .appName("my spark app") \
        .master("local[*]") \ # local[n] it uses n cores/threads for running spark job
        .getOrCreate()

sc = spark.sparkContext

# Or

# Import SparkContext and SparkConf
from pyspark import SparkContext, SparkConf

# Create SparkContext
conf = SparkConf() \
        .setAppName("my spark app")\
        .setMaster("local[*]")

sc = SparkContext(conf=conf)
spark = SparkSession.builder.config(conf).getOrCreate()
sc = spark.sparkContext

Here we are using the local machine as the resource manager with as many as threads/cores as it has.

A Spark Driver is an application that creates a SparkContext for executing one or more jobs in the cluster. It allows your Spark/PySpark application to access Spark Cluster with the help of Resource Manager.

When you create a SparkSession object, SparkContext is also created and can be retrieved using spark.sparkContext. SparkContext will be created only once for an application; even if you try to create another SparkContext, it still returns existing SparkContext.

Spark RDD

There are multiple ways to create RDDs in PySpark.

1. using parallelize() function

data = [(1,2,3, 'a b c'), (4,5,6, 'd e f'), (7,8,9,'g h i')]
rdd = sc.parallelize(data) # Spark RDD creation
# rdd = sc.parallelize(data, n) # n is minimum number of partitions

This function will split the dataset into multiple partitions. You can get number of partitions by using the function <rdd>.getNumParititions().

2. using textFile or wholeTextFiles functions

Read from a local file

path = "file:///data/movies.csv"

rdd = sc.textFile(path) # a spark rdd

The file movies.csv is uploaded to the local file system and stored in the folder /sparkdata.

Read from HDFS

path = "hdfs://localhost:9000/data/movies.csv"
# path = "/data/movies.csv"
    
rdd3 = sc.textFile(path) # a spark rdd

The file movies.csv is uploaded to HDFS and stored in the folder /data.

The function wholeTextFiles will read all files in the directory and each file will be read as a single row in the RDD whereas the function textFile will read each line of the file as a row in the RDD.

When we use parallelize() or textFile() or wholeTextFiles() methods of SparkContxt to initiate RDD, it automatically splits the data into partitions based on resource availability. when you run it on a local machine it would create partitions as the same number of cores available on your system.

Create empty RDD

rdd = sc.emptyRDD()

rdd = sc.paralellize([], 10)

This will create an empty RDD with 10 partitions.

Repartition and Coalesce

Sometimes we may need to repartition the RDD, PySpark provides two ways to repartition; first using repartition() method which shuffles data from all nodes also called full shuffle and second coalesce() method which shuffle data from minimum nodes, for examples if you have data in 4 partitions and doing coalesce(2) moves data from just 2 nodes.

rdd = sc.parallelize(range(1,100), 10)

print(rdd.getNumPartitions())
print(rdd.collect())

rdd2 = rdd.repartition(4)
print(rdd2.getNumPartitions())
print(rdd2.collect())

rdd3 = rdd.coalesce(4)
print(rdd3.getNumPartitions())
print(rdd3.collect())

Note that repartition() method is a very expensive operation as it shuffles data from all nodes in a cluster. Both functions return RDD.

PySpark RDD Transformations

Transformations are lazy operations, instead of updating an RDD, these operations return another RDD.

map and flatMap

map operation returns a new RDD by applying a function to each element of this RDD. flatMap applies the map operation and then flattens the RDD rows. We use this function when you the map operation returns a list of values and flattening will convert the list of list of values into list of values.

# Read file
rdd1 = sc.textFile("movies.csv")
rdd1.take(10)

# tokenize
rdd2 = rdd1.flatMap(lambda x : x.split(","))
rdd2.take(10)

# Or
# def f(x): 
#     return x.split(",")
# rdd2 = rdd1.flatMap(f)
# rdd2.take(10)

# Remove the additional spaces
rdd3 = rdd2.map(lambda x : x.strip())
rdd3.take(10)

take(k) is a Spark action and returns the first k elements of the RDD.

filter

Returns a new RDD after applying filter function on source dataset.

# Returns only values which are digits
rdd4 = rdd3.filter(lambda x : str(x).isdigit())

print(rdd4.count())

count is a Spark action and returns the number of elements in the RDD.

distinct

Returns a new RDD after eliminating all duplicated elements…

# Returns unique number of values which are only digits
rdd5 = rdd4.distinct()

print(rdd5.count())

sample

Return a sampled subset of this RDD.

rdd6 = rdd5.sample(withReplacement=False, fraction=0.6, seed=0)

print(rdd6.count())

randomSplit

Splits the RDD by the weights specified in the argument. For example rdd.randomSplit(0.7,0.3)

rdd7, rdd8 = rdd1.randomSplit(weights=[0.2, 0.3], seed=0)

print(rdd7.count())
print(rdd8.count())

mapPartitions and mapPartitionsWithIndex

mapPartitions is similar to map, but executes transformation function on each partition, This gives better performance than map function. mapPartitionsWithIndex returns a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition. They both should return a generator.

# a generator function
def f9(iter):
    for v in iter:
        yield (v, 1)

rdd9 = rdd3.mapPartitions(f9)

print(rdd9.take(10))

# a generator function
def f10(index, iter):
    for v in iter:
        yield (v, index)

rdd10 = rdd3.mapPartitionsWithIndex(f10)
print(rdd10.take(10))

sortBy and groupBy

# Compute the word-digit frequency in the file and show the top 10 words.

# Group by all tokens
rdd11 = rdd3.groupBy(lambda x : x)

# Calculate the length of the list of token duplicates
rdd12 = rdd11.map(lambda x: (x[0], len(x[1])))
# or
# rdd12 = rd11.mapValues(len)

# Sort the results
rdd13 = rdd12.sortBy(lambda x : x[1], ascending=False)

# Take the first elements of the RDD and display
print(rdd13.take(10))

sortByKey and reduceByKey

# Compute the digit frequency in the file and show the top 10 words.

# Get all digits
rdd14 = rdd3.filter(lambda x: x.isdigit())

# Initialize the counters
rdd15 = rdd14.map(lambda x : (x, 1))

# Aggregate the counters who have same key which is here a digit
rdd16 = rdd15.reduceByKey(lambda x, y : x+y)

# Sort the results
rdd17 = rdd16.sortBy(lambda x : x[1], ascending=False)

# Take the first elements of the RDD and display
print(rdd17.take(10))

PySpark RDD Actions

RDD Action operations return the values from an RDD to a driver program. In other words, any RDD function that returns non-RDD is considered as an action.

collect

returns the complete dataset as an Array.

Using the action on a large RDD is dangerous because it will collect all elements of the distributed RDD in one machine (the driver) and this can cause the machine to run out of memory since Spark is an in-memory processing engine.

max, min, first, top, take

max returns the maximum value from the dataset whereas min returns the minimum value from the dataset. first returns the first element in the dataset. top returns top n elements from the dataset (after sorting them). take returns the first n elements of the dataset.

The operation take in Spark RDD is the same as head in pandas DataFrame whereas top is interpreted as the first elements after sorting them.

count, countByValue

count returns the count of elements in the dataset.

There are other similar operations. countApprox(timeout, confidence=0.95) which is the approximate version of count() and returns a potentially incomplete result within a timeout, even if not all tasks have finished. countApproxDistinct(relative_accuracy) returns an approximate number of distinct elements in the dataset.

Note: These operations are used when you have very large dataset which takes a lot of time to get the count.

countByValueReturn a dictionary where the key represents each unique value in the dataset and the value represents count of each value present.

print(rdd3.countByValue())

reduce, treeReduce

reduce reduces the elements of the dataset using the specified binary operator. treeReduce reduces the elements of this RDD in a multi-level tree pattern. The output is the same.

result = rdd3 \
    .map(lambda x : 1) \
    .reduce(lambda x, y : x+y)

resultTree = rdd3 \
    .map(lambda x : 1) \
    .treeReduce(lambda x, y : x+y)

# You should get the same results as rdd3.count() operation
assert rdd3.count()==result==resultTree

Here I showed some of the operations, but you can find more in the documentation.

saveAsTextFile

Used to save the rdd to an external data store.

rdd3.saveAsTextFile("/root/myrdd")

RDD Persistence

Persistence is useful due to:

We have different levels for storage like memory, disk, serialized, unserialized, repliacted, unreplicated. You can check here for the avilable options.

RDD Cache

PySpark RDD cache() method by default saves RDD computation to storage level MEMORY_ONLY meaning it will store the data in the JVM heap as unserialized objects.

cachedRDD = rdd.cache()

cachedRDD.collect()

RDD Persist

PySpark persist() method is used to store the RDD to a specific storage level.

import pyspark

persistedRDD = rdd.persist(pyspark.StorageLevel.MEMORY_ONLY)

persistedRDD.collect()

RDD Unpersist

PySpark automatically monitors every persist() and cache() calls you make and it checks usage on each node and drops persisted data if not used or by using least-recently-used (LRU) algorithm. You can also manually remove using unpersist() method. unpersist() marks the RDD as non-persistent, and remove all blocks for it from memory and disk.

unpersistedRDD = persistedRDD.unpersist()

unpersistedRDD.collect()

Shuffling in Spark engine

Shuffling is a mechanism Spark to redistribute the data across different executors and even across machines. PySpark shuffling triggers when we perform certain transformation operations like gropByKey(), reduceByKey(), join() on RDDS.

Shuffling is an expensive operation since it involves the following:

For example, when we perform reduceByKey() operation, PySpark does the following:

  1. Spark engine firstly runs map tasks on all partitions which groups all values for a single key.
  2. The results of the map tasks are kept in memory.
  3. When results do not fit in memory, PySpark stores the data into a disk.
  4. PySpark shuffles the mapped data across partitions, some times it also stores the shuffled data into a disk for reuse when it needs to recalculate.
  5. Run the garbage collection
  6. Finally runs reduce tasks on each partition based on key.

PySpark RDD triggers shuffle and repartition for several operations like repartition(), coalesce(), groupByKey(), and reduceByKey().

Based on your dataset size, a number of cores and specific memory size, can benefit or harm the Spark shuffling. When you deal with less amount of data, you should typically reduce the number of partitions otherwise you will end up with many partitioned files with less number of records in each partition. which results in running many tasks with lesser data to process.

On other hand, when you have too much of data and having less number of partitions results in fewer longer running tasks and some times you may also get out of memory error.

Getting the right size of the shuffle partition is always tricky and takes many runs with different values to achieve the optimized number. This is one of the key properties to look for when you have performance issues on Spark jobs.

Shared Variables

When Spark executes transformation using map or reduce operations, It executes the transformations on a remote node by using the variables that are shipped with the tasks and these variables are not sent back to PySpark Driver hence there is no capability to reuse and sharing the variables across tasks. PySpark shared variables solve this problem using the below two techniques. PySpark provides two types of shared variables.

Broadcast variables

We can create broadcast variables using the function sc.broadcast. A broadcast variable created with SparkContext.broadcast(). Access its value through value.

v = sc.broadcast(range(1, 100))

print(v.value)

Accumulator variables

A shared variable that can be accumulated, i.e., has a commutative and associative add operation. Worker tasks on a Spark cluster can add values to an Accumulator with the += operator, but only the driver program is allowed to access its value, using value. Updates from the workers get propagated automatically to the driver program.

acc = sc.accumulator(0)

acc+=10
acc.add(10)

print(acc.value) # 20

Spark DataFrame

DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as structured data files, tables in Hive, external databases, or existing RDDs.

PySpark DataFrame is mostly similar to Pandas DataFrame with the exception PySpark DataFrames are distributed in the cluster (meaning the data in DataFrame’s are stored in different machines in a cluster) and any operations in PySpark executes in parallel on all machines whereas Panda Dataframe stores and operates on a single machine. Due to parallel execution on all cores on multiple machines, PySpark runs operations faster then pandas.

Each record in the dataframe is of type pyspark.sql.Row whereas each column is of type pyspark.sql.Column. There are multiple ways to create DataFrame in PySpark:

1. using createDataFrame() function

data = [(1,2,3, 'a b c'), (4,5,6, 'd e f'), (7,8,9,'g h i')]
df = spark.createDataFrame(data) # is a dataframe from a list

df.rdd # Convert Spark DataFrame into RDD
    
    
rdd = sc.parallelize(data)
df = spark.createDataFrame(rdd) # is a dataframe from an rdd

2. using toDF() function

rdd = sc.parallelize(data)

    
df = rdd.toDF() # From RDD to Dataframe

# from RDD to Dataframe with custom column names
df = rdd.toDF(["int1", "int2", "int3", "str"])


# from RDD to Dataframe without defining the schema (inferSchema option is true)
df = spark.createDataFrame(rdd) 
    
    


from pyspark.sql.types import *    
    
rdd = rdd.map(lambda t: Row(t._1, t._2, t._3))

schema = StructType([
    # StructField(<fieldname>, <fieldtype>, <nullability>)
    StructField("int1", IntegerType(), True),
    StructField("int2", IntegerType(), True),
    StructField("int3", IntegerType(), True),
    StructField("str", StringType(), True)
    ])

# From RDD to Dataframe with an explicit schema
df = spark.createDataFrame(rdd, schema) 

3. Read from a local file

path = "file:///sparkdata/movies.csv"

df1 = spark.read.format("csv") \
  .option("sep", ",") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .load(path)

df1 = spark.read.csv(path)

df1.show() # Display the dataframe
df1.printSchema() # print the schema of the dataframe

You can find more csv options here.

4. Read from MongoDB via PyMongo

You can put the file movies.csv in the moviesdb database inside the collection movies as follows.

mongoimport --db moviesdb --collection movies --type csv --headerline --file movies.csv
import pymongo

# The default configuration
# localhost:27017
client = pymongo.MongoClient()

db = client['moviesdb'] # client['<db_name>']

# A pymongo Cursor 
# db.<collection_name>
movies_cur = db.movies.find() # Get all documents

# Convert to Pandas DataFrame
df1 = pd.DataFrame(movies_cur)

from pyspark.sql.types import *

schema = StructType([
    # StructField(<fieldname>, <fieldtype>, <nullability>)
    StructField("Film", StringType(), True),
    StructField("Genre", StringType(), True),
    StructField("Lead Studio", StringType(), True),
    StructField("Audience score %", IntegerType(), True),
    StructField("Profitability", FloatType(), True),
    StructField("Rotten Tomatoes %", IntegerType(), True),
    StructField("Worldwide Gross", StringType(), True),
    StructField("Year", IntegerType(), True),
    StructField("_id", StringType(), True) # Special field for the documents in Mongodb
    ])

# Try to run spark.createDataFrame(movies_cur) 

# Convert immediately to Spark DataFrame
df3 = spark.createDataFrame(movies_cur, schema)
    
    
# Convert to RDD then to Spark DataFrame
df4 = spark.createDataFrame(sc.parallelize(movies_cur), schema) # Convert to Spark DataFrame



schema = StructType([
    StructField("Film", StringType(), True),
    StructField("Genre", StringType(), True),
    StructField("Lead Studio", StringType(), True),
    StructField("Audience score %", IntegerType(), True),
    StructField("Profitability", FloatType(), True),
    StructField("Rotten Tomatoes %", IntegerType(), True),
    StructField("Worldwide Gross", StringType(), True),
    StructField("Year", IntegerType(), True)
    ])
    
# Read from local file system with a schema
df3 = spark.read.csv(path, schema)

You can store the dataframe in Mongodb as follows.

df.write.format("com.mongodb.spark.sql.DefaultSource") \
    .option("uri", "mongodb://127.0.0.1:27017/db_name.collection_name") \
    .save()

Note: Make sure that you added Mongodb connector .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') to the configuration of SparkSession.

5. Read from HDFS

path = "hdfs://localhost:9000/data/movies.csv"
df = spark.read.load(path, format="csv", sep = ",", inferSchema = "true", header = "true") 
# a spark dataframe

# OR
df = spark.read.csv(path, sep = ",", inferSchema = "true", header = "true") 
# a spark dataframe

df.printSchema()
df.show(truncate=False)

The file movies.csv is uploaded to HDFS and stored in the folder /data.

StructType & StructField

StructType and StructField classes are used to programmatically specify the schema to the DataFrame and create complex columns like nested struct, array, and map columns. StructType is a collection of StructField’s that defines column name, column data type, boolean to specify if the field can be nullable or not and metadata.


from pyspark.sql.types import StructField, StructType, StringType, IntegerType

# A sample data
data = [ (("James","","Smith"),"36636","M",3100),
    (("Michael","Rose",""),"40288","M",4300),
    (("Robert","","Williams"),"42114","M",1400),
    (("Maria","Anne","Jones"),"39192","F",5500),
    (("Jen","Mary","Brown"),"","F",-1)
]


schema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('id', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])
 
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show(truncate=False)

Spark DataFrame Operations

show [Action]

show() is used to display the contents of the DataFrame in a Table Row and Column Format. By default, it shows only 20 Rows, and the column values are truncated at 20 characters.


df.show()
df.show(5)
df.show(5, truncate=False)
df.show(10, truncate=False, vertical=True)

collect [Action]

collect() is an action operation that is used to retrieve all the elements of the dataset (from all nodes) to the driver node. It retrieves all elements in a DataFrame as a list of Row type to the driver node. We should use collect() on smaller dataset usually after filter(), group() e.t.c. Retrieving larger datasets results in OutOfMemory error. You can use head operation to get only the first rows/records.

df.collect() # all elements
df.collect()[0] # first row
df.collect()[0][0] # first cell at first row and first column

Notice that collect on a big dataframe can cause performance and memory issues since this action collects the partitions of the dataframe from all cluster nodes to the memory of one machine. So we suggest more efficient methods as follows:

df.take(10) # first 10 elements
df.take(1)[0] # first row
df.take(1)[0][0] # first cell at first row and first column

select [Transformation]

select() function is used to select single, multiple, column by index, all columns from the list and the nested columns from a DataFrame. This function returns a DataFrame with the selected columns.

from pyspark.sql.functions import col
df.select("name", \
          "name.firstname", \
          df.id, \
          df['gender'], \
          col("salary")) \
.show()

df.select("*").show()

df.select([col for col in df.columns]).show()

df.select(df.columns[:2]).show()

withColumn, withColumnRenamed, drop [Transformation]

withColumn() is a transformation function of DataFrame which is used to change the value, convert the datatype of an existing column, create a new column, and many more.

# Read the data
path = "hdfs://localhost:9000/data/movies.csv"
df = spark.read.load(path, format="csv", sep = ",", inferSchema = "true", header = "true") 

# Print Schema
df.printSchema()
  1. Change the datatype of the column.
# Convert the `Worldwide Gross` column to double
# 1. Remove the $ sign
import pyspark.sql.functions as F
df.withColumn("Worldwide Gross", F.translate('Worldwide Gross', '$', '').cast("Double")).show(5)

df.withColumn("Worldwide Gross", F.col("Worldwide Gross").cast("Double"))

# You can merge the previous operations into one operation as shown below

  1. Update the values in a column
col_name = df.columns[3]
df2.withColumn(col_name, F.col(col_name)/100).show(5)

  1. Create a Column from an existing one
col_name = df2.columns[3]
df2.withColumn("score", F.col(col_name)/100).show(5)

  1. Add a New Column with fixed value
df2.withColumn("Country", F.lit("Russia")).show()
  1. Rename a column
df2.withColumnRenamed(df2.columns[3], "score").show(5)

  1. Drop a column
df2.drop("Year").show(5)

filter, where [Transformation]

PySpark filter() function is used to filter the rows from RDD/DataFrame based on the given condition or SQL expression, you can also use where() clause instead of the filter() if you are coming from an SQL background, both these functions operate exactly the same.

df2.filter((df2.Year == 2008) & (df2['Film'].startswith("Wh"))).show(5)

df2.filter(~F.col('Genre').isin(['Comedy', 'Drama'])).show(5)

distinct, dropDuplicates [Transformation]

PySpark distinct() function is used to drop/remove the duplicate rows (all columns) from DataFrame and dropDuplicates() is used to drop rows based on selected (one or multiple) columns.

print(df2.count() - df2.distinct().count())

df2.dropDuplicates(['Genre', 'Lead Studio']).show(5)

groupby [Transformation]

Similar to SQL GROUP BY clause, PySpark groupBy() function is used to collect the identical data into groups on DataFrame and perform count, sum, avg, min, max functions on the grouped data. When we perform groupBy() on PySpark Dataframe, it returns GroupedData object which contains aggregate functions. Some of them are avg, sum, min, max.

Notice that the aggregate functions are transformations and will return a DataFrame. You need to call an action to see the output of the aggregation.

  1. Total gross for each film genre.
df2.groupby("Genre").sum("Worldwide Gross").show()


2. Calculate the average score for audience and max gross for each film genre every year. Exclude elements whose max gross is less than 50.

df2.groupby("Genre", 'Year') \
    .agg(
        F.avg("Audience score %").alias("avg_score"), \
        F.max(df2.columns[6]).alias("max_gross")
).where(F.col("max_gross")>=50) \
.show(5)


# Equivalent SQL Query
# SELECT Genre, 
# Year, 
# avg("Audience score ") AS avg_score, 
# max("Worldwide Gross") AS max_score 
# FROM movies
# GROUP BY Genre, Year
# HAVING max_score >= 50

orderBy, sort [Transformation]

You can use either sort or orderBy function of PySpark DataFrame to sort DataFrame by ascending or descending order based on single or multiple columns, you can also do sorting using PySpark SQL sorting functions.
Example: Calculate the average score for audience and max gross for each film genre every year. Exclude elements whose max gross is less than 50.

df2.groupby("Genre", 'Year') \
    .agg(
        F.avg("Audience score %").alias("avg_score"), \
        F.max(df2.columns[6]).alias("max_gross")
).where(F.col("max_gross")>=50) \
.sort(F.col("max_gross").asc(), F.col("avg_score").desc()) \
.show(5)


# Equivalent SQL Query
# SELECT Genre, 
# Year, 
# avg("Audience score ") AS avg_score, 
# max("Worldwide Gross") AS max_score 
# FROM movies
# GROUP BY Genre, Year
# HAVING max_score >= 50
# ORDER BY max_gross asc, avg_score desc

Join

Join is used to combine two DataFrames and by chaining these you can join multiple DataFrames. it supports all basic join type operations available in traditional SQL like INNER, LEFT OUTER, RIGHT OUTER, LEFT ANTI, LEFT SEMI, CROSS, SELF JOIN.

df3 = df2.groupby("Genre", 'Year') \
    .agg(
        F.avg("Audience score %").alias("avg_score"), \
        F.max(df2.columns[6]).alias("max_gross")
).where(F.col("max_gross")>=50)


df3.join(df2, (df2.Genre==df3.Genre) & (df2.Year==df3.Year), how="inner").show(5)

UDF (User Defined Function)

PySpark UDF is the most useful feature of Spark SQL & DataFrame that is used to extend the PySpark built-in capabilities. I will show here the steps for creating UDF for capitalizing the first character in each word. Steps of creating UDFs are:

  1. Create a Python function.
def convertCase(s):
    resStr=""
    arr = s.split(" ")
    for x in arr:
        resStr =  resStr + x[0].upper() + x[1:len(x)] + " "
    return resStr 
  1. Convert a Python function to PySpark UDF
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
capitalizeUDF = F.udf(lambda x: convertCase(x),StringType())

# Since the default return type of the udf() is StringType, you can write it as follows
capitalizeUDF = F.udf(lambda x: convertCase(x))
  1. Use the UDF
df2.select("Film", capitalizeUDF(F.col("Film")).alias("Capitalized_Film")).show(5, truncate = False)

Note: UDFs are treated as a black box to Spark hence it can not apply optimization and you will lose all the optimization PySpark does on Dataframe/Dataset. We recommend to use UDFs only if you do not have them as built-in functions.

Save DataFrame to disk

In Spark, you can save (write/extract) a DataFrame to a CSV file on disk by using dataframeObj.write.csv(“path”). Using this you can also write DataFrame to HDFS, or any Spark supported file systems.

#Write DataFrame data to CSV file
df.write.csv("movies_df")

#You can also use below
df.write.format("csv").save("movies_df")
    
df.write.format("csv") \
    .option("header", "true") \
    .option("delimiter", ",") \
    .save("movies_df")
    
df.write.option("header","true") \
  .csv("hdfs://localhost:9000/movies_df")

Partitioning in Spark

PySpark partitionBy() is a function of pyspark.sql.DataFrameWriter class which is used to partition the large dataset (DataFrame) into smaller files based on one or multiple columns while writing to disk. Partitioning the data on the file system is a way to improve the performance of the query when dealing with a large dataset in the Data lake.

PySpark supports partition in two ways; partition in memory (DataFrame) and partition on the disk (File system).

  1. Partition in memory: You can partition or repartition the DataFrame by calling repartition() or coalesce() transformations. This is discussed above in RDD section.

  2. Partition on disk: While writing the PySpark DataFrame back to disk, you can choose how to partition the data based on columns using partitionBy() of pyspark.sql.DataFrameWriter. This is similar to Hives partitions scheme.

Some advantages of partitions are: a) Fast access to the data, b) The ability to perform an operation on a smaller dataset.

# We can store the dataframe in the disk in partitions based on the values of Genre column.
df.write.option("header",True) \
  .partitionBy("Genre") \
  .csv("movies_df")

# Read only a specific parition `Genre=Animation` of the dataframe.
df = spark.read.csv("movies_df/Genre=Animation", header=True, sep=",")

Save modes

Save operations can optionally take a SaveMode, that specifies how to handle existing data if present. It is important to realize that these save modes do not utilize any locking and are not atomic. Additionally, when performing an Overwrite, the data will be deleted before writing out the new data. Options include:

  1. append: Append contents of this DataFrame to existing data.
  2. overwrite: Overwrite existing data.
  3. error or errorifexists: Throw an exception if data already exists. (default option)
  4. ignore: Silently ignore this operation if data already exists.
# Save modes
df.write.format("csv") \
    .mode("append") \
    .option("delimiter", "|") \
    .save("/tmp/spark_output/datacsv")

df.write.format("csv") \
    .option("mode","append") \
    .option("delimiter", "|") \
    .save("/tmp/spark_output/datacsv")

Spark Dataset

Spark Dataset is an interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. Spark Dataset API is supported in statically typed languages like Java and Scala since Spark Datasets rely heavily on static typing. A DataFrame is a Dataset organized into named columns. Python is a dynamically typed language, it still has access to Spark’s DataFrame API, which offers similar functionality as Datasets.

Spark SQL

It is a module used for structured data processing. Spark SQL allows you to query structured data using either SQL or DataFrame API.

The pyspark.sql is a module in Spark that is used to perform SQL-like operations on the data stored in memory. You can either leverage using programming API to query the data or use the ANSI SQL queries similar to RDBMS. You can also mix both, for example, use API on the result of an SQL query.

Spark SQL is one of the most used Spark modules for processing structured columnar data format. Once you have a DataFrame created, you can interact with the data by using SQL syntax. In other words, Spark SQL brings native RAW SQL queries on Spark meaning you can run traditional ANSI SQL on Spark Dataframe.

In order to use SQL, first, register a temporary table/view on DataFrame using the createOrReplaceTempView() function. Once created, this table can be accessed throughout the SparkSession using sql() and it will be dropped along with your SparkContext termination. Use sql() method of the SparkSession object to run the query and this method returns a new DataFrame

Data Description

This dataset includes 44,341 results of international football matches starting from the very first official match in 1872 up to 2022. The matches range from FIFA World Cup to FIFI Wild Cup to regular friendly matches. The matches are strictly men’s full internationals and the data does not include Olympic Games or matches where at least one of the teams was the nation’s B-team, U-23 or a league select team.

results.csv includes the following columns:

For the dataset of scorers and shootouts you can check this Kaggle data card.

Spark SQL Examples

Here we will use the dataset

  1. Create SQL View

    • Load the data and read the results dataframe.
      from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DateType
      
      schema = StructType([
          StructField("date", DateType(), False),
          StructField("home_team", StringType(), False),
          StructField("away_team", StringType(), False),
          StructField("home_score", IntegerType(), False),
          StructField("away_score", IntegerType(), False),
          StructField("tournament", StringType(), False),
          StructField("city", StringType(), False),
          StructField("country", StringType(), False),
          StructField("neutral", BooleanType(), False),
      ])
      
      # You can also use spark.read.csv function
      df = spark.read.format("csv").load("results.csv", header = True, schema = schema)
      df
      
    • Creat the temporary view.
      df.createOrReplaceTempView("results_table")
      
  2. Spark SQL to Select Columns

    // DataFrame API Select query
    df.select("home_team","city","country","tournament") 
         .show(5)
    
    // SQL Select query
    spark.sql("SELECT home_team, city, country, tournament FROM RESULTS_TABLE") 
         .show(5)
    
  3. Filter Rows
    To filter the rows from the data, you can use where() function from the DataFrame API.

    // DataFrame API where()
    df.select("country","city","home_team","tournament") 
      .where("city == 'Moscow'") 
      .show(5)
    

    Similarly, in SQL you can use WHERE clause as follows.

    // SQL where
    spark.sql(""" SELECT  country, city, home_team, tournament FROM RESULTS_TABLE 
              WHERE city = 'Moscow' """) 
         .show(5)
    
    
  4. Sorting

    // sorting
    df.select("country","city","home_team","tournament") 
      .where("city in ('London','Paris','Moscow')") 
      .orderBy("city")
      .show(10)
    
    
    // SQL ORDER BY
    spark.sql(""" SELECT  country, city, home_team, tournament FROM RESULTS_TABLE 
              WHERE city in ('London','Paris','Moscow') order by city """) 
         .show(10)
    
    
  5. Grouping

// grouping
df.groupBy("city").count() 
  .show()
    
// SQL GROUP BY clause
spark.sql(""" SELECT city, count(*) as count FROM RESULTS_TABLE 
          GROUP BY city""") 
     .show()
  1. SQL Join Operations

PySpark SQL join has a below syntax and it can be accessed directly from DataFrame.

join(self, other, on=None, how=None)

join() operation takes parameters as below and returns DataFrame.

You can also write Join expression by adding where() and filter() methods on DataFrame and can have Join on multiple columns.

emp = [(1,"Smith",-1,"2018","10","M",3000), \
    (2,"Rose",1,"2010","20","M",4000), \
    (3,"Williams",1,"2010","10","M",1000), \
    (4,"Jones",2,"2005","10","F",2000), \
    (5,"Brown",2,"2010","40","",-1), \
      (6,"Brown",2,"2010","50","",-1) \
  ]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
       "emp_dept_id","gender","salary"]

empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.printSchema()
empDF.show(truncate=False)

dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")

# Join in pyspark.sql.DataFrame API

empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"inner") \
     .show(truncate=False)


# SQL INNER JOIN
joinDF = spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id") \
  .show(truncate=False)

# SQL INNER JOIN
joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \
  .show(truncate=False)

# Left Join in pyspark.sql.DataFrame API
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id, "left") \ 
    .show(truncate=False)
    
    
# SQL LEFT JOIN
joinDF = spark.sql("select * from EMP e LEFT OUTER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \
  .show(truncate=False)

# Right Join in pyspark.sql.DataFrame API
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id, "right") \ 
    .show(truncate=False)
    
    
# SQL RIGHT JOIN
joinDF = spark.sql("select * from EMP e RIGHT OUTER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \
  .show(truncate=False)

# Full Join in pyspark.sql.DataFrame API
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id, "full") \ 
    .show(truncate=False)
    
    
# SQL FULL JOIN
joinDF = spark.sql("select * from EMP e FULL OUTER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \
  .show(truncate=False)

You can read about Anti-joins, semi-joins and unions from here

References