hadoop mapreduce : handling a text file with a header
I’m playing and learning hadoop MapReduce.
I’m trying to map a VCF file from ( http://en.wikipedia.org/wiki/Variant_Call_Format data: A VCF is a tab-delimited file that starts with a (possibly large) heading. This header is required to get the semantics of the record in the body.
I want to create a mapper that uses this data. The header must be accessible from this Mapper to decode the line.
From http://jayunit100.blogspot.fr/2013/07/hadoop-processing-headers-in-mappers.html, I created this InputFormat with a custom reader:
public static class VcfInputFormat extends FileInputFormat<LongWritable, Text>
{
/* the VCF header is stored here */
private List<String> headerLines=new ArrayList<String>();
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException,
InterruptedException {
return new VcfRecordReader();
}
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
private class VcfRecordReader extends LineRecordReader
{
/* reads all lines starting with '#' */
@Override
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
super.initialize(genericSplit, context);
List<String> headerLines=new ArrayList<String>();
while( super.nextKeyValue())
{
String row = super.getCurrentValue().toString();
if(!row.startsWith("#")) throw new IOException("Bad VCF header");
headerLines.add(row);
if(row.startsWith("#CHROM")) break;
}
}
}
}
Now, in Mapper, is there a way to get a pointer to VcfInputFormat.this.headerLines
to decode the row?
public static class VcfMapper
extends Mapper<LongWritable, Text, Text, IntWritable>{
public void map(LongWritable key, Text value, Context context ) throws IOException, InterruptedException {
my. VcfCodec codec=new my. VcfCodec(???????. headerLines);
my. Variant variant =codec.decode(value.toString());
//(....)
}
}
Solution
I think your situation is different from the example you linked to. In this case, header is used in a custom RecordReader
class to provide a single “current value,” which is a single line of all filter words and passed to the mapper. However, in your case, you want to use header information outside of RecordReader
, i.e. in your mapper, which is not possible.
I also think you can mimic the behavior of the linked example by providing the processed information: by reading headers, storing them, and then getting the current value, your mapper can receive a my. VcfCodec
object instead of Text
object (i.e. the getCurrentValue
method returns a my. VcfCodec
object). Your mapper may look similar to….
public static class VcfMapper extends Mapper<LongWritable, my. VcfCodec, Text, IntWritable>{
public void map(LongWritable key, my. VcfCodec value, Context context ) throws IOException, InterruptedException {
whatever you may want to do with the encoded data...
}