I'm running kafka_2.10-0.8.2.1, and loading kafka_2.10-0.8.2.1.jar via
--jars argument to spark-submit

sbt:
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.2.2"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.2.2"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" %
"1.2.2"

spark-submit --jars
kafka_2.10-0.8.2.1.jar,metrics-core-2.2.0.jar,spark-streaming-kafka_2.10-1.2.2.jar,zkclient-0.1.0.ja


On Sat, May 9, 2015 at 10:39 AM Will Briggs <wrbri...@gmail.com> wrote:

> There's something wrong with your classpath - are you using Maven to
> handle dependencies? Make sure you are bringing in the correct Kafka client
> library.
>
> See here:
> http://stackoverflow.com/questions/28353316/kafka-utils-wrong-classpath-org-apache-kafka-common-utils-utils
>
> On May 9, 2015, at 1:15 PM, Marty B <sp...@mjhb.com> wrote:
>
> My simple Spark streaming app is failing:
>
> ConsumerFetcherManager$LeaderFinderThread:
> [test-group_appdev-1431189419702-9d5d7cbc-leader-finder-thread], Failed to
> find leader for Set([test-topic,0])
> java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/Utils
>
>
> I created the topic 'test-topic' with 1 replication-factor 1 and 1
> partition, and validated  message delivery with the kafka-console-producer
> and consumer utilities.  Running default configurations for Kafka and
> ZooKeeper, with everything (including Spark) running locally on my linux
> laptop.
>
>
> This warning repeats continuously, and the app consumes no messages:
>
> 15/05/09 09:37:00 INFO ConsumerFetcherManager:
> [ConsumerFetcherManager-1431189419735] Added fetcher for partitions
> ArrayBuffer()
> 15/05/09 09:37:00 WARN ConsumerFetcherManager$LeaderFinderThread:
> [test-group_appdev-1431189419702-9d5d7cbc-leader-finder-thread], Failed to
> find leader for Set([test-topic,0])
> java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/Utils
> at kafka.cluster.Broker.connectionString(Broker.scala:62)
> at
>
> kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
> at
>
> kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:89)
> at
>
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>
>
> Spark streaming experiment app:
>
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming._
> import org.apache.spark.streaming.kafka._
>
> object TestApp {
> def main(args: Array[String]) {
> val sparkConf = new SparkConf().setAppName("Test App")
> val ssc = new StreamingContext(sparkConf, Seconds(5))
>
> val group = "test-group"
> val topicMap = Map("test-topic" -> 1)
> val zkQuorum = "localhost:2181"
> val messages = KafkaUtils.createStream(ssc, zkQuorum, group,
> topicMap).map(_._2)
>
> messages.foreachRDD { rdd =>
> println("RDD", rdd.count)
> }
>
> ssc.start()
> ssc.awaitTermination()
> }
> }
>
>
> Is there anything that I should be doing differently to avoid this error?
>

Reply via email to