What is Hadoop Streaming?
Hadoop streaming is a utility that comes with the Hadoop distribution. This utility allows you to create and run Map/Reduce jobs with any executable or script as the mapper and/or the reducer.
- Mapper and Reducer are just normal Linux executables.
takes input stream from standard input;
emmit key-value pairs to standard output.
Each key-value pair takes one line and is formatted as
'%s\t%s' % (key, value).
- Reducer: takes input key-value pairs from STDIN; output key-value pairs to STDOUT.
- By default, key and value are seperated by tab. If there is no tab ('\t'), the whole line is treated as a key and the value is null.
Run with Hadoop Streaming
The command format is as follows:
hduser@master:~$ hadoop jar hadoop-streaming-2.7.1.jar \ -input myInputDirs \ -output myOutputDir \ -mapper java_class_or_executable \ -reducer java_class_or_executable
If you are running Hadoop in DIC cluster, you can find
-input directoryname or filename: Input location in the HDFS for mapper. Required.
-output directoryname: Output location in the HDFS for reducer. Required.
-mapper executable or JavaClassName: Mapper executable in the local file system. Required.
-reducer executable or JavaClassName: Reducer executable in the local file system.
This time, instead of running word count with
/usr/hdp/current/hadoop-mapreduce-client/hadoop-mapreduce-examples.jar), we are going to write the mapper and recuder, and run it with
In this tutorial we will use Python. Fisrt of all, we design our mapper as (
#!/usr/bin/env python import sys for line in sys.stdin: for word in line.split(): print '%s\t%s' % (word, 1)
Write the reducer as (
#!/usr/bin/env python import sys cur_key = None cur_count = 0 for line in sys.stdin: key, value = line.split() #Accumulate count for a word if key == cur_key: cur_count += int(value) else: #finish counting for a certain word if cur_key: print '%s\t%s' % (cur_key, cur_count) cur_key = key cur_count = int(value) #Output the last word print '%s\t%s' % (cur_key, cur_count)
Run it with Hadoop Streaming:
hduser@master:~$ hadoop jar hadoop-streaming-2.7.1.jar -file mapper.py -mapper mapper.py -file reducer.py -reducer reducer.py -input /largefile -output /result
The most frequent words
Following up the previous word count example, how can we get the most frequent words using MapReduce? Check the reducer output in the previous job.
hduser@master:~$ hadoop dfs -cat /result/part-00000 | head -n 5
We know Hadoop will sort the intermediate results by the key. Let's leverage this property and design our mapper as (
#!/usr/bin/env python import sys for line in sys.stdin: word, count = line.split() print '%07d\t%s' % (int(count), word)
Notes: By default, shuffling stage performs a string sorting, not integer sorting. We played a trick,
%07d, to pad leading zeros, so that string sorting and integer sorting are equivalent in this case.
We don't need a reducer in this case. Run with Hadoop Streaming:
hduser@master:~$ hadoop jar hadoop-streaming-2.7.1.jar -mapper swap.py -file swap.py -input /result -output /result_sorted
Check the output:
hduser@master:~$ hadoop dfs -cat /result_sorted/part-00000 | tail -n 5
hadoop dfs -cat can cause a lot network traffic in practice if you have a really large data set. Don't use
-cat in real works and homeworks (just add a reducer if necessary).
There are many parameters you can specify in the command line argument. Try the following command:
hduser@master:~$ hadoop jar hadoop-streaming-2.7.1.jar \ -D stream.num.map.output.key.fields=2 \ -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \ -D mapred.text.key.comparator.options=-k2,2nr \ -mapper cat -reducer cat -input /result -output /result_sorted2 \
-D property=value: Use value for given property
-D stream.num.map.output.key.fields: Specify how many fields as the key
-D mapred.output.key.comparator.class: Use the library class, KeyFieldBasedComparator, as the comparator, allowing the Map/Reduce framework to compare the map outputs based on certain key fields, not the whole keys.
-D mapred.text.key.comparator.options: Specify the comparator rule --
-k2,2means sort the second field;
nmeans in numerical order;
Other useful streaming command options
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner: Use the library class, KeyFieldBasedPartitioner, as the Partitioner, allowing the Map/Reduce framework to partition the map outputs based on certain key fields, not the whole keys. (What's the difference between partition and comparator?)
-D mapred.text.key.partitioner.options: Specify the rule to partition the intermediate tuples.
-D stream.map.output.field.separator: Specify your own separator between key and value.
-D mapred.reduce.tasks: Specify the number of reducers.
-D mapred.map.tasks: A hint to the number of mappers. If not work, you may want to change mapred.min.split.size in mapred-site.xml.
What is Secondary Sort?
- Hadoop Streaming:https://hadoop.apache.org/docs/r1.2.1/streaming.html
- Python Tutoail: https://docs.python.org/2/tutorial/
- Java Word Count Example: https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html
- Secondary Sort Example: http://dl.farinsoft.ir/files/94/Data-Algorithms-Recipes-Scaling-Hadoop.pdf
- DIC user guide: http://mobitec.ie.cuhk.edu.hk/engg4030Fall2016/homework/AguidetouseIEDICcluster.pdf