Also see this context from February.  We started working with Chill to get
Avro records automatically registered with Kryo.  I'm not sure the final
status, but from the Chill PR #172 it looks like this might be much less
friction than before.

Issue we filed: https://github.com/twitter/chill/issues/171
Pull request that adds an AvroSerializer to Chill:
https://github.com/twitter/chill/pull/172
Issue on the old Spark tracker:
https://spark-project.atlassian.net/browse/SPARK-746

Matt can you comment if this change helps you streamline that gist even
further?

Andrew




On Tue, May 27, 2014 at 8:49 AM, Jeremy Lewi <jer...@lewi.us> wrote:

> Thanks that's super helpful.
>
> J
>
>
> On Tue, May 27, 2014 at 8:01 AM, Matt Massie <mas...@berkeley.edu> wrote:
>
>> I really should update that blog post. I created a gist (see
>> https://gist.github.com/massie/7224868) which explains a cleaner, more
>> efficient approach.
>>
>> --
>> Matt <http://www.linkedin.com/in/mattmassie/> 
>> Massie<http://www.twitter.com/matt_massie>
>> UC, Berkeley AMPLab <https://twitter.com/amplab>
>>
>>
>> On Tue, May 27, 2014 at 7:18 AM, Jeremy Lewi <jer...@lewi.us> wrote:
>>
>>> I was able to work around this by switching to the SpecificDatum
>>> interface and following this example:
>>>
>>> https://github.com/massie/spark-parquet-example/blob/master/src/main/scala/com/zenfractal/SerializableAminoAcid.java
>>>
>>> As in the example, I defined a subclass of my Avro type which
>>> implemented the Serializable interface using Avro serialization methods.
>>>
>>> I also defined a copy constructor which converted from the actual avro
>>> type to my subclass.
>>>
>>> In spark, after reading the Avro file, I ran a map operation to convert
>>> from the avro type to my serializable subclass.
>>> This worked although I'm not sure its the most efficient solution.
>>>
>>> Here's a gist of what I run in the console:
>>> https://gist.github.com/jlewi/59b0dec90b639d5d568e#file-avrospecific
>>>
>>> I haven't gotten Kryo registration to work yet but it seems like setting
>>> the registrator before launching the console using the environment variable
>>> SPARK_JAVA_OPTS might be better than shutting down and restarting the spark
>>> context in the console.
>>>
>>> J
>>>
>>>
>>> On Sat, May 24, 2014 at 11:16 AM, Jeremy Lewi <jer...@lewi.us> wrote:
>>>
>>>> Hi Josh,
>>>>
>>>> Thanks for the help.
>>>>
>>>> The class should be on the path on all nodes. Here's what I did:
>>>> 1) I built a jar from my scala code.
>>>> 2) I copied that jar to a location on all nodes in my cluster
>>>> (/usr/local/spark)
>>>> 3) I edited bin/compute-classpath.sh  to add my jar to the class path.
>>>> 4) I repeated the process with the avro mapreduce jar to provide
>>>> AvroKey.
>>>>
>>>> I doubt this is the best way to set the classpath but it seems to work.
>>>>
>>>> J
>>>>
>>>>
>>>> On Sat, May 24, 2014 at 9:26 AM, Josh Marcus <jmar...@meetup.com>wrote:
>>>>
>>>>> Jeremy,
>>>>>
>>>>> Just to be clear, are you assembling a jar with that class compiled
>>>>> (with its dependencies) and including the path to that jar on the command
>>>>> line in an environment variable (e.g. SPARK_CLASSPATH=path ./spark-shell)?
>>>>>
>>>>> --j
>>>>>
>>>>>
>>>>> On Saturday, May 24, 2014, Jeremy Lewi <jer...@lewi.us> wrote:
>>>>>
>>>>>> Hi Spark Users,
>>>>>>
>>>>>> I'm trying to read and process an Avro dataset using the interactive
>>>>>> spark scala shell. When my pipeline executes I get the
>>>>>> ClassNotFoundException pasted at the end of this email.
>>>>>> I'm trying to use the Generic Avro API (not the Specific API).
>>>>>>
>>>>>> Here's a gist of the commands I'm running in the spark console:
>>>>>> https://gist.github.com/jlewi/2c853edddd0ceee5f00c
>>>>>>
>>>>>> Here's my registrator for kryo.
>>>>>>
>>>>>> https://github.com/jlewi/cloud/blob/master/spark/src/main/scala/contrail/AvroGenericRegistrator.scala
>>>>>>
>>>>>> Any help or suggestions would be greatly appreciated.
>>>>>>
>>>>>> Thanks
>>>>>> Jeremy
>>>>>>
>>>>>> Here's the log message that is spewed out.
>>>>>>
>>>>>> 14/05/24 02:00:48 WARN TaskSetManager: Loss was due to
>>>>>> java.lang.ClassNotFoundException
>>>>>> java.lang.ClassNotFoundException:
>>>>>> $line16.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1
>>>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>>>>>  at java.security.AccessController.doPrivileged(Native Method)
>>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>>>>>  at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>>>>>>  at java.lang.Class.forName0(Native Method)
>>>>>> at java.lang.Class.forName(Class.java:270)
>>>>>> at
>>>>>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>>>>>> at
>>>>>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>>>>> at
>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>>>>> at
>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>>  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>>>>> at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>>>>>>  at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
>>>>>> at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>  at java.lang.reflect.Method.invoke(Method.java:606)
>>>>>> at
>>>>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>>>>>> at
>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>> at
>>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>>>>> at
>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>> at
>>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>>>>> at
>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>>>>>  at
>>>>>> scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>>>>>> at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
>>>>>>  at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>>>>  at
>>>>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>>>>>> at
>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>>>>> at
>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>>>>> at
>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>>  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>>>>> at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>>>>>>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>> at
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>>>  at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>>>>  at
>>>>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>>>>>> at
>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>>>>> at
>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>>  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>>>>> at
>>>>>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
>>>>>>  at
>>>>>> org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:63)
>>>>>> at
>>>>>> org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:139)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
>>>>>> at
>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>>>>>  at
>>>>>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
>>>>>> at
>>>>>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62)
>>>>>>  at
>>>>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193)
>>>>>> at
>>>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>>>>>>  at
>>>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
>>>>>> at java.security.AccessController.doPrivileged(Native Method)
>>>>>>  at javax.security.auth.Subject.doAs(Subject.java:415)
>>>>>> at
>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>>>>>>  at
>>>>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
>>>>>> at
>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
>>>>>>  at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>  at java.lang.Thread.run(Thread.java:744)
>>>>>> 14/05/24 02:00:48 WARN TaskSetManager: Lost TID 2 (task 0.0:2)
>>>>>> 14/05/24 02:00:48 INFO TaskSetManager: Loss was due to
>>>>>> java.lang.ClassNotFoundException:
>>>>>> $line16.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1 [duplicate 1]
>>>>>> 14/05/24 02:00:48 WARN TaskSetManager: Lost TID 4 (task 0.0:4)
>>>>>> 14/05/24 02:00:48 INFO TaskSetManager: Loss was due to
>>>>>> java.lang.ClassNotFoundException:
>>>>>> $line16.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1 [duplicate 2]
>>>>>> 14/05/24 02:00:48 WARN TaskSetManager: Lost TID 3 (task 0.0:3)
>>>>>> 14/05/24 02:00:48 INFO TaskSetManager: Loss was due to
>>>>>> java.lang.ClassNotFoundException:
>>>>>> $line16.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1 [duplicate 3]
>>>>>> 14/05/24 02:00:48 INFO TaskSetManager: Starting task 0.0:3 as TID 12
>>>>>> on executor 0: spark-slave00.c.biocloudops.internal (PROCESS_LOCAL)
>>>>>> 14/05/24 02:00:48 INFO TaskSetManager: Serialized task 0.0:3 as 1703
>>>>>> bytes in 1 ms
>>>>>> 14/05/24 02:00:48 INFO TaskSetManager: Starting task 0.0:4 as TID 13
>>>>>> on executor 0: spark-slave00.c.biocloudops.internal (PROCESS_LOCAL)
>>>>>> 14/05/24 02:00:48 INFO TaskSetManager: Serialized task 0.0:4 as 1703
>>>>>> bytes in 1 ms
>>>>>> 14/05/24 02:00:48 INFO TaskSetManager: Starting task 0.0:2 as TID 14
>>>>>> on executor 0: spark-slave00.c.biocloudops.internal (PROCESS_LOCAL)
>>>>>> 14/05/24 02:00:48 INFO TaskSetManager: Serialized task 0.0:2 as 1703
>>>>>> bytes in 0 ms
>>>>>> 14/05/24 02:00:48 WARN TaskSetManager: Lost TID 7 (task 0.0:7)
>>>>>> 14/05/24 02:00:48 INFO TaskSetManager: Loss was due to
>>>>>> java.lang.ClassNotFoundException:
>>>>>> $line16.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1 [duplicate 4]
>>>>>> 14/05/24 02:00:48 WARN TaskSetManager: Lost TID 6 (task 0.0:6)
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to