Java – Parallelizing Ruby reducer in Hadoop?

Parallelizing Ruby reducer in Hadoop?… here is a solution to the problem.

Parallelizing Ruby reducer in Hadoop?

A simple wordcount reducer in Ruby looks like this:

#!/usr/bin/env ruby
wordcount = Hash.new
STDIN.each_line do |line|
keyval = line.split("|")
wordcount[keyval[0]] = wordcount[keyval[0]].to_i+keyval[1].to_i
end

wordcount.each_pair do |word,count|
puts "#{word}|#{count}"
end

It gets the intermediate values of all mappers in STDIN. Not from a specific key.
So there is really only one reducer (not a reducer per word or group of words).

However, in the Java example, I see this interface getting a list of keys and values as inout. This means that intermediate mapping values are keygrouped before reduced and reducers can run in parallel

public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
            public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
              int sum = 0;
              while (values.hasNext()) {
                sum += values.next().get();
              }
              output.collect(key, new IntWritable(sum));
            }
          }

Is this a feature unique to Java? Or can I use Ruby to do it with Hadoop Streaming?

Solution

Reducers will always run in parallel, regardless of whether you use streaming or not (if you don’t see this, verify that the job configuration is set to allow multiple reduce tasks – see mapred.reduce.tasks or job configuration in the cluster). The difference is that when you use Java instead of streaming, the framework packages something better for you.

For Java, the reduce task gets an iterator for all values for a particular key. This makes it easy to iterate over the values if you are suming the map output in the Reduce task. In streaming, you actually just get key-value convection. You are guaranteed that the values will be sorted by key and that for a given key, it will not be split in the reduce task, but any state tracking you need is up to you. For example, in Java, your map output appears as a symbol in your reducer

key1, {val1, val2, val3}
key2, {val7, val8}

By streaming, your output looks like

key1, val1
Key 1, value 2
key 1, value 3
Key 2, value 7
key2, val8

For example, to write a reducer that calculates the sum of values for each key, you need a variable to store the last key you see and a variable to store the sum. Each time you read a new key-value pair, you do the following:

  1. Check if the key is different from the previous key.
  2. If so, output your key and current sum and reset the sum to zero.
  3. Add the current

  4. value to your sum and set the last key as the current key.

HTH.

Related Problems and Solutions