Hadoop MapReduce Example

Now that you're familiar with the basics of Hadoop and HDFS, it's time to explore Hadoop MapReduce.

MapReduce is Hadoop's primary framework for processing big data on a shared cluster. It works by processing smaller amounts of data in parallel via map tasks. The outputs of these map tasks are then used as inputs for reduce tasks which produce a final result set. For a deeper dive on MapReduce concepts, check out MapReduce Quick Explanation.

Every MapReduce application has an associated job configuration. This includes the input/output locations and corresponding map/reduce functions.

You can run MapReduce jobs via the Hadoop command line. Typically, your map/reduce functions are packaged in a particular jar file which you call using Hadoop CLI.

MapReduce Basic Example

Hadoop comes with a basic MapReduce example out of the box. Navigate to /hadoop/share//hadoop/mapreduce/ and you'll find a hadoop-mapreduce-examples-2.7.4.jar jar file. This jar file contains MapReduce sample classes, including a WordCount class for...counting words.

The source code for the WordCount class is as follows:

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
      extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
      extends Reducer {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                Context context
                ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

Hadoop provides the WordCount class as a basic MapReduce application example. Notice the TokenizerMapper and IntSumReduecr classes. These hold the map/reduce functions and are explained in more detail below:

The mapper function:

You'll notice the TokenizerMapper class has the map function. This function takes an input text file and produces a set of key/value pairs for each word in the file.

Note that even if the same word exists twice in the same file, the function will still produce a separate key/value pair for each occurrence of the word it finds.

The reducer function:

The IntSumReducer class implements the reduce function. This function sums up the word counts by iterating through the map function's output. It returns a new set of key/value pairs with the total count for each word.

You'll notice the WordCount class executes the job via the main function. It first configures a new MapReduce job by creating a new Jobinstance and setting the Mapper and Reducer class.

You'll also notice a combiner class is set to the same reducer class. The combiner is an optional class that summarizes map output records with the same key. This is really just a way to pre-process the map outputs before they are "fed" to the reducer.

It then uses the FileInputFormat and FileOutputFormat class to set the input and output paths based on argument values provided at run time.

Finally, the application waits for the job to complete and then shuts itself down.

Running the WordCount MapReduce Application

In order to run the WordCount job, you'll need to specify the input/output directory in HDFS. Furthermore, you'll need actual text files to process as inputs.

1. Create input directort

To create an input directory for HDFS, run:

$ hdfs dfs -mkdir -p /example/wordcount/input

2. Create the input files

You'll want to create two input files, input1.txt and input2.txt. You can easily do this by running these two commands.

echo 'Hello MapReduce Hello' >> input1.txt
echo 'Goodbye MapReduce Goodbye' >> input2.txt

Note that this will create the input files in whatever directory you run the commands in.

3. Add input files to HDFS

After creating these two files, you'll want to add them to the input directory path in HDFS

$ hdfs dfs -put input1.txt /example/wordcount/input
$ hdfs dfs -put input2.txt /example/wordcount/input

4. Run the MapReduce Application

To run the MapReduce job, run the following:

$ hadoop jar hadoop-mapreduce-examples-2.7.4.jar wordcount /example/wordcount/input /example/wordcount/output

Notice how we provide the input and output paths as arguments. The results of the MapReduce job will be written to /example/wordcount/output

4. Verify Results

After the job completes, you can verify the output via:

$ hdfs dfs -cat /example/wordcount/output/part-r-0000
Hello 2
MapReduce 2
Goodbye 2

Notice how the output displays the total word count for our input files!

To summarize, the map function produces two map results for the two files:

< Hello, 1 >
< MapReduce, 1 >
< Hello, 1 >
< Goodbye, 1 >
< MapReduce, 1 >
< Goodbye, 1 >

The combiner then aggregates the same words for each map to produce:

< Hello, 2 >
< MapReduce, 1 >
< Goodbye, 2 >
< MapReduce, 1 >

Finally, the reducer sums up the total word count (for all of our inputs) to produce:

< Hello, 2 >
< Goodbye, 2 >
< MapReduce, 2 >

Conclusion

While this is a basic example using a provided MapReduce job, it demonstrates how to run MapReduce applications on Hadoop. You're now ready to write your own MapReduce jobs and look at more advanced MapReduce topics.

Your thoughts?