I declared sessionRecord as below. sessionRecord = new AvroValue<GenericData.Record>(null);
Thanks & Regards, B Anil Kumar. On Fri, Feb 7, 2014 at 6:11 PM, AnilKumar B <[email protected]> wrote: > Hi, > > In my Mapreduce job, I am using AvroKeyValueOutputFormat as one of my > MultipleOutputs. So I declared my multiple outputs as below. > > MultipleOutputs.addNamedOutput(stage2Job, > SessionConstants.COMPLETED_SESSIONS, > AvroKeyValueOutputFormat.class, AvroKey.class, AvroValue.class); > > And In reducer, I am constructing and emitting GenericData.Record for the > below schema: > > sessionSchema: > {"namespace": "ci.avro", > "type": "record", > "name": "Session", > "fields": [ > {"name":"Common", "type": { > "type": "map", "values":"string"}}, > {"name":"events", > "type": { > "type": "array", > "items":{ > "name":"Event", > "type":"map", > "values":"string"} > } > } > ] > } > > eventSchema: > {"namespace": "ci.avro", > "type": "record", > "name": "AvroEvent", > "fields": [ > {"name":"Event", > "type": { > "type": "map", "values":"string" > } > } > ] > } > > //record generation > > GenericData.Record record = new GenericData.Record(sessionSchema); > GenericData.Record eRecord = new GenericData.Record(eventSchema); > GenericData.Array<GenericData.Record> eventRecords = > new GenericData.Array<GenericData.Record>(vc.getEvents().size(), > sessionSchema.getField("events").schema()); > record.put("Common", vc.getCommon().getM_parameterMap()); > for (Event ev : vc.getEvents()) { > eRecord = new GenericData.Record(eventSchema); > eRecord.put("Event", ev.getM_parameterMap()); > eventRecords.add(eRecord); > } > record.put("events", eventRecords); > > sessionRecord.datum(record); > > > // record emmitted as below > context.getConfiguration().set(CONF_OUTPUT_KEY_SCHEMA, > Schema.create(Schema.Type.STRING).toString()); > context.getConfiguration().set(CONF_OUTPUT_VALUE_SCHEMA, > sessionSchema.toString()); > multipleOutputs.write(SessionConstants.COMPLETED_SESSIONS, > new AvroKey<String>(key.toString()), sessionRecord, > SessionConstants.COMPLETED_SESSIONS); > > But I am getting below exception. I am also declaring > "avro.schema.output.value" as sessionSchema.toString(). What could be the > issue? > > Exception: > org.apache.avro.file.DataFileWriter$AppendWriteException: > java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record > cannot be cast to java.util.Map > at > org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:296) > at > org.apache.avro.mapreduce.AvroKeyValueRecordWriter.write(AvroKeyValueRecordWriter.java:127) > at > org.apache.hadoop.mapreduce.lib.output.MultipleOutputs$RecordWriterWithCounter.write(MultipleOutputs.java:304) > at > org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.write(MultipleOutputs.java:370) > at > com.paypal.ci.CISuperSessionStage2AvroReducer.reduce(CISuperSessionStage2AvroReducer.java:156) > at > com.paypal.ci.CISuperSessionStage2AvroReducer.reduce(CISuperSessionStage2AvroReducer.java:24) > at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177) > at > org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649) > at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418) > at org.apache.hadoop.mapred.Child$4.run(Child.java:255) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:396) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1232) > at org.apache.hadoop.mapred.Child.main(Child.java:249) > Caused by: java.lang.ClassCastException: > org.apache.avro.generic.GenericData$Record cannot be cast to java.util.Map > at > org.apache.avro.generic.GenericDatumWriter.getMapSize(GenericDatumWriter.java:194) > at > org.apache.avro.generic.GenericDatumWriter.writeMap(GenericDatumWriter.java:173) > at > org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:69) > at > org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143) > at > org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:138) > at > org.apache.avro.reflect.ReflectDatumWriter.writeArray(ReflectDatumWriter.java:64) > at > org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:68) > at > org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143) > at > org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114) > at > org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175) > at > org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104) > at > org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66) > at > org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143) > at > org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114) > at > org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175) > at > org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104) > at > org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66) > at > org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143) > at > org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58) > at > org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:290) > > > > Thanks & Regards, > B Anil Kumar. >
