Java – hadoop mapreduce : handling a text file with a header

hadoop mapreduce : handling a text file with a header… here is a solution to the problem.

hadoop mapreduce : handling a text file with a header

I’m playing and learning hadoop MapReduce.

I’m trying to map a VCF file from ( http://en.wikipedia.org/wiki/Variant_Call_Format data: A VCF is a tab-delimited file that starts with a (possibly large) heading. This header is required to get the semantics of the record in the body.

http://wiki.bits.vib.be/index.php/NGS_Exercise.5

I want to create a mapper that uses this data. The header must be accessible from this Mapper to decode the line.

From http://jayunit100.blogspot.fr/2013/07/hadoop-processing-headers-in-mappers.html, I created this InputFormat with a custom reader:

  public static class VcfInputFormat extends FileInputFormat<LongWritable, Text>
    {
    /* the VCF header is stored here */
    private List<String> headerLines=new ArrayList<String>();

@Override
    public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,
            TaskAttemptContext context) throws IOException,
            InterruptedException {
        return new VcfRecordReader();
        }  
    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
        }

private class VcfRecordReader extends LineRecordReader
        {
        /* reads all lines starting with '#' */
         @Override
        public void initialize(InputSplit genericSplit,
                TaskAttemptContext context) throws IOException {
            super.initialize(genericSplit, context);
            List<String> headerLines=new ArrayList<String>();
            while( super.nextKeyValue())
                {
                String row = super.getCurrentValue().toString();
                if(!row.startsWith("#")) throw new IOException("Bad VCF header");
                headerLines.add(row);
                if(row.startsWith("#CHROM")) break;
                }
            }
        }
    }

Now, in Mapper, is there a way to get a pointer to VcfInputFormat.this.headerLines to decode the row?

  public static class VcfMapper
       extends Mapper<LongWritable, Text, Text, IntWritable>{

public void map(LongWritable key, Text value, Context context ) throws IOException, InterruptedException {
      my. VcfCodec codec=new my. VcfCodec(???????. headerLines);
      my. Variant variant =codec.decode(value.toString());
      //(....)
    }
  }

Solution

I think your situation is different from the example you linked to. In this case, header is used in a custom RecordReader class to provide a single “current value,” which is a single line of all filter words and passed to the mapper. However, in your case, you want to use header information outside of RecordReader, i.e. in your mapper, which is not possible.

I also think you can mimic the behavior of the linked example by providing the processed information: by reading headers, storing them, and then getting the current value, your mapper can receive a my. VcfCodec object instead of Text object (i.e. the getCurrentValue method returns a my. VcfCodec object). Your mapper may look similar to….

public static class VcfMapper extends Mapper<LongWritable, my. VcfCodec, Text, IntWritable>{
    public void map(LongWritable key, my. VcfCodec value, Context context ) throws IOException, InterruptedException {
         whatever you may want to do with the encoded data...
}

Related Problems and Solutions