There's something wrong with your classpath - are you using Maven to handle 
dependencies? Make sure you are bringing in the correct Kafka client library.

See here: 
http://stackoverflow.com/questions/28353316/kafka-utils-wrong-classpath-org-apache-kafka-common-utils-utils

On May 9, 2015, at 1:15 PM, Marty B <sp...@mjhb.com> wrote:

My simple Spark streaming app is failing:

ConsumerFetcherManager$LeaderFinderThread:
[test-group_appdev-1431189419702-9d5d7cbc-leader-finder-thread], Failed to
find leader for Set([test-topic,0])
java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/Utils


I created the topic 'test-topic' with 1 replication-factor 1 and 1
partition, and validated  message delivery with the kafka-console-producer
and consumer utilities.  Running default configurations for Kafka and
ZooKeeper, with everything (including Spark) running locally on my linux
laptop.


This warning repeats continuously, and the app consumes no messages:

15/05/09 09:37:00 INFO ConsumerFetcherManager:
[ConsumerFetcherManager-1431189419735] Added fetcher for partitions
ArrayBuffer()
15/05/09 09:37:00 WARN ConsumerFetcherManager$LeaderFinderThread:
[test-group_appdev-1431189419702-9d5d7cbc-leader-finder-thread], Failed to
find leader for Set([test-topic,0])
java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/Utils
at kafka.cluster.Broker.connectionString(Broker.scala:62)
at
kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
at
kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:89)
at
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)


Spark streaming experiment app:

import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object TestApp {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Test App")
val ssc = new StreamingContext(sparkConf, Seconds(5))

val group = "test-group"
val topicMap = Map("test-topic" -> 1)
val zkQuorum = "localhost:2181"
val messages = KafkaUtils.createStream(ssc, zkQuorum, group,
topicMap).map(_._2)

messages.foreachRDD { rdd =>
println("RDD", rdd.count)
}

ssc.start()
ssc.awaitTermination()
}
}


Is there anything that I should be doing differently to avoid this error?

Reply via email to