This may be related to https://issues.apache.org/jira/browse/AVRO-1023
If not, open a new ticket. If so, please comment there. What you describe below seems to have something to do with how Schema.Parser works with tracking a namespace across multiple parse runs. On 2/21/12 2:49 PM, "Serge Blazhievsky" <[email protected]> wrote: > Hi Scott, > > Thanks for looking to this. > > I created a small schema and did some experiments. > > Here is my findings: > > 1. If both schemas do not have namespaces, MapReduce job works > 2. If both schemas have namespaces, MapReduce job works > 3. if the first schema in the Union does not have namespace, but the second > one has namespace, MapReduce works > 4. If the first schema in the Union have namespace, but the second one does > not, MapReduce fails. > > For some reason, it assigns namespace from the first schema to the second > while running MapReduce. > > > This feels like a bug somewhere. > > This is the schema I am setting: > > Union schema: > [ { > "type" : "record", > "name" : "FacebookUser", > "namespace" : "FacebookUser", > "fields" : [ { > "name" : "name", > "type" : "string" > }, { > "name" : "num_likes", > "type" : "int" > }, { > "name" : "num_photos", > "type" : "int" > }, { > "name" : "num_groups", > "type" : "int" > } ] > }, { > "type" : "record", > "name" : "FacebookUser2", > "fields" : [ { > "name" : "name", > "type" : "string" > }, { > "name" : "num_likes", > "type" : "int" > }, { > "name" : "num_photos", > "type" : "int" > }, { > "name" : "num_groups", > "type" : "int" > } ] > } ] > > > and this is the schema that MapReduce gets: > > [ { > "type" : "record", > "name" : "FacebookUser", > "namespace" : "FacebookUser", > "fields" : [ { > "name" : "name", > "type" : "string" > }, { > "name" : "num_likes", > "type" : "int" > }, { > "name" : "num_photos", > "type" : "int" > }, { > "name" : "num_groups", > "type" : "int" > } ] > }, { > "type" : "record", > "name" : "FacebookUser2", > "namespace" : "FacebookUser", > "fields" : [ { > "name" : "name", > "type" : "string" > }, { > "name" : "num_likes", > "type" : "int" > }, { > "name" : "num_photos", > "type" : "int" > }, { > "name" : "num_groups", > "type" : "int" > } ] > } ] > > > The difference is the second namespace. > > I would be more then happy to fix in the code, if you could point me to where > to look > > Regards, > Serge > > > > On Tue, Feb 21, 2012 at 9:39 AM, Scott Carey <[email protected]> wrote: >> As for why the union does not seem to match: >> The Union schemas are not the same as the one in the error the one in the >> error does not have a namespace. It finds "AVRO_NCP_ICM" but the union has >> only "merced.AVRO_NCP_ICM" and "merced. AVRO_IVR_BY_CALLID". >> The namespace and name must both match. >> >> Is your output schema correct? It looks like you are setting both your >> MapOutputSchema and OutputSchema to be a Pair schema. I suspect you only >> want the Pair schema as a map output and reducer input, but cannot be sure >> from the below. >> >> From the below, your reducer must create Pair objects and output them, and >> maybe that is related to the error below. It may also be related to the >> combiner, does it happen without it? >> >> >> >> On 2/12/12 11:01 PM, "Serge Blazhievsky" <[email protected]> wrote: >> >>> Hi all, >>> >>> I am running into an interesting problem with Union. It seems that order of >>> the schema in union must be in the same order as input path for different >>> files. >>> >>> This does not look like right behavior. The code and exception are below. >>> >>> The moment I change the order in union it works. >>> >>> >>> Thanks >>> Serge >>> >>> >>> public int run(String[] strings) throws Exception { >>> >>> JobConf job = new JobConf(); >>> >>> >>> job.setNumMapTasks(map); >>> job.setNumReduceTasks(reduce); >>> >>> >>> // Uncomment to run locally in a single process >>> job.set("mapred.job.tracker", "local"); >>> >>> File file = new File(input); >>> DatumReader<GenericRecord> reader = new >>> GenericDatumReader<GenericRecord>(); >>> DataFileReader<GenericRecord> dataFileReader = new >>> DataFileReader<GenericRecord>(file, reader); >>> >>> Schema s = dataFileReader.getSchema(); >>> >>> >>> >>> >>> >>> File lfile = new File(linput); >>> DatumReader<GenericRecord> lreader = new >>> GenericDatumReader<GenericRecord>(); >>> DataFileReader<GenericRecord> ldataFileReader = new >>> DataFileReader<GenericRecord>(lfile, lreader); >>> >>> Schema s2 = ldataFileReader.getSchema(); >>> >>> >>> >>> List<Schema> slist= new ArrayList<Schema>(); >>> >>> slist.add(s2); >>> slist.add(s); >>> >>> >>> >>> System.out.println(s.toString(true)); >>> System.out.println(s2.toString(true)); >>> >>> >>> >>> Schema s_union=Schema.createUnion(slist); >>> >>> >>> >>> AvroJob.setInputSchema(job, s_union); >>> >>> >>> >>> List<Schema.Field> fields = s.getFields(); >>> >>> List<Schema.Field> outfields = new ArrayList<Schema.Field>(); >>> >>> >>> for (Schema.Field f : fields) { >>> >>> outfields.add(new Schema.Field(f.name <http://f.name> (), >>> Schema.create(Type.STRING), null, null)); >>> } >>> >>> boolean b = false; >>> Schema outschema = Schema.createRecord("AVRO_IVR_BY_CALLID", >>> "AVRO_IVR_BY_CALLID", "merced", b); >>> >>> outschema.setFields(outfields); >>> >>> >>> >>> Schema STRING_SCHEMA = Schema.create(Schema.Type.STRING); >>> >>> >>> Schema OUT_SCHEMA = new Pair<String, GenericRecord>("", >>> STRING_SCHEMA, new GenericData.Record(outschema), outschema).getSchema(); >>> >>> >>> AvroJob.setMapOutputSchema(job, OUT_SCHEMA); >>> AvroJob.setOutputSchema(job, OUT_SCHEMA); >>> >>> AvroJob.setMapperClass(job, MapImpl.class); >>> AvroJob.setCombinerClass(job, ReduceImpl.class); >>> AvroJob.setReducerClass(job, ReduceImpl.class); >>> >>> // FileInputFormat.setInputPaths(job, new Path(input)); >>> >>> >>> FileInputFormat.addInputPath(job, new Path(linput)); >>> FileInputFormat.addInputPath(job, new Path(input)); >>> >>> >>> >>> >>> // MultipleInputs.addInputPath(job, new Path(input), >>> AvroInputFormat<GenericRecord>.class, MapImpl.class); >>> >>> FileOutputFormat.setOutputPath(job, new Path(output)); >>> FileOutputFormat.setCompressOutput(job, true); >>> >>> int res = 255; >>> RunningJob runJob = JobClient.runJob(job); >>> if (runJob != null) { >>> res = runJob.isSuccessful() ? 0 : 1; >>> } >>> return res; >>> } >>> >>> >>> 2/02/12 22:56:52 WARN mapred.LocalJobRunner: job_local_0001 >>> org.apache.avro.AvroTypeException: Found { >>> "type" : "record", >>> "name" : "AVRO_NCP_ICM", >>> "fields" : [ { >>> "name" : "DATADATE", >>> "type" : "string" >>> }, { >>> "name" : "ICM_CALLID", >>> "type" : "string" >>> }, { >>> "name" : "AGENT_ELID", >>> "type" : "string" >>> }, { >>> "name" : "AGENT_NAME", >>> "type" : "string" >>> }, { >>> "name" : "AGENT_SITE", >>> "type" : "string" >>> }, { >>> "name" : "AGENT_SVIEW_USER_ID", >>> "type" : "string" >>> }, { >>> "name" : "AGENT_UNIT_ID", >>> "type" : "string" >>> }, { >>> "name" : "ANI", >>> "type" : "string" >>> }, { >>> "name" : "CALL_CTR_UNIT_ID", >>> "type" : "string" >>> }, { >>> "name" : "CALL_FA_ID", >>> "type" : "string" >>> }, { >>> "name" : "CALL_FUNCTIONALAREA", >>> "type" : "string" >>> }, { >>> "name" : "CTI_CALL_IDENTIFIER", >>> "type" : "string" >>> }, { >>> "name" : "CALLDISPOSITION", >>> "type" : "string" >>> }, { >>> "name" : "AGENTPERIPHERALNUMBER", >>> "type" : "string" >>> }, { >>> "name" : "PERIPHERALID", >>> "type" : "string" >>> }, { >>> "name" : "ICM_CALL_START_TIME", >>> "type" : "string" >>> }, { >>> "name" : "ICM_DERIVED_BAN", >>> "type" : "string" >>> }, { >>> "name" : "ICM_DERIVED_PTN", >>> "type" : "string" >>> }, { >>> "name" : "ICM_SESSIONLOOKUPTIME", >>> "type" : "string" >>> }, { >>> "name" : "IVR_CALLID", >>> "type" : "string" >>> }, { >>> "name" : "RECOVERYKEY", >>> "type" : "string" >>> } ] >>> }, expecting [ { >>> "type" : "record", >>> "name" : "Pair", >>> "namespace" : "org.apache.avro.mapred", >>> "fields" : [ { >>> "name" : "key", >>> "type" : "string", >>> "doc" : "" >>> }, { >>> "name" : "value", >>> "type" : { >>> "type" : "record", >>> "name" : "AVRO_IVR_BY_CALLID", >>> "namespace" : "merced", >>> "doc" : "AVRO_IVR_BY_CALLID", >>> "fields" : [ { >>> "name" : "CALLID", >>> "type" : "string" >>> }, { >>> "name" : "CALLTS", >>> "type" : "string" >>> }, { >>> "name" : "LANGUAGECODE", >>> "type" : "string" >>> }, { >>> "name" : "MARKETID", >>> "type" : "string" >>> }, { >>> "name" : "ANI", >>> "type" : "string" >>> }, { >>> "name" : "BAN_PTN", >>> "type" : "string" >>> }, { >>> "name" : "BAN", >>> "type" : "string" >>> }, { >>> "name" : "AUDITTRAIL", >>> "type" : "string" >>> }, { >>> "name" : "RECOVERYCALLKEY", >>> "type" : "string" >>> }, { >>> "name" : "TOTALTRANSACTIONS", >>> "type" : "string" >>> }, { >>> "name" : "COMPLETEDTRANSACTIONS", >>> "type" : "string" >>> }, { >>> "name" : "BILLABLETRANSACTIONS", >>> "type" : "string" >>> }, { >>> "name" : "CUSTOMERBILLABLETRANSACTIONS", >>> "type" : "string" >>> }, { >>> "name" : "TNTTOCSR", >>> "type" : "string" >>> }, { >>> "name" : "CSRFIRSTRESOLUTION", >>> "type" : "string" >>> }, { >>> "name" : "CDC", >>> "type" : "string" >>> }, { >>> "name" : "REPEATCALL", >>> "type" : "string" >>> }, { >>> "name" : "ELAPSEDIVRTIME", >>> "type" : "string" >>> }, { >>> "name" : "ELAPSEDCALLTIME", >>> "type" : "string" >>> }, { >>> "name" : "CALLPATTERNID", >>> "type" : "string" >>> }, { >>> "name" : "DIALEDNUMBERSTRING", >>> "type" : "string" >>> }, { >>> "name" : "CALLTYPE", >>> "type" : "string" >>> }, { >>> "name" : "PERIPHERALID", >>> "type" : "string" >>> }, { >>> "name" : "BILLABLE_CALL_FLG", >>> "type" : "string" >>> }, { >>> "name" : "REPEAT_PROGRAM_TYPE", >>> "type" : "string" >>> }, { >>> "name" : "PAST_DUE_FLG", >>> "type" : "string" >>> }, { >>> "name" : "HYBRID_FLG", >>> "type" : "string" >>> }, { >>> "name" : "NETWORK_FLG", >>> "type" : "string" >>> }, { >>> "name" : "AUTHENTICATION_FLG", >>> "type" : "string" >>> }, { >>> "name" : "HOTLINE_FLG", >>> "type" : "string" >>> }, { >>> "name" : "SUBSCRIBER_START_DATE", >>> "type" : "string" >>> }, { >>> "name" : "PREPAID_IND", >>> "type" : "string" >>> }, { >>> "name" : "ACC_TYPE", >>> "type" : "string" >>> }, { >>> "name" : "SUB_TYPE", >>> "type" : "string" >>> }, { >>> "name" : "ASL_FLG", >>> "type" : "string" >>> }, { >>> "name" : "CALL_ENV", >>> "type" : "string" >>> }, { >>> "name" : "CALL_SEQ", >>> "type" : "string" >>> } ] >>> }, >>> "doc" : "", >>> "order" : "ignore" >>> } ] >>> }, { >>> "type" : "record", >>> "name" : "AVRO_NCP_ICM", >>> "namespace" : "org.apache.avro.mapred", >>> "fields" : [ { >>> "name" : "DATADATE", >>> "type" : "string" >>> }, { >>> "name" : "ICM_CALLID", >>> "type" : "string" >>> }, { >>> "name" : "AGENT_ELID", >>> "type" : "string" >>> }, { >>> "name" : "AGENT_NAME", >>> "type" : "string" >>> }, { >>> "name" : "AGENT_SITE", >>> "type" : "string" >>> }, { >>> "name" : "AGENT_SVIEW_USER_ID", >>> "type" : "string" >>> }, { >>> "name" : "AGENT_UNIT_ID", >>> "type" : "string" >>> }, { >>> "name" : "ANI", >>> "type" : "string" >>> }, { >>> "name" : "CALL_CTR_UNIT_ID", >>> "type" : "string" >>> }, { >>> "name" : "CALL_FA_ID", >>> "type" : "string" >>> }, { >>> "name" : "CALL_FUNCTIONALAREA", >>> "type" : "string" >>> }, { >>> "name" : "CTI_CALL_IDENTIFIER", >>> "type" : "string" >>> }, { >>> "name" : "CALLDISPOSITION", >>> "type" : "string" >>> }, { >>> "name" : "AGENTPERIPHERALNUMBER", >>> "type" : "string" >>> }, { >>> "name" : "PERIPHERALID", >>> "type" : "string" >>> }, { >>> "name" : "ICM_CALL_START_TIME", >>> "type" : "string" >>> }, { >>> "name" : "ICM_DERIVED_BAN", >>> "type" : "string" >>> }, { >>> "name" : "ICM_DERIVED_PTN", >>> "type" : "string" >>> }, { >>> "name" : "ICM_SESSIONLOOKUPTIME", >>> "type" : "string" >>> }, { >>> "name" : "IVR_CALLID", >>> "type" : "string" >>> }, { >>> "name" : "RECOVERYKEY", >>> "type" : "string" >>> } ] >>> } ] >>> at >>> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:231) >>> at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) >>> at >>> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:206) >>> at >>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:148) >>> at >>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:135) >>> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) >>> at >>> org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:69) >>> at >>> org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:34) >>> at >>> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java >>> :208) >>> at >>> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:193) >>> at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48) >>> at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391) >>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325) >>> at >>> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:210) >>> 12/02/12 22:56:53 INFO mapred.JobClient: map 0% reduce 0% >>> 12/02/12 22:56:53 INFO mapred.JobClient: Job complete: job_local_0001 >>> Job failed with exception:java.io.IOException: Job failed! >
