Java – Hadoop MapReduce : Strange Result when Storing Previous Value in Memory in a Reduce Class (Java)

Hadoop MapReduce : Strange Result when Storing Previous Value in Memory in a Reduce Class (Java)… here is a solution to the problem.

Hadoop MapReduce : Strange Result when Storing Previous Value in Memory in a Reduce Class (Java)

If I want to store the current value of the iterator to compare with the next value of the iterator in the Reduce method, Hadoop requires me to clone it instead of simply assigning its reference to a temporary variable.

I’m going to publish the code to my reducer.

You will see two parts:

  1. The main method tested in Eclipse
  2. The

  3. reduce method executed in Hadoop

You’ll notice that these two lines of code are identical, except for the following:

  1. The main method gets Iterator from the ArrayList I hard-coded into it, while the reduce method gets Iterator from the mapper method.
  2. The main method certainly does not execute context.write.

This is code that is almost shared by both:

MMI currentMMI = null;
MMI previousMMI = null;
UltraAggregation currentAggregation = null;

while (values.hasNext()) {
    currentMMI = values.next();
    if (currentAggregation == null) {
        currentAggregation = new UltraAggregation(currentMMI);
    }
    if (previousMMI == null) {
        previousMMI = new MMI(currentMMI);
        previousMMI = currentMMI;
        continue;
    }
    System.out.println();
    System.out.println("currentMMI = " + currentMMI);
    System.out.println("previousMMI = " + previousMMI);
    System.out.println("equals? " + currentMMI.equals(previousMMI));
    System.out.println("==? " + (currentMMI == previousMMI));
    System.out.println();

 Business logic goes here and involves a context.write on certain conditions

previousMMI = currentMMI;
}
final context.write

You’ll notice that at the end of each loop, I set the reference to the MMI (“currentMMI”) I just used to the object variable “previousMMI”. Then, in the next loop, I set the reference to next() to currentMMI. When I execute my primary method in Eclipse, the following query evaluates to false as expected:

currentMMI == previousMMI;
currentMMI.equals(previousMMI);  

However, when executed in Hadoop, currentMMI and previousMMI always evaluate to true for the following two queries:

currentMMI == previousMMI;
currentMMI.equals(previousMMI);

They only evaluate to false when I change the previousMMI = currentMMI

lines to previousMMI = new MMI(currentMMI). (I made constructors of MMI classes, essentially shallow clones of incoming parameters).

Why do I have to clone instead of set the reference when using a reducer in Hadoop instead of in the main method?

I’m going to copy-paste the reducer class now, which has two parts: the main method for eclipse testing and the reduce method actually used in Hadoop.

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import com.cisco.webex.hadoop.ultrautility.models.MMI;
import com.cisco.webex.hadoop.ultrautility.models.UltraAggregation;

public class MMIReducer extends Reducer<Text, MMI, Object, UltraAggregation> {
    public static void main(String[] args) {
        ArrayList<MMI> mmis = new ArrayList<MMI>();
        mmis.add(new MMI("961864,1,1,1,D1,10,0,2013-08-02 06:00:00.0,USA,N,N"));
        mmis.add(new MMI("961865,1,1,1,D1,10,1,2013-08-02 07:00:00.0,USA,N,N"));
        mmis.add(new MMI("961866,1,1,1,D1,10,2,2013-08-02 08:00:00.0,USA,N,N"));
        mmis.add(new MMI("961867,1,1,1,D1,10,3,2013-08-02 09:00:00.0,USA,N,N"));
        mmis.add(new MMI("961868,1,1,1,D1,10,4,2013-08-02 10:00:00.0,USA,N,N"));
        mmis.add(new MMI("961869,1,1,1,D1,10,5,2013-08-02 11:00:00.0,USA,N,N"));
        mmis.add(new MMI("961870,1,1,1,D1,10,6,2013-08-02 12:00:00.0,USA,N,N"));
        mmis.add(new MMI("961871,1,1,1,D1,10,7,2013-08-02 13:00:00.0,USA,N,N"));
        mmis.add(new MMI("961872,1,1,1,D1,10,8,2013-08-02 14:00:00.0,USA,N,N"));
        mmis.add(new MMI("961873,1,1,1,D1,10,9,2013-08-02 15:00:00.0,USA,N,N"));

Iterator<MMI> values = mmis.iterator();

MMI currentMMI = null;
        MMI previousMMI = null;
        UltraAggregation currentAggregation = null;

while (values.hasNext()) {
            currentMMI = values.next();
            if (currentAggregation == null) {
                currentAggregation = new UltraAggregation(currentMMI);
            }
            if (previousMMI == null) {
                previousMMI = new MMI(currentMMI);
                previousMMI = currentMMI;
                continue;
            }
            System.out.println();
            System.out.println("currentMMI = " + currentMMI);
            System.out.println("previousMMI = " + previousMMI);
            System.out.println("equals? " + currentMMI.equals(previousMMI));
            System.out.println("==? " + (currentMMI == previousMMI));
            System.out.println();

 Business logic goes here and involves a context.write on certain conditions

previousMMI = new MMI(currentMMI);
            /*
            * THIS DOESNT CAUSE LOGIC ERRORS IN MAIN METHOD
            */
            previousMMI = currentMMI;
        }
        context.write(null, currentAggregation);
    }

@Override
    public void reduce(Text key, Iterable<MMI> vals, Context context) throws IOException, InterruptedException {
        Iterator<MMI> values = vals.iterator();

key = deviceId
        MMI currentMMI = null;
        MMI previousMMI = null;
        UltraAggregation currentAggregation = null;

while (values.hasNext()) {
            currentMMI = values.next();
            if (currentAggregation == null) {
                currentAggregation = new UltraAggregation(currentMMI);
            }
            if (previousMMI == null) {
                System.out.println("PreviousMMI is null, setting previousMMI to current MMI and continuing");
                previousMMI = new MMI(currentMMI);
                previousMMI = currentMMI;
                continue;
            }
            System.out.println();
            System.out.println("currentMMI = " + currentMMI);
            System.out.println("previousMMI = " + previousMMI);
            System.out.println("equals? " + currentMMI.equals(previousMMI));
            System.out.println("==? " + (currentMMI == previousMMI));
            System.out.println();

 Business logic goes here and involves a context.write on certain conditions

previousMMI = new MMI(currentMMI); Acts as intended
            /*
            * THIS CAUSES ERRORS WHEN EXECUTED THROUGH HADOOP
            */
            previousMMI = currentMMI;  Causes errors
        }
        context.write(null, currentAggregation);
    }
}

This is the result of truncation from stdout when I execute the main method with a static value in eclipse:

currentMMI = Device Id|D1; Entitlement Tag|10;Device Time| Fri Aug 02 07:00:00 PDT 2013; Uptime|1.0
previousMMI = Device Id|D1; Entitlement Tag|10;Device Time| Fri Aug 02 06:00:00 PDT 2013; Uptime|0.0
equals? false
==? false

currentMMI = Device Id|D1; Entitlement Tag|10;Device Time| Fri Aug 02 08:00:00 PDT 2013; Uptime|2.0
previousMMI = Device Id|D1; Entitlement Tag|10;Device Time| Fri Aug 02 07:00:00 PDT 2013; Uptime|1.0
equals? false
==? false

This is the result of truncation when I execute hadoop jar

:

currentMMI = Device Id|D1; Entitlement Tag|10;Device Time| Sun Aug 04 06:00:00 PDT 2013; Uptime|0.0
previousMMI = Device Id|D1; Entitlement Tag|10;Device Time| Sun Aug 04 06:00:00 PDT 2013; Uptime|0.0
equals? true
==? true

currentMMI = Device Id|D1; Entitlement Tag|10;Device Time| Sun Aug 04 07:00:00 PDT 2013; Uptime|1.0
previousMMI = Device Id|D1; Entitlement Tag|10;Device Time| Sun Aug 04 07:00:00 PDT 2013; Uptime|1.0
equals? true
==? true

Why do I have to clone it for Hadoop instead of in Eclipse?

Solution

Storing all values in memory is very inefficient, so objects are reused and loaded one at a time. See also this other SO question for a good explanation. Summary:

[…] when looping through the Iterable value list, each Object instance is re-used, so it only keeps one instance around at a given time.

Related Problems and Solutions