We figured it out. The actual problem was excluded from the email. Here's
the full mapper:
@Override
protected void map(AvroKey<NetworkRecord> key, NullWritable value,
Context context)
throws IOException, InterruptedException {
startTime.setTime(key.datum().getStartEpoch());
avroKey.datum(DateUtils.truncate(startTime,
Calendar.HOUR_OF_DAY).getTime());
avroValue.datum(key);
context.write(avroKey, avroValue);
}
The problem was in our setting of the avroValue. The fix was changing the
setting of the datum to this:
avroValue.datum(key.datum());
-e
On Thu, Feb 28, 2013 at 6:58 PM, Ramachandran, Karthik <
[email protected]> wrote:
> Hi,
>
> I'm trying to write a Mapper (using the .mapreduce) api that takes an
> Avro 1.7.4 file. I'm using the specific API (the maven avro plugin) to
> generate my objects from an
> .avdl file.
>
> I've verified that the Avro file is generated correctly and can use the
> avro-to-json function in the Avro tools jar to read the file back.
>
> *The error I'm receiving is : *
>
> java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record
> cannot be cast to org.lab41.cyprus.domain.NetworkRecord
> at
> org.lab41.cyprus.mapreduce.RollupAvroFilesMapper.map(RollupAvroFilesMapper.java:45)
> at
> org.lab41.cyprus.mapreduce.RollupAvroFilesMapper.map(RollupAvroFilesMapper.java:23)
> at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:645)
> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
> at
> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:263)\
>
> *
> *
> *The driver class is as follows : *
>
> public class RollupAvroFiles extends Configured implements Tool{
>
> @Override
> public int run(String[] args) throws Exception {
> Configuration conf = getConf();
> String[] otherArgs = new GenericOptionsParser(conf,
> args).getRemainingArgs();
>
> String input, output;
> if (otherArgs.length == 2) {
> input = otherArgs[0];
> output = otherArgs[1];
> } else {
> return 1;
> }
>
> /** configure Job **/
> Job job = new Job(conf, "RollupAvroFiles");
> job.setJarByClass(RollupAvroFiles.class);
> job.setUserClassesTakesPrecedence(true);
> job.setNumReduceTasks(1);
>
> FileInputFormat.setInputPaths(job, new Path(input));
> job.setInputFormatClass(AvroKeyInputFormat.class);
> AvroJob.setInputKeySchema(job, NetworkRecord.SCHEMA$);
>
>
> job.setMapperClass(RollupAvroFilesMapper.class);
> AvroJob.setMapOutputKeySchema(job,
> Schema.create(Schema.Type.LONG));
> AvroJob.setMapOutputValueSchema(job, NetworkRecord.SCHEMA$);
>
> job.setMapOutputKeyClass(AvroKey.class);
> job.setMapOutputValueClass(AvroValue.class);
>
> job.setReducerClass(RollupAvroFilesReducer.class);
> AvroJob.setOutputKeySchema(job, NetworkRecord.SCHEMA$);
>
> job.setOutputFormatClass(AvroKeyOutputFormat.class);
> FileOutputFormat.setOutputPath(job, new Path(output));
>
> return job.waitForCompletion(true) ? 0 : 1;
> }
>
> public static void main(String[] args) throws Exception {
> int exitCode = ToolRunner.run(new RollupAvroFiles(), args);
> System.exit(exitCode);
> }
>
> }
>
> *And the Mapper class reads : *
>
> public class RollupAvroFilesMapper
> extends Mapper<AvroKey<NetworkRecord>, NullWritable,
> AvroKey<Long>, AvroValue<NetworkRecord>> {
>
>
>
> @Override
> protected void setup(Context context) throws IOException,
> InterruptedException {
> }
>
> @Override
> protected void map(AvroKey<NetworkRecord> key, NullWritable value,
> Context context)
> throws IOException, InterruptedException {
> NetworkRecord = key.datum();
> ….
> }
> }
>
> Any thoughts would be appreicated
> --
> Karthik Ramachandran
> In-Q-Tel
> Software Developer, Lab41
> Mobile: (571) 455-5576
> Email: [email protected]
>
>