# A Typical Large Data Problem

Any project that deals with big data can usually be broken down into the following steps:

1. Iterate over a large number of records
2. Extract something of interest from each (**map**)
3. Shuffle and sort intermediate results
4. Aggregate intermediate results (**reduce**)
5. Generate final output

The key idea is to provide a functional abstraction for these tasks and allow the programmer to focus on writing just the **map** and **reduce** programs.

## Anatomy of a Hadoop Job

A MapReduce program in Hadoop is called a Hadoop job.

* Jobs are divided into map and reduce tasks
* An instance of the running a task is called a **task attempt**
* Multiple jobs can be composed into a **workflow**

The **job submission process** is as follows:

* Client (i.e., driver program) creates a job, configures it, and submits it to the JobTracker
* JobClient computes input splits (on client end)
* Job data (jar, configuration XML) are sent to JobTracker
* JobTracker puts job data in a shared location, enqueues tasks
* TaskTrackers poll for tasks

![](/files/-M5-0WGqVX5LH2OqNSPn)

The above image depicts the entire workflow of a job in Hadoop.

* **InputSplits**: describe how to read an input file and convert it into usable records. It is especially helpful in the case of JSON files. JSON files have multi-line records enclosed within curly braces { }. We know that HDFS will split the input file into multiple blocks before the InputSplit can receive it. It might happen that the InputSplit receives a block with an incomplete record (i.e. has '{' but no '}'). In such a case, the InputSplit can request the next block from the appropriate InputSplit that has the required block. Note that the InputSplit presents a *byte-oriented* view of the data. This data is sent to the RecordReaders.
* **RecordReaders**: are responsible for converting the output of the InputSplit into a *record-oriented* view. These records are then sent to the mappers.
* **Mappers**: As discussed earlier, the mapper generates tuples ((key, value) pairs) that are then sent to the partitioners. Note that Hadoop can decide the number of mappers required based on the number of file splits/chunks/blocks; basically one map task is spawned per InputSplit.
* **Partitioners**: decide which keys are to be processed by which reducers. They do so using a hash function. HashPartitioner is the default partitioner.
* **Reducers**: perform aggregation to reduce the number of tuples. Note that the number of reducers is to be specified by the MapReduce programmer using Job.setNumReduceTasks(int) (default=1). We can also have 0 reducers, depending on the job to be performed. The optimal number of reducers, however, ranges between 0.95 and 1.75 times the number of nodes. With 0.95, all of the reducers can launch immediately and start transferring mapper outputs as the maps finish. With 1.75, the faster nodes will finish their first round of reduces and launch a second wave of reducers doing a much better job of load balancing.
* **RecordWriters**: write records to the output file.

**Note**: If we need to write out own InputSplit, we must implement the InputSplit interface. FileSplit is the default InputSplit.

### Shuffle and Sort in Hadoop

**tl;dr**

**Shuffling** refers to the fetching of relevant mapper outputs and **sorting** is the grouping of mapper outputs by key, to prepare the input for the reducers. The shuffle and sort phases occur simultaneously i.e. while mapper outputs are being fetched, they are merged.

**Detailed Explanation**:

**Shuffling** is the process by which intermediate output from mappers is transferred to the reducers. It occurs as follows:

* Mapper outputs are first transferred to an in-memory buffer
* When the buffer reaches a threshold, its contents are *spilled* to the disk
* Spills are then merged in a single partitioned file (sorted within each partition) and sent to the reducers

**Sorting** is a multi-pass merge of mapper outputs. It occurs as follows:

* First, mapper outputs are copied to the reducer machines
* Then, merging of mapper outputs is performed (based on the keys)
* Then, every reducer obtains all values associated with the same key

Sorting therefore helps the reducers to easily distinguish when a new reduce task should start. This saves time for the reducers. Reducers simply start a new reduce task when the next key in the sorted input data is different from the previous key.

The diagram below depicts the shuffling and sorting phases in a MapReduce job.

![](/files/-M5-0WGurvGOQxWhxTd6)

*It is important to note that shuffling and sorting will not be performed at all if we specify zero reducers. In such a case, the MapReduce job stops at the map phase itself.*

Another diagram that depicts shuffle and sort operations:

![](/files/-M5-0WGwt5jXQRcCLmrG)

## Hadoop Workflow

The following image depicts the steps involved in executing a job in Hadoop across the HDFS.![](/files/-M5-0WGyQOm0ZSVRtCn7)

As shown in the image:

1. We must first load data into HDFS
2. Then, we must develop our code locally
3. We must then submit our code (containing map and reduce instructions) to HDFS (this code is referred to as the *MapReduce job* in the above image)
4. Step 2 can be repeated iteratively to fix errors, if any. Finally, we must retrieve the results from HDFS

## Tools for Synchronization

As we know, in a distributed computing environment such as HDFS, synchronization is of utmost importance. A few things to make use of to ensure synchronization include:

* Cleverly-constructed data structures
* Sort order of intermediate keys
* Using a partitioner that decides which keys will be processed by which reducer
* Preserving state in mappers and reducers (i.e. capturing dependencies across multiple keys and values)

The following diagram shows how to preserve state in mappers and reducers:

![](/files/-M5-0WH1WYBqSNRBvTyL)

## The Issue with Garbage Collection

Garbage Collection is a process by which the JVM (Java Virtual Machine) gets rid of unwanted objects.

Garbage Collection is a time consuming task, and it occurs in the **NameNode** of HDFS. When the JVM performs Garbage Collection, **EVERYTHING STOPS**. The entire cluster comes to a grinding halt until the JVM is done with the Garbage Collection task. *Recently, Twitter had one of its clusters stop for 22 hours to perform Garbage Collection!*

So, we must avoid the creation of objects. This way, the JVM will not have to get rid of unreachable/unwanted objects and Garbage Collection can be avoided.

**The next section gives a brief overview of the Google File System and how HDFS differs from it.**


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://vikram-bajaj.gitbook.io/cs-gy-9223-d-programming-for-big-data/master-8/hadoop/a-typical-large-data-problem.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
