Hadoop multi-output with speculative execution
My task is to write avro output to multiple directories organized by several fields of the input record.
For example : Process records of countries across years and write in a directory structure of country/year eg: outputs/usa/2015/outputs_usa_2015.avro outputs/uk/2014/outputs_uk_2014.avro
AvroMultipleOutputs multipleOutputs=new AvroMultipleOutputs(context);
....
....
multipleOutputs.write("output", avroKey, NullWritable.get(),
OUTPUT_DIR + "/" + record.getCountry() + "/" + record.getYear() + "/outputs_" +record.getCountry()+"_"+ record.getYear());
The following code will use what output submitter to write the output. Is it unsafe to use with speculative execution?
By speculative execution, this results in (and may cause) org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException
In this article
Hadoop Reducer: How can I output to multiple directories using speculative execution?
Custom output submitters are recommended
The following code from hadoop AvroMultipleOutputs does not account for any problems with the speculative execution
private synchronized RecordWriter getRecordWriter(TaskAttemptContext taskContext,
String baseFileName) throws IOException, InterruptedException {
writer =
((OutputFormat) ReflectionUtils.newInstance(taskContext.getOutputFormatClass(),
taskContext.getConfiguration())).getRecordWriter(taskContext);
...
}
If the base output path is outside the jobs directory, the write method also does not log any issues
public void write(String namedOutput, Object key, Object value, String baseOutputPath)
Is there a real problem with AvroMultipleOutputs (other outputs) when writing outside the job directory?
If, then how do I rewrite AvroMultipleOutputs so that it has its own output committer. I don’t see any output format in AvroMultipleOutputs, it uses its output submitter
Solution
AvroMultipleOutputs
will use the OutputFormat
that you registered with the job configuration when you added named outputs, such as using the addNamedOutput
API from AvroMultipleOutputs
(for example
AvroKeyValueOutputFormat
)。
With AvroMultipleOutputs
, you may not be able to use the speculative task execution feature. Even covering it doesn’t help or it’s not simple.
Instead, you should write your own OutputFormat
(most likely extending one of the available Avro output formats, such as AvroKeyValueOutputFormat
) and override/implement its getRecordWriter
API, which will return a RecordWriter
The example says MainRecordWriter
(citation only).
This MainRecordWriter will maintain a map instance of RecordWriter
(such as AvroKeyValueRecordWriter
). Each of these
RecordWriter
instances will belong to one of the output files. In the write API MainRecordWriter
, you’ll get the actual RecordWriter
from the map instance (based on the record you want to write
) and write the record using this record writer. So MainRecordWriter
will just work as a wrapper example for multiple RecordWriters
.
For some similar implementations, you might like to look into MultiStorage’s code comes from piggybank's
class library.