Hi,
In my Mapreduce job, I am using AvroKeyValueOutputFormat as one of my
MultipleOutputs. So I declared my multiple outputs as below.
MultipleOutputs.addNamedOutput(stage2Job,
SessionConstants.COMPLETED_SESSIONS,
AvroKeyValueOutputFormat.class, AvroKey.class, AvroValue.class);
And In reducer, I am constructing and emitting GenericData.Record for the
below schema:
sessionSchema:
{"namespace": "ci.avro",
"type": "record",
"name": "Session",
"fields": [
{"name":"Common", "type": {
"type": "map", "values":"string"}},
{"name":"events",
"type": {
"type": "array",
"items":{
"name":"Event",
"type":"map",
"values":"string"}
}
}
]
}
eventSchema:
{"namespace": "ci.avro",
"type": "record",
"name": "AvroEvent",
"fields": [
{"name":"Event",
"type": {
"type": "map", "values":"string"
}
}
]
}
//record generation
GenericData.Record record = new GenericData.Record(sessionSchema);
GenericData.Record eRecord = new GenericData.Record(eventSchema);
GenericData.Array<GenericData.Record> eventRecords =
new GenericData.Array<GenericData.Record>(vc.getEvents().size(),
sessionSchema.getField("events").schema());
record.put("Common", vc.getCommon().getM_parameterMap());
for (Event ev : vc.getEvents()) {
eRecord = new GenericData.Record(eventSchema);
eRecord.put("Event", ev.getM_parameterMap());
eventRecords.add(eRecord);
}
record.put("events", eventRecords);
sessionRecord.datum(record);
// record emmitted as below
context.getConfiguration().set(CONF_OUTPUT_KEY_SCHEMA,
Schema.create(Schema.Type.STRING).toString());
context.getConfiguration().set(CONF_OUTPUT_VALUE_SCHEMA,
sessionSchema.toString());
multipleOutputs.write(SessionConstants.COMPLETED_SESSIONS,
new AvroKey<String>(key.toString()), sessionRecord,
SessionConstants.COMPLETED_SESSIONS);
But I am getting below exception. I am also declaring
"avro.schema.output.value" as sessionSchema.toString(). What could be the
issue?
Exception:
org.apache.avro.file.DataFileWriter$AppendWriteException:
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record
cannot be cast to java.util.Map
at
org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:296)
at
org.apache.avro.mapreduce.AvroKeyValueRecordWriter.write(AvroKeyValueRecordWriter.java:127)
at
org.apache.hadoop.mapreduce.lib.output.MultipleOutputs$RecordWriterWithCounter.write(MultipleOutputs.java:304)
at
org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.write(MultipleOutputs.java:370)
at
com.paypal.ci.CISuperSessionStage2AvroReducer.reduce(CISuperSessionStage2AvroReducer.java:156)
at
com.paypal.ci.CISuperSessionStage2AvroReducer.reduce(CISuperSessionStage2AvroReducer.java:24)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177)
at
org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1232)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: java.lang.ClassCastException:
org.apache.avro.generic.GenericData$Record cannot be cast to java.util.Map
at
org.apache.avro.generic.GenericDatumWriter.getMapSize(GenericDatumWriter.java:194)
at
org.apache.avro.generic.GenericDatumWriter.writeMap(GenericDatumWriter.java:173)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:69)
at
org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at
org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:138)
at
org.apache.avro.reflect.ReflectDatumWriter.writeArray(ReflectDatumWriter.java:64)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:68)
at
org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
at
org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
at
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
at
org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
at
org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
at
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
at
org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
at
org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:290)
Thanks & Regards,
B Anil Kumar.