Why does hadoop mapReduce with python fail but the script runs on the command line?
I’m trying to implement a simple Hadoop map reduce example using Cloudera 5.5.0
The map & reduce step should be implemented using Python 2.6.6
Question:
- If the scripts are executed on the Unix command line, they work very well and produce the expected output.
The cat joins 2*.txt |./join3_mapper.py | sort |./join3_reducer.py
- But executing the script as a hadoop task is very unsuccessful:
hadoop jar/usr/lib/hadoop-mapreduce/hadoop-streaming.jar -input/user/cloudera/inputTV/join2_gen*.txt -output/user/cloudera/output_tv -mapper/home/cloudera/join3_ mapper.py -reducer/home/cloudera/join3_reducer.py -numReduceTasks 1
16/01/06 12:32:32 INFO mapreduce. Job: Job ID: attempt_1452069211060_0026_r_000000_0, Status: Failed
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): Child process failed with code 1
In org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
At org.apache.hadoop.streaming.PipeMapRed.mapRedRedFinished (PipeMapRed.java:538)
In org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:134)
in org.apache.hadoop.io.IOUtils.cleanup(IOUtils.java:244)
In org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:459)
In org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
in org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
In java.security.AccessController.doPrivileged( native method)
In javax.security.auth.Subject.doAs (Subject.java:415)
In org.apache.hadoop.security.UserGroupInformation.doAs (UserGroupInformation.java:1671)
At org.apache.hadoop.mapred.YarnChild.main (YarnChild.java:158).
If the hadoop command is executed using -numReduceTasks 0, the mapper works,
The Hadoop job executes only the mapping step and ends successfully, and the output directory contains the result file of the mapping step.I guess there must be something wrong with the reduce step?
- The stderr logs in Hue show nothing relevant:
Log upload time: Wed Jan 06 12:33:10 -0800 2016
Log length: 222
log4j: WARN could not find an additional program for the logger (org.apache.hadoop.ipc.Server).
log4j:WARN Please initialize the log4j system correctly.
log4j: Warning see http://logging.apache.org/log4j/1.2/faq.html#noconfig Learn more.
Script code:
First file: join3_mapper.py
#!/usr/bin/env python
import sys
for line in sys.stdin:
line = line.strip() #strip out carriage return
tuple2 = line.split(",") #split line, into key and value, returns a list
if len(tuple2) == 2:
key = tuple2[0]
value = tuple2[1]
if value == 'ABC':
print('%s\t%s' % (key, value) )
elif value.isdigit():
print('%s\t%s' % (key, value) )
Second file: join3_reducer.py
#!/usr/bin/env python
import sys
last_key = None #initialize these variables
running_total = 0
abcFound =False;
this_key = None
# -----------------------------------
# Loop the file
# --------------------------------
for input_line in sys.stdin:
input_line = input_line.strip()
# --------------------------------
# Get Next Key value pair, splitting at tab
# --------------------------------
tuple2 = input_line.split("\t")
this_key = tuple2[0]
value = tuple2[1]
if value.isdigit():
value = int(value)
# ---------------------------------
# Key Check part
# if this current key is same
# as the last one Consolidate
# otherwise Emit
# ---------------------------------
if last_key == this_key:
if value == 'ABC': # filter for only ABC in TV shows
abcFound=True;
else:
if isinstance(value, (int,long) ):
running_total += value
else:
if last_key: #if this key is different from last key, and the previous
# (ie last) key is not empy,
# then output
# the previous <key running-count>
if abcFound:
print('%s\t%s' % (last_key, running_total) )
abcFound=False;
running_total = value #reset values
last_key = this_key
if last_key == this_key:
print('%s\t%s' % (last_key, running_total) )
I’ve tried various different ways to declare input files to the hadoop command with no difference, no success.
What am I doing wrong? Thanks a lot for the tips, ideas thank you
Solution
What a lucky punch, fought with that punch for days, knowing I made it :
Due to local (UNIX) execution
cat join2_gen*.txt | ./join2_mapper.py | sort | ./join2_reducer.py
Works fine I want to use 1 merged input file instead of the 6 input files provided, so:
cat join2_gen*.txt >> mergedinputFile.txt
hdfs dfs -put mergedInputFile.txt /user/cloudera/input
Then execute the same hadoop command again, directing the input to mergedInputFile in the input folder —> Perfect result, no problem, no exception work done.
For me, it raises a question:
- Why does it use one merged input file, but now offers a smaller 6 files? Don’t know (yet).