Python – Process LZO sequence files with mrjob

Process LZO sequence files with mrjob… here is a solution to the problem.

Process LZO sequence files with mrjob

I’m writing a task with mrjob that uses Google Ngrams data to calculate various statistics: https://aws.amazon.com/datasets/8172056142375670

I developed and tested my script locally using a tab-delimited subset of uncompressed data in text. This error occurs after trying to run the job:

Traceback (most recent call last):
  File "ngram_counts.py", line 74, in <module>
    MRNGramCounts.run()
  File "/usr/lib/python2.6/dist-packages/mrjob/job.py", line 500, in run
    mr_job.execute()
  File "/usr/lib/python2.6/dist-packages/mrjob/job.py", line 509, in execute
    self.run_mapper(self.options.step_num)
  File "/usr/lib/python2.6/dist-packages/mrjob/job.py", line 574, in run_mapper
    for out_key, out_value in mapper(key, value) or ():
  File "ngram_counts.py", line 51, in mapper
    (ngram, year, _mc, _pc, _vc) = line.split('\t')
ValueError: need more than 2 values to unpack
(while reading from s3://datasets.elasticmapreduce/ngrams/books/20090715/eng-1M/5gram/data)

Presumably this is because of the compression scheme of the public dataset (from the URL link above):

We store the datasets in a single object in Amazon S3. The file is in
sequence file format with block level LZO compression. The sequence
file key is the row number of the dataset stored as a LongWritable and
the value is the raw data stored as TextWritable.

Any guidance on how to set up a workflow that can handle these files? I’ve searched exhaustively for tips but haven’t found anything useful….

(I’m a relative of mrjob and Hadoop.) )

Solution

I finally understood. It looks like EMR handles LZO compression for you, but for sequence file formats, you need to add the following HADOOP_INPUT_FORMAT fields to your MRJob class:

class MyMRJob(MRJob):

HADOOP_INPUT_FORMAT = 'org.apache.hadoop.mapred.SequenceFileAsTextInputFormat'

def mapper(self, _, line):
        # mapper code...

def reducer(self, key, value):
        # reducer code...

There is another trap (referenced from the AWS hosted Google NGrams page):

The sequence file key is the row number of the dataset stored as a LongWritable and the value is the raw data stored as TextWritable.

This means that each row has an additional Long + TAB prefix, so any row parsing you do in the mapper method needs to take prefix information into account.

Related Problems and Solutions