Python – How do I write the output of an EMR streaming job to HDFS?

How do I write the output of an EMR streaming job to HDFS?… here is a solution to the problem.

How do I write the output of an EMR streaming job to HDFS?

I see The people at examples write EMR output to HDFS, but I can’t find an example of how it’s done. Most importantly, this documentation seems to say The –output parameter for the EMR streaming job must be an S3 bucket.

When I actually try to run the script (in this case, using a python stream and mrJob), it throws an “invalid S3 URI” error.

The command is as follows:

python my_script.py -r emr \
 --emr-job-flow-id=j-JOBID --conf-path=./mrjob.conf --no-output \
 --output hdfs:///my-output \
 hdfs:///my-input-directory/my-files*.gz

And retrospective….

Traceback (most recent call last):
  File "pipes/sampler.py", line 28, in <module>
    SamplerJob.run()
  File "/Library/Python/2.7/site-packages/mrjob/job.py", line 483, in run
    mr_job.execute()
  File "/Library/Python/2.7/site-packages/mrjob/job.py", line 501, in execute
    super(MRJob, self).execute()
  File "/Library/Python/2.7/site-packages/mrjob/launch.py", line 146, in execute
    self.run_job()
  File "/Library/Python/2.7/site-packages/mrjob/launch.py", line 206, in run_job
    with self.make_runner() as runner:
  File "/Library/Python/2.7/site-packages/mrjob/job.py", line 524, in make_runner
    return super(MRJob, self).make_runner()
  File "/Library/Python/2.7/site-packages/mrjob/launch.py", line 161, in make_runner
    return EMRJobRunner(**self.emr_job_runner_kwargs())
  File "/Library/Python/2.7/site-packages/mrjob/emr.py", line 585, in __init__
    self._output_dir = self._check_and_fix_s3_dir(self._output_dir)
  File "/Library/Python/2.7/site-packages/mrjob/emr.py", line 776, in _check_and_fix_s3_dir
    raise ValueError('Invalid S3 URI: %r' % s3_uri)
ValueError: Invalid S3 URI: 'hdfs:///input/sample'

How do I write the output of an EMR streaming job to HDFS? Is it possible?

Solution

I’m not sure how to do it with mrJob, but using hadoop and streaming jobs written in java, we do this:

  1. Start the cluster
  2. Use s3distcp from s3 Obtain data to the HDFS of the cluster
  3. Use HDFS input to perform step 1 of our job
  4. Perform step 2 or our job with the same inputs as above

Using the EMR CLI, we do this:

> export jobflow=$(elastic-mapreduce --create --alive --plain-output
> --master-instance-type m1.small --slave-instance-type m1.xlarge --num-instances 21 --name "Custer Name" --bootstrap-action s3://elasticmapreduce/bootstrap-actions/ configure-hadoop --args
> "--mapred-config-file,s3://myBucket/conf/custom-mapred-config-file.xml")
> 
> 
> elastic-mapreduce -j $jobflow --jar
> s3://us-east-1.elasticmapreduce/libs/s3distcp/1.latest/s3distcp.jar
> --arg --src --arg 's3://myBucket/input/' --arg --dest --arg 'hdfs:///input'
> 
> elastic-mapreduce --jobflow $jobflow --jar s3://myBucket/bin/step1.jar
> --arg hdfs:///input --arg hdfs:///output-step1 --step-name "Step 1"
> 
> elastic-mapreduce --jobflow $jobflow --jar s3://myBucket/bin/step2.jar
> --arg hdfs:///input,hdfs:///output-step1 --arg s3://myBucket/output/ --step-name "Step 2"

Related Problems and Solutions