I’m trying to implement a simple Hadoop map reduce example using Cloudera 5.5.0
The map & reduce step should be implemented using Python 2.6.6


  • If the scripts are executed on the Unix command line, they work very well and produce the expected output.

The cat joins 2*.txt |./ | sort |./

  • But executing the script as a hadoop task is very unsuccessful:

hadoop jar/usr/lib/hadoop-mapreduce/hadoop-streaming.jar -input/user/cloudera/inputTV/join2_gen*.txt -output/user/cloudera/output_tv -mapper/home/cloudera/join3_ -reducer/home/cloudera/ -numReduceTasks 1

16/01/06 12:32:32 INFO mapreduce. Job: Job ID: attempt_1452069211060_0026_r_000000_0, Status: Failed
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): Child process failed with code 1
In org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(
At org.apache.hadoop.streaming.PipeMapRed.mapRedRedFinished (
In org.apache.hadoop.streaming.PipeReducer.close(
In org.apache.hadoop.mapred.ReduceTask.runOldReducer(
in org.apache.hadoop.mapred.YarnChild$
In native method)
In (
In (
At org.apache.hadoop.mapred.YarnChild.main (

  • If the hadoop command is executed using -numReduceTasks 0, the mapper works,
    The Hadoop job executes only the mapping step and ends successfully, and the output directory contains the result file of the mapping step.

  • I guess there must be something wrong with the reduce step?

  • The stderr logs in Hue show nothing relevant:

Log upload time: Wed Jan 06 12:33:10 -0800 2016
Log length: 222
log4j: WARN could not find an additional program for the logger (org.apache.hadoop.ipc.Server).
log4j:WARN Please initialize the log4j system correctly.
log4j: Warning see Learn more.

Script code:
First file:

#!/usr/bin/env python

import sys

for line in sys.stdin:
   line       = line.strip()   #strip out carriage return
   tuple2  = line.split(",")   #split line, into key and value, returns a list

if len(tuple2) == 2:
      key = tuple2[0]
      value = tuple2[1]
      if value == 'ABC':
         print('%s\t%s' % (key, value) )
      elif value.isdigit():
         print('%s\t%s' % (key, value) ) 

Second file:

#!/usr/bin/env python
import sys

last_key      = None              #initialize these variables
running_total = 0
abcFound =False;
this_key      = None

# -----------------------------------
# Loop the file
#  --------------------------------
for input_line in sys.stdin:
    input_line = input_line.strip()

# --------------------------------
    # Get Next Key value pair, splitting at tab
    # --------------------------------
    tuple2 = input_line.split("\t") 

this_key = tuple2[0]    
    value = tuple2[1]
    if value.isdigit():
        value = int(value) 

# ---------------------------------
    # Key Check part
    #    if this current key is same 
    #          as the last one Consolidate
    #    otherwise  Emit
    # ---------------------------------
    if last_key == this_key:     
        if value == 'ABC':  # filter for only ABC in TV shows
            if isinstance(value, (int,long) ): 
                running_total += value   

        if last_key:         #if this key is different from last key, and the previous 
                             #   (ie last) key is not empy,
                             #   then output 
                             #   the previous <key running-count>
           if abcFound:
              print('%s\t%s' % (last_key, running_total) )

running_total = value    #reset values
        last_key = this_key

if last_key == this_key:
    print('%s\t%s' % (last_key, running_total) )

I’ve tried various different ways to declare input files to the hadoop command with no difference, no success.

What am I doing wrong? Thanks a lot for the tips, ideas thank you


What a lucky punch, fought with that punch for days, knowing I made it :

Due to local (UNIX) execution

cat join2_gen*.txt | ./ | sort | ./

Works fine I want to use 1 merged input file instead of the 6 input files provided, so:

cat join2_gen*.txt >> mergedinputFile.txt

hdfs dfs -put mergedInputFile.txt /user/cloudera/input

Then execute the same hadoop command again, directing the input to mergedInputFile in the input folder —> Perfect result, no problem, no exception work done.

For me, it raises a question:

  • Why does it use one merged input file, but now offers a smaller 6 files? Don’t know (yet).

