Java – Hadoop DistributedCache objects change during a job

Hadoop DistributedCache objects change during a job… here is a solution to the problem.

Hadoop DistributedCache objects change during a job

I’m trying to run KMeans on AWS, but I’m getting the following exception when trying to read the updated cluster centroids from DistributedCache:

java.io.IOException: The distributed cache object s3://mybucket/centroids_6/part-r-00009 changed during the job from 4/8/13 2:20 PM to 4/8/13 2:20 PM
at org.apache.hadoop.filecache.TrackerDistributedCacheManager.downloadCacheObject(TrackerDistributedCacheManager.java:401)
at org.apache.hadoop.filecache.TrackerDistributedCacheManager.localizePublicCacheObject(TrackerDistributedCacheManager.java:475)
at org.apache.hadoop.filecache.TrackerDistributedCacheManager.getLocalCache(TrackerDistributedCacheManager.java:191)
at org.apache.hadoop.filecache.TaskDistributedCacheManager.setupCache(TaskDistributedCacheManager.java:182)
at org.apache.hadoop.mapred.TaskTracker$4.run(TaskTracker.java:1246)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1132)
at org.apache.hadoop.mapred.TaskTracker.initializeJob(TaskTracker.java:1237)
at org.apache.hadoop.mapred.TaskTracker.localizeJob(TaskTracker.java:1152)
at org.apache.hadoop.mapred.TaskTracker$5.run(TaskTracker.java:2541)
at java.lang.Thread.run(Thread.java:662)

What sets this problem apart from This one error is an intermittent occurrence. I’ve successfully run the same code on a smaller dataset. Also, when I change the centroid number from 12 (seen in the code above) to 8, it fails at iteration 5 instead of 6 (which you can see in the centroids_6 name above).

This is the relevant DistributedCache code in the main driver running the KMeans loop:

    int iteration = 1;
    long changes = 0; 
    do {
         First, write the previous iteration's centroids to the dist cache.
        Configuration iterConf = new Configuration();
        Path prevIter = new Path(centroidsPath.getParent(),
                String.format("centroids_%s", iteration - 1));
        FileSystem fs = prevIter.getFileSystem(iterConf);
        Path pathPattern = new Path(prevIter, "part-*");
        FileStatus [] list = fs.globStatus(pathPattern);
        for (FileStatus status : list) {
            DistributedCache.addCacheFile(status.getPath().toUri(), iterConf);
        }

 Now, set up the job.
        Job iterJob = new Job(iterConf);
        iterJob.setJobName("KMeans " + iteration);
        iterJob.setJarByClass(KMeansDriver.class);
        Path nextIter = new Path(centroidsPath.getParent(), 
                String.format("centroids_%s", iteration));
        KMeansDriver.delete(iterConf, nextIter);

 Set input/output formats.
        iterJob.setInputFormatClass(SequenceFileInputFormat.class);
        iterJob.setOutputFormatClass(SequenceFileOutputFormat.class);

 Set Mapper, Reducer, Combiner
        iterJob.setMapperClass(KMeansMapper.class);
        iterJob.setCombinerClass(KMeansCombiner.class);
        iterJob.setReducerClass(KMeansReducer.class);

 Set MR formats.
        iterJob.setMapOutputKeyClass(IntWritable.class);
        iterJob.setMapOutputValueClass(VectorWritable.class);
        iterJob.setOutputKeyClass(IntWritable.class);
        iterJob.setOutputValueClass(VectorWritable.class);

 Set input/output paths.
        FileInputFormat.addInputPath(iterJob, data);
        FileOutputFormat.setOutputPath(iterJob, nextIter);

iterJob.setNumReduceTasks(nReducers);

if (!iterJob.waitForCompletion(true)) {
            System.err.println("ERROR: Iteration " + iteration + " failed!");
            System.exit(1);
        }
        iteration++;
        changes = iterJob.getCounters().findCounter(KMeansDriver.Counter.CONVERGED).getValue();
        iterJob.getCounters().findCounter(KMeansDriver.Counter.CONVERGED).setValue(0);
    } while (changes > 0);

How would the file be modified otherwise? The only possibility I can think of is that after one iteration, the loop starts again before the centroid of the previous job finishes writing. But in the comments, I call the job using waitForCompletion(true), so there shouldn’t be any leftovers of the job running when the loop restarts. Any ideas?

Solution

It’s not really an answer, but I did realize that it was silly to use DistributedCache the way I used to, instead of reading the result of the last iteration directly from HDFS. I wrote this method in the main driver instead:

public static HashMap<Integer, VectorWritable> readCentroids(Configuration conf, Path path)
        throws IOException {
    HashMap<Integer, VectorWritable> centroids = new HashMap<Integer, VectorWritable>();
    FileSystem fs = FileSystem.get(path.toUri(), conf);
    FileStatus [] list = fs.globStatus(new Path(path, "part-*"));
    for (FileStatus status : list) {
        SequenceFile.Reader reader = new SequenceFile.Reader(fs, status.getPath(), conf);
        IntWritable key = null;
        VectorWritable value = null;
        try {
            key = (IntWritable)reader.getKeyClass().newInstance();
            value = (VectorWritable)reader.getValueClass().newInstance();
        } catch (InstantiationException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        }
        while (reader.next(key, value)) {
            centroids.put(new Integer(key.get()),
                    new VectorWritable(value.get(), value.getClusterId(), value.getNumInstances()));
        }
        reader.close();
    }
    return centroids;
}

This is called in Mapper’s and Reducer’s setup() methods during each iteration to read the centroid of the previous iteration.

protected void setup(Context context) throws IOException {
    Configuration conf = context.getConfiguration();
    Path centroidsPath = new Path(conf.get(KMeansDriver.CENTROIDS));
    centroids = KMeansDriver.readCentroids(conf, centroidsPath);
}

This allows me to remove the block of code in the loop in the original issue that writes the centroid to DistributedCache. I tested it and it now works on both large and small datasets.

I

still don’t know why I’m getting the error I posted (how do I change something in read-only DistributedCache?). Especially when I change the HDFS path in each iteration? But this all seems to work and is an easier way to read centroids.

Related Problems and Solutions