Course: Big Data - IU S25
Author: Firas Jolha
Hadoop MapReduce is a software framework for easily writing applications which process vast amounts of data (multi-terabyte data-sets) in-parallel on large clusters (thousands of nodes) of commodity hardware in a reliable, fault-tolerant manner.
A MapReduce job usually splits the input data-set into independent chunks which are processed by the map tasks in a completely parallel manner. The framework sorts the outputs of the maps, which are then input to the reduce tasks. Typically both the input and the output of the job are stored in a file-system. The framework takes care of scheduling tasks, monitoring them and re-executes the failed tasks.
The MapReduce framework version 1 consists of a single master JobTracker and one slave TaskTracker per cluster-node. The master is responsible for scheduling the jobs’ component tasks on the slaves, monitoring them and re-executing the failed tasks. The slaves execute the tasks as directed by the master. Both processes are now deprecated in MRv2 (or Hadoop version 2) and replaced by Resource Manager, Application Master and Node Manager Daemons.
The Hadoop MapReduce framework versions 2 and 3 (running on YARN) consists of a single master ResourceManager, one worker NodeManager per cluster-node, and MRAppMaster per application.
Typically the compute nodes and the storage nodes are the same, that is, the MapReduce framework and the Hadoop Distributed File System are running on the same set of nodes. This configuration allows the framework to effectively schedule tasks on the nodes where data is already present, resulting in very high aggregate bandwidth across the cluster.
A classic example of showing the MapReduce framework at work is the word count example. The following are the various stages of processing the input data, first splitting the input across multiple worker nodes and then finally generating the output, the word counts:
A MapReduce job usually splits the input data-set into independent chunks which are processed by the map tasks in a completely parallel manner. The framework sorts the outputs of the maps, which are then input to the reduce tasks. Typically both the input and the output of the job are stored in a file-system. The framework takes care of scheduling tasks, monitoring them and re-executes the failed tasks.
Regardless of Hadoop Map-Reduce, Map-reduce is a programming model that has its roots in functional programming. In addition to often producing short, elegant code for problems involving lists or collections, this model has proven very useful for large-scale highly parallel data processing. In Python, we can define a simple map-reduce program as follows.
from functools import reduce
from operator import add, mul
def mapper(x):
return mul(x, x)
inp = list(range(1,100)) # input is [1, 2, 3, ..., 99]
res = map(mapper, inp) # returns iterator over [1*1, 2*2, 3*3, ..., 99*99]
output = reduce(add, res) # computes (((((1+4)+9)+16)+25)...+9801)
print(output) # output is 328350
Minimally, applications specify the input/output locations and supply map and reduce functions via implementations of appropriate interfaces and/or abstract-classes. These, and other job parameters, comprise the job configuration. Although the Hadoop framework is implemented in Java™, MapReduce applications need not be written in Java. Here we will use Python language to write MapReduce apps.
Hadoop Streaming is a utility which allows users to create and run Map-Reduce jobs with any executables (e.g. shell utilities) as the mapper and/or the reducer.
The Hadoop job client then submits the job and configuration to the ResourceManager which then assumes the responsibility of distributing the software/configuration to the workers, scheduling tasks and monitoring them, providing status and diagnostic information to the job-client.
The MapReduce framework operates exclusively on <key, value> pairs, that is, the framework views the input to the job as a set of <key, value> pairs and produces a set of <key, value> pairs as the output of the job, conceivably of different types.
Input and Output types of a MapReduce job are:
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)
Mapper maps input key/value pairs to a set of intermediate key/value pairs. Maps are the individual tasks that transform input records into intermediate records. The transformed intermediate records do not need to be of the same type as the input records. A given input pair may map to zero or many output pairs.
The number of maps is usually driven by the total size of the inputs, that is, the total number of blocks of the input files. The right level of parallelism for maps seems to be around 10-100 maps per-node, although it has been set up to 300 maps for very cpu-light map tasks. Task setup takes a while, so it is best if the maps take at least a minute to execute. Thus, if you expect 10TB of input data and have a blocksize of 128MB, you’ll end up with 82,000 maps.
Reducer reduces a set of intermediate values which share a key to a smaller set of values. Reducer has 3 primary phases: shuffle, sort and reduce. The input to the Reducer is the sorted output of the mappers. In this phase the framework fetches the relevant partition of the output of all the mappers, via HTTP. The framework groups Reducer inputs by keys (since different mappers may have output the same key) in this stage. The shuffle and sort phases occur simultaneously; while map-outputs are being fetched they are merged.
The right number of reduces seems to be 0.95 or 1.75 multiplied by (<no. of nodes> * <no. of maximum containers per node>). With 0.95 all of the reduces can launch immediately and start transferring map outputs as the maps finish. With 1.75 the faster nodes will finish their first round of reduces and launch a second wave of reduces doing a much better job of load balancing. Increasing the number of reduces increases the framework overhead, but increases load balancing and lowers the cost of failures. The scaling factors above are slightly less than whole numbers to reserve a few reduce slots in the framework for speculative-tasks and failed tasks. It is legal to set the number of reduce-tasks to zero if no reduction is desired.
Partitioner partitions the key space. Partitioner controls the partitioning of the keys of the intermediate map-outputs. The key (or a subset of the key) is used to derive the partition, typically by a hash function. The total number of partitions is the same as the number of reduce tasks for the job. Hence this controls which of the reduce tasks the intermediate key (and hence the record) is sent to for reduction. HashPartitioner is the default Partitioner. You can start the MapReduce history server as follows.
mapred --daemon start historyserver
You can access the web UI of this server hosted on (http://localhost:19888) where the default port is 19888.
This is the configuration file for MapReduce in Hadoop. This file exists in Hadoop home directory in the folder etc/hadoop
. You can find the documentation of this file from the official website. Here I present some of the important properties to set as follows.
mapreduce.framework.name
mapreduce.job.maps
mapreduce.job.reduces
mapreduce.map.memory.mb
mapreduce.reduce.memory.mb
mapreduce.jobhistory.address
host:port
. The default value 0.0.0.0:10020
.mapreduce.jobhistory.webapp.address
host:port
. The default value is 0.0.0.0:19888
.Here we will build a simple word counter program and run it on Hadoop Map-Reduce engine. We need to prepare two modules/files, one is mapper.py
and another is reducer.py
, then we pass the files to Hadoop MapReduce via Hadoop Streaming utility. The input to the mapper
will be read from STDIN
and the output will be written to STDOUT
which will be input to the reducer
whose output will be written to STDOUT
.
# app/mapreduce/wordcount/mapper.py
import sys
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = line.split()
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited; the trivial word count is 1
print f'{word}\t1'
# app/mapreduce/wordcount/reducer.py
from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split('\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
except ValueError:
# count was not a number, so silently
# ignore/discard this line
continue
# this IF-switch only works because Hadoop sorts map output
# by key (here: word) before it is passed to the reducer
if current_word == word:
current_count += count
else:
if current_word:
# write result to STDOUT
print f'{current_word}\t{current_count}'
current_count = count
current_word = word
# do not forget to output the last word if needed!
if current_word == word:
print f'{current_word}\t{current_count}'
We can test the source code on a simple input text as follows:
# Testing the mapper
printf "hello map reduce world map \n reduce hadoop engine" | python3 /app/mapreduce/wordcount/mapper.py
# Testing the reducer
printf "hello map reduce world map \n reduce hadoop engine" | python3 /app/mapreduce/wordcount/mapper.py | sort -k 1 | python3 /app/mapreduce/wordcount/reducer.py
We can also pass a file to the program as follows:
cat /app/data/story.txt | \
python3 /app/mapreduce/wordcount/mapper.py | \
sort -k 1 | \
python3 /app/mapreduce/wordcount/reducer.py
Let’s download some text files and run the MapReduce application on Hadoop MapReduce engine as follows:
https://www.gutenberg.org
)# Here I will download Plain text files
wget https://www.gutenberg.org/ebooks/20417.txt.utf-8 -P /app/data/gutenberg/
wget https://www.gutenberg.org/ebooks/20418.txt.utf-8 -P /app/data/gutenberg/
wget https://www.gutenberg.org/ebooks/20419.txt.utf-8 -P /app/data/gutenberg/
wget https://www.gutenberg.org/ebooks/20420.txt.utf-8 -P /app/data/gutenberg/
hdfs dfs -put /app/data/gutenberg /
hdfs dfs -ls /gutenberg
mapred streaming \
-files mapper.py,reducer.py \
-mapper 'python3 mapper.py' \
-reducer 'python3 reducer.py' \
-input /gutenberg/* -output /gutenberg/gutenberg.output
hdfs dfs -ls /gutenberg/gutenberg.output
hdfs dfs -cat /gutenberg/gutenberg.output/part-00000
You can change the number of output partitions using the option -D mapred.reduce.tasks=<number>
as follows.
mapred streaming \
-D mapred.reduce.tasks=5 \
-files mapper.py,reducer.py \
-mapper 'python3 mapper.py' \
-reducer 'python3 reducer.py' \
-input /gutenberg/* -output /gutenberg/gutenberg.output
An Inverted Index is a data structure used in information retrieval systems to efficiently retrieve documents or web pages containing a specific term or set of terms. In an inverted index, the index is organized by terms (words), and each term points to a list of documents or web pages that contain that term.
Inverted indexes are widely used in search engines, database systems, and other applications where efficient text search is required. They are especially useful for large collections of documents, where searching through all the documents would be prohibitively slow.
Here we will build a MapReduce application for creating the inverted index for input documents where each input line is considered here as a single document.
# app/mapreduce/inverted_index/mapper.py
import sys
i = 1
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = line.split()
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
# We are storing here the line number as document identifier
# where lines start from 1
print(f'{word}\t{i}')
i+=1
# app/mapreduce/inverted_index/reducer.py
import sys
key = None
total = 0
index = {}
for line in sys.stdin:
word, loc = line.split('\t', 1)
word = word.strip()
loc = loc.strip()
if word in index:
index[word].add(loc)
else:
index[word] = set([loc])
for word, docs in index.items():
print('{}\t{}'.format(word, docs))
We can test the source code on a simple input text as follows:
# Testing the mapper
printf "hello map reduce world map \n reduce hadoop engine \n world engine hello" | python3 /app/mapreduce/inverted_index/mapper.py
# Testing the reducer
printf "hello map reduce world map \n reduce hadoop engine \n world engine hello" | python3 /app/mapreduce/inverted_index/mapper.py | sort -k 1 | python3 /app/mapreduce/inverted_index/reducer.py
We can run this MapReduce job on gutenberg
data as follows.
cd /app/mapreduce/inverted_index
mapred streaming -files mapper.py,reducer.py -mapper 'python3 mapper.py' -reducer 'python3 reducer.py' -input /gutenberg/* -output /gutenberg/gutenberg.output1
Check the output in HDFS.
hdfs dfs -ls /gutenberg/gutenberg.output1
hdfs dfs -cat /gutenberg/gutenberg.output1/part-00000
You can change the number of output partitions by changing the number of reducers in the file mapred-site.xml
or you can set it in the command as follows.
mapred streaming -D mapreduce.job.reduces=3
s mapper.py,reducer.py -mapper 'python3 mapper.py' -reducer 'python3 reducer.py' -input /gutenberg/* -output /gutenberg/gutenberg.output3
This will create 3 reducers for the job and the output in HDFS will be as follows.
hdfs dfs -ls /gutenberg/gutenberg.output3
# output
# -rw-r--r-- 1 root supergroup 0 2025-03-11 06:03 /gutenberg/gutenberg.output3/_SUCCESS
# -rw-r--r-- 1 root supergroup 4087036 2025-03-11 06:03 /gutenberg/gutenberg.output3/part-00000
# -rw-r--r-- 1 root supergroup 4236668 2025-03-11 06:03 /gutenberg/gutenberg.output3/part-00001
# -rw-r--r-- 1 root supergroup 4545167 2025-03-11 06:03 /gutenberg/gutenberg.output3/part-00002
When an application wants to run, the client launches the ApplicationMaster, which then negotiates with the ResourceManager to get resources in the cluster in the form of containers. A container represents CPUs (cores) and memory allocated on a single node to be used to run tasks and processes. Containers are supervised by the NodeManager and scheduled by the ResourceManager.
The fundamental idea of YARN is to split up the functionalities of resource management and job scheduling/monitoring into separate daemons. The idea is to have a global ResourceManager (RM) and per-application ApplicationMaster (AM). An application is either a single job or a DAG of jobs.
The ResourceManager and the NodeManager form the data-computation framework. The ResourceManager is the ultimate authority that arbitrates resources among all the applications in the system. The NodeManager is the per-machine framework agent who is responsible for containers, monitoring their resource usage (cpu, memory, disk, network) and reporting the same to the ResourceManager/Scheduler.
The per-application ApplicationMaster is, in effect, a framework specific library and is tasked with negotiating resources from the ResourceManager and working with the NodeManager(s) to execute and monitor the tasks.
The ResourceManager has two main components: Scheduler and ApplicationsManager. The Scheduler is responsible for allocating resources to the various running applications subject to familiar constraints of capacities, queues etc. The current schedulers such as the CapacityScheduler and the FairScheduler would be some examples of plug-ins. The ApplicationsManager is responsible for accepting job-submissions, negotiating the first container for executing the application specific ApplicationMaster and provides the service for restarting the ApplicationMaster container on failure. The per-application ApplicationMaster has the responsibility of negotiating appropriate resource containers from the Scheduler, tracking their status and monitoring for progress.
Hadoop Yarn is included within Hadoop installation and you can start its components as follows.
# Starting YARN
start-yarn.sh
# Starting YARN + HDFS
start-all.sh
You can also start each component individually. For instance you can start the resource manager as follows:
yarn --daemon start resourcemanager
Similarly you can start the node manager as follows. Notice that you should run this on the slave nodes and not the master node.
yarn --daemon start nodemanager # On slave node
# Using worker mode
yarn --workers --daemon start nodemanager # On master node
Note: If you want to add a new node to the YARN cluster, you need to add its domain to $HADOOP_HOME/etc/hadoop/workers
file.
You can access the web UI for YARN resource manager via (http://localhost:8088).
yarn
toolThis tool allows you to manage the YARN cluster. Here I present some of the important functions that it can do.
yarn version
yarn top
top
-like window for monitoring the apps on the cluster.yarn app|application
yarn app -list
yarn app -list -appStates <States>
yarn app -list -appStates RUNNING,KILLED
yarn app -status <Application Name or ID>
yarn app -kill <Application ID>
You can run a MapReduce job in the background using the option -background
as follows.
mapred streaming -D mapreduce.framework.name=yarn -files mapper.py,reducer.py -background -mapper 'python3 mapper.py' -reducer 'python3 reducer.py' -input /gutenberg/* -output /gutenberg/gutenberg.output
yarn-site.xml
This file stores the configuration of the YARN cluster. You can check the official resource for more info about the properties. Here I present some of them.
yarn.resourcemanager.scheduler.class
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler
.yarn.scheduler.minimum-allocation-mb
yarn.scheduler.maximum-allocation-mb
yarn.scheduler.minimum-allocation-vcores
yarn.scheduler.maximum-allocation-vcores
yarn.resourcemanager.cluster-id
yarn.nodemanager.resource.memory-mb
yarn.nodemanager.resource.cpu-vcores
yarn.nodemanager.resource.percentage-physical-cpu-limit
yarn.nodemanager.webapp.address
capacity-scheduler.xml
This file is the configuration file for the CapacityScheduler. The CapacityScheduler has a predefined queue called root. All queues in the system are children of the root queue. The configuration for CapacityScheduler uses a concept called queue path to configure the hierarchy of queues. The queue path is the full path of the queue’s hierarchy, starting at root, with . (dot) as the delimiter.
Here is an example with three top-level child-queues a, b and c and some sub-queues for a and b:
<property>
<name>yarn.scheduler.capacity.root.queues</name>
<value>a,b,c</value>
<description>The queues at the this level (root is the root queue).
</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.a.queues</name>
<value>a1,a2</value>
<description>The queues at the this level (root is the root queue).
</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.b.queues</name>
<value>b1,b2,b3</value>
<description>The queues at the this level (root is the root queue).
</description>
</property>
Next we need to define the capacity for each queue in the hierarchy. For example, for the queue whose path root.a.a1
the capacity is 50% of the capacity of queue root.a
.
<property>
<name>yarn.scheduler.capacity.root.a.a1.capacity</name>
<value>50</value>
<description>Default queue target capacity.</description>
</property>