thanks, Alan, it works!
2014-03-07 11:21 GMT+08:00 Alan Paulsen <[email protected]>: > Hi Fengyun, > > > > Here's what I've done in the past when facing a similar issue: > > > > 1) Set the map output schema to a UNION of both of your target > schemas, A and B. > > 2) Serialize the data in the mappers, using the avro datum as the > value. > > 3) Figure out what the avro schema is for each datum and write out > the data in the reducer. > > > > Thanks, > > > > Alan > > > > *From:* Fengyun RAO [mailto:[email protected]] > *Sent:* Thursday, March 06, 2014 2:14 AM > *To:* [email protected]; [email protected] > *Subject:* Re: MapReduce: How to output multiplt Avro files? > > > > add avro user mail-list > > > > 2014-03-06 16:09 GMT+08:00 Fengyun RAO <[email protected]>: > > our input is a line of text which may be parsed to e.g. A or B object. > > We want all A objects written to "A.avro" files, while all B objects > written to "B.avro". > > > > I looked into AvroMultipleOutputs class: > http://avro.apache.org/docs/1.7.4/api/java/org/apache/avro/mapreduce/AvroMultipleOutputs.html > > There is an example, however, it's not quite clear. > > For job submission, it uses AvroMultipleOutputs.addNamedOutput to add > schemas for A and B. > > In my program looks like: > > AvroMultipleOutputs.addNamedOutput(job, "A", > AvroKeyOutputFormat.class, aSchema, null); > > AvroMultipleOutputs.addNamedOutput(job, "B", > AvroKeyOutputFormat.class, bSchema, null); > > I believe this is for Reducer output files. > > > > *My question is* what the Mapper output should be, in specific what > "job.setMapOutputValueClass" should be, > > since the Mapper output could be A or B object, with schema aSchema or > bSchema. > > > > In my progam, I simply set it to GenericData, but get error as below: > > > > 14/03/06 15:55:34 INFO mapreduce.Job: Task Id : > attempt_1393817780522_0012_m_000010_2, Status : FAILED > > Error: java.lang.NullPointerException > > at > org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:989) > > at > org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:390) > > at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:79) > > at > org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:674) > > at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:746) > > at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) > > at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:165) > > at java.security.AccessController.doPrivileged(Native Method) > > at javax.security.auth.Subject.doAs(Subject.java:415) > > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) > > at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:160) > > > > I have no idea what this means. > > >
