Thanks a lot Harsh.
As you suggested I modified my schema as below and code and now it's
working now.
sessionSchema:
{"namespace": "com.paypal.ci.avro",
"type": "record",
"name": "Session",
"fields": [
{"name":"Common", "type": {"type": "map", "values":"string"}},
{"name":"events",
"type": {
"type": "array",
"items":{
"type": "record",
"namespace": "com.paypal.ci.avro",
"name": "AvroEvent",
"fields": [
{"name":"Event",
"type": {
"type": "map", "values":"null"
}
}
]
}
}
}
]
}
//
sessionRecord = new AvroValue<GenericData.Record>(
null);
// avro
GenericData.Record record = new GenericData.Record(sessionSchema);
Schema eventsSchema = sessionSchema.getField("events").schema();
eventSchema = eventsSchema.getElementType();
GenericData.Record eRecord = new GenericData.Record(eventSchema);
GenericData.Array<GenericData.Record> eventRecords =
new GenericData.Array<GenericData.Record>(vc.getEvents().size(),
eventsSchema);
record.put("Common", vc.getVisitCommon().getM_parameterMap());
for (Event ev : vc.getEvents()) {
eRecord = new GenericData.Record(eventSchema);
eRecord.put("Event", ev.getM_parameterMap());
eventRecords.add(eRecord);
}
record.put("events", eventRecords);
sessionRecord.datum(record);
Thanks & Regards,
B Anil Kumar.
On Sat, Feb 8, 2014 at 11:46 PM, Harsh J <[email protected]> wrote:
> Hello Anil,
>
> Your eventSchema is a record that carries a single Map type field named
> "Event".
> Your sessionSchema is a record that carries a Map type field "Common"
> and an array of _Maps_ called "Event".
>
> What your writer is assuming instead is that the sessionSchema carries
> an array of _eventSchema_ records. Therefore, in expecting a map data
> type when writing the array, it gets a record data type and throws
> "Record cannot be cast to java.util.Map"
>
> Does this help you understand the error?
>
> If you want to instead write an array of eventSchema records (as you
> are apparently doing in your code), you should alter your
> sessionSchema to inline the eventSchema record schema as the array's
> contained data type, such as the below:
>
> {"namespace": "ci.avro",
> "type": "record",
> "name": "Session",
> "fields": [
> {"name":"Common", "type": {"type": "map", "values":"string"}},
> {"name":"events",
> "type": {
> "type": "array",
> "items":{
> "type": "record",
> "namespace": "ci.avro",
> "name": "AvroEvent",
> "fields": [
> {"name":"Event",
> "type": {
> "type": "map", "values":"string"
> }
> }
> ]
> }
> }
> }
> ]
> }
>
> On Fri, Feb 7, 2014 at 6:11 PM, AnilKumar B <[email protected]> wrote:
> > Hi,
> >
> > In my Mapreduce job, I am using AvroKeyValueOutputFormat as one of my
> > MultipleOutputs. So I declared my multiple outputs as below.
> >
> > MultipleOutputs.addNamedOutput(stage2Job,
> > SessionConstants.COMPLETED_SESSIONS,
> > AvroKeyValueOutputFormat.class, AvroKey.class, AvroValue.class);
> >
> > And In reducer, I am constructing and emitting GenericData.Record for the
> > below schema:
> >
> > sessionSchema:
> > {"namespace": "ci.avro",
> > "type": "record",
> > "name": "Session",
> > "fields": [
> > {"name":"Common", "type": {
> > "type": "map", "values":"string"}},
> > {"name":"events",
> > "type": {
> > "type": "array",
> > "items":{
> > "name":"Event",
> > "type":"map",
> > "values":"string"}
> > }
> > }
> > ]
> > }
> >
> > eventSchema:
> > {"namespace": "ci.avro",
> > "type": "record",
> > "name": "AvroEvent",
> > "fields": [
> > {"name":"Event",
> > "type": {
> > "type": "map", "values":"string"
> > }
> > }
> > ]
> > }
> >
> > //record generation
> >
> > GenericData.Record record = new GenericData.Record(sessionSchema);
> > GenericData.Record eRecord = new GenericData.Record(eventSchema);
> > GenericData.Array<GenericData.Record> eventRecords =
> > new GenericData.Array<GenericData.Record>(vc.getEvents().size(),
> > sessionSchema.getField("events").schema());
> > record.put("Common", vc.getCommon().getM_parameterMap());
> > for (Event ev : vc.getEvents()) {
> > eRecord = new GenericData.Record(eventSchema);
> > eRecord.put("Event", ev.getM_parameterMap());
> > eventRecords.add(eRecord);
> > }
> > record.put("events", eventRecords);
> >
> > sessionRecord.datum(record);
> >
> >
> > // record emmitted as below
> > context.getConfiguration().set(CONF_OUTPUT_KEY_SCHEMA,
> > Schema.create(Schema.Type.STRING).toString());
> > context.getConfiguration().set(CONF_OUTPUT_VALUE_SCHEMA,
> > sessionSchema.toString());
> > multipleOutputs.write(SessionConstants.COMPLETED_SESSIONS,
> > new AvroKey<String>(key.toString()), sessionRecord,
> > SessionConstants.COMPLETED_SESSIONS);
> >
> > But I am getting below exception. I am also declaring
> > "avro.schema.output.value" as sessionSchema.toString(). What could be
> the
> > issue?
> >
> > Exception:
> > org.apache.avro.file.DataFileWriter$AppendWriteException:
> > java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record
> > cannot be cast to java.util.Map
> > at
> > org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:296)
> > at
> >
> org.apache.avro.mapreduce.AvroKeyValueRecordWriter.write(AvroKeyValueRecordWriter.java:127)
> > at
> >
> org.apache.hadoop.mapreduce.lib.output.MultipleOutputs$RecordWriterWithCounter.write(MultipleOutputs.java:304)
> > at
> >
> org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.write(MultipleOutputs.java:370)
> > at
> >
> com.paypal.ci.CISuperSessionStage2AvroReducer.reduce(CISuperSessionStage2AvroReducer.java:156)
> > at
> >
> com.paypal.ci.CISuperSessionStage2AvroReducer.reduce(CISuperSessionStage2AvroReducer.java:24)
> > at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177)
> > at
> > org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649)
> > at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418)
> > at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at javax.security.auth.Subject.doAs(Subject.java:396)
> > at
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1232)
> > at org.apache.hadoop.mapred.Child.main(Child.java:249)
> > Caused by: java.lang.ClassCastException:
> > org.apache.avro.generic.GenericData$Record cannot be cast to
> java.util.Map
> > at
> >
> org.apache.avro.generic.GenericDatumWriter.getMapSize(GenericDatumWriter.java:194)
> > at
> >
> org.apache.avro.generic.GenericDatumWriter.writeMap(GenericDatumWriter.java:173)
> > at
> >
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:69)
> > at
> >
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
> > at
> >
> org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:138)
> > at
> >
> org.apache.avro.reflect.ReflectDatumWriter.writeArray(ReflectDatumWriter.java:64)
> > at
> >
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:68)
> > at
> >
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
> > at
> >
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
> > at
> >
> org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
> > at
> >
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
> > at
> >
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
> > at
> >
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
> > at
> >
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
> > at
> >
> org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
> > at
> >
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
> > at
> >
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
> > at
> >
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
> > at
> >
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
> > at
> > org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:290)
> >
> >
> >
> > Thanks & Regards,
> > B Anil Kumar.
>
>
>
> --
> Harsh J
>