Hi,
I'm trying to write a Mapper (using the .mapreduce) api that takes an Avro
1.7.4 file. I'm using the specific API (the maven avro plugin) to generate my
objects from an
.avdl file.
I've verified that the Avro file is generated correctly and can use the
avro-to-json function in the Avro tools jar to read the file back.
The error I'm receiving is :
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot
be cast to org.lab41.cyprus.domain.NetworkRecord
at
org.lab41.cyprus.mapreduce.RollupAvroFilesMapper.map(RollupAvroFilesMapper.java:45)
at
org.lab41.cyprus.mapreduce.RollupAvroFilesMapper.map(RollupAvroFilesMapper.java:23)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:645)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:263)\
The driver class is as follows :
public class RollupAvroFiles extends Configured implements Tool{
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
String[] otherArgs = new GenericOptionsParser(conf,
args).getRemainingArgs();
String input, output;
if (otherArgs.length == 2) {
input = otherArgs[0];
output = otherArgs[1];
} else {
return 1;
}
/** configure Job **/
Job job = new Job(conf, "RollupAvroFiles");
job.setJarByClass(RollupAvroFiles.class);
job.setUserClassesTakesPrecedence(true);
job.setNumReduceTasks(1);
FileInputFormat.setInputPaths(job, new Path(input));
job.setInputFormatClass(AvroKeyInputFormat.class);
AvroJob.setInputKeySchema(job, NetworkRecord.SCHEMA$);
job.setMapperClass(RollupAvroFilesMapper.class);
AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.LONG));
AvroJob.setMapOutputValueSchema(job, NetworkRecord.SCHEMA$);
job.setMapOutputKeyClass(AvroKey.class);
job.setMapOutputValueClass(AvroValue.class);
job.setReducerClass(RollupAvroFilesReducer.class);
AvroJob.setOutputKeySchema(job, NetworkRecord.SCHEMA$);
job.setOutputFormatClass(AvroKeyOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(output));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new RollupAvroFiles(), args);
System.exit(exitCode);
}
}
And the Mapper class reads :
public class RollupAvroFilesMapper
extends Mapper<AvroKey<NetworkRecord>, NullWritable, AvroKey<Long>,
AvroValue<NetworkRecord>> {
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
}
@Override
protected void map(AvroKey<NetworkRecord> key, NullWritable value, Context
context)
throws IOException, InterruptedException {
NetworkRecord = key.datum();
….
}
}
Any thoughts would be appreicated
--
Karthik Ramachandran
In-Q-Tel
Software Developer, Lab41
Mobile: (571) 455-5576
Email: [email protected]