~ Tutorial 3 ~

Feed


Hadoop Streaming

What is Hadoop Streaming?

Hadoop streaming is a utility that comes with the Hadoop distribution. This utility allows you to create and run Map/Reduce jobs with any executable or script as the mapper and/or the reducer.

  • Mapper and Reducer are just normal Linux executables.
  • Mapper: takes input stream from standard input; emmit key-value pairs to standard output. Each key-value pair takes one line and is formatted as '%s\t%s' % (key, value).
  • Reducer: takes input key-value pairs from STDIN; output key-value pairs to STDOUT.
  • By default, key and value are seperated by tab. If there is no tab ('\t'), the whole line is treated as a key and the value is null.

Run with Hadoop Streaming

The command format is as follows:

hduser@master:~$ hadoop jar hadoop-streaming-2.7.1.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper java_class_or_executable \
    -reducer java_class_or_executable

If you are running Hadoop in DIC cluster, you can find hadoop-streaming.jar under /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar.

  • -input directoryname or filename: Input location in the HDFS for mapper. Required.
  • -output directoryname: Output location in the HDFS for reducer. Required.
  • -mapper executable or JavaClassName: Mapper executable in the local file system. Required.
  • -reducer executable or JavaClassName: Reducer executable in the local file system.

This time, instead of running word count with hadoop-mapreduce-examples.jar(under /usr/hdp/current/hadoop-mapreduce-client/hadoop-mapreduce-examples.jar), we are going to write the mapper and recuder, and run it with hadoop-streaming.jar.

In this tutorial we will use Python. Fisrt of all, we design our mapper as (mapper.py):

#!/usr/bin/env python

import sys

for line in sys.stdin:
    for word in line.split():
        print '%s\t%s' % (word, 1)

Write the reducer as (reducer.py):

#!/usr/bin/env python

import sys

cur_key = None
cur_count = 0

for line in sys.stdin:
    key, value = line.split()

    #Accumulate count for a word
    if key == cur_key:
        cur_count += int(value)
    else:
        #finish counting for a certain word
        if cur_key:
            print '%s\t%s' % (cur_key, cur_count)
        cur_key = key
        cur_count = int(value)

#Output the last word
print '%s\t%s' % (cur_key, cur_count)

Run it with Hadoop Streaming:

hduser@master:~$ hadoop jar hadoop-streaming-2.7.1.jar -file mapper.py -mapper mapper.py -file reducer.py -reducer reducer.py -input /largefile -output /result

The most frequent words

Following up the previous word count example, how can we get the most frequent words using MapReduce? Check the reducer output in the previous job.

hduser@master:~$ hadoop dfs -cat /result/part-00000 | head -n 5

Option 1

We know Hadoop will sort the intermediate results by the key. Let's leverage this property and design our mapper as (swap.py):

#!/usr/bin/env python

import sys

for line in sys.stdin:
    word, count = line.split()
    print '%07d\t%s' % (int(count), word)

Notes: By default, shuffling stage performs a string sorting, not integer sorting. We played a trick, %07d, to pad leading zeros, so that string sorting and integer sorting are equivalent in this case.

We don't need a reducer in this case. Run with Hadoop Streaming:

hduser@master:~$ hadoop jar hadoop-streaming-2.7.1.jar -mapper swap.py -file swap.py -input /result -output /result_sorted

Check the output:

hduser@master:~$ hadoop dfs -cat /result_sorted/part-00000 | tail -n 5

Caution: hadoop dfs -cat can cause a lot network traffic in practice if you have a really large data set. Don't use -cat in real works and homeworks (just add a reducer if necessary).

Option 2

There are many parameters you can specify in the command line argument. Try the following command:

hduser@master:~$ hadoop jar hadoop-streaming-2.7.1.jar \
    -D stream.num.map.output.key.fields=2 \
    -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
    -D mapred.text.key.comparator.options=-k2,2nr \
    -mapper cat -reducer cat -input /result -output /result_sorted2 \
  • -D property=value: Use value for given property
  • -D stream.num.map.output.key.fields: Specify how many fields as the key
  • -D mapred.output.key.comparator.class: Use the library class, KeyFieldBasedComparator, as the comparator, allowing the Map/Reduce framework to compare the map outputs based on certain key fields, not the whole keys.
  • -D mapred.text.key.comparator.options: Specify the comparator rule -- -k2,2 means sort the second field; n means in numerical order; r means reverse.

Other useful streaming command options

  • -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner: Use the library class, KeyFieldBasedPartitioner, as the Partitioner, allowing the Map/Reduce framework to partition the map outputs based on certain key fields, not the whole keys. (What's the difference between partition and comparator?)
  • -D mapred.text.key.partitioner.options: Specify the rule to partition the intermediate tuples.
  • -D stream.map.output.field.separator: Specify your own separator between key and value.
  • -D mapred.reduce.tasks: Specify the number of reducers.
  • -D mapred.map.tasks: A hint to the number of mappers. If not work, you may want to change mapred.min.split.size in mapred-site.xml.

Secondary Sort

What is Secondary Sort?

Reference