Hi Gabriel, David, My mistake here. I thought I should use an Avro schema. I am pretty new to Hadoop and Crunch.
The Avro types from Crunch works. I also applied the patch by following the steps you provided and it works. For the moment as you recommended I will stick to the Avro types until the fix will be released. Thank you for the support and the guidance, Florin On Wed, Jul 8, 2015 at 7:41 PM, Gabriel Reid <[email protected]> wrote: > Hi Florin, > > The use of Avros that David and I were referring to basically just > involves changing your import statements in your original source file from: > > import static org.apache.crunch.types.writable.Writables.ints; > import static org.apache.crunch.types.writable.Writables.strings; > import static org.apache.crunch.types.writable.Writables.tableOf; > > to: > > import static org.apache.crunch.types.avro.Avros.ints; > import static org.apache.crunch.types.avro.Avros.strings; > import static org.apache.crunch.types.avro.Avros.tableOf; > > > The specific problem that you seem to be running into now is due to Hadoop > 2.7.0 still shipping with avro 1.7.4, in which the underlying Avro bugs > that contributed to CRUNCH-360 are still present. You should be able to get > around that by swapping out the avro jar files > under share/hadoop/common/lib in your Hadoop installation (as mentioned in > CRUNCH-360). > > About applying the patch on CRUNCH-539, you can do it as follows: > 1. Download the patch to your local system > 2. Clone the Crunch git repo > $ git clone https://git-wip-us.apache.org/repos/asf/crunch.git > > 3. Apply the patch with git > $ git am path/to/CRUNCH-539.patch > > 4. Rebuild Crunch > $ mvn clean install -DskipTests > > 5. Update the pom in your own project to refer to crunch 0.13-SNAPSHOT, > and rebuild your own project > > > - Gabriel > > > > > On Wed, Jul 8, 2015 at 11:22 AM Florin Tatu <[email protected]> wrote: > >> Hi David, Gabriel, >> >> Using Avros I get org.apache.avro.UnresolvedUnionException: Not in union >> >> The situation is similar to the crunch example from the hadoop book(first >> make a clone: >> git clone https://github.com/tomwhite/hadoop-book.git) >> If you run on hadoop-book/ch18-crunch the following commands: >> >> mvn package >> hadoop jar target/ch18-crunch-4.0-job.jar >> crunch.AvroGenericMaxTemperatureCrunch ncdc_data output >> >> you obtain the following exception: >> >> java.lang.Exception: org.apache.avro.UnresolvedUnionException: Not in >> union >> [{"type":"record","name":"WeatherRecord","namespace":"org.apache.avro.mapred","doc":"A >> weather >> reading.","fields":[{"name":"year","type":"int"},{"name":"temperature","type":"int"},{"name":"stationId","type":"string"}]},"null"]: >> {"year": 1928, "temperature": 28, "stationId": "011060-99999"} >> at >> org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462) >> at >> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522) >> Caused by: org.apache.avro.UnresolvedUnionException: Not in union >> [{"type":"record","name":"WeatherRecord","namespace":"org.apache.avro.mapred","doc":"A >> weather >> reading.","fields":[{"name":"year","type":"int"},{"name":"temperature","type":"int"},{"name":"stationId","type":"string"}]},"null"]: >> {"year": 1928, "temperature": 28, "stationId": "011060-99999"} >> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:561) >> at >> org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:144) >> at >> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71) >> at >> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58) >> at >> org.apache.crunch.types.avro.SafeAvroSerialization$AvroWrapperSerializer.serialize(SafeAvroSerialization.java:128) >> at >> org.apache.crunch.types.avro.SafeAvroSerialization$AvroWrapperSerializer.serialize(SafeAvroSerialization.java:113) >> at >> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1157) >> at >> org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715) >> at >> org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89) >> at >> org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112) >> at >> org.apache.crunch.impl.mr.emit.OutputEmitter.emit(OutputEmitter.java:41) >> at org.apache.crunch.MapFn.process(MapFn.java:34) >> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) >> at >> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56) >> at >> crunch.AvroGenericMaxTemperatureCrunch$2.process(AvroGenericMaxTemperatureCrunch.java:95) >> at >> crunch.AvroGenericMaxTemperatureCrunch$2.process(AvroGenericMaxTemperatureCrunch.java:82) >> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) >> at >> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56) >> at org.apache.crunch.MapFn.process(MapFn.java:34) >> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) >> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) >> at org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) >> at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146) >> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787) >> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) >> at >> org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243) >> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) >> at java.util.concurrent.FutureTask.run(FutureTask.java:262) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:745) >> >> As far as I researched it seems to be related with: >> https://issues.apache.org/jira/browse/CRUNCH-360 >> (but please correct me if I am wrong). >> >> If this is a known issue, is there another alternative? >> >> PS: @Gabriel Reid: I saw you made a patch for the issue >> https://issues.apache.org/jira/browse/CRUNCH-539 >> I am unfamiliar on how to apply it to the project, can you please provide >> me some minimal guidance? :) >> >> >> - Florin >> >> On Tue, Jul 7, 2015 at 11:45 PM, Gabriel Reid <[email protected]> >> wrote: >> >>> Hi Florin, >>> >>> Thanks for the very detailed report. That appears to be a bug, brought >>> on by the way that ObjectInputStream works with classloaders, together with >>> how Hadoop manipulates classloaders. >>> >>> I've logged a JIRA ticket [1] for this. For now, like David I would >>> recommend using Avros instead of Writables, as that should get around this >>> issue without having any other consequences for now. >>> >>> - Gabriel >>> >>> 1. https://issues.apache.org/jira/browse/CRUNCH-539 >>> >>> >>> On Tue, Jul 7, 2015 at 3:27 PM David Ortiz <[email protected]> wrote: >>> >>>> That looks weird. Can you try it using Avros in place of Writables and >>>> see if it does the same thing? >>>> >>>> On Tue, Jul 7, 2015, 3:43 AM Florin Tatu <[email protected]> wrote: >>>> >>>>> Hi, >>>>> >>>>> I am having a job that processes a set of files containing climatic >>>>> data(more exactly data from this location: >>>>> ftp://ftp.ncdc.noaa.gov/pub/data/noaa/) >>>>> >>>>> I downloaded and merged the data using a script so I will have one >>>>> folder (ncdc_data) having a .gz archive for each year(eg: 1901.gz, 1902.gz >>>>> etc) >>>>> Each archive contains only one text file. >>>>> >>>>> My code is: >>>>> >>>>> import com.google.common.base.Charsets; >>>>> import com.google.common.io.Files; >>>>> import org.apache.crunch.*; >>>>> import org.apache.crunch.fn.Aggregators; >>>>> import org.apache.crunch.impl.mr.MRPipeline; >>>>> import org.apache.crunch.io.To; >>>>> import java.io.File; >>>>> import static org.apache.crunch.types.writable.Writables.ints; >>>>> import static org.apache.crunch.types.writable.Writables.strings; >>>>> import static org.apache.crunch.types.writable.Writables.tableOf; >>>>> >>>>> public class MaxTemperatureCrunch { >>>>> >>>>> public static void main(String[] args) throws Exception { >>>>> if (args.length != 2) { >>>>> System.err.println("Usage: MaxTemperatureCrunch <input >>>>> path> <output path>"); >>>>> System.exit(-1); >>>>> } >>>>> >>>>> Pipeline pipeline = new MRPipeline(MaxTemperatureCrunch.class); >>>>> >>>>> PCollection<String> records = pipeline.readTextFile(args[0]); >>>>> >>>>> PTable<String, Integer> yearTemperatures = records >>>>> .parallelDo(toYearTempPairsFn(), tableOf(strings(), >>>>> ints())); >>>>> >>>>> PTable<String, Integer> maxTemps = yearTemperatures >>>>> .groupByKey() >>>>> .combineValues(Aggregators.MAX_INTS()) >>>>> .top(1); //LINE THAT CAUSES THE ERROR >>>>> >>>>> maxTemps.write(To.textFile(args[1])); >>>>> >>>>> PipelineResult result = pipeline.done(); >>>>> String dot = >>>>> pipeline.getConfiguration().get("crunch.planner.dotfile"); >>>>> Files.write(dot, new File("pipeline.dot"), Charsets.UTF_8); >>>>> Runtime.getRuntime().exec("dot -Tpng -O pipeline.dot"); >>>>> System.exit(result.succeeded() ? 0 : 1); >>>>> } >>>>> >>>>> static DoFn<String, Pair<String, Integer>> toYearTempPairsFn() { >>>>> return new DoFn<String, Pair<String, Integer>>() { >>>>> NcdcRecordParser parser = new NcdcRecordParser(); >>>>> @Override >>>>> public void process(String input, Emitter<Pair<String, >>>>> Integer>> emitter) { >>>>> parser.parse(input); >>>>> if (parser.isValidTemperature()) { >>>>> emitter.emit(Pair.of(parser.getYear(), >>>>> parser.getAirTemperature())); >>>>> } >>>>> } >>>>> }; >>>>> } >>>>> } >>>>> >>>>> >>>>> Hadoop runs locally in standalone mode. >>>>> Hadoop version is: 2.7.0 >>>>> Crunch version is: 0.12.0 (maven dependency: 0.12.0-hadoop2) >>>>> >>>>> I build my application with: mvn package >>>>> I run it with: hadoop jar target/crunch-demo-1.0-SNAPSHOT-job.jar >>>>> ncdc_data/ output >>>>> >>>>> If I do not call .top(1) (see comment: //LINE THAT CAUSES THE ERROR) >>>>> everything works fine, but I obtain the maximum temperatures for each year >>>>> only and I want to obtain the overall maximum temperature or the top N >>>>> temperatures for the whole data set. >>>>> >>>>> If I call .top(1) I obtain the following error: >>>>> >>>>> java.lang.Exception: org.apache.crunch.CrunchRuntimeException: Error >>>>> reloading writable comparable codes >>>>> at >>>>> org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462) >>>>> at >>>>> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522) >>>>> Caused by: org.apache.crunch.CrunchRuntimeException: Error reloading >>>>> writable comparable codes >>>>> at >>>>> org.apache.crunch.types.writable.TupleWritable.setConf(TupleWritable.java:71) >>>>> at >>>>> org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:76) >>>>> at >>>>> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:136) >>>>> at >>>>> org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:66) >>>>> at >>>>> org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:42) >>>>> at >>>>> org.apache.hadoop.io.SequenceFile$Reader.deserializeValue(SequenceFile.java:2247) >>>>> at >>>>> org.apache.hadoop.io.SequenceFile$Reader.getCurrentValue(SequenceFile.java:2220) >>>>> at >>>>> org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader.nextKeyValue(SequenceFileRecordReader.java:78) >>>>> at >>>>> org.apache.crunch.impl.mr.run.CrunchRecordReader.nextKeyValue(CrunchRecordReader.java:157) >>>>> at >>>>> org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:556) >>>>> at >>>>> org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80) >>>>> at >>>>> org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91) >>>>> at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) >>>>> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787) >>>>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) >>>>> at >>>>> org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243) >>>>> at >>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) >>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:262) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> Caused by: java.lang.ClassNotFoundException: >>>>> org.apache.crunch.types.writable.TupleWritable >>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366) >>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355) >>>>> at java.security.AccessController.doPrivileged(Native Method) >>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354) >>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:425) >>>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) >>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358) >>>>> at java.lang.Class.forName0(Native Method) >>>>> at java.lang.Class.forName(Class.java:274) >>>>> at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625) >>>>> at >>>>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) >>>>> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) >>>>> at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1483) >>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1333) >>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >>>>> at >>>>> com.google.common.collect.Serialization.populateMap(Serialization.java:91) >>>>> at com.google.common.collect.HashBiMap.readObject(HashBiMap.java:109) >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>> at >>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>>>> at >>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>>> at >>>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) >>>>> at >>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) >>>>> at >>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >>>>> at >>>>> org.apache.crunch.types.writable.Writables.reloadWritableComparableCodes(Writables.java:145) >>>>> at >>>>> org.apache.crunch.types.writable.TupleWritable.setConf(TupleWritable.java:69) >>>>> ... 20 more >>>>> >>>>> >>>>> Did anyone encountered this issue? >>>>> If you need any other details please ask me. >>>>> >>>>> Thank you, >>>>> Florin >>>>> >>>> >>
