Python – Produces both maximum and minimum values in a single mapreduce

Produces both maximum and minimum values in a single mapreduce… here is a solution to the problem.

Produces both maximum and minimum values in a single mapreduce

I’m a beginner and just started writing MapReduce programs in Python using the MRJob library.

An example in the video tutorial is to find the maximum temperature by location_id. It is also simple to write another program to find the lowest temperature by location_id.

I wonder, is there a way to generate the maximum and minimum temperatures via location_id in a single mapreduce program? Here’s how I did it:

from mrjob.job import MRJob

'''Sample Data
ITE00100554,18000101,TMAX,-75,,,E,
ITE00100554,18000101,TMIN,-148,,,E,
GM000010962,18000101,PRCP,0,,,E,
EZE00100082,18000101,TMAX,-86,,,E,
EZE00100082,18000101,TMIN,-135,,,E,
ITE00100554,18000102,TMAX,-60,,I,E,
ITE00100554,18000102,TMIN,-125,,,E,
GM000010962,18000102,PRCP,0,,,E,
EZE00100082,18000102,TMAX,-44,,,E, 

Output I am expecting to see:
ITE00100554  32.3  20.2
EZE00100082  34.4  19.6
'''

class MaxMinTemperature(MRJob):
    def mapper(self, _, line):
        location, datetime, measure, temperature, w, x, y, z = line.split(',')
        temperature = float(temperature)/10
        if measure == 'TMAX' or measure == 'TMIN':
            yield location, temperature

def reducer(self, location, temperatures):
        yield location, max(temperatures), min(temperatures)

if __name__ == '__main__':
    MaxMinTemperature.run()

I get the following error:

File "MaxMinTemperature.py", line 12, in reducer
yield location, max(temperatures), min(temperatures)
ValueError: min() arg is an empty sequence

Is this possible?

Thank you for your assistance.

Schiff

Solution

Reducer has two problems:

  1. If you check the type of temperature parameter, you will see that it is a generator. The generator can only be traversed once, so you cannot pass the same generator to both the “min” and “max” functions. The correct solution is to traverse it manually. The solution to the error – converting it to a list – can cause an out-of-memory error for input that is large enough because the list keeps all its elements in memory and the generator does not.

  2. The result of the reducer must be a binary group. Therefore, you need to combine the lowest and maximum temperatures into another tuple.

Complete working solution:

class MaxMinTemperature(MRJob):
    def mapper(self, _, line):
        location, datetime, measure, temperature, w, x, y, z = line.split(',')
        temperature = float(temperature)/10
        if measure in ('TMAX', 'TMIN'):
            yield location, temperature

def reducer(self, location, temperatures):
        min_temp = next(temperatures)
        max_temp = min_temp
        for item in temperatures:
            min_temp = min(item, min_temp)
            max_temp = max(item, max_temp)
        yield location, (min_temp, max_temp)

Related Problems and Solutions