hadoop streaming: mapper 'wrapping' a binary executable

  • Last Update :
  • Techknowledgy :

After much googling (etc.) I figured out how to include executable binaries/scripts/modules that are accessible to your mappers/reducers. The trick is to upload all you files to hadoop first.

$ bin / hadoop dfs - copyFromLocal / local / file / system / module.py module.py

Then you need to format you streaming command like the following template:

$. / bin / hadoop jar / local / file / system / hadoop - 0.21 .0 / mapred / contrib / streaming / hadoop - 0.21 .0 - streaming.jar\ -
   file / local / file / system / data / data.txt\ -
   file / local / file / system / mapper.py\ -
   file / local / file / system / reducer.py\ -
   cacheFile hdfs: //localhost:9000/user/you/module.py#module.py \
   -input data.txt\ -
   output output / \
   -mapper mapper.py\ -
   reducer reducer.py\ -
   verbose

If you're linking a python module you'll need to add the following code to your mapper/reducer scripts:

import sys
sys.path.append('.')
import module

Got it running finally

$pid = open2(my $out, my $in, "./binary") or die "could not run open2";

Suggestion : 2

Hadoop streaming is a utility that comes with the Hadoop distribution. This utility allows you to create and run Map/Reduce jobs with any executable or script as the mapper and/or the reducer.,In the above example, both the mapper and the reducer are python scripts that read the input from standard input and emit the output to standard output. The utility will create a Map/Reduce job, submit the job to an appropriate cluster, and monitor the progress of the job until it completes.,For Hadoop streaming, we are considering the word-count problem. Any job in Hadoop must have two phases: mapper and reducer. We have written codes for the mapper and the reducer in python script to run it under Hadoop. One can also write the same in Perl and Ruby.,Save the mapper and reducer codes in mapper.py and reducer.py in Hadoop home directory. Make sure these files have execution permission (chmod +x mapper.py and chmod +x reducer.py). As python is indentation sensitive so the same code can be download from the below link.

Mapper Phase Code

!/usr/bin / python

import sys

# Input takes from standard input
for myline in sys.stdin:
   # Remove whitespace either side
myline = myline.strip()

# Break the line into words
words = myline.split()

# Iterate the words list
for myword in words:
   # Write the results to standard output
print '%s\t%s' % (myword, 1)

Reducer Phase Code

#!/usr/bin/python

from operator
import itemgetter
import sys

current_word = ""
current_count = 0
word = ""

# Input takes from standard input
for myline in sys.stdin:
   # Remove whitespace either side
myline = myline.strip()

# Split the input we got from mapper.py word,
   count = myline.split('\t', 1)

# Convert count variable to integer
try:
count = int(count)

except ValueError:
   # Count was not a number, so silently ignore this line
continue

if current_word == word:
   current_count += count
else:
   if current_word:
   # Write result to standard output print '%s\t%s' % (current_word, current_count)

current_count = count
current_word = word

# Do not forget to output the last word
if needed!
   if current_word == word:
   print '%s\t%s' % (current_word, current_count)

Execution of WordCount Program

$ $HADOOP_HOME / bin / hadoop jar contrib / streaming / hadoop - streaming - 1.
2.1.jar\ -
   input input_dirs\ -
   output output_dir\ -
   mapper < path / mapper.py\ -
   reducer < path / reducer.py

Suggestion : 3

Action nodes define the jobs, which are the individual units of work that are chained together to make up the Oozie workflow. Actions do the actual processing in the workflow. An action node can run a variety of jobs: MapReduce, Pig, Hive, and more.,Streaming jobs run binaries or scripts and obviously need a mapper and reducer executable. These are packaged through the <file> and <archive> elements as explained in the previous section. If the <file> element is missing for a streaming job, the executables are assumed to be available in the specified path on the local Hadoop nodes. If it’s a relative path, it’s assumed to be relative to the workflow root directory.,Let’s look at a specific example of how a real-life Pig job is run on the command line and convert it into an Oozie action definition. Here’s an example of a simple Pig script:,The action needs to know the JobTracker (JT) and the NameNode (NN) of the underlying Hadoop cluster where Oozie has to run the MapReduce job. The first two elements in the previous list are meant for specifying them. These are required elements for this action:

Example 4-1. Action node
  <action name="identity-MR">
     <map-reduce>
        <job-tracker>localhost:8032</job-tracker>
        <name-node>hdfs://localhost:8020</name-node>
        <prepare>
           <delete path="/user/joe/data/output" />
        </prepare>
        <configuration>
           <property>
              <name>mapred.mapper.class</name>
              <value>org.apache.hadoop.mapred.lib.IdentityMapper</value>
           </property>
           <property>
              <name>mapred.reducer.class</name>
              <value>org.apache.hadoop.mapred.lib.IdentityReducer</value>
           </property>
           <property>
              <name>mapred.input.dir</name>
              <value>/user/joe/data/input</value>
           </property>
           <property>
              <name>mapred.output.dir</name>
              <value>/user/joe/data/input</value>
           </property>
        </configuration>
     </map-reduce>
     <ok to="success" />
     <error to="fail" />
  </action>

The action needs to know the JobTracker (JT) and the NameNode (NN) of the underlying Hadoop cluster where Oozie has to run the MapReduce job. The first two elements in the previous list are meant for specifying them. These are required elements for this action:

...
<job-tracker>localhost:8032</job-tracker>
<name-node>hdfs://localhost:8020</name-node>
...

The <prepare> section is optional and is typically used as a preprocessor to delete output directories or HCatalog table partitions or to create some directories required for the action. This delete helps make the action repeatable and enables retries after failure. Without this cleanup, retries of Hadoop jobs will fail because Hadoop checks for nonexistence of the output directories and tries to create them for the job. So deleting them before running the action is a common use case for this element. Using <prepare> to create directories is also supported, but not as common as the delete in usage:

...
<prepare>
   <delete path="hdfs://localhost:8020/user/joe/output" />
</prepare>
...

The <job-xml> element(s) and/or the <configuration> section can be used to capture all of the Hadoop job configuration properties. The worker code for the MapReduce action is specified as part of this configuration using the mapred.mapper.class and the mapred.reducer.class properties. These properties specify the actual Java classes to be run as map and reduce as part of this action:

...
<configuration>
   <property>
      <name>mapred.mapper.class</name>
      <value>org.myorg.FirstJob.Map</value>
   </property>
   <property>
      <name>mapred.reducer.class</name>
      <value>org.myorg.FirstJob.Reduce</value>
   </property>
</configuration>
...

Now, putting all the pieces together, a sample <map-reduce> action is shown here:

...
    <action name="myMapReduceAction">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${myMapReduceActionOutput}"/>
            </prepare>
            <job-xml>/myfirstjob.xml</job-xml>
            <configuration>
                <property>
                    <name>mapred.mapper.class</name>
                    <value>org.myorg.FirstJob.Map</value>
                </property>
                <property>
                      <name>mapred.reducer.class</name
                      <value>org.myorg.FirstJob.Reduce</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>${myMapReduceActionInput}</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>${myMapReduceActionOutput}</value>
                </property>
                <property>
                    <name>mapred.reduce.tasks</name>
                    <value>${JobNumReducers}</value>
                </property>
            </configuration>
            <file>myDir1/myFile.txt#file1</file>
            <archive>mytar.tgz#mygzdir</archive>
        </map-reduce>
    </action>
...

You can optionally give a <record-reader> and <record-reader-mapping> through those elements to the streaming MapReduce job. Refer to the Hadoop documentation for more information on those properties. The <env> element comes in handy to set some environment variables required by the scripts. Here is an example of a streaming section:

...
<streaming>
   <mapper>python MyCustomMapper.py</mapper>
   <reducer>python MyCustomReducer.py</reducer>
   <record-reader>StreamXmlRecordReader</record-reader>
   <env>output_dir=/tmp/output</env>
</streaming>
...

You can optionally give a <record-reader> and <record-reader-mapping> through those elements to the streaming MapReduce job. Refer to the Hadoop documentation for more information on those properties. The <env> element comes in handy to set some environment variables required by the scripts. Here is an example of a streaming section:

...
<streaming>
   <mapper>python MyCustomMapper.py</mapper>
   <reducer>python MyCustomReducer.py</reducer>
   <record-reader>StreamXmlRecordReader</record-reader>
   <env>output_dir=/tmp/output</env>
</streaming>
...

The <program> element is the most important in the list and it points to the C++ executable to be run. This executable needs to be packaged with the workflow application and deployed on HDFS. You can also optionally specify the <map> class, <reduce> class, <inputformat>, <partitioner>, and <writer> elements. Refer to the Hadoop documentation on pipes for more details. Here is an example of a pipes section in the Oozie action:

...
<pipes>
   <program>hdfs://localhost:8020/user/myUser/wf/bin/
      wordcount-simple#wordcount-simple</program>
</pipes>
...

Now, let’s look at a specific example of how a Hadoop MapReduce job is run on the command line and convert it into an Oozie action definition. You’re likely already familiar with running basic Hadoop jobs from the command line. Using that as a starting point and converting it to an action definition in Oozie will make it easier for you to become familiar with the workflow syntax. Here’s an example:

$ hadoop jar / user / joe / myApp.jar myAppClass -
   Dmapred.job.reduce.memory.mb = 8192 / hdfs / user / joe / input /
   hdfs / user / joe / output prod

In “Action Types”, we covered how a typical Java MapReduce program has a main driver class that is not needed in Oozie. You just need to specify the mapper and reducer class in the action definition. But this also requires knowing the actual mapper and reducer class in the JAR to be able to write the Oozie <map-reduce> action. In the command line above, myAppClass is the main driver class. This is part of the main driver code for the preceding Hadoop example:

...
/**
 * The main driver for the map/reduce program.
 * Invoke this method to submit the map/reduce job.
 */
public static void main(String[] args) throws IOException {
      JobConf conf = new JobConf(myAppClass.class);
      conf.setJobName("myAppClass");

      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class);

      conf.setMapperClass(MyMapClass.class);
      conf.setReducerClass(MyRedClass.class);
      ...

Given this, the command line for the preceding Hadoop job submission can be specified in an Oozie workflow action as shown here:

    <map-reduce>
       <job-tracker>jt.mycompany.com:8032</job-tracker>
       <name-node>hdfs://nn.mycompany.com:8020</name-node>
       <prepare>
          <delete path="hdfs://nn.mycompany.com:8020/hdfs/user/joe/output" />
       </prepare>
       <configuration>
          <property>
             <name>mapred.mapper.class</name>
             <value>com.myBiz.mr.MyMapClass</value>
          </property>
          <property>
             <name>mapred.reducer.class</name>
             <value>com.myBiz.mr.MyRedClass</value>
          </property>
          <property>
             <name>mapred.job.reduce.memory.mb</name>
             <value>8192</value>
          </property>
          <property>
             <name>mapred.input.dir</name>
             <value>/hdfs/user/joe/input</value>
          </property>
          <property>
             <name>mapred.output.dir</name>
             <value>/hdfs/user/joe/output</value>
          </property>
       </configuration>
    </map-reduce>
    <ok to="success" />
    <error to="fail" />
    </action>

Let’s look at a Python streaming job invoked using the Hadoop client:

$ hadoop jar / opt / hadoop / share / hadoop / tools / lib / hadoop - * streaming * .jar -
   file / home / joe / mapper.py - mapper / home / joe / mapper.py -
   file / home / joe / reducer.py - reducer / home / joe / reducer.py -
   input hdfs: //nn.mycompany.com:8020/hdfs/user/joe/input/ 
   -output hdfs: //nn.mycompany.com:8020/hdfs/user/joe/output/
Example 4-2. MapReduce streaming action
<action name="myStreamingMRAction">
   <map-reduce>
      <job-tracker>jt.mycompany.com:8032</job-tracker>
      <name-node>hdfs://nn.mycompany.com:8020</name-node>
      <prepare>
         <delete path="hdfs://nn.mycompany.com:8020/hdfs/user/joe/output" />
      </prepare>
      <streaming>
         <mapper>python mapper.py</mapper>
         <reducer>python reducer.py</reducer>
      </streaming>
      <configuration>
         <property>
            <name>mapred.input.dir</name>
            <value>/hdfs/user/joe/input</value>
         </property>
         <property>
            <name>mapred.output.dir</name>
            <value>/hdfs/user/joe/output</value>
         </property>
      </configuration>
      <file>wfDir/mapper.py#mapper.py</file>
      <file>wfDir/redcer.py#reducer.py</file>
   </map-reduce>
   <ok to="success" />
   <error to="fail" />
</action>