Hi Florin, Thanks for the very detailed report. That appears to be a bug, brought on by the way that ObjectInputStream works with classloaders, together with how Hadoop manipulates classloaders.
I've logged a JIRA ticket [1] for this. For now, like David I would recommend using Avros instead of Writables, as that should get around this issue without having any other consequences for now. - Gabriel 1. https://issues.apache.org/jira/browse/CRUNCH-539 On Tue, Jul 7, 2015 at 3:27 PM David Ortiz <[email protected]> wrote: > That looks weird. Can you try it using Avros in place of Writables and > see if it does the same thing? > > On Tue, Jul 7, 2015, 3:43 AM Florin Tatu <[email protected]> wrote: > >> Hi, >> >> I am having a job that processes a set of files containing climatic >> data(more exactly data from this location: >> ftp://ftp.ncdc.noaa.gov/pub/data/noaa/) >> >> I downloaded and merged the data using a script so I will have one folder >> (ncdc_data) having a .gz archive for each year(eg: 1901.gz, 1902.gz etc) >> Each archive contains only one text file. >> >> My code is: >> >> import com.google.common.base.Charsets; >> import com.google.common.io.Files; >> import org.apache.crunch.*; >> import org.apache.crunch.fn.Aggregators; >> import org.apache.crunch.impl.mr.MRPipeline; >> import org.apache.crunch.io.To; >> import java.io.File; >> import static org.apache.crunch.types.writable.Writables.ints; >> import static org.apache.crunch.types.writable.Writables.strings; >> import static org.apache.crunch.types.writable.Writables.tableOf; >> >> public class MaxTemperatureCrunch { >> >> public static void main(String[] args) throws Exception { >> if (args.length != 2) { >> System.err.println("Usage: MaxTemperatureCrunch <input path> >> <output path>"); >> System.exit(-1); >> } >> >> Pipeline pipeline = new MRPipeline(MaxTemperatureCrunch.class); >> >> PCollection<String> records = pipeline.readTextFile(args[0]); >> >> PTable<String, Integer> yearTemperatures = records >> .parallelDo(toYearTempPairsFn(), tableOf(strings(), >> ints())); >> >> PTable<String, Integer> maxTemps = yearTemperatures >> .groupByKey() >> .combineValues(Aggregators.MAX_INTS()) >> .top(1); //LINE THAT CAUSES THE ERROR >> >> maxTemps.write(To.textFile(args[1])); >> >> PipelineResult result = pipeline.done(); >> String dot = >> pipeline.getConfiguration().get("crunch.planner.dotfile"); >> Files.write(dot, new File("pipeline.dot"), Charsets.UTF_8); >> Runtime.getRuntime().exec("dot -Tpng -O pipeline.dot"); >> System.exit(result.succeeded() ? 0 : 1); >> } >> >> static DoFn<String, Pair<String, Integer>> toYearTempPairsFn() { >> return new DoFn<String, Pair<String, Integer>>() { >> NcdcRecordParser parser = new NcdcRecordParser(); >> @Override >> public void process(String input, Emitter<Pair<String, >> Integer>> emitter) { >> parser.parse(input); >> if (parser.isValidTemperature()) { >> emitter.emit(Pair.of(parser.getYear(), >> parser.getAirTemperature())); >> } >> } >> }; >> } >> } >> >> >> Hadoop runs locally in standalone mode. >> Hadoop version is: 2.7.0 >> Crunch version is: 0.12.0 (maven dependency: 0.12.0-hadoop2) >> >> I build my application with: mvn package >> I run it with: hadoop jar target/crunch-demo-1.0-SNAPSHOT-job.jar >> ncdc_data/ output >> >> If I do not call .top(1) (see comment: //LINE THAT CAUSES THE ERROR) >> everything works fine, but I obtain the maximum temperatures for each year >> only and I want to obtain the overall maximum temperature or the top N >> temperatures for the whole data set. >> >> If I call .top(1) I obtain the following error: >> >> java.lang.Exception: org.apache.crunch.CrunchRuntimeException: Error >> reloading writable comparable codes >> at >> org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462) >> at >> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522) >> Caused by: org.apache.crunch.CrunchRuntimeException: Error reloading >> writable comparable codes >> at >> org.apache.crunch.types.writable.TupleWritable.setConf(TupleWritable.java:71) >> at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:76) >> at >> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:136) >> at >> org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:66) >> at >> org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:42) >> at >> org.apache.hadoop.io.SequenceFile$Reader.deserializeValue(SequenceFile.java:2247) >> at >> org.apache.hadoop.io.SequenceFile$Reader.getCurrentValue(SequenceFile.java:2220) >> at >> org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader.nextKeyValue(SequenceFileRecordReader.java:78) >> at >> org.apache.crunch.impl.mr.run.CrunchRecordReader.nextKeyValue(CrunchRecordReader.java:157) >> at >> org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:556) >> at >> org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80) >> at >> org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91) >> at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) >> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787) >> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) >> at >> org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243) >> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) >> at java.util.concurrent.FutureTask.run(FutureTask.java:262) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.lang.ClassNotFoundException: >> org.apache.crunch.types.writable.TupleWritable >> at java.net.URLClassLoader$1.run(URLClassLoader.java:366) >> at java.net.URLClassLoader$1.run(URLClassLoader.java:355) >> at java.security.AccessController.doPrivileged(Native Method) >> at java.net.URLClassLoader.findClass(URLClassLoader.java:354) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:425) >> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:358) >> at java.lang.Class.forName0(Native Method) >> at java.lang.Class.forName(Class.java:274) >> at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625) >> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) >> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) >> at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1483) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1333) >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >> at >> com.google.common.collect.Serialization.populateMap(Serialization.java:91) >> at com.google.common.collect.HashBiMap.readObject(HashBiMap.java:109) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:606) >> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) >> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >> at >> org.apache.crunch.types.writable.Writables.reloadWritableComparableCodes(Writables.java:145) >> at >> org.apache.crunch.types.writable.TupleWritable.setConf(TupleWritable.java:69) >> ... 20 more >> >> >> Did anyone encountered this issue? >> If you need any other details please ask me. >> >> Thank you, >> Florin >> >
