Re: Kafka createDirectStream ​issue

2015-09-20 Thread Petr Novak
val topics="first"

shouldn't it be val topics = Set("first") ?

On Sun, Sep 20, 2015 at 1:01 PM, Petr Novak  wrote:

> val topics="first"
>
> shouldn't it be val topics = Set("first") ?
>
> On Sat, Sep 19, 2015 at 10:07 PM, kali.tumm...@gmail.com <
> kali.tumm...@gmail.com> wrote:
>
>> Hi ,
>>
>> I am trying to develop in intellij Idea same code I am having the same
>> issue
>> is there any work around.
>>
>> Error in intellij:- cannot resolve symbol createDirectStream
>>
>> import kafka.serializer.StringDecoder
>> import org.apache.spark._
>> import org.apache.spark.SparkContext._
>> import org.apache.spark.sql.SQLContext
>> import org.apache.spark.SparkConf
>> import org.apache.log4j.Logger
>> import org.apache.log4j.Level
>> import org.apache.spark.streaming.dstream.InputDStream
>> import org.apache.spark.streaming.{Seconds,StreamingContext}
>> import org.apache.spark._
>> import  org.apache.spark.streaming._
>> import org.apache.spark.streaming.StreamingContext._
>> import org.apache.spark.streaming.kafka.KafkaUtils
>> import  org.apache.spark.streaming.kafka._
>> import com.datastax.spark.connector.streaming._
>> import org.apache.spark.streaming.kafka._
>>
>> object SparkKafkaOffsetTest {
>>
>>   def main(args: Array[String]): Unit = {
>> //Logger.getLogger("org").setLevel(Level.WARN)
>> //Logger.getLogger("akka").setLevel(Level.WARN)
>>
>> val conf = new
>>
>> SparkConf().setMaster("local").setAppName("KafkaOffsetStreaming").set("spark.executor.memory",
>> "1g")
>> val sc = new SparkContext(conf)
>> val ssc = new StreamingContext(sc, Seconds(2))
>>
>> val zkQuorm="localhost:2181"
>> val group="test-group"
>> val topics="first"
>> val numThreads=1
>> val broker="localhost:9091"
>> val kafkaParams = Map[String, String]("metadata.broker.list" ->
>> broker)
>> //val kafkaParams = Map[String, String]("metadata.broker.list" )
>> val topicMap=topics.split(",").map((_,numThreads.toInt)).toMap
>>
>> //val lineMap=KafkaUtils.createStream(ssc,zkQuorm,group,topicMap)
>>
>> //val directKafkaStream=KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](ssc, kafkaParams, topics)
>>
>> val messages= KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](ssc, kafkaParams, topics)
>>
>>
>>
>> //val directKafkaStream = KafkaUtils.createDirectStream[
>>   //[key class], [value class], [key decoder class], [value decoder
>> class] ](
>>   //streamingContext, [map of Kafka parameters], [set of topics to
>> consume])
>>
>>   }
>>
>> }
>>
>> Thanks
>> Sri
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-createDirectStream-issue-tp23456p24749.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Kafka createDirectStream ​issue

2015-09-19 Thread kali.tumm...@gmail.com
Hi ,

I am trying to develop in intellij Idea same code I am having the same issue
is there any work around.

Error in intellij:- cannot resolve symbol createDirectStream

import kafka.serializer.StringDecoder
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark._
import  org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtils
import  org.apache.spark.streaming.kafka._
import com.datastax.spark.connector.streaming._
import org.apache.spark.streaming.kafka._

object SparkKafkaOffsetTest {

  def main(args: Array[String]): Unit = {
//Logger.getLogger("org").setLevel(Level.WARN)
//Logger.getLogger("akka").setLevel(Level.WARN)

val conf = new
SparkConf().setMaster("local").setAppName("KafkaOffsetStreaming").set("spark.executor.memory",
"1g")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(2))

val zkQuorm="localhost:2181"
val group="test-group"
val topics="first"
val numThreads=1
val broker="localhost:9091"
val kafkaParams = Map[String, String]("metadata.broker.list" -> broker)
//val kafkaParams = Map[String, String]("metadata.broker.list" )
val topicMap=topics.split(",").map((_,numThreads.toInt)).toMap

//val lineMap=KafkaUtils.createStream(ssc,zkQuorm,group,topicMap)

//val directKafkaStream=KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topics)

val messages= KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topics)



//val directKafkaStream = KafkaUtils.createDirectStream[
  //[key class], [value class], [key decoder class], [value decoder
class] ](
  //streamingContext, [map of Kafka parameters], [set of topics to
consume])

  }

}

Thanks
Sri




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-createDirectStream-issue-tp23456p24749.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kafka createDirectStream ​issue

2015-06-24 Thread syepes
Hello,

Thanks for all the help on resolving this issue, especially to Cody who
guided me to the solution.

For other facing similar issues, basically the issue was that I was running
Spark Streaming jobs from the spark-shell and this is not supported. Running
the same job through spark-submit work as expected.

Does anyone know if there some kind of way to get around this problem?
The build jar/submit process is a bit cumbersome when trying to debug and
testing new jobs..


Best regards,
Sebastian



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-createDirectStream-issue-tp23456p23467.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kafka createDirectStream ​issue

2015-06-23 Thread Tathagata Das
49.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>> at java.lang.Class.forName0(Native Method)
>>> at java.lang.Class.forName(Class.java:348)
>>> at
>>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66)
>>>
>>> at
>>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
>>> at
>>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>>> at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>>> at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>> ..
>>> ..
>>> Driver stacktrace:
>>> at 
>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
>>>
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
>>>
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
>>>
>>> at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>
>>> at
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
>>> --
>>>
>>>
>>> Best regards and thanks in advance for any help.
>>>
>>>
>>> --
>>>  If you reply to this email, your message will be added to the
>>> discussion below:
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-createDirectStream-issue-tp23456.html
>>>  To start a new topic under Apache Spark User List, email [hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=23457&i=1>
>>> To unsubscribe from Apache Spark User List, click here.
>>> NAML
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>
>>
>>
>> --
>>  If you reply to this email, your message will be added to the
>> discussion below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-createDirectStream-issue-tp23456p23457.html
>>  To unsubscribe from Kafka createDirectStream ​issue, click here.
>> NAML
>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
> --
> View this message in context: Re: Kafka createDirectStream ​issue
> <http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-createDirectStream-issue-tp23456p23458.html>
> Sent from the Apache Spark User List mailing list archive
> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>


Re: Kafka createDirectStream ​issue

2015-06-23 Thread syepes
yes, I have two clusters one standalone an another using Mesos

 Sebastian YEPES
   http://sebastian-yepes.com

On Wed, Jun 24, 2015 at 12:37 AM, drarse [via Apache Spark User List] <
ml-node+s1001560n23457...@n3.nabble.com> wrote:

> Hi syepes,
> Are u run the application in standalone mode?
> Regards
> El 23/06/2015 22:48, "syepes [via Apache Spark User List]" <[hidden email]
> > escribió:
>
>> Hello,
>>
>> I ​am trying ​use the new Kafka ​consumer
>> ​​"KafkaUtils.createDirectStream"​ but I am having some issues making it
>> work.
>> I have tried different versions of Spark v1.4.0 and branch-1.4 #8d6e363
>> and I am still getting the same strange exception "ClassNotFoundException:
>> $line49.$read$$iwC$$i"
>>
>> Has anyone else been facing this kind of problem?
>>
>> The following is the code and logs that I have been using to reproduce
>> the issue:
>>
>> spark-shell: script
>> --
>> sc.stop()
>> import _root_.kafka.serializer.StringDecoder
>> import org.apache.spark.SparkConf
>> import org.apache.spark.streaming._
>> import org.apache.spark.streaming.kafka.KafkaUtils
>>
>> val sparkConf = new
>> SparkConf().setMaster("spark://localhost:7077").setAppName("KCon").set("spark.ui.port",
>> "4041" ).set("spark.driver.allowMultipleContexts",
>> "true").setJars(Array("/opt/spark-libs/spark-streaming-kafka-assembly_2.10-1.4.2-SNAPSHOT.jar"))
>>
>> val ssc = new StreamingContext(sparkConf, Seconds(5))
>>
>> val kafkaParams = Map[String, String]("bootstrap.servers" ->
>> "localhost:9092", "schema.registry.url" -> "http://localhost:8081";,
>> "zookeeper.connect" -> "localhost:2181", "group.id" -> "KCon" )
>> val topic = Set("test")
>> val messages = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>
>> val raw = messages.map(_._2)
>> val words = raw.flatMap(_.split(" "))
>> val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
>> wordCounts.print()
>>
>> ssc.start()
>> ssc.awaitTermination()
>> --
>>
>>
>> spark-shell: output
>> --
>> sparkConf: org.apache.spark.SparkConf =
>> org.apache.spark.SparkConf@330e37b2
>> ssc: org.apache.spark.streaming.StreamingContext =
>> org.apache.spark.streaming.StreamingContext@28ec9c23
>> kafkaParams: scala.collection.immutable.Map[String,String] =
>> Map(bootstrap.servers -> localhost:9092, schema.registry.url ->
>> http://localhost:8081, zookeeper.connect -> localhost:2181, group.id ->
>> OPC)topic: scala.collection.immutable.Set[String] = Set(test)
>> WARN  [main] kafka.utils.VerifiableProperties - Property
>> schema.registry.url is not valid
>> messages: org.apache.spark.streaming.dstream.InputDStream[(String,
>> String)] = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1e71b70d
>> raw: org.apache.spark.streaming.dstream.DStream[String] =
>> org.apache.spark.streaming.dstream.MappedDStream@578ce232
>> words: org.apache.spark.streaming.dstream.DStream[String] =
>> org.apache.spark.streaming.dstream.FlatMappedDStream@351cc4b5
>> wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Long)] =
>> org.apache.spark.streaming.dstream.ShuffledDStream@ae04104
>> WARN  [JobGenerator] kafka.utils.VerifiableProperties - Property
>> schema.registry.url is not valid
>> WARN  [task-result-getter-0] org.apache.spark.scheduler.TaskSetManager -
>> Lost task 0.0 in stage 0.0 (TID 0, 10.3.30.87):
>> java.lang.ClassNotFoundException:
>> $line49.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:348)
>> at
>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66)
>>
>> at
>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
>> at
>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>> ..
>> ..
>> Driver stacktrace:
>> at 
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
>>
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scal

Re: Kafka createDirectStream ​issue

2015-06-23 Thread drarse
Hi syepes,
Are u run the application in standalone mode?
Regards
El 23/06/2015 22:48, "syepes [via Apache Spark User List]" <
ml-node+s1001560n23456...@n3.nabble.com> escribió:

> Hello,
>
> I ​am trying ​use the new Kafka ​consumer
> ​​"KafkaUtils.createDirectStream"​ but I am having some issues making it
> work.
> I have tried different versions of Spark v1.4.0 and branch-1.4 #8d6e363
> and I am still getting the same strange exception "ClassNotFoundException:
> $line49.$read$$iwC$$i"
>
> Has anyone else been facing this kind of problem?
>
> The following is the code and logs that I have been using to reproduce the
> issue:
>
> spark-shell: script
> --
> sc.stop()
> import _root_.kafka.serializer.StringDecoder
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming._
> import org.apache.spark.streaming.kafka.KafkaUtils
>
> val sparkConf = new
> SparkConf().setMaster("spark://localhost:7077").setAppName("KCon").set("spark.ui.port",
> "4041" ).set("spark.driver.allowMultipleContexts",
> "true").setJars(Array("/opt/spark-libs/spark-streaming-kafka-assembly_2.10-1.4.2-SNAPSHOT.jar"))
>
> val ssc = new StreamingContext(sparkConf, Seconds(5))
>
> val kafkaParams = Map[String, String]("bootstrap.servers" ->
> "localhost:9092", "schema.registry.url" -> "http://localhost:8081";,
> "zookeeper.connect" -> "localhost:2181", "group.id" -> "KCon" )
> val topic = Set("test")
> val messages = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>
> val raw = messages.map(_._2)
> val words = raw.flatMap(_.split(" "))
> val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
> wordCounts.print()
>
> ssc.start()
> ssc.awaitTermination()
> --
>
>
> spark-shell: output
> --
> sparkConf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@330e37b2
> ssc: org.apache.spark.streaming.StreamingContext =
> org.apache.spark.streaming.StreamingContext@28ec9c23
> kafkaParams: scala.collection.immutable.Map[String,String] =
> Map(bootstrap.servers -> localhost:9092, schema.registry.url ->
> http://localhost:8081, zookeeper.connect -> localhost:2181, group.id ->
> OPC)topic: scala.collection.immutable.Set[String] = Set(test)
> WARN  [main] kafka.utils.VerifiableProperties - Property
> schema.registry.url is not valid
> messages: org.apache.spark.streaming.dstream.InputDStream[(String,
> String)] = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1e71b70d
> raw: org.apache.spark.streaming.dstream.DStream[String] =
> org.apache.spark.streaming.dstream.MappedDStream@578ce232
> words: org.apache.spark.streaming.dstream.DStream[String] =
> org.apache.spark.streaming.dstream.FlatMappedDStream@351cc4b5
> wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Long)] =
> org.apache.spark.streaming.dstream.ShuffledDStream@ae04104
> WARN  [JobGenerator] kafka.utils.VerifiableProperties - Property
> schema.registry.url is not valid
> WARN  [task-result-getter-0] org.apache.spark.scheduler.TaskSetManager -
> Lost task 0.0 in stage 0.0 (TID 0, 10.3.30.87):
> java.lang.ClassNotFoundException:
> $line49.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66)
>
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
> at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> ..
> ..
> Driver stacktrace:
> at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
>
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler

Re: Kafka createDirectStream ​issue

2015-06-23 Thread Cody Koeninger
The exception $line49 is referring to a line of the spark shell.

Have you tried it from an actual assembled job with spark-submit ?

On Tue, Jun 23, 2015 at 3:48 PM, syepes  wrote:

> Hello,
>
> I ​am trying ​use the new Kafka ​consumer
> ​​"KafkaUtils.createDirectStream"​
> but I am having some issues making it work.
> I have tried different versions of Spark v1.4.0 and branch-1.4 #8d6e363 and
> I am still getting the same strange exception "ClassNotFoundException:
> $line49.$read$$iwC$$i"
>
> Has anyone else been facing this kind of problem?
>
> The following is the code and logs that I have been using to reproduce the
> issue:
>
> spark-shell: script
> --
> sc.stop()
> import _root_.kafka.serializer.StringDecoder
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming._
> import org.apache.spark.streaming.kafka.KafkaUtils
>
> val sparkConf = new
>
> SparkConf().setMaster("spark://localhost:7077").setAppName("KCon").set("spark.ui.port",
> "4041" ).set("spark.driver.allowMultipleContexts",
>
> "true").setJars(Array("/opt/spark-libs/spark-streaming-kafka-assembly_2.10-1.4.2-SNAPSHOT.jar"))
> val ssc = new StreamingContext(sparkConf, Seconds(5))
>
> val kafkaParams = Map[String, String]("bootstrap.servers" ->
> "localhost:9092", "schema.registry.url" -> "http://localhost:8081";,
> "zookeeper.connect" -> "localhost:2181", "group.id" -> "KCon" )
> val topic = Set("test")
> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,
> StringDecoder](ssc, kafkaParams, topic)
>
> val raw = messages.map(_._2)
> val words = raw.flatMap(_.split(" "))
> val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
> wordCounts.print()
>
> ssc.start()
> ssc.awaitTermination()
> --
>
>
> spark-shell: output
> --
> sparkConf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@330e37b2
> ssc: org.apache.spark.streaming.StreamingContext =
> org.apache.spark.streaming.StreamingContext@28ec9c23
> kafkaParams: scala.collection.immutable.Map[String,String] =
> Map(bootstrap.servers -> localhost:9092, schema.registry.url ->
> http://localhost:8081, zookeeper.connect -> localhost:2181, group.id ->
> OPC)topic: scala.collection.immutable.Set[String] = Set(test)
> WARN  [main] kafka.utils.VerifiableProperties - Property
> schema.registry.url
> is not valid
> messages: org.apache.spark.streaming.dstream.InputDStream[(String, String)]
> = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1e71b70d
> raw: org.apache.spark.streaming.dstream.DStream[String] =
> org.apache.spark.streaming.dstream.MappedDStream@578ce232
> words: org.apache.spark.streaming.dstream.DStream[String] =
> org.apache.spark.streaming.dstream.FlatMappedDStream@351cc4b5
> wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Long)] =
> org.apache.spark.streaming.dstream.ShuffledDStream@ae04104
> WARN  [JobGenerator] kafka.utils.VerifiableProperties - Property
> schema.registry.url is not valid
> WARN  [task-result-getter-0] org.apache.spark.scheduler.TaskSetManager -
> Lost task 0.0 in stage 0.0 (TID 0, 10.3.30.87):
> java.lang.ClassNotFoundException:
>
> $line49.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at
>
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66)
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
> at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> ..
> ..
> Driver stacktrace:
> at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
> at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortSta