Hadoop DistributedCache objects change during a job

I’m trying to run KMeans on AWS, but I’m getting the following exception when trying to read the updated cluster centroids from DistributedCache: The distributed cache object s3://mybucket/centroids_6/part-r-00009 changed during the job from 4/8/13 2:20 PM to 4/8/13 2:20 PM
at org.apache.hadoop.filecache.TrackerDistributedCacheManager.downloadCacheObject(
at org.apache.hadoop.filecache.TrackerDistributedCacheManager.localizePublicCacheObject(
at org.apache.hadoop.filecache.TrackerDistributedCacheManager.getLocalCache(
at org.apache.hadoop.filecache.TaskDistributedCacheManager.setupCache(
at org.apache.hadoop.mapred.TaskTracker$
at Method)
at org.apache.hadoop.mapred.TaskTracker.initializeJob(
at org.apache.hadoop.mapred.TaskTracker.localizeJob(
at org.apache.hadoop.mapred.TaskTracker$

What sets this problem apart from This one error is an intermittent occurrence. I’ve successfully run the same code on a smaller dataset. Also, when I change the centroid number from 12 (seen in the code above) to 8, it fails at iteration 5 instead of 6 (which you can see in the centroids_6 name above).

This is the relevant DistributedCache code in the main driver running the KMeans loop:

    int iteration = 1;
    long changes = 0; 
    do {
         First, write the previous iteration's centroids to the dist cache.
        Configuration iterConf = new Configuration();
        Path prevIter = new Path(centroidsPath.getParent(),
                String.format("centroids_%s", iteration - 1));
        FileSystem fs = prevIter.getFileSystem(iterConf);
        Path pathPattern = new Path(prevIter, "part-*");
        FileStatus [] list = fs.globStatus(pathPattern);
        for (FileStatus status : list) {
            DistributedCache.addCacheFile(status.getPath().toUri(), iterConf);

 Now, set up the job.
        Job iterJob = new Job(iterConf);
        iterJob.setJobName("KMeans " + iteration);
        Path nextIter = new Path(centroidsPath.getParent(), 
                String.format("centroids_%s", iteration));
        KMeansDriver.delete(iterConf, nextIter);

 Set input/output formats.

 Set Mapper, Reducer, Combiner

 Set MR formats.

 Set input/output paths.
        FileInputFormat.addInputPath(iterJob, data);
        FileOutputFormat.setOutputPath(iterJob, nextIter);


if (!iterJob.waitForCompletion(true)) {
            System.err.println("ERROR: Iteration " + iteration + " failed!");
        changes = iterJob.getCounters().findCounter(KMeansDriver.Counter.CONVERGED).getValue();
    } while (changes > 0);

How would the file be modified otherwise? The only possibility I can think of is that after one iteration, the loop starts again before the centroid of the previous job finishes writing. But in the comments, I call the job using waitForCompletion(true), so there shouldn’t be any leftovers of the job running when the loop restarts. Any ideas?


It’s not really an answer, but I did realize that it was silly to use DistributedCache the way I used to, instead of reading the result of the last iteration directly from HDFS. I wrote this method in the main driver instead:

public static HashMap<Integer, VectorWritable> readCentroids(Configuration conf, Path path)
        throws IOException {
    HashMap<Integer, VectorWritable> centroids = new HashMap<Integer, VectorWritable>();
    FileSystem fs = FileSystem.get(path.toUri(), conf);
    FileStatus [] list = fs.globStatus(new Path(path, "part-*"));
    for (FileStatus status : list) {
        SequenceFile.Reader reader = new SequenceFile.Reader(fs, status.getPath(), conf);
        IntWritable key = null;
        VectorWritable value = null;
        try {
            key = (IntWritable)reader.getKeyClass().newInstance();
            value = (VectorWritable)reader.getValueClass().newInstance();
        } catch (InstantiationException e) {
        } catch (IllegalAccessException e) {
        while (, value)) {
            centroids.put(new Integer(key.get()),
                    new VectorWritable(value.get(), value.getClusterId(), value.getNumInstances()));
    return centroids;

This is called in Mapper’s and Reducer’s setup() methods during each iteration to read the centroid of the previous iteration.

protected void setup(Context context) throws IOException {
    Configuration conf = context.getConfiguration();
    Path centroidsPath = new Path(conf.get(KMeansDriver.CENTROIDS));
    centroids = KMeansDriver.readCentroids(conf, centroidsPath);

This allows me to remove the block of code in the loop in the original issue that writes the centroid to DistributedCache. I tested it and it now works on both large and small datasets.


still don’t know why I’m getting the error I posted (how do I change something in read-only DistributedCache?). Especially when I change the HDFS path in each iteration? But this all seems to work and is an easier way to read centroids.

