You should not need to include jars for Kafka, the spark connectors have the appropriate transitive dependency on the correct version.
On Sat, Feb 4, 2017 at 3:25 PM, Marco Mistroni <mmistr...@gmail.com> wrote: > Hi > not sure if this will help at all, and pls take it with a pinch of salt as > i dont have your setup and i am not running on a cluster > > I have tried to run a kafka example which was originally workkign on spark > 1.6.1 on spark 2. > These are the jars i am using > > spark-streaming-kafka-0-10_2.11_2.0.1.jar > > kafka_2.11-0.10.1.1 > > > And here's the code up to the creation of the Direct Stream. apparently with > the new version of kafka libs some properties have to be specified > > > import org.apache.spark.SparkConf > import org.apache.spark.streaming.{Seconds, StreamingContext} > import org.apache.spark.storage.StorageLevel > > import java.util.regex.Pattern > import java.util.regex.Matcher > > import Utilities._ > > import org.apache.spark.streaming.kafka010.KafkaUtils > import kafka.serializer.StringDecoder > import > org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent > import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe > > /** Working example of listening for log data from Kafka's testLogs topic on > port 9092. */ > object KafkaExample { > > def main(args: Array[String]) { > > // Create the context with a 1 second batch size > val ssc = new StreamingContext("local[*]", "KafkaExample", Seconds(1)) > > setupLogging() > > // Construct a regular expression (regex) to extract fields from raw > Apache log lines > val pattern = apacheLogPattern() > > val kafkaParams = Map("metadata.broker.list" -> "localhost:9092", > "bootstrap.servers" -> "localhost:9092", > "key.deserializer" > ->"org.apache.kafka.common.serialization.StringDeserializer", > "value.deserializer" > ->"org.apache.kafka.common.serialization.StringDeserializer", > "group.id" -> "group1") > val topics = List("testLogs").toSet > val lines = KafkaUtils.createDirectStream[String, String]( > ssc, > PreferConsistent, > Subscribe[String, > String](topics, kafkaParams) > ).map(cr => cr.value()) > > hth > > marco > > > > > > > > > > > > > On Sat, Feb 4, 2017 at 8:33 PM, Mich Talebzadeh <mich.talebza...@gmail.com> > wrote: >> >> I am getting this error with Spark 2. which works with CDH 5.5.1 (Spark >> 1.5). >> >> Admittedly I am messing around with Spark-shell. However, I am surprised >> why this does not work with Spark 2 and is ok with CDH 5.1 >> >> scala> val dstream = KafkaUtils.createDirectStream[String, String, >> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics) >> >> java.lang.NoClassDefFoundError: Could not initialize class >> kafka.consumer.FetchRequestAndResponseStatsRegistry$ >> at kafka.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:39) >> at >> org.apache.spark.streaming.kafka.KafkaCluster.connect(KafkaCluster.scala:52) >> at >> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:345) >> at >> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342) >> at >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >> at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) >> at >> org.apache.spark.streaming.kafka.KafkaCluster.org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342) >> at >> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125) >> at >> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112) >> at >> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211) >> at >> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) >> ... 74 elided >> >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> Disclaimer: Use it at your own risk. Any and all responsibility for any >> loss, damage or destruction of data or any other property which may arise >> from relying on this email's technical content is explicitly disclaimed. The >> author will in no case be liable for any monetary damages arising from such >> loss, damage or destruction. >> >> > > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org