Please run it in your own application and not in the spark shell. I see
that you are trying to stop the Spark context and create a new
StreamingContext. That will lead to unexpected issue, that you are seeing.
Please make a standalone SBT/Maven app for Spark Streaming.

On Tue, Jun 23, 2015 at 3:43 PM, syepes <sye...@gmail.com> wrote:

> yes, I have two clusters one standalone an another using Mesos
>
>  Sebastian YEPES
>    http://sebastian-yepes.com
>
> On Wed, Jun 24, 2015 at 12:37 AM, drarse [via Apache Spark User List] <[hidden
> email] <http:///user/SendEmail.jtp?type=node&node=23458&i=0>> wrote:
>
>> Hi syepes,
>> Are u run the application in standalone mode?
>> Regards
>> El 23/06/2015 22:48, "syepes [via Apache Spark User List]" <[hidden
>> email] <http:///user/SendEmail.jtp?type=node&node=23457&i=0>> escribió:
>>
>>> Hello,
>>>
>>> I ​am trying ​use the new Kafka ​consumer
>>> ​​"KafkaUtils.createDirectStream"​ but I am having some issues making it
>>> work.
>>> I have tried different versions of Spark v1.4.0 and branch-1.4 #8d6e363
>>> and I am still getting the same strange exception "ClassNotFoundException:
>>> $line49.$read$$iwC$$i...."
>>>
>>> Has anyone else been facing this kind of problem?
>>>
>>> The following is the code and logs that I have been using to reproduce
>>> the issue:
>>>
>>> spark-shell: script
>>> ------------------------------------------
>>> sc.stop()
>>> import _root_.kafka.serializer.StringDecoder
>>> import org.apache.spark.SparkConf
>>> import org.apache.spark.streaming._
>>> import org.apache.spark.streaming.kafka.KafkaUtils
>>>
>>> val sparkConf = new
>>> SparkConf().setMaster("spark://localhost:7077").setAppName("KCon").set("spark.ui.port",
>>> "4041" ).set("spark.driver.allowMultipleContexts",
>>> "true").setJars(Array("/opt/spark-libs/spark-streaming-kafka-assembly_2.10-1.4.2-SNAPSHOT.jar"))
>>>
>>> val ssc = new StreamingContext(sparkConf, Seconds(5))
>>>
>>> val kafkaParams = Map[String, String]("bootstrap.servers" ->
>>> "localhost:9092", "schema.registry.url" -> "http://localhost:8081";,
>>> "zookeeper.connect" -> "localhost:2181", "group.id" -> "KCon" )
>>> val topic = Set("test")
>>> val messages = KafkaUtils.createDirectStream[String, String,
>>> StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>>
>>> val raw = messages.map(_._2)
>>> val words = raw.flatMap(_.split(" "))
>>> val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
>>> wordCounts.print()
>>>
>>> ssc.start()
>>> ssc.awaitTermination()
>>> ------------------------------------------
>>>
>>>
>>> spark-shell: output
>>> ------------------------------------------
>>> sparkConf: org.apache.spark.SparkConf =
>>> org.apache.spark.SparkConf@330e37b2
>>> ssc: org.apache.spark.streaming.StreamingContext =
>>> org.apache.spark.streaming.StreamingContext@28ec9c23
>>> kafkaParams: scala.collection.immutable.Map[String,String] =
>>> Map(bootstrap.servers -> localhost:9092, schema.registry.url ->
>>> http://localhost:8081, zookeeper.connect -> localhost:2181, group.id ->
>>> OPC)topic: scala.collection.immutable.Set[String] = Set(test)
>>> WARN  [main] kafka.utils.VerifiableProperties - Property
>>> schema.registry.url is not valid
>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String,
>>> String)] = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1e71b70d
>>> raw: org.apache.spark.streaming.dstream.DStream[String] =
>>> org.apache.spark.streaming.dstream.MappedDStream@578ce232
>>> words: org.apache.spark.streaming.dstream.DStream[String] =
>>> org.apache.spark.streaming.dstream.FlatMappedDStream@351cc4b5
>>> wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Long)] =
>>> org.apache.spark.streaming.dstream.ShuffledDStream@ae04104
>>> WARN  [JobGenerator] kafka.utils.VerifiableProperties - Property
>>> schema.registry.url is not valid
>>> WARN  [task-result-getter-0] org.apache.spark.scheduler.TaskSetManager -
>>> Lost task 0.0 in stage 0.0 (TID 0, 10.3.30.87):
>>> java.lang.ClassNotFoundException:
>>> $line49.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>         at java.lang.Class.forName0(Native Method)
>>>         at java.lang.Class.forName(Class.java:348)
>>>         at
>>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66)
>>>
>>>         at
>>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
>>>         at
>>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>>>         at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>>>         at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>>         at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>>>         at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>>>         at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>> ..
>>> ..
>>> Driver stacktrace:
>>>         at 
>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
>>>
>>>         at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
>>>
>>>         at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
>>>
>>>         at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>
>>>         at
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>         at
>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
>>> ------------------------------------------
>>>
>>>
>>> Best regards and thanks in advance for any help.
>>>
>>>
>>> ------------------------------
>>>  If you reply to this email, your message will be added to the
>>> discussion below:
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-createDirectStream-issue-tp23456.html
>>>  To start a new topic under Apache Spark User List, email [hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=23457&i=1>
>>> To unsubscribe from Apache Spark User List, click here.
>>> NAML
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>
>>
>>
>> ------------------------------
>>  If you reply to this email, your message will be added to the
>> discussion below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-createDirectStream-issue-tp23456p23457.html
>>  To unsubscribe from Kafka createDirectStream ​issue, click here.
>> NAML
>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
> ------------------------------
> View this message in context: Re: Kafka createDirectStream ​issue
> <http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-createDirectStream-issue-tp23456p23458.html>
> Sent from the Apache Spark User List mailing list archive
> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>

Reply via email to