As for why the union does not seem to match: The Union schemas are not the same as the one in the error the one in the error does not have a namespace. It finds "AVRO_NCP_ICM" but the union has only "merced.AVRO_NCP_ICM" and "merced. AVRO_IVR_BY_CALLID". The namespace and name must both match.
Is your output schema correct? It looks like you are setting both your MapOutputSchema and OutputSchema to be a Pair schema. I suspect you only want the Pair schema as a map output and reducer input, but cannot be sure from the below. >From the below, your reducer must create Pair objects and output them, and maybe that is related to the error below. It may also be related to the combiner, does it happen without it? On 2/12/12 11:01 PM, "Serge Blazhievsky" <[email protected]> wrote: > Hi all, > > I am running into an interesting problem with Union. It seems that order of > the schema in union must be in the same order as input path for different > files. > > This does not look like right behavior. The code and exception are below. > > The moment I change the order in union it works. > > > Thanks > Serge > > > public int run(String[] strings) throws Exception { > > JobConf job = new JobConf(); > > > job.setNumMapTasks(map); > job.setNumReduceTasks(reduce); > > > // Uncomment to run locally in a single process > job.set("mapred.job.tracker", "local"); > > File file = new File(input); > DatumReader<GenericRecord> reader = new > GenericDatumReader<GenericRecord>(); > DataFileReader<GenericRecord> dataFileReader = new > DataFileReader<GenericRecord>(file, reader); > > Schema s = dataFileReader.getSchema(); > > > > > > File lfile = new File(linput); > DatumReader<GenericRecord> lreader = new > GenericDatumReader<GenericRecord>(); > DataFileReader<GenericRecord> ldataFileReader = new > DataFileReader<GenericRecord>(lfile, lreader); > > Schema s2 = ldataFileReader.getSchema(); > > > > List<Schema> slist= new ArrayList<Schema>(); > > slist.add(s2); > slist.add(s); > > > > System.out.println(s.toString(true)); > System.out.println(s2.toString(true)); > > > > Schema s_union=Schema.createUnion(slist); > > > > AvroJob.setInputSchema(job, s_union); > > > > List<Schema.Field> fields = s.getFields(); > > List<Schema.Field> outfields = new ArrayList<Schema.Field>(); > > > for (Schema.Field f : fields) { > > outfields.add(new Schema.Field(f.name <http://f.name> (), > Schema.create(Type.STRING), null, null)); > } > > boolean b = false; > Schema outschema = Schema.createRecord("AVRO_IVR_BY_CALLID", > "AVRO_IVR_BY_CALLID", "merced", b); > > outschema.setFields(outfields); > > > > Schema STRING_SCHEMA = Schema.create(Schema.Type.STRING); > > > Schema OUT_SCHEMA = new Pair<String, GenericRecord>("", STRING_SCHEMA, > new GenericData.Record(outschema), outschema).getSchema(); > > > AvroJob.setMapOutputSchema(job, OUT_SCHEMA); > AvroJob.setOutputSchema(job, OUT_SCHEMA); > > AvroJob.setMapperClass(job, MapImpl.class); > AvroJob.setCombinerClass(job, ReduceImpl.class); > AvroJob.setReducerClass(job, ReduceImpl.class); > > // FileInputFormat.setInputPaths(job, new Path(input)); > > > FileInputFormat.addInputPath(job, new Path(linput)); > FileInputFormat.addInputPath(job, new Path(input)); > > > > > // MultipleInputs.addInputPath(job, new Path(input), > AvroInputFormat<GenericRecord>.class, MapImpl.class); > > FileOutputFormat.setOutputPath(job, new Path(output)); > FileOutputFormat.setCompressOutput(job, true); > > int res = 255; > RunningJob runJob = JobClient.runJob(job); > if (runJob != null) { > res = runJob.isSuccessful() ? 0 : 1; > } > return res; > } > > > 2/02/12 22:56:52 WARN mapred.LocalJobRunner: job_local_0001 > org.apache.avro.AvroTypeException: Found { > "type" : "record", > "name" : "AVRO_NCP_ICM", > "fields" : [ { > "name" : "DATADATE", > "type" : "string" > }, { > "name" : "ICM_CALLID", > "type" : "string" > }, { > "name" : "AGENT_ELID", > "type" : "string" > }, { > "name" : "AGENT_NAME", > "type" : "string" > }, { > "name" : "AGENT_SITE", > "type" : "string" > }, { > "name" : "AGENT_SVIEW_USER_ID", > "type" : "string" > }, { > "name" : "AGENT_UNIT_ID", > "type" : "string" > }, { > "name" : "ANI", > "type" : "string" > }, { > "name" : "CALL_CTR_UNIT_ID", > "type" : "string" > }, { > "name" : "CALL_FA_ID", > "type" : "string" > }, { > "name" : "CALL_FUNCTIONALAREA", > "type" : "string" > }, { > "name" : "CTI_CALL_IDENTIFIER", > "type" : "string" > }, { > "name" : "CALLDISPOSITION", > "type" : "string" > }, { > "name" : "AGENTPERIPHERALNUMBER", > "type" : "string" > }, { > "name" : "PERIPHERALID", > "type" : "string" > }, { > "name" : "ICM_CALL_START_TIME", > "type" : "string" > }, { > "name" : "ICM_DERIVED_BAN", > "type" : "string" > }, { > "name" : "ICM_DERIVED_PTN", > "type" : "string" > }, { > "name" : "ICM_SESSIONLOOKUPTIME", > "type" : "string" > }, { > "name" : "IVR_CALLID", > "type" : "string" > }, { > "name" : "RECOVERYKEY", > "type" : "string" > } ] > }, expecting [ { > "type" : "record", > "name" : "Pair", > "namespace" : "org.apache.avro.mapred", > "fields" : [ { > "name" : "key", > "type" : "string", > "doc" : "" > }, { > "name" : "value", > "type" : { > "type" : "record", > "name" : "AVRO_IVR_BY_CALLID", > "namespace" : "merced", > "doc" : "AVRO_IVR_BY_CALLID", > "fields" : [ { > "name" : "CALLID", > "type" : "string" > }, { > "name" : "CALLTS", > "type" : "string" > }, { > "name" : "LANGUAGECODE", > "type" : "string" > }, { > "name" : "MARKETID", > "type" : "string" > }, { > "name" : "ANI", > "type" : "string" > }, { > "name" : "BAN_PTN", > "type" : "string" > }, { > "name" : "BAN", > "type" : "string" > }, { > "name" : "AUDITTRAIL", > "type" : "string" > }, { > "name" : "RECOVERYCALLKEY", > "type" : "string" > }, { > "name" : "TOTALTRANSACTIONS", > "type" : "string" > }, { > "name" : "COMPLETEDTRANSACTIONS", > "type" : "string" > }, { > "name" : "BILLABLETRANSACTIONS", > "type" : "string" > }, { > "name" : "CUSTOMERBILLABLETRANSACTIONS", > "type" : "string" > }, { > "name" : "TNTTOCSR", > "type" : "string" > }, { > "name" : "CSRFIRSTRESOLUTION", > "type" : "string" > }, { > "name" : "CDC", > "type" : "string" > }, { > "name" : "REPEATCALL", > "type" : "string" > }, { > "name" : "ELAPSEDIVRTIME", > "type" : "string" > }, { > "name" : "ELAPSEDCALLTIME", > "type" : "string" > }, { > "name" : "CALLPATTERNID", > "type" : "string" > }, { > "name" : "DIALEDNUMBERSTRING", > "type" : "string" > }, { > "name" : "CALLTYPE", > "type" : "string" > }, { > "name" : "PERIPHERALID", > "type" : "string" > }, { > "name" : "BILLABLE_CALL_FLG", > "type" : "string" > }, { > "name" : "REPEAT_PROGRAM_TYPE", > "type" : "string" > }, { > "name" : "PAST_DUE_FLG", > "type" : "string" > }, { > "name" : "HYBRID_FLG", > "type" : "string" > }, { > "name" : "NETWORK_FLG", > "type" : "string" > }, { > "name" : "AUTHENTICATION_FLG", > "type" : "string" > }, { > "name" : "HOTLINE_FLG", > "type" : "string" > }, { > "name" : "SUBSCRIBER_START_DATE", > "type" : "string" > }, { > "name" : "PREPAID_IND", > "type" : "string" > }, { > "name" : "ACC_TYPE", > "type" : "string" > }, { > "name" : "SUB_TYPE", > "type" : "string" > }, { > "name" : "ASL_FLG", > "type" : "string" > }, { > "name" : "CALL_ENV", > "type" : "string" > }, { > "name" : "CALL_SEQ", > "type" : "string" > } ] > }, > "doc" : "", > "order" : "ignore" > } ] > }, { > "type" : "record", > "name" : "AVRO_NCP_ICM", > "namespace" : "org.apache.avro.mapred", > "fields" : [ { > "name" : "DATADATE", > "type" : "string" > }, { > "name" : "ICM_CALLID", > "type" : "string" > }, { > "name" : "AGENT_ELID", > "type" : "string" > }, { > "name" : "AGENT_NAME", > "type" : "string" > }, { > "name" : "AGENT_SITE", > "type" : "string" > }, { > "name" : "AGENT_SVIEW_USER_ID", > "type" : "string" > }, { > "name" : "AGENT_UNIT_ID", > "type" : "string" > }, { > "name" : "ANI", > "type" : "string" > }, { > "name" : "CALL_CTR_UNIT_ID", > "type" : "string" > }, { > "name" : "CALL_FA_ID", > "type" : "string" > }, { > "name" : "CALL_FUNCTIONALAREA", > "type" : "string" > }, { > "name" : "CTI_CALL_IDENTIFIER", > "type" : "string" > }, { > "name" : "CALLDISPOSITION", > "type" : "string" > }, { > "name" : "AGENTPERIPHERALNUMBER", > "type" : "string" > }, { > "name" : "PERIPHERALID", > "type" : "string" > }, { > "name" : "ICM_CALL_START_TIME", > "type" : "string" > }, { > "name" : "ICM_DERIVED_BAN", > "type" : "string" > }, { > "name" : "ICM_DERIVED_PTN", > "type" : "string" > }, { > "name" : "ICM_SESSIONLOOKUPTIME", > "type" : "string" > }, { > "name" : "IVR_CALLID", > "type" : "string" > }, { > "name" : "RECOVERYKEY", > "type" : "string" > } ] > } ] > at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:231) > at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) > at > org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:206) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:148) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:135) > at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) > at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:69) > at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:34) > at > org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:2 > 08) > at > org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:193) > at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48) > at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391) > at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325) > at > org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:210) > 12/02/12 22:56:53 INFO mapred.JobClient: map 0% reduce 0% > 12/02/12 22:56:53 INFO mapred.JobClient: Job complete: job_local_0001 > Job failed with exception:java.io.IOException: Job failed!
