Java – Two equal key combinations do not reach the same reducer

Two equal key combinations do not reach the same reducer… here is a solution to the problem.

Two equal key combinations do not reach the same reducer

I’m using the MapReduce framework to make Hadoop applications in Java.

I only use text keys and values for inputs and outputs. Before reducing to the final output, I use the combiner to perform additional computational steps.

But the problem I’m having is that the key doesn’t go to the same reducer.
I created and added a key/value pair like this in the combiner:

public static class Step4Combiner extends Reducer<Text,Text,Text,Text> {
    private static Text key0 = new Text();
    private static Text key1 = new Text();

public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                key0.set("KeyOne");
                key1.set("KeyTwo");
                context.write(key0, new Text("some value"));
                context.write(key1, new Text("some other value"));
        }

}   

public static class Step4Reducer extends Reducer<Text,Text,Text,Text> {

public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                System.out.print("Key:" + key.toString() + " Value: ");
                String theOutput = "";
                for (Text val : values) {
                    System.out.print("," + val);
                }
                System.out.print("\n");

context.write(key, new Text(theOutput));
            }

}

That’s how I mostly created the work:

Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

Job job4 = new Job(conf, "Step 4");
job4.setJarByClass(Step4.class);

job4.setMapperClass(Step4.Step4Mapper.class);
job4.setCombinerClass(Step4.Step4Combiner.class);
job4.setReducerClass(Step4.Step4Reducer.class);

job4.setInputFormatClass(TextInputFormat.class);
job4.setOutputKeyClass(Text.class);
job4.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job4, new Path(outputPath));
FileOutputFormat.setOutputPath(job4, new Path(finalOutputPath));            

System.exit(job4.waitForCompletion(true) ? 0 : 1);

The standard output printed from the reducer looks like this:

Key:KeyOne Value: ,some value
Key:KeyTwo Value: ,some other value
Key:KeyOne Value: ,some value
Key:KeyTwo Value: ,some other value
Key:KeyOne Value: ,some value
Key:KeyTwo Value: ,some other value

This doesn’t make sense because the key is the same, so it should be 2 reducers with 3 identical values in its Iterable

Hope you can help me get it out 🙂

Solution

This is most likely because your combiner is running in both the map and reduce phases (a little-known “feature”).

Basically, you are modifying the keys in the

combiner, and the combiner may or may not run when the mapping outputs are merged together in the reducer. After the combiner runs (reduce side), the keys are fed through the packet comparator to determine which values returned to Iterable are passed to the reduce method (I’m bypassing the stream aspect of the reduce phase here – iterable is not supported through a set of values or a list of values, and more calls to iterator().next() return true if the grouping comparer determines that the current key and the last key are the same).

You can try to detect the current combiner stage (map or subtract) by checking the context (there is a Context.getTaskAttempt().isMap() method, but I have some memory this is also problematic, maybe even somewhere there is a JIRA ticket about this).

Most importantly, do not modify the key in the combiner unless you can find a way to bypass this behavior if the compositor is running the reduce side.

Edit
So, investigating @Amar comments, I put together some code (pastebin link), which added some lengthy comparators, combiners, reducers, etc. If you run a single mapping job, no compositor will run during the scale-down phase, and the map output will not be sorted again because it is already assumed to have been sorted.

Suppose it’s sorted before being sent to the combiner class, and it assumes that the key will come out intact — and therefore still sorted. Remember that combiners are designed to combine the values of a given key.

Therefore, for a single map and a given combiner, the reducer looks at the keys in KeyOne, KeyTwo, KeyOne, KeyTwo, KeyOne order. The packet comparer sees the conversion between them, so you call the reduce function 6 times

If you use two mappers, the reducer knows that it has two sorted segments (one per map), so they still need to be sorted before shrinking – but because the number of segments is below the threshold, sorting is done as an inline flow sort (again assuming the segments are sorted). You are still the error output of both mappers (10 record output of the reduce phase).

Again, do not modify the key in the combiner

, this is not the purpose of the combiner.

Related Problems and Solutions