Hello, I am trying use the new Kafka consumer "KafkaUtils.createDirectStream" but I am having some issues making it work. I have tried different versions of Spark v1.4.0 and branch-1.4 #8d6e363 and I am still getting the same strange exception "ClassNotFoundException: $line49.$read$$iwC$$i...."
Has anyone else been facing this kind of problem? The following is the code and logs that I have been using to reproduce the issue: spark-shell: script ------------------------------------------ sc.stop() import _root_.kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka.KafkaUtils val sparkConf = new SparkConf().setMaster("spark://localhost:7077").setAppName("KCon").set("spark.ui.port", "4041" ).set("spark.driver.allowMultipleContexts", "true").setJars(Array("/opt/spark-libs/spark-streaming-kafka-assembly_2.10-1.4.2-SNAPSHOT.jar")) val ssc = new StreamingContext(sparkConf, Seconds(5)) val kafkaParams = Map[String, String]("bootstrap.servers" -> "localhost:9092", "schema.registry.url" -> "http://localhost:8081", "zookeeper.connect" -> "localhost:2181", "group.id" -> "KCon" ) val topic = Set("test") val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic) val raw = messages.map(_._2) val words = raw.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() ------------------------------------------ spark-shell: output ------------------------------------------ sparkConf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@330e37b2 ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@28ec9c23 kafkaParams: scala.collection.immutable.Map[String,String] = Map(bootstrap.servers -> localhost:9092, schema.registry.url -> http://localhost:8081, zookeeper.connect -> localhost:2181, group.id -> OPC)topic: scala.collection.immutable.Set[String] = Set(test) WARN [main] kafka.utils.VerifiableProperties - Property schema.registry.url is not valid messages: org.apache.spark.streaming.dstream.InputDStream[(String, String)] = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1e71b70d raw: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@578ce232 words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@351cc4b5 wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Long)] = org.apache.spark.streaming.dstream.ShuffledDStream@ae04104 WARN [JobGenerator] kafka.utils.VerifiableProperties - Property schema.registry.url is not valid WARN [task-result-getter-0] org.apache.spark.scheduler.TaskSetManager - Lost task 0.0 in stage 0.0 (TID 0, 10.3.30.87): java.lang.ClassNotFoundException: $line49.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1 at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) .. .. Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) ------------------------------------------ Best regards and thanks in advance for any help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-createDirectStream-issue-tp23456.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org