I'm running kafka_2.10-0.8.2.1, and loading kafka_2.10-0.8.2.1.jar via --jars argument to spark-submit
sbt: libraryDependencies += "org.apache.spark" %% "spark-core" % "1.2.2" libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.2.2" libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.2.2" spark-submit --jars kafka_2.10-0.8.2.1.jar,metrics-core-2.2.0.jar,spark-streaming-kafka_2.10-1.2.2.jar,zkclient-0.1.0.ja On Sat, May 9, 2015 at 10:39 AM Will Briggs <wrbri...@gmail.com> wrote: > There's something wrong with your classpath - are you using Maven to > handle dependencies? Make sure you are bringing in the correct Kafka client > library. > > See here: > http://stackoverflow.com/questions/28353316/kafka-utils-wrong-classpath-org-apache-kafka-common-utils-utils > > On May 9, 2015, at 1:15 PM, Marty B <sp...@mjhb.com> wrote: > > My simple Spark streaming app is failing: > > ConsumerFetcherManager$LeaderFinderThread: > [test-group_appdev-1431189419702-9d5d7cbc-leader-finder-thread], Failed to > find leader for Set([test-topic,0]) > java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/Utils > > > I created the topic 'test-topic' with 1 replication-factor 1 and 1 > partition, and validated message delivery with the kafka-console-producer > and consumer utilities. Running default configurations for Kafka and > ZooKeeper, with everything (including Spark) running locally on my linux > laptop. > > > This warning repeats continuously, and the app consumes no messages: > > 15/05/09 09:37:00 INFO ConsumerFetcherManager: > [ConsumerFetcherManager-1431189419735] Added fetcher for partitions > ArrayBuffer() > 15/05/09 09:37:00 WARN ConsumerFetcherManager$LeaderFinderThread: > [test-group_appdev-1431189419702-9d5d7cbc-leader-finder-thread], Failed to > find leader for Set([test-topic,0]) > java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/Utils > at kafka.cluster.Broker.connectionString(Broker.scala:62) > at > > kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89) > at > > kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:89) > at > > kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > > > Spark streaming experiment app: > > import org.apache.spark.SparkConf > import org.apache.spark.streaming._ > import org.apache.spark.streaming.kafka._ > > object TestApp { > def main(args: Array[String]) { > val sparkConf = new SparkConf().setAppName("Test App") > val ssc = new StreamingContext(sparkConf, Seconds(5)) > > val group = "test-group" > val topicMap = Map("test-topic" -> 1) > val zkQuorum = "localhost:2181" > val messages = KafkaUtils.createStream(ssc, zkQuorum, group, > topicMap).map(_._2) > > messages.foreachRDD { rdd => > println("RDD", rdd.count) > } > > ssc.start() > ssc.awaitTermination() > } > } > > > Is there anything that I should be doing differently to avoid this error? >