Oh, okay. Would you humor me and try to use parallelDo(...) instead of mapValues(...) after the groupByKey() call and see if that works? I have this weird feeling that mapValues is doing something it shouldn't be doing to the Iterable.
J On Mon, Apr 13, 2015 at 9:34 PM, Lucy Chen <[email protected]> wrote: > Hi Josh, > > Thanks for your quick response. The codes should be as follows. I > just renamed the LingPipeData and copied the codes in the email, I forgot > to change a couple of places. I just simply change LingPipeData to ABCData > to make it easier for you to understand. Here I used LingPipe package > inside my Crunch jobs. I doubt whether the Vector included in the ABCData > caused some troubles when it was serialized by an Avro type. However, when > the codes exclude the parts after "*******" and just write ABC as an > output. It worked fine; but after adding ABC.groupByKey().mapValues(...), > it throws the exception. > > Sorry about the typos in my last email. > > Thanks! > > Lucy > > PType<ABCData> ABCDataType = Avros.records(ABCData.class); > > PTable<String, ABCData> ABC = input.mapValues(new > ConvertToABCData(feat_index_mapping, > addIntercept), ABCDataType); > > > ******************************************************************************************************* > > > PTable<String, String> lgr = ABC.groupByKey(). > > mapValues(new MapFn<Iterable<ABCData>, String> { > > @Override > > public String map(Iterable<ABCData> input) > > { > > Iterator<ABCData> ite1 = input.iterator(); > > int counter=0; > > while(ite1.hasNext()) > > { > > counter++; > > } > > return Integer.toString(counter); > > > } > > }, Avros.strings()); > > lgr.write(At.textFile(output_path), WriteMode.OVERWRITE); > > > **************************************************************************************************************** > > > public class ConvertToABCData extends MapFn<InputType, ABCData>{ > > > private FeatIndexMapping feat_index_mapping; > > private boolean addIntercept; > > public ConvertToABCData(FeatIndexMapping feat_index_mapping, boolean > addIntercept) > > { > > this.feat_index_mapping = feat_index_mapping; > > this.addIntercept = addIntercept; > > } > > @Override > > public ABCData map(InputType input) > > { > > return new ABCData(input, feat_index_mapping, addIntercept); > > } > > > } > > > public class ABCData implements java.io.Serializable, Cloneable{ > > > private int label; > > private Vector feature; > > private int dim; > > private final static Logger logger = Logger > > .getLogger(ABCData.class.getName()); > > ...... > > } > > > On Mon, Apr 13, 2015 at 4:24 PM, Josh Wills <[email protected]> wrote: > >> Hey Lucy, >> >> I don't grok the last MapFn before the lgr gets written out; it looks >> like it's defined over an Iterable<ABCData>, but the map() function defined >> inside the class is over Iterable<LingPipeData>. I assume that's the source >> of the problem-- the value that is getting printed out is the string form >> of a LingPipeData object, which isn't what the system expects to see. >> >> J >> >> On Mon, Apr 13, 2015 at 7:12 PM, Lucy Chen <[email protected]> >> wrote: >> >>> Hi, >>> >>> I have an exception of org.apache.avro.UnresolvedUnionException >>> thrown out by the following codes: >>> >>> >>> PType<ABCData> ABCDataType = Avros.records(ABCData.class); >>> >>> PTable<String, ABCData> ABC = input.mapValues(new >>> ConvertToABCData(feat_index_mapping, addIntercept), ABCDataType); >>> >>> >>> ******************************************************************************************************* >>> >>> >>> PTable<String, String> lgr = ABC.groupByKey(). >>> >>> mapValues(new MapFn<Iterable<ABCData>, String> { >>> >>> @Override >>> >>> public String map(Iterable<LingPipeData> input) >>> >>> { >>> >>> Iterator<LingPipeData> ite1 = input.iterator(); >>> >>> int counter=0; >>> >>> while(ite1.hasNext()) >>> >>> { >>> >>> counter++; >>> >>> } >>> >>> return Integer.toString(counter); >>> >>> >>> } >>> >>> }, Avros.strings()); >>> >>> lgr.write(At.textFile(output_path), WriteMode.OVERWRITE); >>> >>> >>> **************************************************************************************************************** >>> >>> >>> public class ConvertToABCData extends MapFn<InputType, ABCData>{ >>> >>> >>> private FeatIndexMapping feat_index_mapping; >>> >>> private boolean addIntercept; >>> >>> public ConvertToABCData(FeatIndexMapping feat_index_mapping, boolean >>> addIntercept) >>> >>> { >>> >>> this.feat_index_mapping = feat_index_mapping; >>> >>> this.addIntercept = addIntercept; >>> >>> } >>> >>> @Override >>> >>> public ABCData map(InputType input) >>> >>> { >>> >>> return new ABCData(input, feat_index_mapping, addIntercept); >>> >>> } >>> >>> >>> } >>> >>> >>> public class ABCData implements java.io.Serializable, Cloneable{ >>> >>> >>> private int label; >>> >>> private Vector feature; >>> >>> private int dim; >>> >>> private final static Logger logger = Logger >>> >>> .getLogger(ABCData.class.getName()); >>> >>> ...... >>> >>> } >>> >>> >>> Here Vector is defined from third party: com.aliasi.matrix.Vector; The >>> codes can run well until the line of star. But when the codes include >>> ABC.groupByKey().mapValues(), the following exception will be caught. Can >>> any one tell me how to solve the problem? >>> >>> >>> Thanks. >>> >>> >>> Lucy >>> >>> >>> The logs look like: >>> >>> >>> org.apache.crunch.CrunchRuntimeException: >>> org.apache.avro.file.DataFileWriter$AppendWriteException: >>> org.apache.avro.UnresolvedUnionException: Not in union >>> ["null",{"type":"record","name":"Vector","namespace":"com.aliasi.matrix","fields":[]}]: >>> 0=1.0 8=0.0917 9=0.0734 14=0.0336 22=0.0485 36=0.0795 40=0.0611 59=0.079 >>> 101=0.1065 127=0.1101 131=0.0969 135=0.1016 151=0.079 154=0.1847 177=0.0858 >>> 199=0.1131 200=0.0485 269=0.1096 271=0.1275 335=0.1299 588=0.165 799=0.2264 >>> 1200=0.1321 1286=0.2796 1482=0.1299 1702=0.4409 2170=0.2236 3644=0.2319 >>> 4824=0.2624 5040=0.3815 5584=0.2258 5937=0.2466 >>> >>> at >>> org.apache.crunch.impl.mr.emit.MultipleOutputEmitter.emit(MultipleOutputEmitter.java:45) >>> >>> at org.apache.crunch.MapFn.process(MapFn.java:34) >>> >>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) >>> >>> at >>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56) >>> >>> at org.apache.crunch.MapFn.process(MapFn.java:34) >>> >>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) >>> >>> at >>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56) >>> >>> at >>> com.apple.rsp.Utils.RetrieveDataFromJoin.process(RetrieveDataFromJoin.java:14) >>> >>> at >>> com.apple.rsp.Utils.RetrieveDataFromJoin.process(RetrieveDataFromJoin.java:10) >>> >>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) >>> >>> at >>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56) >>> >>> at org.apache.crunch.lib.join.InnerJoinFn.join(InnerJoinFn.java:66) >>> >>> at org.apache.crunch.lib.join.JoinFn.process(JoinFn.java:79) >>> >>> at org.apache.crunch.lib.join.JoinFn.process(JoinFn.java:32) >>> >>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) >>> >>> at >>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56) >>> >>> at org.apache.crunch.MapFn.process(MapFn.java:34) >>> >>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) >>> >>> at org.apache.crunch.impl.mr.run.RTNode.processIterable(RTNode.java:113) >>> >>> at >>> org.apache.crunch.impl.mr.run.CrunchReducer.reduce(CrunchReducer.java:57) >>> >>> at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171) >>> >>> at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627) >>> >>> at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389) >>> >>> at >>> org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319) >>> >>> at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) >>> >>> at java.util.concurrent.FutureTask.run(FutureTask.java:262) >>> >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> >>> at java.lang.Thread.run(Thread.java:744) >>> >>> Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException: >>> org.apache.avro.UnresolvedUnionException: Not in union >>> ["null",{"type":"record","name":"Vector","namespace":"com.aliasi.matrix","fields":[]}]: >>> 0=1.0 8=0.0917 9=0.0734 14=0.0336 22=0.0485 36=0.0795 40=0.0611 59=0.079 >>> 101=0.1065 127=0.1101 131=0.0969 135=0.1016 151=0.079 154=0.1847 177=0.0858 >>> 199=0.1131 200=0.0485 269=0.1096 271=0.1275 335=0.1299 588=0.165 799=0.2264 >>> 1200=0.1321 1286=0.2796 1482=0.1299 1702=0.4409 2170=0.2236 3644=0.2319 >>> 4824=0.2624 5040=0.3815 5584=0.2258 5937=0.2466 >>> >>> at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:263) >>> >>> at >>> org.apache.crunch.types.avro.AvroOutputFormat$1.write(AvroOutputFormat.java:87) >>> >>> at >>> org.apache.crunch.types.avro.AvroOutputFormat$1.write(AvroOutputFormat.java:84) >>> >>> at org.apache.crunch.io.CrunchOutputs.write(CrunchOutputs.java:133) >>> >>> at >>> org.apache.crunch.impl.mr.emit.MultipleOutputEmitter.emit(MultipleOutputEmitter.java:41) >>> >>> ... 28 more >>> >>> Caused by: org.apache.avro.UnresolvedUnionException: Not in union >>> ["null",{"type":"record","name":"Vector","namespace":"com.aliasi.matrix","fields":[]}]: >>> 0=1.0 8=0.0917 9=0.0734 14=0.0336 22=0.0485 36=0.0795 40=0.0611 59=0.079 >>> 101=0.1065 127=0.1101 131=0.0969 135=0.1016 151=0.079 154=0.1847 177=0.0858 >>> 199=0.1131 200=0.0485 269=0.1096 271=0.1275 335=0.1299 588=0.165 799=0.2264 >>> 1200=0.1321 1286=0.2796 1482=0.1299 1702=0.4409 2170=0.2236 3644=0.2319 >>> 4824=0.2624 5040=0.3815 5584=0.2258 5937=0.2466 >>> >>> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:561) >>> >>> at >>> org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:144) >>> >>> at >>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71) >>> >>> at >>> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:104) >>> >>> at >>> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:106) >>> >>> at >>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66) >>> >>> at >>> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:104) >>> >>> at >>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73) >>> >>> at >>> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:104) >>> >>> at >>> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:106) >>> >>> at >>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66) >>> >>> at >>> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:104) >>> >>> at >>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58) >>> >>> at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:257) >>> >>> ... 32 more >>> >>> 2015-04-13 15:49:16,876 INFO [Thread-500] mapred.LocalJobRunner >>> (LocalJobRunner.java:runTasks(456)) - reduce task executor complete. >>> >>> 2015-04-13 15:49:16,879 WARN [Thread-500] mapred.LocalJobRunner >>> (LocalJobRunner.java:run(560)) - job_local918028004_0008 >>> >>> java.lang.Exception: org.apache.crunch.CrunchRuntimeException: >>> org.apache.avro.file.DataFileWriter$AppendWriteException: >>> org.apache.avro.UnresolvedUnionException: Not in union >>> ["null",{"type":"record","name":"Vector","namespace":"com.aliasi.matrix","fields":[]}]: >>> 0=1.0 8=0.0917 9=0.0734 14=0.0336 22=0.0485 36=0.0795 40=0.0611 59=0.079 >>> 101=0.1065 127=0.1101 131=0.0969 135=0.1016 151=0.079 154=0.1847 177=0.0858 >>> 199=0.1131 200=0.0485 269=0.1096 271=0.1275 335=0.1299 588=0.165 799=0.2264 >>> 1200=0.1321 1286=0.2796 1482=0.1299 1702=0.4409 2170=0.2236 3644=0.2319 >>> 4824=0.2624 5040=0.3815 5584=0.2258 5937=0.2466 >>> >>> at >>> org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462) >>> >>> at >>> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529) >>> >>> Caused by: org.apache.crunch.CrunchRuntimeException: >>> org.apache.avro.file.DataFileWriter$AppendWriteException: >>> org.apache.avro.UnresolvedUnionException: Not in union >>> ["null",{"type":"record","name":"Vector","namespace":"com.aliasi.matrix","fields":[]}]: >>> 0=1.0 8=0.0917 9=0.0734 14=0.0336 22=0.0485 36=0.0795 40=0.0611 59=0.079 >>> 101=0.1065 127=0.1101 131=0.0969 135=0.1016 151=0.079 154=0.1847 177=0.0858 >>> 199=0.1131 200=0.0485 269=0.1096 271=0.1275 335=0.1299 588=0.165 799=0.2264 >>> 1200=0.1321 1286=0.2796 1482=0.1299 1702=0.4409 2170=0.2236 3644=0.2319 >>> 4824=0.2624 5040=0.3815 5584=0.2258 5937=0.2466 >>> >>> at >>> org.apache.crunch.impl.mr.emit.MultipleOutputEmitter.emit(MultipleOutputEmitter.java:45) >>> >>> at org.apache.crunch.MapFn.process(MapFn.java:34) >>> >>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) >>> >>> at >>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56) >>> >>> at org.apache.crunch.MapFn.process(MapFn.java:34) >>> >>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) >>> >>> at >>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56) >>> >>> at >>> com.apple.rsp.Utils.RetrieveDataFromJoin.process(RetrieveDataFromJoin.java:14) >>> >>> at >>> com.apple.rsp.Utils.RetrieveDataFromJoin.process(RetrieveDataFromJoin.java:10) >>> >>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) >>> >>> at >>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56) >>> >>> at org.apache.crunch.lib.join.InnerJoinFn.join(InnerJoinFn.java:66) >>> >>> at org.apache.crunch.lib.join.JoinFn.process(JoinFn.java:79) >>> >>> at org.apache.crunch.lib.join.JoinFn.process(JoinFn.java:32) >>> >>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) >>> >>> at >>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56) >>> >>> at org.apache.crunch.MapFn.process(MapFn.java:34) >>> >>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) >>> >>> at org.apache.crunch.impl.mr.run.RTNode.processIterable(RTNode.java:113) >>> >>> at >>> org.apache.crunch.impl.mr.run.CrunchReducer.reduce(CrunchReducer.java:57) >>> >>> at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171) >>> >>> at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627) >>> >>> at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389) >>> >>> at >>> org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319) >>> >>> at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) >>> >>> at java.util.concurrent.FutureTask.run(FutureTask.java:262) >>> >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> >>> at java.lang.Thread.run(Thread.java:744) >>> >>> Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException: >>> org.apache.avro.UnresolvedUnionException: Not in union >>> ["null",{"type":"record","name":"Vector","namespace":"com.aliasi.matrix","fields":[]}]: >>> 0=1.0 8=0.0917 9=0.0734 14=0.0336 22=0.0485 36=0.0795 40=0.0611 59=0.079 >>> 101=0.1065 127=0.1101 131=0.0969 135=0.1016 151=0.079 154=0.1847 177=0.0858 >>> 199=0.1131 200=0.0485 269=0.1096 271=0.1275 335=0.1299 588=0.165 799=0.2264 >>> 1200=0.1321 1286=0.2796 1482=0.1299 1702=0.4409 2170=0.2236 3644=0.2319 >>> 4824=0.2624 5040=0.3815 5584=0.2258 5937=0.2466 >>> >>> at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:263) >>> >>> at >>> org.apache.crunch.types.avro.AvroOutputFormat$1.write(AvroOutputFormat.java:87) >>> >>> at >>> org.apache.crunch.types.avro.AvroOutputFormat$1.write(AvroOutputFormat.java:84) >>> >>> at org.apache.crunch.io.CrunchOutputs.write(CrunchOutputs.java:133) >>> >>> at >>> org.apache.crunch.impl.mr.emit.MultipleOutputEmitter.emit(MultipleOutputEmitter.java:41) >>> >>> ... 28 more >>> >>> Caused by: org.apache.avro.UnresolvedUnionException: Not in union >>> ["null",{"type":"record","name":"Vector","namespace":"com.aliasi.matrix","fields":[]}]: >>> 0=1.0 8=0.0917 9=0.0734 14=0.0336 22=0.0485 36=0.0795 40=0.0611 59=0.079 >>> 101=0.1065 127=0.1101 131=0.0969 135=0.1016 151=0.079 154=0.1847 177=0.0858 >>> 199=0.1131 200=0.0485 269=0.1096 271=0.1275 335=0.1299 588=0.165 799=0.2264 >>> 1200=0.1321 1286=0.2796 1482=0.1299 1702=0.4409 2170=0.2236 3644=0.2319 >>> 4824=0.2624 5040=0.3815 5584=0.2258 5937=0.2466 >>> >>> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:561) >>> >>> at >>> org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:144) >>> >>> at >>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71) >>> >>> at >>> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:104) >>> >>> at >>> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:106) >>> >>> at >>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66) >>> >>> at >>> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:104) >>> >>> at >>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73) >>> >>> at >>> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:104) >>> >>> at >>> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:106) >>> >>> at >>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66) >>> >>> at >>> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:104) >>> >>> at >>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58) >>> >>> at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:257) >>> >>> ... 32 more >>> >>> 2 job failure(s) occurred: >>> >>> (5): Depending job with jobID 1 failed. >>> >>> com.apple.rsp.CrossValidation.CrossValidationDriver: >>> [[Text(/Users/luren/Lu/Project/model_testing/training_set... ID=1 (5/6)(1): >>> Job failed! >>> >>> >> > -- Director of Data Science Cloudera <http://www.cloudera.com> Twitter: @josh_wills <http://twitter.com/josh_wills>
