I declared sessionRecord as below.

sessionRecord = new AvroValue<GenericData.Record>(null);

Thanks & Regards,
B Anil Kumar.


On Fri, Feb 7, 2014 at 6:11 PM, AnilKumar B <[email protected]> wrote:

> Hi,
>
> In my Mapreduce job, I am using AvroKeyValueOutputFormat as one of my
> MultipleOutputs. So I declared my multiple outputs as below.
>
>  MultipleOutputs.addNamedOutput(stage2Job,
>         SessionConstants.COMPLETED_SESSIONS,
>         AvroKeyValueOutputFormat.class, AvroKey.class, AvroValue.class);
>
> And In reducer, I am constructing and emitting GenericData.Record for the
> below schema:
>
> sessionSchema:
> {"namespace": "ci.avro",
>  "type": "record",
>  "name": "Session",
>  "fields": [
>    {"name":"Common", "type": {
>            "type": "map", "values":"string"}},
>    {"name":"events",
>     "type": {
>         "type": "array",
>     "items":{
>     "name":"Event",
>     "type":"map",
>     "values":"string"}
>     }
>     }
>  ]
> }
>
> eventSchema:
> {"namespace": "ci.avro",
>  "type": "record",
>  "name": "AvroEvent",
>  "fields": [
>     {"name":"Event",
>       "type": {
>            "type": "map", "values":"string"
>               }
>     }
>  ]
> }
>
> //record generation
>
>  GenericData.Record record = new GenericData.Record(sessionSchema);
>     GenericData.Record eRecord = new GenericData.Record(eventSchema);
>     GenericData.Array<GenericData.Record> eventRecords =
>         new GenericData.Array<GenericData.Record>(vc.getEvents().size(),
>             sessionSchema.getField("events").schema());
>     record.put("Common", vc.getCommon().getM_parameterMap());
>     for (Event ev : vc.getEvents()) {
>       eRecord = new GenericData.Record(eventSchema);
>       eRecord.put("Event", ev.getM_parameterMap());
>       eventRecords.add(eRecord);
>     }
>     record.put("events", eventRecords);
>
>     sessionRecord.datum(record);
>
>
> // record emmitted as below
> context.getConfiguration().set(CONF_OUTPUT_KEY_SCHEMA,
>           Schema.create(Schema.Type.STRING).toString());
>       context.getConfiguration().set(CONF_OUTPUT_VALUE_SCHEMA,
>           sessionSchema.toString());
>       multipleOutputs.write(SessionConstants.COMPLETED_SESSIONS,
>           new AvroKey<String>(key.toString()), sessionRecord,
>           SessionConstants.COMPLETED_SESSIONS);
>
> But I am getting below exception. I am also declaring
> "avro.schema.output.value" as sessionSchema.toString().  What could be the
> issue?
>
> Exception:
> org.apache.avro.file.DataFileWriter$AppendWriteException:
> java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record
> cannot be cast to java.util.Map
>         at
> org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:296)
>         at
> org.apache.avro.mapreduce.AvroKeyValueRecordWriter.write(AvroKeyValueRecordWriter.java:127)
>         at
> org.apache.hadoop.mapreduce.lib.output.MultipleOutputs$RecordWriterWithCounter.write(MultipleOutputs.java:304)
>         at
> org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.write(MultipleOutputs.java:370)
>         at
> com.paypal.ci.CISuperSessionStage2AvroReducer.reduce(CISuperSessionStage2AvroReducer.java:156)
>         at
> com.paypal.ci.CISuperSessionStage2AvroReducer.reduce(CISuperSessionStage2AvroReducer.java:24)
>         at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177)
>         at
> org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649)
>         at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418)
>         at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:396)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1232)
>         at org.apache.hadoop.mapred.Child.main(Child.java:249)
> Caused by: java.lang.ClassCastException:
> org.apache.avro.generic.GenericData$Record cannot be cast to java.util.Map
>         at
> org.apache.avro.generic.GenericDatumWriter.getMapSize(GenericDatumWriter.java:194)
>         at
> org.apache.avro.generic.GenericDatumWriter.writeMap(GenericDatumWriter.java:173)
>         at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:69)
>         at
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
>         at
> org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:138)
>         at
> org.apache.avro.reflect.ReflectDatumWriter.writeArray(ReflectDatumWriter.java:64)
>         at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:68)
>         at
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
>         at
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
>         at
> org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
>         at
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
>         at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
>         at
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
>         at
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
>         at
> org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
>         at
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
>         at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
>         at
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
>         at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
>         at
> org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:290)
>
>
>
> Thanks & Regards,
> B Anil Kumar.
>

Reply via email to