Java – Handles large output values of the reduce step in Hadoop

Handles large output values of the reduce step in Hadoop… here is a solution to the problem.

Handles large output values of the reduce step in Hadoop

In the Reduce phase of my MapReduce program, the only thing I’m doing is connecting each value in the Iterator provided by the connection, as follows:

public void reduce(Text key, Iterator<text> values,
                    OutputCollector<Text, Text> output, Reporter reporter) {
    Text next;
    Text outKey = new Text()
    Text outVal = new Text();
    StringBuilder sb = new StringBuilder();
    while(values.hasNext()) {
        next = values.next();
        sb.append(next.toString());
        if (values.hasNext())
            sb.append(',');
    }
    outKey.set(key.toString());
    outVal.set(sb.toSTring());
    output.collect(outKey,outVal);
}

My problem is that some reduce output values are huge lines of text; So large that even if the initial size is very large, the string buffer must increase (double) its size several times to accommodate all the contexts of the iterator, causing memory problems.

In traditional Java applications, this suggests that buffering a write file is the preferred method for writing output. How to handle jumbo output key-value pairs in Hadoop? Should I stream the results directly to a file on HDFS (one file per reduce call)? Are there any other ways to buffer the output besides the output.collect method?

Note: I have maximized my memory/heap size. Also, some sources suggest that increasing the number of reducers helps solve the memory/heap issue, but the problem here goes straight back to SringBuilder’s use when scaling its capacity.

Thanks

Solution

It’s not that I understand why you want to have a huge value, but there’s a way to do that.

If you write

your own OutputFormat, you can fix the behavior of the RecordWriter.write(Key, Value) method to handle concatenation of values based on whether the Key value is null.

In this way, in your reducer, you can write code like this (the first output of the key is the actual key, and everything after that is an empty key:

).

public void reduce(Text key, Iterator<Text> values,
                OutputCollector<Text, Text> output, Reporter reporter) {
  boolean firstKey = true;
  for (Text value : values) {
    output.collect(firstKey ? key : null, value);
    firstKey = false;
  }
}

The actual RecordWriter.write() then has the following logic to handle the empty key/value join logic:

    public synchronized void write(K key, V value) throws IOException {

boolean nullKey = key == null || key instanceof NullWritable;
        boolean nullValue = value == null || value instanceof NullWritable;
        if (nullKey && nullValue) {
            return;
        }

if (!nullKey) {
             if we've written data before, append a new line
            if (dataWritten) {
                out.write(newline);
            }

 write out the key and separator
            writeObject(key);
            out.write(keyValueSeparator);
        } else if (!nullValue) {
             write out the value delimiter
            out.write(valueDelimiter);
        }

 write out the value
        writeObject(value);

 track that we've written some data
        dataWritten = true;
    }

public synchronized void close(Reporter reporter) throws IOException {
         if we've written out any data, append a closing newline
        if (dataWritten) {
            out.write(newline);
        }

out.close();
    }

You’ll notice that the close method has also been modified to write a trailing newline character in the last written record

The full code list can be found on pastebin, which is the test output:

key1    value1
key2    value1,value2,value3
key3    value1,value2

Related Problems and Solutions