Please run it in your own application and not in the spark shell. I see that you are trying to stop the Spark context and create a new StreamingContext. That will lead to unexpected issue, that you are seeing. Please make a standalone SBT/Maven app for Spark Streaming.
On Tue, Jun 23, 2015 at 3:43 PM, syepes <sye...@gmail.com> wrote: > yes, I have two clusters one standalone an another using Mesos > > Sebastian YEPES > http://sebastian-yepes.com > > On Wed, Jun 24, 2015 at 12:37 AM, drarse [via Apache Spark User List] <[hidden > email] <http:///user/SendEmail.jtp?type=node&node=23458&i=0>> wrote: > >> Hi syepes, >> Are u run the application in standalone mode? >> Regards >> El 23/06/2015 22:48, "syepes [via Apache Spark User List]" <[hidden >> email] <http:///user/SendEmail.jtp?type=node&node=23457&i=0>> escribió: >> >>> Hello, >>> >>> I am trying use the new Kafka consumer >>> "KafkaUtils.createDirectStream" but I am having some issues making it >>> work. >>> I have tried different versions of Spark v1.4.0 and branch-1.4 #8d6e363 >>> and I am still getting the same strange exception "ClassNotFoundException: >>> $line49.$read$$iwC$$i...." >>> >>> Has anyone else been facing this kind of problem? >>> >>> The following is the code and logs that I have been using to reproduce >>> the issue: >>> >>> spark-shell: script >>> ------------------------------------------ >>> sc.stop() >>> import _root_.kafka.serializer.StringDecoder >>> import org.apache.spark.SparkConf >>> import org.apache.spark.streaming._ >>> import org.apache.spark.streaming.kafka.KafkaUtils >>> >>> val sparkConf = new >>> SparkConf().setMaster("spark://localhost:7077").setAppName("KCon").set("spark.ui.port", >>> "4041" ).set("spark.driver.allowMultipleContexts", >>> "true").setJars(Array("/opt/spark-libs/spark-streaming-kafka-assembly_2.10-1.4.2-SNAPSHOT.jar")) >>> >>> val ssc = new StreamingContext(sparkConf, Seconds(5)) >>> >>> val kafkaParams = Map[String, String]("bootstrap.servers" -> >>> "localhost:9092", "schema.registry.url" -> "http://localhost:8081", >>> "zookeeper.connect" -> "localhost:2181", "group.id" -> "KCon" ) >>> val topic = Set("test") >>> val messages = KafkaUtils.createDirectStream[String, String, >>> StringDecoder, StringDecoder](ssc, kafkaParams, topic) >>> >>> val raw = messages.map(_._2) >>> val words = raw.flatMap(_.split(" ")) >>> val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) >>> wordCounts.print() >>> >>> ssc.start() >>> ssc.awaitTermination() >>> ------------------------------------------ >>> >>> >>> spark-shell: output >>> ------------------------------------------ >>> sparkConf: org.apache.spark.SparkConf = >>> org.apache.spark.SparkConf@330e37b2 >>> ssc: org.apache.spark.streaming.StreamingContext = >>> org.apache.spark.streaming.StreamingContext@28ec9c23 >>> kafkaParams: scala.collection.immutable.Map[String,String] = >>> Map(bootstrap.servers -> localhost:9092, schema.registry.url -> >>> http://localhost:8081, zookeeper.connect -> localhost:2181, group.id -> >>> OPC)topic: scala.collection.immutable.Set[String] = Set(test) >>> WARN [main] kafka.utils.VerifiableProperties - Property >>> schema.registry.url is not valid >>> messages: org.apache.spark.streaming.dstream.InputDStream[(String, >>> String)] = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1e71b70d >>> raw: org.apache.spark.streaming.dstream.DStream[String] = >>> org.apache.spark.streaming.dstream.MappedDStream@578ce232 >>> words: org.apache.spark.streaming.dstream.DStream[String] = >>> org.apache.spark.streaming.dstream.FlatMappedDStream@351cc4b5 >>> wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Long)] = >>> org.apache.spark.streaming.dstream.ShuffledDStream@ae04104 >>> WARN [JobGenerator] kafka.utils.VerifiableProperties - Property >>> schema.registry.url is not valid >>> WARN [task-result-getter-0] org.apache.spark.scheduler.TaskSetManager - >>> Lost task 0.0 in stage 0.0 (TID 0, 10.3.30.87): >>> java.lang.ClassNotFoundException: >>> $line49.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1 >>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>> at java.lang.Class.forName0(Native Method) >>> at java.lang.Class.forName(Class.java:348) >>> at >>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66) >>> >>> at >>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) >>> at >>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) >>> at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) >>> at >>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) >>> at >>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) >>> at >>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) >>> at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) >>> .. >>> .. >>> Driver stacktrace: >>> at >>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266) >>> >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257) >>> >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256) >>> >>> at >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>> >>> at >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>> at >>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) >>> ------------------------------------------ >>> >>> >>> Best regards and thanks in advance for any help. >>> >>> >>> ------------------------------ >>> If you reply to this email, your message will be added to the >>> discussion below: >>> >>> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-createDirectStream-issue-tp23456.html >>> To start a new topic under Apache Spark User List, email [hidden email] >>> <http:///user/SendEmail.jtp?type=node&node=23457&i=1> >>> To unsubscribe from Apache Spark User List, click here. >>> NAML >>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >>> >> >> >> ------------------------------ >> If you reply to this email, your message will be added to the >> discussion below: >> >> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-createDirectStream-issue-tp23456p23457.html >> To unsubscribe from Kafka createDirectStream issue, click here. >> NAML >> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >> > > > ------------------------------ > View this message in context: Re: Kafka createDirectStream issue > <http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-createDirectStream-issue-tp23456p23458.html> > Sent from the Apache Spark User List mailing list archive > <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com. >