I really should update that blog post. I created a gist (see https://gist.github.com/massie/7224868) which explains a cleaner, more efficient approach.
-- Matt <http://www.linkedin.com/in/mattmassie/> Massie<http://www.twitter.com/matt_massie> UC, Berkeley AMPLab <https://twitter.com/amplab> On Tue, May 27, 2014 at 7:18 AM, Jeremy Lewi <jer...@lewi.us> wrote: > I was able to work around this by switching to the SpecificDatum interface > and following this example: > > https://github.com/massie/spark-parquet-example/blob/master/src/main/scala/com/zenfractal/SerializableAminoAcid.java > > As in the example, I defined a subclass of my Avro type which implemented > the Serializable interface using Avro serialization methods. > > I also defined a copy constructor which converted from the actual avro > type to my subclass. > > In spark, after reading the Avro file, I ran a map operation to convert > from the avro type to my serializable subclass. > This worked although I'm not sure its the most efficient solution. > > Here's a gist of what I run in the console: > https://gist.github.com/jlewi/59b0dec90b639d5d568e#file-avrospecific > > I haven't gotten Kryo registration to work yet but it seems like setting > the registrator before launching the console using the environment variable > SPARK_JAVA_OPTS might be better than shutting down and restarting the spark > context in the console. > > J > > > On Sat, May 24, 2014 at 11:16 AM, Jeremy Lewi <jer...@lewi.us> wrote: > >> Hi Josh, >> >> Thanks for the help. >> >> The class should be on the path on all nodes. Here's what I did: >> 1) I built a jar from my scala code. >> 2) I copied that jar to a location on all nodes in my cluster >> (/usr/local/spark) >> 3) I edited bin/compute-classpath.sh to add my jar to the class path. >> 4) I repeated the process with the avro mapreduce jar to provide AvroKey. >> >> I doubt this is the best way to set the classpath but it seems to work. >> >> J >> >> >> On Sat, May 24, 2014 at 9:26 AM, Josh Marcus <jmar...@meetup.com> wrote: >> >>> Jeremy, >>> >>> Just to be clear, are you assembling a jar with that class compiled >>> (with its dependencies) and including the path to that jar on the command >>> line in an environment variable (e.g. SPARK_CLASSPATH=path ./spark-shell)? >>> >>> --j >>> >>> >>> On Saturday, May 24, 2014, Jeremy Lewi <jer...@lewi.us> wrote: >>> >>>> Hi Spark Users, >>>> >>>> I'm trying to read and process an Avro dataset using the interactive >>>> spark scala shell. When my pipeline executes I get the >>>> ClassNotFoundException pasted at the end of this email. >>>> I'm trying to use the Generic Avro API (not the Specific API). >>>> >>>> Here's a gist of the commands I'm running in the spark console: >>>> https://gist.github.com/jlewi/2c853edddd0ceee5f00c >>>> >>>> Here's my registrator for kryo. >>>> >>>> https://github.com/jlewi/cloud/blob/master/spark/src/main/scala/contrail/AvroGenericRegistrator.scala >>>> >>>> Any help or suggestions would be greatly appreciated. >>>> >>>> Thanks >>>> Jeremy >>>> >>>> Here's the log message that is spewed out. >>>> >>>> 14/05/24 02:00:48 WARN TaskSetManager: Loss was due to >>>> java.lang.ClassNotFoundException >>>> java.lang.ClassNotFoundException: >>>> $line16.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1 >>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366) >>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355) >>>> at java.security.AccessController.doPrivileged(Native Method) >>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354) >>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:425) >>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358) >>>> at java.lang.Class.forName0(Native Method) >>>> at java.lang.Class.forName(Class.java:270) >>>> at >>>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37) >>>> at >>>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) >>>> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) >>>> at >>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) >>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>> at >>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) >>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >>>> at >>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>> at >>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) >>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >>>> at >>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >>>> at scala.collection.immutable.$colon$colon.readObject(List.scala:362) >>>> at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>> at >>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) >>>> at >>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) >>>> at >>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>> at >>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) >>>> at >>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >>>> at >>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>> at >>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) >>>> at >>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >>>> at >>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >>>> at scala.collection.immutable.$colon$colon.readObject(List.scala:362) >>>> at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>> at >>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) >>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) >>>> at >>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>> at >>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) >>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >>>> at >>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>> at >>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) >>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >>>> at >>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >>>> at scala.collection.immutable.$colon$colon.readObject(List.scala:362) >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>> at >>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) >>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) >>>> at >>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>> at >>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) >>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >>>> at >>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >>>> at >>>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40) >>>> at >>>> org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:63) >>>> at >>>> org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:139) >>>> at >>>> java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) >>>> at >>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) >>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >>>> at >>>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40) >>>> at >>>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62) >>>> at >>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193) >>>> at >>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) >>>> at >>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) >>>> at java.security.AccessController.doPrivileged(Native Method) >>>> at javax.security.auth.Subject.doAs(Subject.java:415) >>>> at >>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) >>>> at >>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) >>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>> at java.lang.Thread.run(Thread.java:744) >>>> 14/05/24 02:00:48 WARN TaskSetManager: Lost TID 2 (task 0.0:2) >>>> 14/05/24 02:00:48 INFO TaskSetManager: Loss was due to >>>> java.lang.ClassNotFoundException: >>>> $line16.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1 [duplicate 1] >>>> 14/05/24 02:00:48 WARN TaskSetManager: Lost TID 4 (task 0.0:4) >>>> 14/05/24 02:00:48 INFO TaskSetManager: Loss was due to >>>> java.lang.ClassNotFoundException: >>>> $line16.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1 [duplicate 2] >>>> 14/05/24 02:00:48 WARN TaskSetManager: Lost TID 3 (task 0.0:3) >>>> 14/05/24 02:00:48 INFO TaskSetManager: Loss was due to >>>> java.lang.ClassNotFoundException: >>>> $line16.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1 [duplicate 3] >>>> 14/05/24 02:00:48 INFO TaskSetManager: Starting task 0.0:3 as TID 12 on >>>> executor 0: spark-slave00.c.biocloudops.internal (PROCESS_LOCAL) >>>> 14/05/24 02:00:48 INFO TaskSetManager: Serialized task 0.0:3 as 1703 >>>> bytes in 1 ms >>>> 14/05/24 02:00:48 INFO TaskSetManager: Starting task 0.0:4 as TID 13 on >>>> executor 0: spark-slave00.c.biocloudops.internal (PROCESS_LOCAL) >>>> 14/05/24 02:00:48 INFO TaskSetManager: Serialized task 0.0:4 as 1703 >>>> bytes in 1 ms >>>> 14/05/24 02:00:48 INFO TaskSetManager: Starting task 0.0:2 as TID 14 on >>>> executor 0: spark-slave00.c.biocloudops.internal (PROCESS_LOCAL) >>>> 14/05/24 02:00:48 INFO TaskSetManager: Serialized task 0.0:2 as 1703 >>>> bytes in 0 ms >>>> 14/05/24 02:00:48 WARN TaskSetManager: Lost TID 7 (task 0.0:7) >>>> 14/05/24 02:00:48 INFO TaskSetManager: Loss was due to >>>> java.lang.ClassNotFoundException: >>>> $line16.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1 [duplicate 4] >>>> 14/05/24 02:00:48 WARN TaskSetManager: Lost TID 6 (task 0.0:6) >>>> >>> >> >