Java – Hadoop multi-output with speculative execution

Hadoop multi-output with speculative execution… here is a solution to the problem.

Hadoop multi-output with speculative execution

My task is to write avro output to multiple directories organized by several fields of the input record.

For example : 
Process records of countries across years 
and write in a directory structure of country/year 
eg:
outputs/usa/2015/outputs_usa_2015.avro 
outputs/uk/2014/outputs_uk_2014.avro
AvroMultipleOutputs multipleOutputs=new AvroMultipleOutputs(context);
....
....
     multipleOutputs.write("output", avroKey, NullWritable.get(), 
            OUTPUT_DIR + "/" + record.getCountry() + "/" + record.getYear() + "/outputs_" +record.getCountry()+"_"+ record.getYear());

The following code will use what output submitter to write the output. Is it unsafe to use with speculative execution?
By speculative execution, this results in (and may cause) org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException

In this article
Hadoop Reducer: How can I output to multiple directories using speculative execution?
Custom output submitters are recommended

The following code from hadoop AvroMultipleOutputs does not account for any problems with the speculative execution

 private synchronized RecordWriter getRecordWriter(TaskAttemptContext taskContext,
          String baseFileName) throws IOException, InterruptedException {

writer =
                ((OutputFormat) ReflectionUtils.newInstance(taskContext.getOutputFormatClass(),
                    taskContext.getConfiguration())).getRecordWriter(taskContext);
...
}

If the base output path is outside the jobs directory, the write method also does not log any issues

public void write(String namedOutput, Object key, Object value, String baseOutputPath)

Is there a real problem with AvroMultipleOutputs (other outputs) when writing outside the job directory?
If, then how do I rewrite AvroMultipleOutputs so that it has its own output committer. I don’t see any output format in AvroMultipleOutputs, it uses its output submitter

Solution

AvroMultipleOutputs

will use the OutputFormat that you registered with the job configuration when you added named outputs, such as using the addNamedOutput API from AvroMultipleOutputs (for example AvroKeyValueOutputFormat )。

With AvroMultipleOutputs, you may not be able to use the speculative task execution feature. Even covering it doesn’t help or it’s not simple.

Instead, you should write your own OutputFormat (most likely extending one of the available Avro output formats, such as AvroKeyValueOutputFormat) and override/implement its getRecordWriter API, which will return a RecordWriter The example says MainRecordWriter (citation only).

This MainRecordWriter will maintain a map instance of RecordWriter (such as AvroKeyValueRecordWriter). Each of these RecordWriter instances will belong to one of the output files. In the write API MainRecordWriter, you’ll get the actual RecordWriter from the map instance (based on the record you want to write) and write the record using this record writer. So MainRecordWriter will just work as a wrapper example for multiple RecordWriters.

For some similar implementations, you might like to look into MultiStorage’s code comes from piggybank's class library.

Related Problems and Solutions