Java – The first time using Hadoop, MapReduce Job did not run Reduce Phase

The first time using Hadoop, MapReduce Job did not run Reduce Phase… here is a solution to the problem.

The first time using Hadoop, MapReduce Job did not run Reduce Phase

I wrote a simple map reduce job that reads data from DFS and runs a simple algorithm on it. While trying to debug it, I decided to simply let the mapper output a set of keys and values, and the reducer output a completely different set of keys and values. I run this job on a single-node Hadoop 20.2 cluster. After the job completes, the output contains only the value of the mapper output, which leads me to believe that the reducer is not running. I would appreciate any insight into why my code produces this output. I’ve tried setting outputKeyClass and outputValueClass to different things, and setMapOutputKeyClass and setMapOutputValueClass to different things. Currently commenting out the part of our code is the algorithm I’m running, but I’ve changed the map and reduce methods to simply output some value. Similarly, the output of the job contains only the values of the mapper output. This is the class I used to run the job :

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class CalculateHistogram {

public static class HistogramMap extends Mapper<LongWritable, Text, LongWritable, Text> {

private static final int R = 100;
        private int n = 0;

@Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            if (n == 0) {
                StringTokenizer tokens = new StringTokenizer(value.toString(), ",");
                int counter = 0;
                while (tokens.hasMoreTokens()) {
                    String token = tokens.nextToken();
                    if (tokens.hasMoreTokens()) {
                        context.write(new LongWritable(-2), new Text("HI"));
                        context.write(new LongWritable(counter), new Text(token));
                    }
                    counter++;
                    n++;
                }
            } else {
                n++;
                if (n == R) {
                    n = 0;
                }
                
}
        }
    }

public static class HistogramReduce extends Reducer<LongWritable, Text, LongWritable, HistogramBucket> {

private final static int R = 10;

public void reduce(LongWritable key, Iterator<Text> values, Context context)
                                            throws IOException, InterruptedException {
            if (key.toString().equals("-1")) {
                context.write(key, new HistogramBucket(key));
            }
            Text t = values.next();
            for (char c : t.toString().toCharArray()) {
                if (! Character.isDigit(c) && c != '.') {
                    context.write(key, new HistogramBucket(key));//if this isnt a numerical attribute we ignore it
                }
            }
            context.setStatus("Building Histogram");
            HistogramBucket i = new HistogramBucket(key);
            i.add(new DoubleWritable(Double.parseDouble(t.toString())));
            while (values.hasNext()) {
                for (int j = 0; j < R; j++) {
                    t = values.next();
                }
                if (!i.contains(Double.parseDouble(t.toString()))) {
                    context.setStatus("Writing a value to the Histogram");
                    i.add(new DoubleWritable(Double.parseDouble(t.toString())));
                }
            }
            
context.write(new LongWritable(55555555), new HistogramBucket(new LongWritable(55555555)));
        }
    }

public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }

Job job = new Job(conf, "MRDT - Generate Histogram");
        job.setJarByClass(CalculateHistogram.class);
        job.setMapperClass(HistogramMap.class);
        job.setReducerClass(HistogramReduce.class);

job.setOutputValueClass(HistogramBucket.class);
        
job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

Solution

The signature of your reduce method is wrong. Your method signature contains Iterator<Text> You must < Text> through Iterable

Your code does not override the method base class of Reduce Reducer. Therefore, the default implementation provided by Reducer uses the base class. This implementation is an identity function.

Use @Override annotations to predict errors like this.

Related Problems and Solutions