Python – Hadoop returns fewer results than expected

Hadoop returns fewer results than expected… here is a solution to the problem.

Hadoop returns fewer results than expected

I

have two python scripts, a mapper and a reducer (basically reducer just prints anything at this point), while locally I get 4 results – strings
On Hadoop I get 3. How does this work?

I use Amazon Elastic Map Reduce and Hadoop

Mapper .py

#!/usr/bin/env python

import sys
import re
import os
# Constants declaration

WINDOW = 10
OVERLAP = 4
START_POSITION = 0
END_POSITION = 0

# regular expressions

pattern = re.compile("[a-z]*", re. IGNORECASE)

a_to_f_pattern = re.compile("[a-f]", re. IGNORECASE)
g_to_l_pattern = re.compile("[g-l]", re. IGNORECASE)
m_to_r_pattern = re.compile("[m-r]", re. IGNORECASE)
s_to_z_pattern = re.compile("[s-z]", re. IGNORECASE)

# variables initialization

converted_word = ""
next_word = ""
new_character = ""
filename = ""
prev_filename = ""
i = 0

# Read pairs as lines of input from STDIN
for line in sys.stdin:

line.strip()

filename = os.environ['mapreduce_map_input_file']
    filename = filename.replace("s3://source123/input/","")

# check if its a new file, and reset start position
    if filename != prev_filename:

START_POSITION = 0
        next_word = ""
        converted_word = ""
        prev_filename = filename

# loop through every word that matches the pattern
    for word in pattern.findall(line):

new_character = convert(word)
                converted_word = converted_word + new_character

if len(converted_word) > (WINDOW - OVERLAP):
                    next_word = next_word + new_character

# print "word= ", word
                # print "converted_word= ", converted_word
            else:

END_POSITION = START_POSITION + (len(converted_word) - 1)

print converted_word + "," + str(filename) + "," + str(START_POSITION) + "," + str(END_POSITION)

START_POSITION = START_POSITION + (WINDOW - OVERLAP)
                new_character = convert(word)
                converted_word = next_word + new_character

Logs

2016-04-27 19:58:41,293 INFO com.amazon.ws.emr.hadoop.fs.EmrFileSystem (main): Consistency disabled, using com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem as filesystem implementation
2016-04-27 19:58:41,512 INFO amazon.emr.metrics.MetricsSaver (main): MetricsConfigRecord disabledInCluster: false instanceEngineCycleSec: 60 clusterEngineCycleSec: 60 disableClusterEngine: true maxMemoryMb: 3072 maxInstanceCount: 500 lastModified: 1461784308237 
2016-04-27 19:58:41,512 INFO amazon.emr.metrics.MetricsSaver (main): Created MetricsSaver j-KCDMFZJGYO89:i-995f5a41:RunJar:16480 period:60 /mnt/var/em/raw/i-995f5a41_20160427_ RunJar_16480_raw.bin
2016-04-27 19:58:43,477 INFO org.apache.hadoop.yarn.client.RMProxy (main): Connecting to ResourceManager at ip-172-31-38-52.us-west-2.compute.internal/172.31.38.52:8032
2016-04-27 19:58:43,673 INFO org.apache.hadoop.yarn.client.RMProxy (main): Connecting to ResourceManager at ip-172-31-38-52.us-west-2.compute.internal/172.31.38.52:8032
2016-04-27 19:58:44,156 INFO com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem (main): Opening 's3://source123/mapper.py' for reading
2016-04-27 19:58:44,267 INFO amazon.emr.metrics.MetricsSaver (main): Thread 1 created MetricsLockFreeSaver 1
2016-04-27 19:58:44,439 INFO com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem (main): Opening 's3://source123/source_reducer.py' for reading
2016-04-27 19:58:44,628 INFO com.hadoop.compression.lzo.GPLNativeCodeLoader (main): Loaded native gpl library
2016-04-27 19:58:44,630 INFO com.hadoop.compression.lzo.LzoCodec (main): Successfully loaded & initialized native-lzo library [hadoop-lzo rev 426d94a07125cf9447bb0c2b336cf10b4c254375]
2016-04-27 19:58:45,046 INFO com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem (main): listStatus s3://source123/input with recursive false
2016-04-27 19:58:45,265 INFO org.apache.hadoop.mapred.FileInputFormat (main): Total input paths to process : 1
2016-04-27 19:58:45,336 INFO org.apache.hadoop.mapreduce.JobSubmitter (main): number of splits:9
2016-04-27 19:58:45,565 INFO org.apache.hadoop.mapreduce.JobSubmitter (main): Submitting tokens for job: job_1461784297295_0004
2016-04-27 19:58:45,710 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl (main): Submitted application application_1461784297295_0004
2016-04-27 19:58:45,743 INFO org.apache.hadoop.mapreduce.Job (main): The url to track the job: http://ip-172-31-38-52.us-west-2.compute.internal:20888/proxy/application_1461784297295_0004/
2016-04-27 19:58:45,744 INFO org.apache.hadoop.mapreduce.Job (main): Running job: job_1461784297295_0004
2016-04-27 19:58:53,876 INFO org.apache.hadoop.mapreduce.Job (main): Job job_1461784297295_0004 running in uber mode : false
2016-04-27 19:58:53,877 INFO org.apache.hadoop.mapreduce.Job (main):  map 0% reduce 0%
2016-04-27 19:59:11,063 INFO org.apache.hadoop.mapreduce.Job (main):  map 11% reduce 0%
2016-04-27 19:59:14,081 INFO org.apache.hadoop.mapreduce.Job (main):  map 22% reduce 0%
2016-04-27 19:59:16,094 INFO org.apache.hadoop.mapreduce.Job (main):  map 33% reduce 0%
2016-04-27 19:59:18,106 INFO org.apache.hadoop.mapreduce.Job (main):  map 56% reduce 0%
2016-04-27 19:59:19,114 INFO org.apache.hadoop.mapreduce.Job (main):  map 67% reduce 0%
2016-04-27 19:59:26,159 INFO org.apache.hadoop.mapreduce.Job (main):  map 78% reduce 0%
2016-04-27 19:59:29,178 INFO org.apache.hadoop.mapreduce.Job (main):  map 89% reduce 0%
2016-04-27 19:59:30,184 INFO org.apache.hadoop.mapreduce.Job (main):  map 100% reduce 0%
2016-04-27 19:59:32,196 INFO org.apache.hadoop.mapreduce.Job (main):  map 100% reduce 33%
2016-04-27 19:59:34,207 INFO org.apache.hadoop.mapreduce.Job (main):  map 100% reduce 67%
2016-04-27 19:59:38,228 INFO org.apache.hadoop.mapreduce.Job (main):  map 100% reduce 100%
2016-04-27 19:59:40,246 INFO org.apache.hadoop.mapreduce.Job (main): Job job_1461784297295_0004 completed successfully
2016-04-27 19:59:40,409 INFO org.apache.hadoop.mapreduce.Job (main): Counters: 55
    File System Counters
        FILE: Number of bytes read=190
        FILE: Number of bytes written=1541379
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=873
        HDFS: Number of bytes written=0
        HDFS: Number of read operations=9
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=0
        S3: Number of bytes read=864
        S3: Number of bytes written=130
        S3: Number of read operations=0
        S3: Number of large read operations=0
        S3: Number of write operations=0
    Job Counters 
        Killed map tasks=1
        Launched map tasks=9
        Launched reduce tasks=3
        Data-local map tasks=9
        Total time spent by all maps in occupied slots (ms)=6351210
        Total time spent by all reduces in occupied slots (ms)=2449170
        Total time spent by all map tasks (ms)=141138
        Total time spent by all reduce tasks (ms)=27213
        Total vcore-milliseconds taken by all map tasks=141138
        Total vcore-milliseconds taken by all reduce tasks=27213
        Total megabyte-milliseconds taken by all map tasks=203238720
        Total megabyte-milliseconds taken by all reduce tasks=78373440
    Map-Reduce Framework
        Map input records=5
        Map output records=3
        Map output bytes=124
        Map output materialized bytes=562
        Input split bytes=873
        Combine input records=0
        Combine output records=0
        Reduce input groups=3
        Reduce shuffle bytes=562
        Reduce input records=3
        Reduce output records=6
        Spilled Records=6
        Shuffled Maps =27
        Failed Shuffles=0
        Merged Map outputs=27
        GC time elapsed (ms)=2785
        CPU time spent (ms)=11670
        Physical memory (bytes) snapshot=5282500608
        Virtual memory (bytes) snapshot=28472725504
        Total committed heap usage (bytes)=5977407488
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=864
    File Output Format Counters 
        Bytes Written=130
2016-04-27 19:59:40,409 INFO org.apache.hadoop.streaming.StreamJob (main): Output directory: s3://source123/output/

Solution

The mapper task converts its input into rows and provides the rows to the standard input of the process.

In this case, you have multiple input files and you assume that all lines from different files are entered sequentially (i.e. file by file), but they are most likely processed in parallel, so the mapper (fetching several input files) may reset its counters more times than expected for sequential distribution.

Related Problems and Solutions