After much googling (etc.) I figured out how to include executable binaries/scripts/modules that are accessible to your mappers/reducers. The trick is to upload all you files to hadoop first.
$ bin / hadoop dfs - copyFromLocal / local / file / system / module.py module.py
Then you need to format you streaming command like the following template:
$. / bin / hadoop jar / local / file / system / hadoop - 0.21 .0 / mapred / contrib / streaming / hadoop - 0.21 .0 - streaming.jar\ -
file / local / file / system / data / data.txt\ -
file / local / file / system / mapper.py\ -
file / local / file / system / reducer.py\ -
cacheFile hdfs: //localhost:9000/user/you/module.py#module.py \
-input data.txt\ -
output output / \
-mapper mapper.py\ -
reducer reducer.py\ -
verbose
If you're linking a python module you'll need to add the following code to your mapper/reducer scripts:
import sys
sys.path.append('.')
import module
Got it running finally
$pid = open2(my $out, my $in, "./binary") or die "could not run open2";
Hadoop streaming is a utility that comes with the Hadoop distribution. This utility allows you to create and run Map/Reduce jobs with any executable or script as the mapper and/or the reducer.,In the above example, both the mapper and the reducer are python scripts that read the input from standard input and emit the output to standard output. The utility will create a Map/Reduce job, submit the job to an appropriate cluster, and monitor the progress of the job until it completes.,For Hadoop streaming, we are considering the word-count problem. Any job in Hadoop must have two phases: mapper and reducer. We have written codes for the mapper and the reducer in python script to run it under Hadoop. One can also write the same in Perl and Ruby.,Save the mapper and reducer codes in mapper.py and reducer.py in Hadoop home directory. Make sure these files have execution permission (chmod +x mapper.py and chmod +x reducer.py). As python is indentation sensitive so the same code can be download from the below link.
Mapper Phase Code
!/usr/bin / python
import sys
# Input takes from standard input
for myline in sys.stdin:
# Remove whitespace either side
myline = myline.strip()
# Break the line into words
words = myline.split()
# Iterate the words list
for myword in words:
# Write the results to standard output
print '%s\t%s' % (myword, 1)
Reducer Phase Code
#!/usr/bin/python from operator import itemgetter import sys current_word = "" current_count = 0 word = "" # Input takes from standard input for myline in sys.stdin: # Remove whitespace either side myline = myline.strip() # Split the input we got from mapper.py word, count = myline.split('\t', 1) # Convert count variable to integer try: count = int(count) except ValueError: # Count was not a number, so silently ignore this line continue if current_word == word: current_count += count else: if current_word: # Write result to standard output print '%s\t%s' % (current_word, current_count) current_count = count current_word = word # Do not forget to output the last word if needed! if current_word == word: print '%s\t%s' % (current_word, current_count)
Execution of WordCount Program
$ $HADOOP_HOME / bin / hadoop jar contrib / streaming / hadoop - streaming - 1. 2.1.jar\ - input input_dirs\ - output output_dir\ - mapper < path / mapper.py\ - reducer < path / reducer.py
Action nodes define the jobs, which are the individual units of work that are chained together to make up the Oozie workflow. Actions do the actual processing in the workflow. An action node can run a variety of jobs: MapReduce, Pig, Hive, and more.,Streaming jobs run binaries or scripts and obviously need a mapper and reducer executable. These are packaged through the <file> and <archive> elements as explained in the previous section. If the <file> element is missing for a streaming job, the executables are assumed to be available in the specified path on the local Hadoop nodes. If it’s a relative path, it’s assumed to be relative to the workflow root directory.,Let’s look at a specific example of how a real-life Pig job is run on the command line and convert it into an Oozie action definition. Here’s an example of a simple Pig script:,The action needs to know the JobTracker (JT) and the NameNode (NN) of the underlying Hadoop cluster where Oozie has to run the MapReduce job. The first two elements in the previous list are meant for specifying them. These are required elements for this action:
Example 4-1. Action node
<action name="identity-MR">
<map-reduce>
<job-tracker>localhost:8032</job-tracker>
<name-node>hdfs://localhost:8020</name-node>
<prepare>
<delete path="/user/joe/data/output" />
</prepare>
<configuration>
<property>
<name>mapred.mapper.class</name>
<value>org.apache.hadoop.mapred.lib.IdentityMapper</value>
</property>
<property>
<name>mapred.reducer.class</name>
<value>org.apache.hadoop.mapred.lib.IdentityReducer</value>
</property>
<property>
<name>mapred.input.dir</name>
<value>/user/joe/data/input</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>/user/joe/data/input</value>
</property>
</configuration>
</map-reduce>
<ok to="success" />
<error to="fail" />
</action>
The action needs to know the JobTracker
(JT) and the NameNode
(NN) of the underlying Hadoop cluster where Oozie has to run the
MapReduce job. The first two elements in the previous list are meant for
specifying them. These are required elements for this action:
...
<job-tracker>localhost:8032</job-tracker>
<name-node>hdfs://localhost:8020</name-node>
...
The <prepare>
section is
optional and is typically used as a preprocessor to delete
output directories or HCatalog
table partitions or to create some directories required for the
action. This delete helps make the action repeatable and enables retries
after failure. Without this cleanup, retries of Hadoop jobs will fail
because Hadoop checks for nonexistence of the output directories and tries to
create them for the job. So deleting them before running the action is a
common use case for this element. Using <prepare>
to create directories is also
supported, but not as common as the delete
in usage:
...
<prepare>
<delete path="hdfs://localhost:8020/user/joe/output" />
</prepare>
...
The <job-xml>
element(s)
and/or the <configuration>
section can be used to capture all of the Hadoop job configuration
properties. The worker code for the MapReduce action is specified as
part of this configuration using the mapred.mapper.class
and the mapred.reducer.class
properties. These properties specify the actual Java classes to be run
as map and reduce as part of this action:
...
<configuration>
<property>
<name>mapred.mapper.class</name>
<value>org.myorg.FirstJob.Map</value>
</property>
<property>
<name>mapred.reducer.class</name>
<value>org.myorg.FirstJob.Reduce</value>
</property>
</configuration>
...
Now, putting all the pieces together, a sample <map-reduce>
action
is shown here:
...
<action name="myMapReduceAction">
<map-reduce>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${myMapReduceActionOutput}"/>
</prepare>
<job-xml>/myfirstjob.xml</job-xml>
<configuration>
<property>
<name>mapred.mapper.class</name>
<value>org.myorg.FirstJob.Map</value>
</property>
<property>
<name>mapred.reducer.class</name
<value>org.myorg.FirstJob.Reduce</value>
</property>
<property>
<name>mapred.input.dir</name>
<value>${myMapReduceActionInput}</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>${myMapReduceActionOutput}</value>
</property>
<property>
<name>mapred.reduce.tasks</name>
<value>${JobNumReducers}</value>
</property>
</configuration>
<file>myDir1/myFile.txt#file1</file>
<archive>mytar.tgz#mygzdir</archive>
</map-reduce>
</action>
...
You can optionally give a <record-reader>
and <record-reader-mapping>
through those
elements to the streaming MapReduce job. Refer to the
Hadoop documentation for more information on those properties.
The <env>
element comes in handy to set some environment variables
required by the scripts. Here is an example of a streaming
section:
...
<streaming>
<mapper>python MyCustomMapper.py</mapper>
<reducer>python MyCustomReducer.py</reducer>
<record-reader>StreamXmlRecordReader</record-reader>
<env>output_dir=/tmp/output</env>
</streaming>
...
You can optionally give a <record-reader>
and <record-reader-mapping>
through those
elements to the streaming MapReduce job. Refer to the
Hadoop documentation for more information on those properties.
The <env>
element comes in handy to set some environment variables
required by the scripts. Here is an example of a streaming
section:
...
<streaming>
<mapper>python MyCustomMapper.py</mapper>
<reducer>python MyCustomReducer.py</reducer>
<record-reader>StreamXmlRecordReader</record-reader>
<env>output_dir=/tmp/output</env>
</streaming>
...
The <program>
element
is the most important in the list and it points to the C++
executable to be run. This executable needs to be packaged with the
workflow application and deployed on HDFS. You can also optionally
specify the <map>
class, <reduce>
class, <inputformat>
, <partitioner>
, and <writer>
elements. Refer to the Hadoop
documentation on pipes
for more details. Here is an example of a pipes section in the Oozie
action:
...
<pipes>
<program>hdfs://localhost:8020/user/myUser/wf/bin/
wordcount-simple#wordcount-simple</program>
</pipes>
...
Now, let’s look at a specific example of how a Hadoop MapReduce job is run on the command line and convert it into an Oozie action definition. You’re likely already familiar with running basic Hadoop jobs from the command line. Using that as a starting point and converting it to an action definition in Oozie will make it easier for you to become familiar with the workflow syntax. Here’s an example:
$ hadoop jar / user / joe / myApp.jar myAppClass - Dmapred.job.reduce.memory.mb = 8192 / hdfs / user / joe / input / hdfs / user / joe / output prod
In “Action Types”, we covered how a
typical Java MapReduce program has a main driver class that is not
needed in Oozie. You just need to specify the mapper and reducer class
in the action definition. But this also requires knowing the actual
mapper and reducer class in the JAR to be able to write the Oozie
<map-reduce>
action. In the command line above,
myAppClass
is the main driver class. This is part
of the main driver code for the preceding Hadoop example:
...
/**
* The main driver for the map/reduce program.
* Invoke this method to submit the map/reduce job.
*/
public static void main(String[] args) throws IOException {
JobConf conf = new JobConf(myAppClass.class);
conf.setJobName("myAppClass");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(MyMapClass.class);
conf.setReducerClass(MyRedClass.class);
...
Given this, the command line for the preceding Hadoop job submission can be specified in an Oozie workflow action as shown here:
<map-reduce>
<job-tracker>jt.mycompany.com:8032</job-tracker>
<name-node>hdfs://nn.mycompany.com:8020</name-node>
<prepare>
<delete path="hdfs://nn.mycompany.com:8020/hdfs/user/joe/output" />
</prepare>
<configuration>
<property>
<name>mapred.mapper.class</name>
<value>com.myBiz.mr.MyMapClass</value>
</property>
<property>
<name>mapred.reducer.class</name>
<value>com.myBiz.mr.MyRedClass</value>
</property>
<property>
<name>mapred.job.reduce.memory.mb</name>
<value>8192</value>
</property>
<property>
<name>mapred.input.dir</name>
<value>/hdfs/user/joe/input</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>/hdfs/user/joe/output</value>
</property>
</configuration>
</map-reduce>
<ok to="success" />
<error to="fail" />
</action>
$ hadoop jar / opt / hadoop / share / hadoop / tools / lib / hadoop - * streaming * .jar -
file / home / joe / mapper.py - mapper / home / joe / mapper.py -
file / home / joe / reducer.py - reducer / home / joe / reducer.py -
input hdfs: //nn.mycompany.com:8020/hdfs/user/joe/input/
-output hdfs: //nn.mycompany.com:8020/hdfs/user/joe/output/
Example 4-2. MapReduce streaming action
<action name="myStreamingMRAction">
<map-reduce>
<job-tracker>jt.mycompany.com:8032</job-tracker>
<name-node>hdfs://nn.mycompany.com:8020</name-node>
<prepare>
<delete path="hdfs://nn.mycompany.com:8020/hdfs/user/joe/output" />
</prepare>
<streaming>
<mapper>python mapper.py</mapper>
<reducer>python reducer.py</reducer>
</streaming>
<configuration>
<property>
<name>mapred.input.dir</name>
<value>/hdfs/user/joe/input</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>/hdfs/user/joe/output</value>
</property>
</configuration>
<file>wfDir/mapper.py#mapper.py</file>
<file>wfDir/redcer.py#reducer.py</file>
</map-reduce>
<ok to="success" />
<error to="fail" />
</action>