The exception $line49.... is referring to a line of the spark shell. Have you tried it from an actual assembled job with spark-submit ?
On Tue, Jun 23, 2015 at 3:48 PM, syepes <sye...@gmail.com> wrote: > Hello, > > I am trying use the new Kafka consumer > "KafkaUtils.createDirectStream" > but I am having some issues making it work. > I have tried different versions of Spark v1.4.0 and branch-1.4 #8d6e363 and > I am still getting the same strange exception "ClassNotFoundException: > $line49.$read$$iwC$$i...." > > Has anyone else been facing this kind of problem? > > The following is the code and logs that I have been using to reproduce the > issue: > > spark-shell: script > ------------------------------------------ > sc.stop() > import _root_.kafka.serializer.StringDecoder > import org.apache.spark.SparkConf > import org.apache.spark.streaming._ > import org.apache.spark.streaming.kafka.KafkaUtils > > val sparkConf = new > > SparkConf().setMaster("spark://localhost:7077").setAppName("KCon").set("spark.ui.port", > "4041" ).set("spark.driver.allowMultipleContexts", > > "true").setJars(Array("/opt/spark-libs/spark-streaming-kafka-assembly_2.10-1.4.2-SNAPSHOT.jar")) > val ssc = new StreamingContext(sparkConf, Seconds(5)) > > val kafkaParams = Map[String, String]("bootstrap.servers" -> > "localhost:9092", "schema.registry.url" -> "http://localhost:8081", > "zookeeper.connect" -> "localhost:2181", "group.id" -> "KCon" ) > val topic = Set("test") > val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, > StringDecoder](ssc, kafkaParams, topic) > > val raw = messages.map(_._2) > val words = raw.flatMap(_.split(" ")) > val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) > wordCounts.print() > > ssc.start() > ssc.awaitTermination() > ------------------------------------------ > > > spark-shell: output > ------------------------------------------ > sparkConf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@330e37b2 > ssc: org.apache.spark.streaming.StreamingContext = > org.apache.spark.streaming.StreamingContext@28ec9c23 > kafkaParams: scala.collection.immutable.Map[String,String] = > Map(bootstrap.servers -> localhost:9092, schema.registry.url -> > http://localhost:8081, zookeeper.connect -> localhost:2181, group.id -> > OPC)topic: scala.collection.immutable.Set[String] = Set(test) > WARN [main] kafka.utils.VerifiableProperties - Property > schema.registry.url > is not valid > messages: org.apache.spark.streaming.dstream.InputDStream[(String, String)] > = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1e71b70d > raw: org.apache.spark.streaming.dstream.DStream[String] = > org.apache.spark.streaming.dstream.MappedDStream@578ce232 > words: org.apache.spark.streaming.dstream.DStream[String] = > org.apache.spark.streaming.dstream.FlatMappedDStream@351cc4b5 > wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Long)] = > org.apache.spark.streaming.dstream.ShuffledDStream@ae04104 > WARN [JobGenerator] kafka.utils.VerifiableProperties - Property > schema.registry.url is not valid > WARN [task-result-getter-0] org.apache.spark.scheduler.TaskSetManager - > Lost task 0.0 in stage 0.0 (TID 0, 10.3.30.87): > java.lang.ClassNotFoundException: > > $line49.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1 > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at > > org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) > at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > .. > .. > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256) > at > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) > ------------------------------------------ > > > Best regards and thanks in advance for any help. > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-createDirectStream-issue-tp23456.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >