Hello again, In spark-developers mailing list I saw discussion about v0.8.1-incubating version and decided to try it out. And my problem сan not be reproduced in it!
There were some commits made regarding streaming and kafka, for example: https://github.com/apache/incubator-spark/commit/c85665157afa75caeac3a91adf97a0edc0cac3a5#diff-abb0256a318439745ab0b343d64b9ba0 I don’t understand how they relate to my problem, but it fixed) Maybe this information can be helpful. — Best regards, Konstantin Abakumov On Thursday, 28 November 2013 г. at 18:49, Konstantin Abakumov wrote: > Hello! > > > I am facing a problem using spark streaming and KafkaStream as input stream. > > > Spark version: 0.8.0-incubating, kafka-0.7.2 that ships with spark > distribution. Spark is deployed on Amazon EC2 with 2 slaves using provided > scripts. > > > My application gets an error in distributed mode (stack trace is pasted > below). As I understand, workers cannot find class > kafka.consumer.ConsumerConnector during job execution. As a result, no data > is consumed from kafka broker. It's strange, because kafka jar is in > classpath and packaged with application, and no problem occured in local > mode, data were consumed and processed correctly. > > > Then I've tried to execute KafkaWordCount example but got the same problem, > again in distributed mode only. Moreover, there was no kafka jar added to > spark-examples assembly jar or classpath, so I had to add it manually to > classpath in run-example script, otherwise example was failed to execute :) > > > Unfortunately, I haven't figured out what led to this error yet, so any > advice will be helpful. > > > After all I've tested the spark's master branch that uses kafka 0.8.0 and, > fortunately, no problem occured. KafkaWordCount example works correctly both > in local and distributed modes. So the problem appears only with kafka 0.7.2. > > > We use kafka 0.7 in our project now, but we're going to upgrade it to version > 0.8 after first spark's stable release with 0.8.0 support. Is master branch > any sort of production-ready? Can we use it now or it is not recommended at > the moment? > > > Stack trace: > java.lang.NoClassDefFoundError: Lkafka/consumer/ConsumerConnector; at > java.lang.Class.getDeclaredFields0(Native Method) at > java.lang.Class.privateGetDeclaredFields(Class.java:2397) at > java.lang.Class.getDeclaredField(Class.java:1946) at > java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659) at > java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:72) at > java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:480) at > java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468) at > java.security.AccessController.doPrivileged(Native Method) at > java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:468) at > java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365) at > java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602) at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at > java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at > java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500) at > org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at > java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at > org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:135) at > java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at > java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39) > at > org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:61) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:153) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) Caused by: > java.lang.ClassNotFoundException: kafka.consumer.ConsumerConnector at > java.net.URLClassLoader$1.run(URLClassLoader.java:366) at > java.net.URLClassLoader$1.run(URLClassLoader.java:355) at > java.security.AccessController.doPrivileged(Native Method) at > java.net.URLClassLoader.findClass(URLClassLoader.java:354) at > java.lang.ClassLoader.loadClass(ClassLoader.java:425) at > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at > java.lang.ClassLoader.loadClass(ClassLoader.java:358) > ... 43 more > > > -- > Best regards, > Konstantin Abakumov >
