After searching a little bit, I came to know that Spark 0.8 supports
kafka-0.7. So, I tried to use it this way:

In my pom.xml, specified a Spark dependency as follows:

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.9.3</artifactId>
      <version>0.8.1-incubating</version>
    </dependency>

And a kafka dependency as follows:

      <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka</artifactId>
          <version>0.7.2-spark</version>
          <scope>provided</scope>
      </dependency>

As I have declared the dependency as provided, I downloaded this & the
other files like SHA & MD5 files from the cloudera repository at
https://repository.cloudera.com/artifactory/repo/org/apache/kafka/kafka/0.7.2-spark/
into the maven local repository

After building my jar file, I include the classes from this kafka jar into
my jar file as suggested in this thread:
http://qnalist.com/questions/5008317/spark-streaming-with-kafka-noclassdeffounderror

I verified that the files actually exist using jar -tf

However, when I submit my job, I am getting the following error (same as
mentioned in the thread above):

14/09/05 21:45:58 INFO spark.SparkContext: Added JAR
/Users/yhemanth/projects/personal/spark/spark-samples/target/spark-samples-1.0-SNAPSHOT-jar-with-dependencies.jar
at
http://192.168.1.5:51211/jars/spark-samples-1.0-SNAPSHOT-jar-with-dependencies.jar
with timestamp 1409933758392
[WARNING]
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
at java.lang.Thread.run(Thread.java:695)
Caused by: java.lang.NoClassDefFoundError: kafka/serializer/StringDecoder
at
org.apache.spark.streaming.StreamingContext.kafkaStream(StreamingContext.scala:258)
at
org.apache.spark.streaming.api.java.JavaStreamingContext.kafkaStream(JavaStreamingContext.scala:146)
at
com.yhemanth.spark.KafkaStreamingSample.main(KafkaStreamingSample.java:31)
... 6 more
Caused by: java.lang.ClassNotFoundException: kafka.serializer.StringDecoder
at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
... 9 more
[WARNING]
java.lang.InterruptedException
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:1961)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1996)
at
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
at
org.apache.spark.scheduler.SparkListenerBus$$anon$2.run(SparkListenerBus.scala:40)
[WARNING]
java.lang.InterruptedException
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:1961)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2038)
at
java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:424)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:496)
at
org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:157)
14/09/05 21:45:58 INFO network.ConnectionManager: Selector thread was
interrupted!

Can someone please help on how to debug this further, or am I doing
something wrong ?

Thanks
hemanth




On Fri, Sep 5, 2014 at 3:37 PM, Hemanth Yamijala <yhema...@gmail.com> wrote:

> Hi,
>
> Due to some limitations, we are having to stick to Kafka 0.7.x. We would
> like to use as latest a version of Spark in streaming mode that integrates
> with Kafka 0.7. The latest version supports only 0.8 it appears. Has anyone
> solved such a requirement ? Any tips on what can be tried ?
>
> FWIW, I tried use the low level of Kafka and write a custom receiver. This
> fails at compile time due to Scala dependency issues. The Scala version I
> have declared in pom.xml is 2.8.0 and this seems to not work with Spark
> Streaming version 1.0.2.
>
> Thanks
> Hemanth
>

Reply via email to