Hi syepes, Are u run the application in standalone mode? Regards El 23/06/2015 22:48, "syepes [via Apache Spark User List]" < ml-node+s1001560n23456...@n3.nabble.com> escribió:
> Hello, > > I am trying use the new Kafka consumer > "KafkaUtils.createDirectStream" but I am having some issues making it > work. > I have tried different versions of Spark v1.4.0 and branch-1.4 #8d6e363 > and I am still getting the same strange exception "ClassNotFoundException: > $line49.$read$$iwC$$i...." > > Has anyone else been facing this kind of problem? > > The following is the code and logs that I have been using to reproduce the > issue: > > spark-shell: script > ------------------------------------------ > sc.stop() > import _root_.kafka.serializer.StringDecoder > import org.apache.spark.SparkConf > import org.apache.spark.streaming._ > import org.apache.spark.streaming.kafka.KafkaUtils > > val sparkConf = new > SparkConf().setMaster("spark://localhost:7077").setAppName("KCon").set("spark.ui.port", > "4041" ).set("spark.driver.allowMultipleContexts", > "true").setJars(Array("/opt/spark-libs/spark-streaming-kafka-assembly_2.10-1.4.2-SNAPSHOT.jar")) > > val ssc = new StreamingContext(sparkConf, Seconds(5)) > > val kafkaParams = Map[String, String]("bootstrap.servers" -> > "localhost:9092", "schema.registry.url" -> "http://localhost:8081", > "zookeeper.connect" -> "localhost:2181", "group.id" -> "KCon" ) > val topic = Set("test") > val messages = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder](ssc, kafkaParams, topic) > > val raw = messages.map(_._2) > val words = raw.flatMap(_.split(" ")) > val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) > wordCounts.print() > > ssc.start() > ssc.awaitTermination() > ------------------------------------------ > > > spark-shell: output > ------------------------------------------ > sparkConf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@330e37b2 > ssc: org.apache.spark.streaming.StreamingContext = > org.apache.spark.streaming.StreamingContext@28ec9c23 > kafkaParams: scala.collection.immutable.Map[String,String] = > Map(bootstrap.servers -> localhost:9092, schema.registry.url -> > http://localhost:8081, zookeeper.connect -> localhost:2181, group.id -> > OPC)topic: scala.collection.immutable.Set[String] = Set(test) > WARN [main] kafka.utils.VerifiableProperties - Property > schema.registry.url is not valid > messages: org.apache.spark.streaming.dstream.InputDStream[(String, > String)] = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1e71b70d > raw: org.apache.spark.streaming.dstream.DStream[String] = > org.apache.spark.streaming.dstream.MappedDStream@578ce232 > words: org.apache.spark.streaming.dstream.DStream[String] = > org.apache.spark.streaming.dstream.FlatMappedDStream@351cc4b5 > wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Long)] = > org.apache.spark.streaming.dstream.ShuffledDStream@ae04104 > WARN [JobGenerator] kafka.utils.VerifiableProperties - Property > schema.registry.url is not valid > WARN [task-result-getter-0] org.apache.spark.scheduler.TaskSetManager - > Lost task 0.0 in stage 0.0 (TID 0, 10.3.30.87): > java.lang.ClassNotFoundException: > $line49.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1 > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at > org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66) > > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) > at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > .. > .. > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256) > > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) > ------------------------------------------ > > > Best regards and thanks in advance for any help. > > > ------------------------------ > If you reply to this email, your message will be added to the discussion > below: > > http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-createDirectStream-issue-tp23456.html > To start a new topic under Apache Spark User List, email > ml-node+s1001560n1...@n3.nabble.com > To unsubscribe from Apache Spark User List, click here > <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=ZHJhcnNlLmFtZXNAZ21haWwuY29tfDF8MTUyMzY0MjQyMA==> > . > NAML > <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> > -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-createDirectStream-issue-tp23456p23457.html Sent from the Apache Spark User List mailing list archive at Nabble.com.