
Deep dive into the Hadoop MapReduce framework
The story of Hadoop started with HDFS and MapReduce. Hadoop version 1 has the basic features for storing and processing data over a distributed platform and since then it has evolved a lot. Hadoop version 2 added major changes, such as NameNode, high availability, and a new resource management framework called YARN. However, the high-level flow for MapReduce processing did not change despite various changes in its API.
MapReduce consists of two major steps: map and reduce, and multiple minor steps that are part of the process flow from map to reduce tasks. The mappers are responsible for performing map tasks while reducers are responsible for the reduce tasks. The job of the mapper is to process the blocks stored on HDFS, like the distributed storage system. Let's us look at the following MapReduce flow diagram:

We will understand the processing flow as follows:
- InputFileFormat: The MapReduce process starts with reading the file stored on HDFS. These files can be of any specific type, such as Text, Avro, and so on. The processing of the file is controlled by InputFormat. There are multiple implementations for InputFormat. One such implementation is TextInputFormat. The abstract InputFormat class looks as follows:
public abstract class InputFormat<K, V>
{
public abstract List<InputSplit> getSplits(JobContext
context) throws IOException, InterruptedException;
public abstract RecordReader<K, V>
createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException,
InterruptedException;
}
- RecordReader and input split: The input file is divided into chunks and these chunks are known as input split. The input split is nothing but the individual chunks of the file and the size is controlled by the mapred.max.split.size and mapred.min.split.size parameters. By default, the input split size is the same as block size and you must not change this unless required for a specific case. For non-splittable file formats such as .gzip, the input split will be equal to the size of a single .gzip file, which means that if there are 12 .gzip files then there will be 12 input splits and for each input split there will be one mapper launched to process it.
The RecordReader function is responsible for reading data from the input splits stored on HDFS. The default input file format is TextInputFileFormat and the RecordReader delimiter is /n, which means the one line will be treated as one record by the RecordReader. Remember you can always customize the behavior of the RecordReader by passing the implementation of your own RecordReader.
The RecordReader knows how to read records from the input split. By default, the RecordReader reads a record with a new line record delimiter for TextInputFileFormat. However, you can modify the behavior of RecordReader by passing your own implementation. The RecordReader reads the record and passes it to mapper. Mapper: The Mapper class is responsible for processing the input split. The RecordReader function reads the record from input split and passes each record to the map function of the mapper. The mapper contains the map method, which takes input from RecordReader and processes the record. The map function gets executed for each record, which means if one input split has 100 records, then the map function will be executed 100 times.
The mapper also contains the setup and cleanup methods. The setup method gets executed before the mapper starts processing the records for input split, thus any initialization operation, such as reading from distributed cache and initializing connection, should be done inside the setup method. The cleanup method gets executed once all records in the input split have been processed and thus any cleanup operation should be performed inside this method.The Mapper processes the record and emits the output using the context object. The context object enables Mapper and Reducer to interact with other Hadoop systems, such as availing configuration to mappers and reducers, writing mappers and reducers emitted records to file, and so on. It also enables communication between Mapper, Combiner, and Reducer. The Mapper class looks as follows:
import org.apache.Hadoop.io.IntWritable;
import org.apache.Hadoop.io.LongWritable;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Mapper;
import java.io.IOException;
public class DemoMapper extends Mapper<LongWritable,
Text, Text, IntWritable> {
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
}
@Override
protected void map(LongWritable key, Text value, Context
context) throws IOException, InterruptedException {
//Record Processing Logic Here
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
super.cleanup(context);
}
}
- Partitioner: The job of a Partitioner is to assign a partition number to the record emitted by mapper so that records with the same key always get the same partition number, which ensures the records with the same key will always go to the same reducer. For each record, there is a specific partition index associated and the value of the partition index is calculated inside. mapper Context.write(). The general formula of partition index is
partitionIndex = (key.hashCode() & Integer.MAX_VALUE) % numReducers - Shuffling and sorting: The process of transferring data from mapper to reducer is known as shuffling. The reducer launches threads to read data from the mapper machine and reads all the partitions that belong to them for processing using the HTTP protocol. The different mappers may have the record for the same and thus the reducer merge sorts the records by key. The shuffling and sorting phases occur in parallel, which means while outputs are being fetched, they are merged so that the reducer receives multiple records for the same key in a list as a value.
- Reducer: The number of reducers that the Hadoop framework can launch depends on the number of map outputs and various other parameters, but we can also control the number of reducers that can be launched. The formula for calculating the number of reducers is as follows:
1.75 * no. of nodes * mapred.tasktracker.reduce.tasks.maximum.
The Reducer contains reduce(), which gets executed for each unique key emitted by mappers, as follows:
import org.apache.Hadoop.io.IntWritable;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Reducer;
import java.io.IOException;
public class DemoReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
}
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
super.reduce(key, values, context);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
super.cleanup(context);
}
}
- Combiner: The combiner, also known as mini reducer or localized reducer, runs on the mappers machine. It takes the intermediate key emitted by the mapper and applies the user-defined reduce function of the combiner on the same machine. For each mapper, there will be one combiner available on the mappers machine. The combiner significantly reduces the amount of data shuffling from mapper to reducer and thus helps in performance improvement. The combiners are not guaranteed to be executed.
- Output format: The output format translates the reduce functions key/value pair output and record writer writes it to the file on HDFS. By default, the key values of the output are separated by tab and records are separated by a newline character. The output format translates the final key/value pair from the reduce function and writes it out to a file by a record writer. By default, it will separate the key and value with a tab and separate records with a newline character. The default behavior can be changed by implementing your own output format.
We have covered the important components of the MapReduce framework. We will examine how execution flow works in the upcoming sections. The idea is to give you enough understanding of the framework. To make any significant changes, you must know where the impact of changes will be.