Python – Why does hadoop mapReduce with python fail but the script runs on the command line?

Why does hadoop mapReduce with python fail but the script runs on the command line?… here is a solution to the problem.

Why does hadoop mapReduce with python fail but the script runs on the command line?

I’m trying to implement a simple Hadoop map reduce example using Cloudera 5.5.0
The map & reduce step should be implemented using Python 2.6.6

Question:

  • If the scripts are executed on the Unix command line, they work very well and produce the expected output.

The cat joins 2*.txt |./join3_mapper.py | sort |./join3_reducer.py

  • But executing the script as a hadoop task is very unsuccessful:

hadoop jar/usr/lib/hadoop-mapreduce/hadoop-streaming.jar -input/user/cloudera/inputTV/join2_gen*.txt -output/user/cloudera/output_tv -mapper/home/cloudera/join3_ mapper.py -reducer/home/cloudera/join3_reducer.py -numReduceTasks 1

16/01/06 12:32:32 INFO mapreduce. Job: Job ID: attempt_1452069211060_0026_r_000000_0, Status: Failed
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): Child process failed with code 1
In org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
At org.apache.hadoop.streaming.PipeMapRed.mapRedRedFinished (PipeMapRed.java:538)
In org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:134)
in org.apache.hadoop.io.IOUtils.cleanup(IOUtils.java:244)
In org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:459)
In org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
in org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
In java.security.AccessController.doPrivileged( native method)
In javax.security.auth.Subject.doAs (Subject.java:415)
In org.apache.hadoop.security.UserGroupInformation.doAs (UserGroupInformation.java:1671)
At org.apache.hadoop.mapred.YarnChild.main (YarnChild.java:158).

  • If the hadoop command is executed using -numReduceTasks 0, the mapper works,
    The Hadoop job executes only the mapping step and ends successfully, and the output directory contains the result file of the mapping step.

  • I guess there must be something wrong with the reduce step?

  • The stderr logs in Hue show nothing relevant:

Log upload time: Wed Jan 06 12:33:10 -0800 2016
Log length: 222
log4j: WARN could not find an additional program for the logger (org.apache.hadoop.ipc.Server).
log4j:WARN Please initialize the log4j system correctly.
log4j: Warning see http://logging.apache.org/log4j/1.2/faq.html#noconfig Learn more.

Script code:
First file: join3_mapper.py

#!/usr/bin/env python

import sys

for line in sys.stdin:
   line       = line.strip()   #strip out carriage return
   tuple2  = line.split(",")   #split line, into key and value, returns a list

if len(tuple2) == 2:
      key = tuple2[0]
      value = tuple2[1]
      if value == 'ABC':
         print('%s\t%s' % (key, value) )
      elif value.isdigit():
         print('%s\t%s' % (key, value) ) 

Second file: join3_reducer.py

#!/usr/bin/env python
import sys

last_key      = None              #initialize these variables
running_total = 0
abcFound =False;
this_key      = None

# -----------------------------------
# Loop the file
#  --------------------------------
for input_line in sys.stdin:
    input_line = input_line.strip()

# --------------------------------
    # Get Next Key value pair, splitting at tab
    # --------------------------------
    tuple2 = input_line.split("\t") 

this_key = tuple2[0]    
    value = tuple2[1]
    if value.isdigit():
        value = int(value) 

# ---------------------------------
    # Key Check part
    #    if this current key is same 
    #          as the last one Consolidate
    #    otherwise  Emit
    # ---------------------------------
    if last_key == this_key:     
        if value == 'ABC':  # filter for only ABC in TV shows
            abcFound=True;
        else:
            if isinstance(value, (int,long) ): 
                running_total += value   

else:
        if last_key:         #if this key is different from last key, and the previous 
                             #   (ie last) key is not empy,
                             #   then output 
                             #   the previous <key running-count>
           if abcFound:
              print('%s\t%s' % (last_key, running_total) )
              abcFound=False;

running_total = value    #reset values
        last_key = this_key

if last_key == this_key:
    print('%s\t%s' % (last_key, running_total) )

I’ve tried various different ways to declare input files to the hadoop command with no difference, no success.

What am I doing wrong? Thanks a lot for the tips, ideas thank you

Solution

What a lucky punch, fought with that punch for days, knowing I made it :

Due to local (UNIX) execution

cat join2_gen*.txt | ./join2_mapper.py | sort | ./join2_reducer.py

Works fine I want to use 1 merged input file instead of the 6 input files provided, so:

cat join2_gen*.txt >> mergedinputFile.txt

hdfs dfs -put mergedInputFile.txt /user/cloudera/input

Then execute the same hadoop command again, directing the input to mergedInputFile in the input folder —> Perfect result, no problem, no exception work done.

For me, it raises a question:

  • Why does it use one merged input file, but now offers a smaller 6 files? Don’t know (yet).

Related Problems and Solutions