Java – MapReduce – reducer does not combine keys

MapReduce – reducer does not combine keys… here is a solution to the problem.

MapReduce – reducer does not combine keys

I

have a simple map reduce job where I’m building a reverse index.

My mapper works fine (I checked) and outputs key to word and docID:TFIDF values:

Mapper (show output only):

context.write(new IntWritable(wordIndex), new Text(index + ":" + tfidf));

The only job of the reducer is to merge these values. Here is my implementation:

public static class IndexerReducer extends Reducer<Text, IntWritable, IntWritable, Text>
    {
        public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException
        {

StringBuilder sb = new StringBuilder();

for (Text value : values)
            {
                sb.append(value.toString() + " ");
            }

context.write(key, new Text(sb.toString()));
        }
    }

However, it doesn’t combine anything, and the output looks basically the same as the mapper. Although the reducer should combine them together, there are some lines in the output that have the same key – basically all keys in the output file should be unique when using the reducer, right?

Here is an example of my reducer output (note that this is a simplified example):

1 15:2.1
1 13:4.3
2 9:9.3
2 43:7.9
etc

I expect this:

1 15:2.1 13:4.3
2 9:9.3 43:7.9

For completeness, I’ve included the run method:

@Override
    public int run(String[] arguments) throws Exception {
        ArgumentParser parser = new ArgumentParser("TextPreprocessor");

parser.addArgument("input", true, true, "specify input directory");
        parser.addArgument("output", true, true, "specify output directory");

parser.parseAndCheck(arguments);

Path inputPath = new Path(parser.getString("input"));
        Path outputDir = new Path(parser.getString("output"));

 Create configuration.
        Configuration conf = getConf();

 add distributed file with vocabulary
        DistributedCache
                .addCacheFile(new URI("/user/myslima3/vocab.txt"), conf);

 Create job.
        Job job = new Job(conf, "WordCount");
        job.setJarByClass(IndexerMapper.class);

 Setup MapReduce.
        job.setMapperClass(IndexerMapper.class);
        job.setReducerClass(IndexerReducer.class);

 Sort the output words in reversed order.
        job.setSortComparatorClass(WordCountComparator.class);

job.setNumReduceTasks(1);

 Specify (key, value).
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);

 Input.
        FileInputFormat.addInputPath(job, inputPath);
        job.setInputFormatClass(TextInputFormat.class);

 Output.
        FileOutputFormat.setOutputPath(job, outputDir);
        job.setOutputFormatClass(TextOutputFormat.class);

FileSystem hdfs = FileSystem.get(conf);

 Delete output directory (if exists).
        if (hdfs.exists(outputDir))
            hdfs.delete(outputDir, true);

 Execute the job.
        return job.waitForCompletion(true) ? 0 : 1;
    }

I would be happy if there were any hints about what was going on. I’m new to map reduction. Thanks for any debugging tips!

Solution

Always use @Override comments.

You define

public static class IndexerReducer extends Reducer<Text, IntWritable, IntWritable, Text>

Then your reduce method must look like that

@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException

Related Problems and Solutions