Java – Mapping : expected. The key types in do not match. text, received… LongWritable

Mapping : expected. The key types in do not match. text, received… LongWritable… here is a solution to the problem.

Mapping : expected. The key types in do not match. text, received… LongWritable

I have a simple hadoop app that takes a CSV file, then splits the entries with “,”, and then counts the first item.

Below is my code.

package com.bluedolphin;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MyJob extends Configured implements Tool {
    private final static LongWritable one = new LongWritable(1);

public static class MapClass extends Mapper<Object, Text, Text, LongWritable> {
        private Text word = new Text();
        public void map(Object key, 
                    Text value, 
                    OutputCollector<Text, LongWritable> output,
                    Reporter reporter) throws IOException, InterruptedException {
            String[] citation = value.toString().split(",");
            word.set(citation[0]);
            output.collect(word, one);
        }
    }

public static class Reduce extends Reducer<Text, LongWritable, Text, LongWritable> {
        public void reduce(
                Text key, 
                Iterator<LongWritable> values, 
                OutputCollector<Text, LongWritable> output,
                Reporter reporter) throws IOException, InterruptedException {
            int sum = 0;

while (values.hasNext()) {
                sum += values.next().get();
            }
            output.collect(key, new LongWritable(sum));
        }
    }
    public static class Combiner extends Reducer<Text, IntWritable, Text, LongWritable> {
        public void reduce(
                Text key, 
                Iterator<LongWritable> values, 
                OutputCollector<Text, LongWritable> output,
                Reporter reporter) throws IOException, InterruptedException {
            int sum = 0;

while (values.hasNext()) {
                sum += values.next().get();
            }
            output.collect(key, new LongWritable(sum));

}
    }

public int run(String[] args) throws Exception {
        Configuration conf = getConf();

Job job = new Job(conf, "MyJob");
        job.setJarByClass(MyJob.class);

Path in = new Path(args[0]);
        Path out = new Path(args[1]);

FileInputFormat.setInputPaths(job, in);
        FileOutputFormat.setOutputPath(job, out);

job.setMapperClass(MapClass.class);
      job.setCombinerClass(Combiner.class);
        job.setReducerClass(Reduce.class);
      job.setInputFormatClass(KeyValueInputFormat.class);
        job.setInputFormatClass(TextInputFormat.class);
      job.setOutputFormatClass(KeyValueOutputFormat.class);

job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

System.exit(job.waitForCompletion(true) ? 0 : 1);
        return 0;
    }

public static void main(String args[]) throws Exception {
        int res = ToolRunner.run(new Configuration(), new MyJob(), args);
        System.exit(res);
    }
}

Here is the error:

11/12/16 22:16:58 INFO mapred. JobClient: Task Id : attempt_201112161948_0005_m_000000_0, Status : FAILED
java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.LongWritable
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1013)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:690)
    at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
    at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:416)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
    at org.apache.hadoop.mapred.Child.main(Child.java:253)

Solution

There are some issues that need to be fixed in the code

    The old API (o.a.h.mapred) and

  1. the new API (o.a.h.mapreduce) are not compatible and should not be mixed.

import org.apache.hadoop.mapred.OutputCollector;  
import org.apache.hadoop.mapred.Reporter;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.Reducer;
  1. Ensure that the inputs/outputs of the mapper/reducer are of type o.a.h.io.Writable. Mapper’s input key is Object, which is set to LongWritable.

  2. Combiner and Reducer seem to have the same functionality, so there is no need to repeat it.

job.setCombinerClass(Reducer.class);

Also, you can use WordCount, for example, there is not much difference between your requirements and the WordCount example.

Related Problems and Solutions