Also see this context from February. We started working with Chill to get Avro records automatically registered with Kryo. I'm not sure the final status, but from the Chill PR #172 it looks like this might be much less friction than before.
Issue we filed: https://github.com/twitter/chill/issues/171 Pull request that adds an AvroSerializer to Chill: https://github.com/twitter/chill/pull/172 Issue on the old Spark tracker: https://spark-project.atlassian.net/browse/SPARK-746 Matt can you comment if this change helps you streamline that gist even further? Andrew On Tue, May 27, 2014 at 8:49 AM, Jeremy Lewi <jer...@lewi.us> wrote: > Thanks that's super helpful. > > J > > > On Tue, May 27, 2014 at 8:01 AM, Matt Massie <mas...@berkeley.edu> wrote: > >> I really should update that blog post. I created a gist (see >> https://gist.github.com/massie/7224868) which explains a cleaner, more >> efficient approach. >> >> -- >> Matt <http://www.linkedin.com/in/mattmassie/> >> Massie<http://www.twitter.com/matt_massie> >> UC, Berkeley AMPLab <https://twitter.com/amplab> >> >> >> On Tue, May 27, 2014 at 7:18 AM, Jeremy Lewi <jer...@lewi.us> wrote: >> >>> I was able to work around this by switching to the SpecificDatum >>> interface and following this example: >>> >>> https://github.com/massie/spark-parquet-example/blob/master/src/main/scala/com/zenfractal/SerializableAminoAcid.java >>> >>> As in the example, I defined a subclass of my Avro type which >>> implemented the Serializable interface using Avro serialization methods. >>> >>> I also defined a copy constructor which converted from the actual avro >>> type to my subclass. >>> >>> In spark, after reading the Avro file, I ran a map operation to convert >>> from the avro type to my serializable subclass. >>> This worked although I'm not sure its the most efficient solution. >>> >>> Here's a gist of what I run in the console: >>> https://gist.github.com/jlewi/59b0dec90b639d5d568e#file-avrospecific >>> >>> I haven't gotten Kryo registration to work yet but it seems like setting >>> the registrator before launching the console using the environment variable >>> SPARK_JAVA_OPTS might be better than shutting down and restarting the spark >>> context in the console. >>> >>> J >>> >>> >>> On Sat, May 24, 2014 at 11:16 AM, Jeremy Lewi <jer...@lewi.us> wrote: >>> >>>> Hi Josh, >>>> >>>> Thanks for the help. >>>> >>>> The class should be on the path on all nodes. Here's what I did: >>>> 1) I built a jar from my scala code. >>>> 2) I copied that jar to a location on all nodes in my cluster >>>> (/usr/local/spark) >>>> 3) I edited bin/compute-classpath.sh to add my jar to the class path. >>>> 4) I repeated the process with the avro mapreduce jar to provide >>>> AvroKey. >>>> >>>> I doubt this is the best way to set the classpath but it seems to work. >>>> >>>> J >>>> >>>> >>>> On Sat, May 24, 2014 at 9:26 AM, Josh Marcus <jmar...@meetup.com>wrote: >>>> >>>>> Jeremy, >>>>> >>>>> Just to be clear, are you assembling a jar with that class compiled >>>>> (with its dependencies) and including the path to that jar on the command >>>>> line in an environment variable (e.g. SPARK_CLASSPATH=path ./spark-shell)? >>>>> >>>>> --j >>>>> >>>>> >>>>> On Saturday, May 24, 2014, Jeremy Lewi <jer...@lewi.us> wrote: >>>>> >>>>>> Hi Spark Users, >>>>>> >>>>>> I'm trying to read and process an Avro dataset using the interactive >>>>>> spark scala shell. When my pipeline executes I get the >>>>>> ClassNotFoundException pasted at the end of this email. >>>>>> I'm trying to use the Generic Avro API (not the Specific API). >>>>>> >>>>>> Here's a gist of the commands I'm running in the spark console: >>>>>> https://gist.github.com/jlewi/2c853edddd0ceee5f00c >>>>>> >>>>>> Here's my registrator for kryo. >>>>>> >>>>>> https://github.com/jlewi/cloud/blob/master/spark/src/main/scala/contrail/AvroGenericRegistrator.scala >>>>>> >>>>>> Any help or suggestions would be greatly appreciated. >>>>>> >>>>>> Thanks >>>>>> Jeremy >>>>>> >>>>>> Here's the log message that is spewed out. >>>>>> >>>>>> 14/05/24 02:00:48 WARN TaskSetManager: Loss was due to >>>>>> java.lang.ClassNotFoundException >>>>>> java.lang.ClassNotFoundException: >>>>>> $line16.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1 >>>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366) >>>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355) >>>>>> at java.security.AccessController.doPrivileged(Native Method) >>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354) >>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:425) >>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358) >>>>>> at java.lang.Class.forName0(Native Method) >>>>>> at java.lang.Class.forName(Class.java:270) >>>>>> at >>>>>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37) >>>>>> at >>>>>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) >>>>>> at >>>>>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) >>>>>> at >>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) >>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>>>> at >>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) >>>>>> at >>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >>>>>> at >>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>>>> at >>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) >>>>>> at >>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >>>>>> at >>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >>>>>> at scala.collection.immutable.$colon$colon.readObject(List.scala:362) >>>>>> at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) >>>>>> at >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>>>> at >>>>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) >>>>>> at >>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) >>>>>> at >>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>>>> at >>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>>>> at >>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) >>>>>> at >>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >>>>>> at >>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>>>> at >>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>>>> at >>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) >>>>>> at >>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >>>>>> at >>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>>>> at >>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >>>>>> at >>>>>> scala.collection.immutable.$colon$colon.readObject(List.scala:362) >>>>>> at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) >>>>>> at >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>>>> at >>>>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) >>>>>> at >>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) >>>>>> at >>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>>>> at >>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) >>>>>> at >>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >>>>>> at >>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>>>> at >>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) >>>>>> at >>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >>>>>> at >>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >>>>>> at scala.collection.immutable.$colon$colon.readObject(List.scala:362) >>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>> at >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>>>>> at >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>>>> at >>>>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) >>>>>> at >>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) >>>>>> at >>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>>>> at >>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) >>>>>> at >>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >>>>>> at >>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >>>>>> at >>>>>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40) >>>>>> at >>>>>> org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:63) >>>>>> at >>>>>> org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:139) >>>>>> at >>>>>> java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) >>>>>> at >>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) >>>>>> at >>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >>>>>> at >>>>>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40) >>>>>> at >>>>>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62) >>>>>> at >>>>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193) >>>>>> at >>>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) >>>>>> at >>>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) >>>>>> at java.security.AccessController.doPrivileged(Native Method) >>>>>> at javax.security.auth.Subject.doAs(Subject.java:415) >>>>>> at >>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) >>>>>> at >>>>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) >>>>>> at >>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) >>>>>> at >>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>>> at >>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>>> at java.lang.Thread.run(Thread.java:744) >>>>>> 14/05/24 02:00:48 WARN TaskSetManager: Lost TID 2 (task 0.0:2) >>>>>> 14/05/24 02:00:48 INFO TaskSetManager: Loss was due to >>>>>> java.lang.ClassNotFoundException: >>>>>> $line16.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1 [duplicate 1] >>>>>> 14/05/24 02:00:48 WARN TaskSetManager: Lost TID 4 (task 0.0:4) >>>>>> 14/05/24 02:00:48 INFO TaskSetManager: Loss was due to >>>>>> java.lang.ClassNotFoundException: >>>>>> $line16.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1 [duplicate 2] >>>>>> 14/05/24 02:00:48 WARN TaskSetManager: Lost TID 3 (task 0.0:3) >>>>>> 14/05/24 02:00:48 INFO TaskSetManager: Loss was due to >>>>>> java.lang.ClassNotFoundException: >>>>>> $line16.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1 [duplicate 3] >>>>>> 14/05/24 02:00:48 INFO TaskSetManager: Starting task 0.0:3 as TID 12 >>>>>> on executor 0: spark-slave00.c.biocloudops.internal (PROCESS_LOCAL) >>>>>> 14/05/24 02:00:48 INFO TaskSetManager: Serialized task 0.0:3 as 1703 >>>>>> bytes in 1 ms >>>>>> 14/05/24 02:00:48 INFO TaskSetManager: Starting task 0.0:4 as TID 13 >>>>>> on executor 0: spark-slave00.c.biocloudops.internal (PROCESS_LOCAL) >>>>>> 14/05/24 02:00:48 INFO TaskSetManager: Serialized task 0.0:4 as 1703 >>>>>> bytes in 1 ms >>>>>> 14/05/24 02:00:48 INFO TaskSetManager: Starting task 0.0:2 as TID 14 >>>>>> on executor 0: spark-slave00.c.biocloudops.internal (PROCESS_LOCAL) >>>>>> 14/05/24 02:00:48 INFO TaskSetManager: Serialized task 0.0:2 as 1703 >>>>>> bytes in 0 ms >>>>>> 14/05/24 02:00:48 WARN TaskSetManager: Lost TID 7 (task 0.0:7) >>>>>> 14/05/24 02:00:48 INFO TaskSetManager: Loss was due to >>>>>> java.lang.ClassNotFoundException: >>>>>> $line16.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1 [duplicate 4] >>>>>> 14/05/24 02:00:48 WARN TaskSetManager: Lost TID 6 (task 0.0:6) >>>>>> >>>>> >>>> >>> >> >