Hello!
I am facing a problem using spark streaming and KafkaStream as input stream.
Spark version: 0.8.0-incubating, kafka-0.7.2 that ships with spark
distribution. Spark is deployed on Amazon EC2 with 2 slaves using provided
scripts.
My application gets an error in distributed mode (stack trace is pasted below).
As I understand, workers cannot find class kafka.consumer.ConsumerConnector
during job execution. As a result, no data is consumed from kafka broker. It's
strange, because kafka jar is in classpath and packaged with application, and
no problem occured in local mode, data were consumed and processed correctly.
Then I've tried to execute KafkaWordCount example but got the same problem,
again in distributed mode only. Moreover, there was no kafka jar added to
spark-examples assembly jar or classpath, so I had to add it manually to
classpath in run-example script, otherwise example was failed to execute :)
Unfortunately, I haven't figured out what led to this error yet, so any advice
will be helpful.
After all I've tested the spark's master branch that uses kafka 0.8.0 and,
fortunately, no problem occured. KafkaWordCount example works correctly both in
local and distributed modes. So the problem appears only with kafka 0.7.2.
We use kafka 0.7 in our project now, but we're going to upgrade it to version
0.8 after first spark's stable release with 0.8.0 support. Is master branch any
sort of production-ready? Can we use it now or it is not recommended at the
moment?
Stack trace:
java.lang.NoClassDefFoundError: Lkafka/consumer/ConsumerConnector; at
java.lang.Class.getDeclaredFields0(Native Method) at
java.lang.Class.privateGetDeclaredFields(Class.java:2397) at
java.lang.Class.getDeclaredField(Class.java:1946) at
java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659) at
java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:72) at
java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:480) at
java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468) at
java.security.AccessController.doPrivileged(Native Method) at
java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:468) at
java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365) at
java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602) at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at
java.io.Ob
jectInputStream.readObject0(ObjectInputStream.java:1350) at
java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500) at
org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606) at
java.io.ObjectStreamClass.invokeReadObject(ObjectStre
amClass.java:1017) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at
org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:135) at
java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:61)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:153) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecu
tor.java:1145) at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744) Caused by:
java.lang.ClassNotFoundException: kafka.consumer.ConsumerConnector at
java.net.URLClassLoader$1.run(URLClassLoader.java:366) at
java.net.URLClassLoader$1.run(URLClassLoader.java:355) at
java.security.AccessController.doPrivileged(Native Method) at
java.net.URLClassLoader.findClass(URLClassLoader.java:354) at
java.lang.ClassLoader.loadClass(ClassLoader.java:425) at
sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at
java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 43 more
--
Best regards,
Konstantin Abakumov