Hi Nir, I'm not an expert with the avro.mapred APIs, but as far as I know, AvroJob does not perform schema evolution, so the schema you provide to AvroJob.setInputSchema has to be the exact same schema with which your input files to the mappers are encoded. So if your input isn't actually a union type, but your schema says that it is, decoding of the input file will fail.
If you want to consume several different input directories with different schemas in the same MapReduce job, you might be able to build that using Hadoop's MultipleInputs, though I don't know how well that plays with Avro. Alternatively, you could take a look at http://crunch.apache.org/ which provides higher-level MapReduce APIs and is designed to play nicely with Avro. Hope that helps, Martin On 24 March 2013 03:23, nir_zamir <[email protected]> wrote: > Hi, > > I have a small M/R which, for the sake of troubleshooting, just reads a > single Avro file and sends it to an AvroMapper. In order to handle different > schemas, I set the mapper to work with a Union Schema. > For the simplicity, the union is a union of only one schema, taken from the > Avro file. This is my run function: > > public int run2(String[] args) throws Exception { > List<Schema> schemas= new ArrayList<Schema>(); > Schema schema = readSchema(new Path(inputDir), conf); > schemas.add(schema); > Schema unionSchema = Schema.createUnion(schemas); > > AvroJob.setInputSchema(conf, unionSchema); > AvroJob.setMapOutputSchema(conf, > Pair.getPairSchema(Schema.create(Type.INT), > unionSchema)); > > JobClient.runJob(conf); > return 0; > } > > And my mapper is: > > public static class MyMap extends AvroMapper <GenericRecord, > Pair<Integer, GenericRecord>> { > @Override > public void map( GenericRecord datum, > AvroCollector<Pair<Integer, > GenericRecord>> collector, Reporter reporter) throws IOException { > collector.collect(new Pair<Integer, > GenericRecord>(conversion_id, > datum)); > } > } > > *Side note:* when I extract the schema manually from the Avro file and call > GenericData.get().resolveUnion - the schema is correctly resolved and found > in the union schema (the methods returns 0). > > > *THE PROBLEM:* > the 'collect' method throws an exception saying my datum's schema is not in > the union: > > org.apache.avro.UnresolvedUnionException: Not in union > [{"type":"record","name":"path_to_conversions_raw","namespace":"org.apache.avro.mapred","doc":"Sqoop > import of > path_to_conversions_raw","fields":[{"name":"conversion_id","type":["long","null"],"columnName":"conversion_id","sqlType":"-5"},{"name":"conversion_time","type":["long","null"],"columnName":"conversion_time","sqlType":"93"},{"name":"derived_time","type":["long","null"],"columnName":"derived_time","sqlType":"93"},{"name":"position","type":["int","null"],"columnName":"position","sqlType":"5"},{"name":"event_time","type":["long","null"],"columnName":"event_time","sqlType":"93"},{"name":"event_type_id","type":["int","null"],"columnName":"event_type_id","sqlType":"5"},{"name":"entity_id","type":["int","null"],"columnName":"entity_id","sqlType":"4"},{"name":"entity_type_id","type":["int","null"],"columnName":"entity_type_id","sqlType":"4"},{"name":"sv_click_type","type":["int","null"],"columnName":"sv_click_type","sqlType":"5"},{"name":"placement_id","type":["int","null"],"columnName":"placement_id","sqlType":"4"},{"name":"site_id","type":["int","null"],"columnName":"site_id","sqlType":"4"},{"name":"campaign_id","type":["int","null"],"columnName":"campaign_id","sqlType":"4"},{"name":"days_before_conversion","type":["int","null"],"columnName":"days_before_conversion","sqlType":"5"},{"name":"version_id","type":["int","null"],"columnName":"version_id","sqlType":"4"},{"name":"target_audience_id","type":["int","null"],"columnName":"target_audience_id","sqlType":"4"},{"name":"ip_attributes_id","type":["long","null"],"columnName":"ip_attributes_id","sqlType":"-5"},{"name":"batch_id","type":["long","null"],"columnName":"batch_id","sqlType":"-5"},{"name":"ccs_ad_id","type":["long","null"],"columnName":"ccs_ad_id","sqlType":"-5"}],"tableName":"path_to_conversions_raw"}]: > {"conversion_id": 552804, "conversion_time": 1325451223000, "derived_time": > 1330639180000, "position": 2, "event_time": 1325401200000, "event_type_id": > 1, "entity_id": 4035315, "entity_type_id": 1, "sv_click_type": 2, > "placement_id": 1978266, "site_id": 9103, "campaign_id": 129001, > "days_before_conversion": 1, "version_id": 73721, "target_audience_id": -1, > "ip_attributes_id": 3598, "batch_id": 4601, "ccs_ad_id": null} > at > org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:542) > at > org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:137) > at > org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:70) > at > org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:104) > at > org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:57) > at > org.apache.avro.mapred.AvroSerialization$AvroWrapperSerializer.serialize(AvroSerialization.java:127) > at > org.apache.avro.mapred.AvroSerialization$AvroWrapperSerializer.serialize(AvroSerialization.java:110) > at > org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:916) > at > org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:501) > at > org.apache.avro.mapred.HadoopMapper$MapCollector.collect(HadoopMapper.java:69) > at example.avro.ColorCount$MyMap.map(ColorCount.java:51) > > > Any help would be appreciated.. > > Thanks! > Nir > > > > > > -- > View this message in context: > http://apache-avro.679487.n3.nabble.com/Union-in-AvroMapper-map-Not-in-Union-tp4026706.html > Sent from the Avro - Users mailing list archive at Nabble.com.
