Hi David, Gabriel, Using Avros I get org.apache.avro.UnresolvedUnionException: Not in union
The situation is similar to the crunch example from the hadoop book(first make a clone: git clone https://github.com/tomwhite/hadoop-book.git) If you run on hadoop-book/ch18-crunch the following commands: mvn package hadoop jar target/ch18-crunch-4.0-job.jar crunch.AvroGenericMaxTemperatureCrunch ncdc_data output you obtain the following exception: java.lang.Exception: org.apache.avro.UnresolvedUnionException: Not in union [{"type":"record","name":"WeatherRecord","namespace":"org.apache.avro.mapred","doc":"A weather reading.","fields":[{"name":"year","type":"int"},{"name":"temperature","type":"int"},{"name":"stationId","type":"string"}]},"null"]: {"year": 1928, "temperature": 28, "stationId": "011060-99999"} at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462) at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522) Caused by: org.apache.avro.UnresolvedUnionException: Not in union [{"type":"record","name":"WeatherRecord","namespace":"org.apache.avro.mapred","doc":"A weather reading.","fields":[{"name":"year","type":"int"},{"name":"temperature","type":"int"},{"name":"stationId","type":"string"}]},"null"]: {"year": 1928, "temperature": 28, "stationId": "011060-99999"} at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:561) at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:144) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58) at org.apache.crunch.types.avro.SafeAvroSerialization$AvroWrapperSerializer.serialize(SafeAvroSerialization.java:128) at org.apache.crunch.types.avro.SafeAvroSerialization$AvroWrapperSerializer.serialize(SafeAvroSerialization.java:113) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1157) at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715) at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89) at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112) at org.apache.crunch.impl.mr.emit.OutputEmitter.emit(OutputEmitter.java:41) at org.apache.crunch.MapFn.process(MapFn.java:34) at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56) at crunch.AvroGenericMaxTemperatureCrunch$2.process(AvroGenericMaxTemperatureCrunch.java:95) at crunch.AvroGenericMaxTemperatureCrunch$2.process(AvroGenericMaxTemperatureCrunch.java:82) at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56) at org.apache.crunch.MapFn.process(MapFn.java:34) at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) As far as I researched it seems to be related with: https://issues.apache.org/jira/browse/CRUNCH-360 (but please correct me if I am wrong). If this is a known issue, is there another alternative? PS: @Gabriel Reid: I saw you made a patch for the issue https://issues.apache.org/jira/browse/CRUNCH-539 I am unfamiliar on how to apply it to the project, can you please provide me some minimal guidance? :) - Florin On Tue, Jul 7, 2015 at 11:45 PM, Gabriel Reid <[email protected]> wrote: > Hi Florin, > > Thanks for the very detailed report. That appears to be a bug, brought on > by the way that ObjectInputStream works with classloaders, together with > how Hadoop manipulates classloaders. > > I've logged a JIRA ticket [1] for this. For now, like David I would > recommend using Avros instead of Writables, as that should get around this > issue without having any other consequences for now. > > - Gabriel > > 1. https://issues.apache.org/jira/browse/CRUNCH-539 > > > On Tue, Jul 7, 2015 at 3:27 PM David Ortiz <[email protected]> wrote: > >> That looks weird. Can you try it using Avros in place of Writables and >> see if it does the same thing? >> >> On Tue, Jul 7, 2015, 3:43 AM Florin Tatu <[email protected]> wrote: >> >>> Hi, >>> >>> I am having a job that processes a set of files containing climatic >>> data(more exactly data from this location: >>> ftp://ftp.ncdc.noaa.gov/pub/data/noaa/) >>> >>> I downloaded and merged the data using a script so I will have one >>> folder (ncdc_data) having a .gz archive for each year(eg: 1901.gz, 1902.gz >>> etc) >>> Each archive contains only one text file. >>> >>> My code is: >>> >>> import com.google.common.base.Charsets; >>> import com.google.common.io.Files; >>> import org.apache.crunch.*; >>> import org.apache.crunch.fn.Aggregators; >>> import org.apache.crunch.impl.mr.MRPipeline; >>> import org.apache.crunch.io.To; >>> import java.io.File; >>> import static org.apache.crunch.types.writable.Writables.ints; >>> import static org.apache.crunch.types.writable.Writables.strings; >>> import static org.apache.crunch.types.writable.Writables.tableOf; >>> >>> public class MaxTemperatureCrunch { >>> >>> public static void main(String[] args) throws Exception { >>> if (args.length != 2) { >>> System.err.println("Usage: MaxTemperatureCrunch <input path> >>> <output path>"); >>> System.exit(-1); >>> } >>> >>> Pipeline pipeline = new MRPipeline(MaxTemperatureCrunch.class); >>> >>> PCollection<String> records = pipeline.readTextFile(args[0]); >>> >>> PTable<String, Integer> yearTemperatures = records >>> .parallelDo(toYearTempPairsFn(), tableOf(strings(), >>> ints())); >>> >>> PTable<String, Integer> maxTemps = yearTemperatures >>> .groupByKey() >>> .combineValues(Aggregators.MAX_INTS()) >>> .top(1); //LINE THAT CAUSES THE ERROR >>> >>> maxTemps.write(To.textFile(args[1])); >>> >>> PipelineResult result = pipeline.done(); >>> String dot = >>> pipeline.getConfiguration().get("crunch.planner.dotfile"); >>> Files.write(dot, new File("pipeline.dot"), Charsets.UTF_8); >>> Runtime.getRuntime().exec("dot -Tpng -O pipeline.dot"); >>> System.exit(result.succeeded() ? 0 : 1); >>> } >>> >>> static DoFn<String, Pair<String, Integer>> toYearTempPairsFn() { >>> return new DoFn<String, Pair<String, Integer>>() { >>> NcdcRecordParser parser = new NcdcRecordParser(); >>> @Override >>> public void process(String input, Emitter<Pair<String, >>> Integer>> emitter) { >>> parser.parse(input); >>> if (parser.isValidTemperature()) { >>> emitter.emit(Pair.of(parser.getYear(), >>> parser.getAirTemperature())); >>> } >>> } >>> }; >>> } >>> } >>> >>> >>> Hadoop runs locally in standalone mode. >>> Hadoop version is: 2.7.0 >>> Crunch version is: 0.12.0 (maven dependency: 0.12.0-hadoop2) >>> >>> I build my application with: mvn package >>> I run it with: hadoop jar target/crunch-demo-1.0-SNAPSHOT-job.jar >>> ncdc_data/ output >>> >>> If I do not call .top(1) (see comment: //LINE THAT CAUSES THE ERROR) >>> everything works fine, but I obtain the maximum temperatures for each year >>> only and I want to obtain the overall maximum temperature or the top N >>> temperatures for the whole data set. >>> >>> If I call .top(1) I obtain the following error: >>> >>> java.lang.Exception: org.apache.crunch.CrunchRuntimeException: Error >>> reloading writable comparable codes >>> at >>> org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462) >>> at >>> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522) >>> Caused by: org.apache.crunch.CrunchRuntimeException: Error reloading >>> writable comparable codes >>> at >>> org.apache.crunch.types.writable.TupleWritable.setConf(TupleWritable.java:71) >>> at >>> org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:76) >>> at >>> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:136) >>> at >>> org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:66) >>> at >>> org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:42) >>> at >>> org.apache.hadoop.io.SequenceFile$Reader.deserializeValue(SequenceFile.java:2247) >>> at >>> org.apache.hadoop.io.SequenceFile$Reader.getCurrentValue(SequenceFile.java:2220) >>> at >>> org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader.nextKeyValue(SequenceFileRecordReader.java:78) >>> at >>> org.apache.crunch.impl.mr.run.CrunchRecordReader.nextKeyValue(CrunchRecordReader.java:157) >>> at >>> org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:556) >>> at >>> org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80) >>> at >>> org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91) >>> at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) >>> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787) >>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) >>> at >>> org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243) >>> at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:262) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> at java.lang.Thread.run(Thread.java:745) >>> Caused by: java.lang.ClassNotFoundException: >>> org.apache.crunch.types.writable.TupleWritable >>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366) >>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355) >>> at java.security.AccessController.doPrivileged(Native Method) >>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354) >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:425) >>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358) >>> at java.lang.Class.forName0(Native Method) >>> at java.lang.Class.forName(Class.java:274) >>> at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625) >>> at >>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) >>> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) >>> at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1483) >>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1333) >>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >>> at >>> com.google.common.collect.Serialization.populateMap(Serialization.java:91) >>> at com.google.common.collect.HashBiMap.readObject(HashBiMap.java:109) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:606) >>> at >>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) >>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) >>> at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >>> at >>> org.apache.crunch.types.writable.Writables.reloadWritableComparableCodes(Writables.java:145) >>> at >>> org.apache.crunch.types.writable.TupleWritable.setConf(TupleWritable.java:69) >>> ... 20 more >>> >>> >>> Did anyone encountered this issue? >>> If you need any other details please ask me. >>> >>> Thank you, >>> Florin >>> >>
