Hello again,

In spark-developers mailing list I saw discussion about v0.8.1-incubating 
version and decided to try it out. And my problem сan not be reproduced in it!  

There were some commits made regarding streaming and kafka, for example: 
https://github.com/apache/incubator-spark/commit/c85665157afa75caeac3a91adf97a0edc0cac3a5#diff-abb0256a318439745ab0b343d64b9ba0
  
I don’t understand how they relate to my problem, but it fixed)

Maybe this information can be helpful.  

—  
Best regards,
Konstantin Abakumov


On Thursday, 28 November 2013 г. at 18:49, Konstantin Abakumov wrote:

> Hello!
>  
>  
> I am facing a problem using spark streaming and KafkaStream as input stream.
>  
>  
> Spark version: 0.8.0-incubating, kafka-0.7.2 that ships with spark 
> distribution. Spark is deployed on Amazon EC2 with 2 slaves using provided 
> scripts.
>  
>  
> My application gets an error in distributed mode (stack trace is pasted 
> below). As I understand, workers cannot find class 
> kafka.consumer.ConsumerConnector during job execution. As a result, no data 
> is consumed from kafka broker. It's strange, because kafka jar is in 
> classpath and packaged with application, and no problem occured in local 
> mode, data were consumed and processed correctly.
>  
>  
> Then I've tried to execute KafkaWordCount example but got the same problem, 
> again in distributed mode only. Moreover, there was no kafka jar added to 
> spark-examples assembly jar or classpath, so I had to add it manually to 
> classpath in run-example script, otherwise example was failed to execute :)
>  
>  
> Unfortunately, I haven't figured out what led to this error yet, so any 
> advice will be helpful.
>  
>  
> After all I've tested the spark's master branch that uses kafka 0.8.0 and, 
> fortunately, no problem occured. KafkaWordCount example works correctly both 
> in local and distributed modes. So the problem appears only with kafka 0.7.2.
>  
>  
> We use kafka 0.7 in our project now, but we're going to upgrade it to version 
> 0.8 after first spark's stable release with 0.8.0 support. Is master branch 
> any sort of production-ready? Can we use it now or it is not recommended at 
> the moment?
>  
>  
> Stack trace:
> java.lang.NoClassDefFoundError: Lkafka/consumer/ConsumerConnector; at 
> java.lang.Class.getDeclaredFields0(Native Method) at 
> java.lang.Class.privateGetDeclaredFields(Class.java:2397) at 
> java.lang.Class.getDeclaredField(Class.java:1946) at 
> java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659) at 
> java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:72) at 
> java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:480) at 
> java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468) at 
> java.security.AccessController.doPrivileged(Native Method) at 
> java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:468) at 
> java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365) at 
> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602) at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) at 
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at 
> java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at 
> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500) at 
> org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
>  at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:606) at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at 
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at 
> org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:135) at 
> java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at 
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at 
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39)
>  at 
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:61)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:153) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:744) Caused by: 
> java.lang.ClassNotFoundException: kafka.consumer.ConsumerConnector at 
> java.net.URLClassLoader$1.run(URLClassLoader.java:366) at 
> java.net.URLClassLoader$1.run(URLClassLoader.java:355) at 
> java.security.AccessController.doPrivileged(Native Method) at 
> java.net.URLClassLoader.findClass(URLClassLoader.java:354) at 
> java.lang.ClassLoader.loadClass(ClassLoader.java:425) at 
> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at 
> java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>         ... 43 more  
>  
>  
> --  
> Best regards,
> Konstantin Abakumov
>  

Reply via email to