Re: ********Spark streaming issue to Elastic data**********

2024-05-06 Thread Mich Talebzadeh
Hi Kartrick,

Unfortunately Materialised views are not available in Spark as yet. I
raised Jira [SPARK-48117] Spark Materialized Views: Improve Query
Performance and Data Management - ASF JIRA (apache.org)
 as a feature request.

Let me think about another way and revert

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Mon, 6 May 2024 at 07:54, Karthick Nk  wrote:

> Thanks Mich,
>
> can you please confirm me is my understanding correct?
>
> First, we have to create the materialized view based on the mapping
> details we have by using multiple tables as source(since we have multiple
> join condition from different tables). From the materialised view we can
> stream the view data into elastic index by using cdc?
>
> Thanks in advance.
>
> On Fri, May 3, 2024 at 3:39 PM Mich Talebzadeh 
> wrote:
>
>> My recommendation! is using materialized views (MVs) created in Hive with
>> Spark Structured Streaming and Change Data Capture (CDC) is a good
>> combination for efficiently streaming view data updates in your scenario.
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> Von Braun
>> )".
>>
>>
>> On Thu, 2 May 2024 at 21:25, Karthick Nk  wrote:
>>
>>> Hi All,
>>>
>>> Requirements:
>>> I am working on the data flow, which will use the view definition(view
>>> definition already defined in schema), there are multiple tables used in
>>> the view definition. Here we want to stream the view data into elastic
>>> index based on if any of the table(used in the view definition) data got
>>> changed.
>>>
>>>
>>> Current flow:
>>> 1. we are inserting id's from the table(which used in the view
>>> definition) into the common table.
>>> 2. From the common table by using the id, we will be streaming the view
>>> data (by using if any of the incomming id is present in the collective id
>>> of all tables used from view definition) by using spark structured
>>> streaming.
>>>
>>>
>>> Issue:
>>> 1. Here we are facing issue - For each incomming id here we running view
>>> definition(so it will read all the data from all the data) and check if any
>>> of the incomming id is present in the collective id's of view result, Due
>>> to which it is taking more memory in the cluster driver and taking more
>>> time to process.
>>>
>>>
>>> I am epxpecting an alternate solution, if we can avoid full scan of view
>>> definition every time, If you have any alternate deisgn flow how we can
>>> achieve the result, please suggest for the same.
>>>
>>>
>>> Note: Also, it will be helpfull, if you can share the details like
>>> community forum or platform to discuss this kind of deisgn related topics,
>>> it will be more helpfull.
>>>
>>


Re: ********Spark streaming issue to Elastic data**********

2024-05-05 Thread Karthick Nk
Thanks Mich,

can you please confirm me is my understanding correct?

First, we have to create the materialized view based on the mapping details
we have by using multiple tables as source(since we have multiple
join condition from different tables). From the materialised view we can
stream the view data into elastic index by using cdc?

Thanks in advance.

On Fri, May 3, 2024 at 3:39 PM Mich Talebzadeh 
wrote:

> My recommendation! is using materialized views (MVs) created in Hive with
> Spark Structured Streaming and Change Data Capture (CDC) is a good
> combination for efficiently streaming view data updates in your scenario.
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Thu, 2 May 2024 at 21:25, Karthick Nk  wrote:
>
>> Hi All,
>>
>> Requirements:
>> I am working on the data flow, which will use the view definition(view
>> definition already defined in schema), there are multiple tables used in
>> the view definition. Here we want to stream the view data into elastic
>> index based on if any of the table(used in the view definition) data got
>> changed.
>>
>>
>> Current flow:
>> 1. we are inserting id's from the table(which used in the view
>> definition) into the common table.
>> 2. From the common table by using the id, we will be streaming the view
>> data (by using if any of the incomming id is present in the collective id
>> of all tables used from view definition) by using spark structured
>> streaming.
>>
>>
>> Issue:
>> 1. Here we are facing issue - For each incomming id here we running view
>> definition(so it will read all the data from all the data) and check if any
>> of the incomming id is present in the collective id's of view result, Due
>> to which it is taking more memory in the cluster driver and taking more
>> time to process.
>>
>>
>> I am epxpecting an alternate solution, if we can avoid full scan of view
>> definition every time, If you have any alternate deisgn flow how we can
>> achieve the result, please suggest for the same.
>>
>>
>> Note: Also, it will be helpfull, if you can share the details like
>> community forum or platform to discuss this kind of deisgn related topics,
>> it will be more helpfull.
>>
>


Re: ********Spark streaming issue to Elastic data**********

2024-05-03 Thread Mich Talebzadeh
My recommendation! is using materialized views (MVs) created in Hive with
Spark Structured Streaming and Change Data Capture (CDC) is a good
combination for efficiently streaming view data updates in your scenario.

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Thu, 2 May 2024 at 21:25, Karthick Nk  wrote:

> Hi All,
>
> Requirements:
> I am working on the data flow, which will use the view definition(view
> definition already defined in schema), there are multiple tables used in
> the view definition. Here we want to stream the view data into elastic
> index based on if any of the table(used in the view definition) data got
> changed.
>
>
> Current flow:
> 1. we are inserting id's from the table(which used in the view definition)
> into the common table.
> 2. From the common table by using the id, we will be streaming the view
> data (by using if any of the incomming id is present in the collective id
> of all tables used from view definition) by using spark structured
> streaming.
>
>
> Issue:
> 1. Here we are facing issue - For each incomming id here we running view
> definition(so it will read all the data from all the data) and check if any
> of the incomming id is present in the collective id's of view result, Due
> to which it is taking more memory in the cluster driver and taking more
> time to process.
>
>
> I am epxpecting an alternate solution, if we can avoid full scan of view
> definition every time, If you have any alternate deisgn flow how we can
> achieve the result, please suggest for the same.
>
>
> Note: Also, it will be helpfull, if you can share the details like
> community forum or platform to discuss this kind of deisgn related topics,
> it will be more helpfull.
>


********Spark streaming issue to Elastic data**********

2024-05-02 Thread Karthick Nk
Hi All,

Requirements:
I am working on the data flow, which will use the view definition(view
definition already defined in schema), there are multiple tables used in
the view definition. Here we want to stream the view data into elastic
index based on if any of the table(used in the view definition) data got
changed.


Current flow:
1. we are inserting id's from the table(which used in the view definition)
into the common table.
2. From the common table by using the id, we will be streaming the view
data (by using if any of the incomming id is present in the collective id
of all tables used from view definition) by using spark structured
streaming.


Issue:
1. Here we are facing issue - For each incomming id here we running view
definition(so it will read all the data from all the data) and check if any
of the incomming id is present in the collective id's of view result, Due
to which it is taking more memory in the cluster driver and taking more
time to process.


I am epxpecting an alternate solution, if we can avoid full scan of view
definition every time, If you have any alternate deisgn flow how we can
achieve the result, please suggest for the same.


Note: Also, it will be helpfull, if you can share the details like
community forum or platform to discuss this kind of deisgn related topics,
it will be more helpfull.


Re: spark streaming: issue with logging with separate log4j properties files for driver and executor

2016-05-24 Thread chandan prakash
Resolved.
Used passing  parameters in sparkConf instead of passing to spark-submit
command : (still dont know why passing to spark-submit command did not work)

 sparkConf.set("spark.executor.extraJavaOptions", "-XX:+UseG1GC
-Dlog4j.configuration=file:log4j_RequestLogExecutor.properties ")




On Tue, May 24, 2016 at 10:24 PM, chandan prakash  wrote:

> Any suggestion?
>
> On Mon, May 23, 2016 at 5:18 PM, chandan prakash <
> chandanbaran...@gmail.com> wrote:
>
>> Hi,
>> I am able to do logging for driver but not for executor.
>>
>> I am running spark streaming under mesos.
>> Want to do log4j logging separately for driver and executor.
>>
>> Used the below option in spark-submit command :
>>
>> --driver-java-options 
>> "-Dlog4j.configuration=file:/usr/local/spark-1.5.1-bin-hadoop2.6/conf/log4j_RequestLogDriver.properties"
>>  --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=file:
>> /usr/local/spark-1.5.1-bin-hadoop2.6/conf/log4j_RequestLogExecutor.properties
>> "
>>
>> Logging for driver at path mentioned as in
>> log4j_RequestLogDriver.properties(/tmp/requestLogDriver.log) is
>> happening fine.
>> But for executor, there is no logging happening (shud be at
>> /tmp/requestLogExecutor.log as mentioned in 
>> log4j_RequestLogExecutor.properties
>> on executor machines)
>>
>> *Any suggestions how to get logging enabled for executor ?*
>>
>> TIA,
>> Chandan
>>
>> --
>> Chandan Prakash
>>
>>
>
>
> --
> Chandan Prakash
>
>


-- 
Chandan Prakash


Re: spark streaming: issue with logging with separate log4j properties files for driver and executor

2016-05-24 Thread chandan prakash
Any suggestion?

On Mon, May 23, 2016 at 5:18 PM, chandan prakash 
wrote:

> Hi,
> I am able to do logging for driver but not for executor.
>
> I am running spark streaming under mesos.
> Want to do log4j logging separately for driver and executor.
>
> Used the below option in spark-submit command :
>
> --driver-java-options 
> "-Dlog4j.configuration=file:/usr/local/spark-1.5.1-bin-hadoop2.6/conf/log4j_RequestLogDriver.properties"
>  --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=file:
> /usr/local/spark-1.5.1-bin-hadoop2.6/conf/log4j_RequestLogExecutor.properties
> "
>
> Logging for driver at path mentioned as in
> log4j_RequestLogDriver.properties(/tmp/requestLogDriver.log) is happening
> fine.
> But for executor, there is no logging happening (shud be at
> /tmp/requestLogExecutor.log as mentioned in 
> log4j_RequestLogExecutor.properties
> on executor machines)
>
> *Any suggestions how to get logging enabled for executor ?*
>
> TIA,
> Chandan
>
> --
> Chandan Prakash
>
>


-- 
Chandan Prakash


spark streaming: issue with logging with separate log4j properties files for driver and executor

2016-05-23 Thread chandan prakash
Hi,
I am able to do logging for driver but not for executor.

I am running spark streaming under mesos.
Want to do log4j logging separately for driver and executor.

Used the below option in spark-submit command :
--driver-java-options
"-Dlog4j.configuration=file:/usr/local/spark-1.5.1-bin-hadoop2.6/conf/log4j_RequestLogDriver.properties"
--conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=file:
/usr/local/spark-1.5.1-bin-hadoop2.6/conf/log4j_RequestLogExecutor.properties
"

Logging for driver at path mentioned as in
log4j_RequestLogDriver.properties(/tmp/requestLogDriver.log) is happening
fine.
But for executor, there is no logging happening (shud be at
/tmp/requestLogExecutor.log as mentioned in log4j_RequestLogExecutor.properties
on executor machines)

*Any suggestions how to get logging enabled for executor ?*

TIA,
Chandan

-- 
Chandan Prakash


Re: Spark streaming issue

2016-04-01 Thread Mich Talebzadeh
Ok I managed to make this work.

All I am interested is receiving messages from topic every minute. No
filtering yet jut full text

import _root_.kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka.KafkaUtils
//
val sparkConf = new SparkConf().
 setAppName("StreamTest").
 setMaster("local[12]").
 set("spark.driver.allowMultipleContexts", "true").
 set("spark.hadoop.validateOutputSpecs", "false")
val ssc = new StreamingContext(sparkConf, Seconds(60))
val kafkaParams = Map[String, String]("bootstrap.servers" ->
"rhes564:9092", "schema.registry.url" -> "http://rhes564:8081";,
"zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
val topic = Set("newtopic")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topic)
messages.print()
ssc.start()


---
Time: 145955454 ms
---
(null,Sat Apr 2 00:33:01 BST 2016  === Sending messages from rhes5)
(null,1,'a7UkW5ZRaI_V8oRiPUNx0on6E06Ikr8_ILOxhVpgt6IoXXq2fF9ssYuJYcr49Cj4yp3nY9k8sHtIi_7XjltTVzqJ33beV2hIaqAj',101)
(null,2,'dnFxOkOibbKLR5m3CIeS3rhwn8hCiaZAfEaD7yXi6M7jXcvaFYBjClLDoNMEVgfLZVgJ9tXchqlGX44FmvhnarLFrtJNbTb1C6j4',102)
(null,3,'M9pvIOKMhaI_mSE3ExlovZWIxBE66KNEWGIGtCJF1qr_dGJX5sFKqLLa3Qv8aN2lCLi3lnGnMtqeZYBqE5YD586Vw50WWjL7ncZA',103)
(null,4,'9EROPf_dJZpdAHmBubTRxEUkvC9S_Xnll5bWmX0xcOPk7l4TGXPgEqxpUP52QG6pUIn74mvwWqF9vzZ2ZhsmV6WPOmUAw4Ub_nFU',104)
(null,5,'BLIi9a_n7Pfyc7r3nfzKfaNRa4Hmd9NlHEVDPkQS4xbgUWqU2bJeI6b8b1IMoStnmjMHhYLtFf4TQyJcpn85PSwFksggNVnQl1oL',105)



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 1 April 2016 at 23:26, Mich Talebzadeh  wrote:

> I adopted this approach
>
> scala> val conf = new SparkConf().
>  |  setAppName("StreamTest").
>  |  setMaster("local[12]").
>  |  set("spark.driver.allowMultipleContexts", "true").
>  |  set("spark.hadoop.validateOutputSpecs", "false")
> conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@321d96f7
> scala> val ssc = new StreamingContext(conf, Seconds(60))
> ssc: org.apache.spark.streaming.StreamingContext =
> org.apache.spark.streaming.StreamingContext@5dbae9eb
> scala> val kafkaParams = Map("metadata.broker.list" -> "rhes564:9092")
> kafkaParams: scala.collection.immutable.Map[String,String] =
> Map(metadata.broker.list -> rhes564:9092)
> scala> val topics = Set("newtopic")
> topics: scala.collection.immutable.Set[String] = Set(newtopic)
> scala> val stream = KafkaUtils.createDirectStream(ssc, kafkaParams, topics)
> stream: org.apache.spark.streaming.dstream.InputDStream[(Nothing,
> Nothing)] =
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@6d2d3b21
>
> So that opens data stream. What next?
>
> Thanks
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 1 April 2016 at 22:37, Mich Talebzadeh 
> wrote:
>
>> yes I noticed that
>>
>> scala> val kafkaStream = KafkaUtils.createStream(ssc, "rhes564:2181",
>> "rhes564:9092", "newtopic", 1)
>>
>> :52: error: overloaded method value createStream with
>> alternatives:
>>   (jssc:
>> org.apache.spark.streaming.api.java.JavaStreamingContext,zkQuorum:
>> String,groupId: String,topics: java.util.Map[String,Integer],storageLevel:
>> org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream[String,String]
>> 
>>   (ssc: org.apache.spark.streaming.StreamingContext,zkQuorum:
>> String,groupId: String,topics:
>> scala.collection.immutable.Map[String,Int],storageLevel:
>> org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.dstream.ReceiverInputDStream[(String,
>> String)]
>>  cannot be applied to (org.apache.spark.streaming.StreamingContext,
>> String, String, String, Int)
>>  val kafkaStream = KafkaUtils.createStream(ssc, "rhes564:2181",
>> "rhes564:9092", "newtopic", 1)
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 1 April 2016 at 22:25, Cody Koeninger  wrote:
>>
>>> You're not passing valid Scala values.  rhes564:2181  without quotes
>>> isn't a valid literal, newtopic isn't a list of strings, etc.
>>>
>>> On Fri, Apr 1, 2016 at 4:04 PM, Mich Talebzadeh
>>>  wrote:
>>> > Thanks Cody.
>>> >
>>

Re: Spark streaming issue

2016-04-01 Thread Mich Talebzadeh
I adopted this approach

scala> val conf = new SparkConf().
 |  setAppName("StreamTest").
 |  setMaster("local[12]").
 |  set("spark.driver.allowMultipleContexts", "true").
 |  set("spark.hadoop.validateOutputSpecs", "false")
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@321d96f7
scala> val ssc = new StreamingContext(conf, Seconds(60))
ssc: org.apache.spark.streaming.StreamingContext =
org.apache.spark.streaming.StreamingContext@5dbae9eb
scala> val kafkaParams = Map("metadata.broker.list" -> "rhes564:9092")
kafkaParams: scala.collection.immutable.Map[String,String] =
Map(metadata.broker.list -> rhes564:9092)
scala> val topics = Set("newtopic")
topics: scala.collection.immutable.Set[String] = Set(newtopic)
scala> val stream = KafkaUtils.createDirectStream(ssc, kafkaParams, topics)
stream: org.apache.spark.streaming.dstream.InputDStream[(Nothing, Nothing)]
= org.apache.spark.streaming.kafka.DirectKafkaInputDStream@6d2d3b21

So that opens data stream. What next?

Thanks



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 1 April 2016 at 22:37, Mich Talebzadeh  wrote:

> yes I noticed that
>
> scala> val kafkaStream = KafkaUtils.createStream(ssc, "rhes564:2181",
> "rhes564:9092", "newtopic", 1)
>
> :52: error: overloaded method value createStream with
> alternatives:
>   (jssc:
> org.apache.spark.streaming.api.java.JavaStreamingContext,zkQuorum:
> String,groupId: String,topics: java.util.Map[String,Integer],storageLevel:
> org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream[String,String]
> 
>   (ssc: org.apache.spark.streaming.StreamingContext,zkQuorum:
> String,groupId: String,topics:
> scala.collection.immutable.Map[String,Int],storageLevel:
> org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.dstream.ReceiverInputDStream[(String,
> String)]
>  cannot be applied to (org.apache.spark.streaming.StreamingContext,
> String, String, String, Int)
>  val kafkaStream = KafkaUtils.createStream(ssc, "rhes564:2181",
> "rhes564:9092", "newtopic", 1)
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 1 April 2016 at 22:25, Cody Koeninger  wrote:
>
>> You're not passing valid Scala values.  rhes564:2181  without quotes
>> isn't a valid literal, newtopic isn't a list of strings, etc.
>>
>> On Fri, Apr 1, 2016 at 4:04 PM, Mich Talebzadeh
>>  wrote:
>> > Thanks Cody.
>> >
>> > Can I use Receiver-based Approach here?
>> >
>> > I have created the topic newtopic as below
>> >
>> > ${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper rhes564:2181
>> > --replication-factor 1 --partitions 1 --topic newtopic
>> >
>> >
>> > This is basically what I am doing the Spark
>> >
>> > val lines = ssc.socketTextStream("rhes564", 2181)
>> >
>> > Which obviously not working
>> >
>> > This is what is suggested in the doc
>> >
>> > import org.apache.spark.streaming.kafka._
>> >
>> > val kafkaStream = KafkaUtils.createStream(streamingContext,
>> >  [ZK quorum], [consumer group id], [per-topic number of Kafka
>> partitions
>> > to consume])
>> >
>> > *is a list of one or more zookeeper servers that make
>> quorum
>> > *is the name of kafka consumer group
>> > *is a list of one or more kafka topics to consume from
>> > *is the number of threads the kafka consumer should use
>> >
>> > Now this comes back with error. onviously not passing parameters
>> correctly!
>> >
>> > scala> val kafkaStream = KafkaUtils.createStream(streamingContext,
>> > rhes564:2181, rhes564:9092, newtopic 1)
>> > :1: error: identifier expected but integer literal found.
>> >val kafkaStream = KafkaUtils.createStream(streamingContext,
>> > rhes564:2181, rhes564:9092, newtopic 1)
>> >
>> >
>> >
>> >
>> >
>> >
>> > Dr Mich Talebzadeh
>> >
>> >
>> >
>> > LinkedIn
>> >
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> >
>> >
>> >
>> > http://talebzadehmich.wordpress.com
>> >
>> >
>> >
>> >
>> > On 1 April 2016 at 21:13, Cody Koeninger  wrote:
>> >>
>> >> It looks like you're using a plain socket stream to connect to a
>> >> zookeeper port, which won't work.
>> >>
>> >>   Look at
>> spark.apache.org/docs/latest/streaming-kafka-integration.html
>> >>
>> >> On Fri, Apr 1, 2016 at 3:03 PM, Mich Talebzadeh
>> >>  wrote:
>> >> >
>> >> > Hi,
>> >> >
>> >> > I am just testing Spark streaming with Kafka.
>> >> >
>> >> > Basically I am broadcasting topic every minute to Host:port ->
>> >> > rhes564:2181.
>> >> > This is sending few lines through a shell script as follow

Re: Spark streaming issue

2016-04-01 Thread Mich Talebzadeh
yes I noticed that

scala> val kafkaStream = KafkaUtils.createStream(ssc, "rhes564:2181",
"rhes564:9092", "newtopic", 1)

:52: error: overloaded method value createStream with alternatives:
  (jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,zkQuorum:
String,groupId: String,topics: java.util.Map[String,Integer],storageLevel:
org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream[String,String]

  (ssc: org.apache.spark.streaming.StreamingContext,zkQuorum:
String,groupId: String,topics:
scala.collection.immutable.Map[String,Int],storageLevel:
org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.dstream.ReceiverInputDStream[(String,
String)]
 cannot be applied to (org.apache.spark.streaming.StreamingContext, String,
String, String, Int)
 val kafkaStream = KafkaUtils.createStream(ssc, "rhes564:2181",
"rhes564:9092", "newtopic", 1)


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 1 April 2016 at 22:25, Cody Koeninger  wrote:

> You're not passing valid Scala values.  rhes564:2181  without quotes
> isn't a valid literal, newtopic isn't a list of strings, etc.
>
> On Fri, Apr 1, 2016 at 4:04 PM, Mich Talebzadeh
>  wrote:
> > Thanks Cody.
> >
> > Can I use Receiver-based Approach here?
> >
> > I have created the topic newtopic as below
> >
> > ${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper rhes564:2181
> > --replication-factor 1 --partitions 1 --topic newtopic
> >
> >
> > This is basically what I am doing the Spark
> >
> > val lines = ssc.socketTextStream("rhes564", 2181)
> >
> > Which obviously not working
> >
> > This is what is suggested in the doc
> >
> > import org.apache.spark.streaming.kafka._
> >
> > val kafkaStream = KafkaUtils.createStream(streamingContext,
> >  [ZK quorum], [consumer group id], [per-topic number of Kafka
> partitions
> > to consume])
> >
> > *is a list of one or more zookeeper servers that make
> quorum
> > *is the name of kafka consumer group
> > *is a list of one or more kafka topics to consume from
> > *is the number of threads the kafka consumer should use
> >
> > Now this comes back with error. onviously not passing parameters
> correctly!
> >
> > scala> val kafkaStream = KafkaUtils.createStream(streamingContext,
> > rhes564:2181, rhes564:9092, newtopic 1)
> > :1: error: identifier expected but integer literal found.
> >val kafkaStream = KafkaUtils.createStream(streamingContext,
> > rhes564:2181, rhes564:9092, newtopic 1)
> >
> >
> >
> >
> >
> >
> > Dr Mich Talebzadeh
> >
> >
> >
> > LinkedIn
> >
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >
> >
> >
> > http://talebzadehmich.wordpress.com
> >
> >
> >
> >
> > On 1 April 2016 at 21:13, Cody Koeninger  wrote:
> >>
> >> It looks like you're using a plain socket stream to connect to a
> >> zookeeper port, which won't work.
> >>
> >>   Look at spark.apache.org/docs/latest/streaming-kafka-integration.html
> >>
> >> On Fri, Apr 1, 2016 at 3:03 PM, Mich Talebzadeh
> >>  wrote:
> >> >
> >> > Hi,
> >> >
> >> > I am just testing Spark streaming with Kafka.
> >> >
> >> > Basically I am broadcasting topic every minute to Host:port ->
> >> > rhes564:2181.
> >> > This is sending few lines through a shell script as follows:
> >> >
> >> > cat ${IN_FILE} | ${KAFKA_HOME}/bin/kafka-console-producer.sh
> >> > --broker-list
> >> > rhes564:9092 --topic newtopic
> >> >
> >> > That works fine and I can see the messages in
> >> >
> >> > ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181
> >> > --topic
> >> > newtopic
> >> >
> >> > Fri Apr 1 21:00:01 BST 2016  === Sending messages from rhes5
> >> >
> >> >
> 1,'OZ9062Cx22qAHo8m_fsZb16Etlq5eTnL4jYPKmgPQPyQB7Kk5IMt2xQN3yy1Qb1O3Qph16TGlHzixw02mRLAiagU0Wh17fHi5dOQ',101
> >> >
> >> >
> 2,'Py_xzno6MEWPz1bp5Cc0JBPfX90mz2uVMLPBJUWucvNPlPnVMMm81PExZ5uM0K9iEdKmleY7XFsn8O3Oxr6e07qdycvuk_lR84vI',102
> >> >
> >> >
> 3,'i2FS2ODjRBdaIpyE362JVPu4KEYSHDNTjPh46YFANquxNRK9JQT8h1W4Tph9DqGfwIgQG5ZJ8BCBklRQreyJhoLIPMbJQeH_rhN1',103
> >> >
> >> >
> 4,'Yp_q_uyH16UPTRvPdeKaslw8bhheFqqdwWaG_e8TZZ6jyscyQN556jJMxYOZjx5Zv7GV6zoa2ORsTEGcAKbKUChPFfuGAujgDkjT',104
> >> >
> >> >
> 5,'t3uuFOkNEjDE_7rc9cLbgT1o0B_jZXWsWNtmBgiC4ACffzTHUGRkl5YIZSUXB3kew2yytvB8nbCklImDa0BWxYseSbMWiKg1R9ae',105
> >> >
> >> > Now I try to see the topic in spark streaming as follows:
> >> >
> >> > val conf = new SparkConf().
> >> >  setAppName("StreamTest").
> >> >  setMaster("local[12]").
> >> >  set("spark.driver.allowMultipleContexts", "true").
> >> >  set("spark.hadoop.validateOutputSpecs", "false")
> >> > val sc = new SparkContext(conf)
> >> > // Create sqlContext based on HiveContext
> >> > val sqlContext = new HiveContext(sc)
> >> > val HiveC

Re: Spark streaming issue

2016-04-01 Thread Cody Koeninger
You're not passing valid Scala values.  rhes564:2181  without quotes
isn't a valid literal, newtopic isn't a list of strings, etc.

On Fri, Apr 1, 2016 at 4:04 PM, Mich Talebzadeh
 wrote:
> Thanks Cody.
>
> Can I use Receiver-based Approach here?
>
> I have created the topic newtopic as below
>
> ${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper rhes564:2181
> --replication-factor 1 --partitions 1 --topic newtopic
>
>
> This is basically what I am doing the Spark
>
> val lines = ssc.socketTextStream("rhes564", 2181)
>
> Which obviously not working
>
> This is what is suggested in the doc
>
> import org.apache.spark.streaming.kafka._
>
> val kafkaStream = KafkaUtils.createStream(streamingContext,
>  [ZK quorum], [consumer group id], [per-topic number of Kafka partitions
> to consume])
>
> *is a list of one or more zookeeper servers that make quorum
> *is the name of kafka consumer group
> *is a list of one or more kafka topics to consume from
> *is the number of threads the kafka consumer should use
>
> Now this comes back with error. onviously not passing parameters correctly!
>
> scala> val kafkaStream = KafkaUtils.createStream(streamingContext,
> rhes564:2181, rhes564:9092, newtopic 1)
> :1: error: identifier expected but integer literal found.
>val kafkaStream = KafkaUtils.createStream(streamingContext,
> rhes564:2181, rhes564:9092, newtopic 1)
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
>
> On 1 April 2016 at 21:13, Cody Koeninger  wrote:
>>
>> It looks like you're using a plain socket stream to connect to a
>> zookeeper port, which won't work.
>>
>>   Look at spark.apache.org/docs/latest/streaming-kafka-integration.html
>>
>> On Fri, Apr 1, 2016 at 3:03 PM, Mich Talebzadeh
>>  wrote:
>> >
>> > Hi,
>> >
>> > I am just testing Spark streaming with Kafka.
>> >
>> > Basically I am broadcasting topic every minute to Host:port ->
>> > rhes564:2181.
>> > This is sending few lines through a shell script as follows:
>> >
>> > cat ${IN_FILE} | ${KAFKA_HOME}/bin/kafka-console-producer.sh
>> > --broker-list
>> > rhes564:9092 --topic newtopic
>> >
>> > That works fine and I can see the messages in
>> >
>> > ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181
>> > --topic
>> > newtopic
>> >
>> > Fri Apr 1 21:00:01 BST 2016  === Sending messages from rhes5
>> >
>> > 1,'OZ9062Cx22qAHo8m_fsZb16Etlq5eTnL4jYPKmgPQPyQB7Kk5IMt2xQN3yy1Qb1O3Qph16TGlHzixw02mRLAiagU0Wh17fHi5dOQ',101
>> >
>> > 2,'Py_xzno6MEWPz1bp5Cc0JBPfX90mz2uVMLPBJUWucvNPlPnVMMm81PExZ5uM0K9iEdKmleY7XFsn8O3Oxr6e07qdycvuk_lR84vI',102
>> >
>> > 3,'i2FS2ODjRBdaIpyE362JVPu4KEYSHDNTjPh46YFANquxNRK9JQT8h1W4Tph9DqGfwIgQG5ZJ8BCBklRQreyJhoLIPMbJQeH_rhN1',103
>> >
>> > 4,'Yp_q_uyH16UPTRvPdeKaslw8bhheFqqdwWaG_e8TZZ6jyscyQN556jJMxYOZjx5Zv7GV6zoa2ORsTEGcAKbKUChPFfuGAujgDkjT',104
>> >
>> > 5,'t3uuFOkNEjDE_7rc9cLbgT1o0B_jZXWsWNtmBgiC4ACffzTHUGRkl5YIZSUXB3kew2yytvB8nbCklImDa0BWxYseSbMWiKg1R9ae',105
>> >
>> > Now I try to see the topic in spark streaming as follows:
>> >
>> > val conf = new SparkConf().
>> >  setAppName("StreamTest").
>> >  setMaster("local[12]").
>> >  set("spark.driver.allowMultipleContexts", "true").
>> >  set("spark.hadoop.validateOutputSpecs", "false")
>> > val sc = new SparkContext(conf)
>> > // Create sqlContext based on HiveContext
>> > val sqlContext = new HiveContext(sc)
>> > val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> > //
>> > // Create a local StreamingContext with two working thread and batch
>> > interval of 1 second.
>> > // The master requires 2 cores to prevent from a starvation scenario.
>> > val ssc = new StreamingContext(conf, Minutes(1))
>> > // Create a DStream that will connect to hostname:port, like
>> > localhost:
>> > //val lines = ssc.socketTextStream("rhes564", 9092)
>> > val lines = ssc.socketTextStream("rhes564", 2181)
>> > // Split each line into words
>> > val words = lines.flatMap(_.split(" "))
>> > val pairs = words.map(word => (word, 1))
>> > val wordCounts = pairs.reduceByKey(_ + _)
>> > // Print the first ten elements of each RDD generated in this DStream to
>> > the
>> > console
>> > wordCounts.print()
>> > ssc.start()
>> >
>> > This is what I am getting:
>> >
>> >
>> > scala> ---
>> > Time: 145954176 ms
>> > ---
>> >
>> > But no values
>> >
>> > Have I got the port wrong in this case or the set up is incorrect?
>> >
>> >
>> > Thanks
>> >
>> > Dr Mich Talebzadeh
>> >
>> >
>> >
>> > LinkedIn
>> >
>> > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> >
>> >
>> >
>> > http://talebzadehmich.wordpress.com
>> >
>> >
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

Re: Spark streaming issue

2016-04-01 Thread Mich Talebzadeh
Thanks Cody.

Can I use Receiver-based Approach here?

I have created the topic newtopic as below

${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper rhes564:2181
--replication-factor 1 --partitions 1 --topic newtopic

This is basically what I am doing the Spark

val lines = ssc.socketTextStream("rhes564", 2181)

Which obviously not working

This is what is suggested in the doc

import org.apache.spark.streaming.kafka._

val kafkaStream = KafkaUtils.createStream(streamingContext,
 [ZK quorum], [consumer group id], [per-topic number of Kafka
partitions to consume])

*is a list of one or more zookeeper servers that make quorum
*is the name of kafka consumer group
*is a list of one or more kafka topics to consume from
*is the number of threads the kafka consumer should use

Now this comes back with error. onviously not passing parameters correctly!

scala> val kafkaStream = KafkaUtils.createStream(streamingContext,
rhes564:2181, rhes564:9092, newtopic 1)
:1: error: identifier expected but integer literal found.
   val kafkaStream = KafkaUtils.createStream(streamingContext,
rhes564:2181, rhes564:9092, newtopic 1)






Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 1 April 2016 at 21:13, Cody Koeninger  wrote:

> It looks like you're using a plain socket stream to connect to a
> zookeeper port, which won't work.
>
>   Look at spark.apache.org/docs/latest/streaming-kafka-integration.html
>
> On Fri, Apr 1, 2016 at 3:03 PM, Mich Talebzadeh
>  wrote:
> >
> > Hi,
> >
> > I am just testing Spark streaming with Kafka.
> >
> > Basically I am broadcasting topic every minute to Host:port ->
> rhes564:2181.
> > This is sending few lines through a shell script as follows:
> >
> > cat ${IN_FILE} | ${KAFKA_HOME}/bin/kafka-console-producer.sh
> --broker-list
> > rhes564:9092 --topic newtopic
> >
> > That works fine and I can see the messages in
> >
> > ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181
> --topic
> > newtopic
> >
> > Fri Apr 1 21:00:01 BST 2016  === Sending messages from rhes5
> >
> 1,'OZ9062Cx22qAHo8m_fsZb16Etlq5eTnL4jYPKmgPQPyQB7Kk5IMt2xQN3yy1Qb1O3Qph16TGlHzixw02mRLAiagU0Wh17fHi5dOQ',101
> >
> 2,'Py_xzno6MEWPz1bp5Cc0JBPfX90mz2uVMLPBJUWucvNPlPnVMMm81PExZ5uM0K9iEdKmleY7XFsn8O3Oxr6e07qdycvuk_lR84vI',102
> >
> 3,'i2FS2ODjRBdaIpyE362JVPu4KEYSHDNTjPh46YFANquxNRK9JQT8h1W4Tph9DqGfwIgQG5ZJ8BCBklRQreyJhoLIPMbJQeH_rhN1',103
> >
> 4,'Yp_q_uyH16UPTRvPdeKaslw8bhheFqqdwWaG_e8TZZ6jyscyQN556jJMxYOZjx5Zv7GV6zoa2ORsTEGcAKbKUChPFfuGAujgDkjT',104
> >
> 5,'t3uuFOkNEjDE_7rc9cLbgT1o0B_jZXWsWNtmBgiC4ACffzTHUGRkl5YIZSUXB3kew2yytvB8nbCklImDa0BWxYseSbMWiKg1R9ae',105
> >
> > Now I try to see the topic in spark streaming as follows:
> >
> > val conf = new SparkConf().
> >  setAppName("StreamTest").
> >  setMaster("local[12]").
> >  set("spark.driver.allowMultipleContexts", "true").
> >  set("spark.hadoop.validateOutputSpecs", "false")
> > val sc = new SparkContext(conf)
> > // Create sqlContext based on HiveContext
> > val sqlContext = new HiveContext(sc)
> > val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> > //
> > // Create a local StreamingContext with two working thread and batch
> > interval of 1 second.
> > // The master requires 2 cores to prevent from a starvation scenario.
> > val ssc = new StreamingContext(conf, Minutes(1))
> > // Create a DStream that will connect to hostname:port, like
> localhost:
> > //val lines = ssc.socketTextStream("rhes564", 9092)
> > val lines = ssc.socketTextStream("rhes564", 2181)
> > // Split each line into words
> > val words = lines.flatMap(_.split(" "))
> > val pairs = words.map(word => (word, 1))
> > val wordCounts = pairs.reduceByKey(_ + _)
> > // Print the first ten elements of each RDD generated in this DStream to
> the
> > console
> > wordCounts.print()
> > ssc.start()
> >
> > This is what I am getting:
> >
> >
> > scala> ---
> > Time: 145954176 ms
> > ---
> >
> > But no values
> >
> > Have I got the port wrong in this case or the set up is incorrect?
> >
> >
> > Thanks
> >
> > Dr Mich Talebzadeh
> >
> >
> >
> > LinkedIn
> >
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >
> >
> >
> > http://talebzadehmich.wordpress.com
> >
> >
>


Re: Spark streaming issue

2016-04-01 Thread Cody Koeninger
It looks like you're using a plain socket stream to connect to a
zookeeper port, which won't work.

  Look at spark.apache.org/docs/latest/streaming-kafka-integration.html

On Fri, Apr 1, 2016 at 3:03 PM, Mich Talebzadeh
 wrote:
>
> Hi,
>
> I am just testing Spark streaming with Kafka.
>
> Basically I am broadcasting topic every minute to Host:port -> rhes564:2181.
> This is sending few lines through a shell script as follows:
>
> cat ${IN_FILE} | ${KAFKA_HOME}/bin/kafka-console-producer.sh --broker-list
> rhes564:9092 --topic newtopic
>
> That works fine and I can see the messages in
>
> ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181 --topic
> newtopic
>
> Fri Apr 1 21:00:01 BST 2016  === Sending messages from rhes5
> 1,'OZ9062Cx22qAHo8m_fsZb16Etlq5eTnL4jYPKmgPQPyQB7Kk5IMt2xQN3yy1Qb1O3Qph16TGlHzixw02mRLAiagU0Wh17fHi5dOQ',101
> 2,'Py_xzno6MEWPz1bp5Cc0JBPfX90mz2uVMLPBJUWucvNPlPnVMMm81PExZ5uM0K9iEdKmleY7XFsn8O3Oxr6e07qdycvuk_lR84vI',102
> 3,'i2FS2ODjRBdaIpyE362JVPu4KEYSHDNTjPh46YFANquxNRK9JQT8h1W4Tph9DqGfwIgQG5ZJ8BCBklRQreyJhoLIPMbJQeH_rhN1',103
> 4,'Yp_q_uyH16UPTRvPdeKaslw8bhheFqqdwWaG_e8TZZ6jyscyQN556jJMxYOZjx5Zv7GV6zoa2ORsTEGcAKbKUChPFfuGAujgDkjT',104
> 5,'t3uuFOkNEjDE_7rc9cLbgT1o0B_jZXWsWNtmBgiC4ACffzTHUGRkl5YIZSUXB3kew2yytvB8nbCklImDa0BWxYseSbMWiKg1R9ae',105
>
> Now I try to see the topic in spark streaming as follows:
>
> val conf = new SparkConf().
>  setAppName("StreamTest").
>  setMaster("local[12]").
>  set("spark.driver.allowMultipleContexts", "true").
>  set("spark.hadoop.validateOutputSpecs", "false")
> val sc = new SparkContext(conf)
> // Create sqlContext based on HiveContext
> val sqlContext = new HiveContext(sc)
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> //
> // Create a local StreamingContext with two working thread and batch
> interval of 1 second.
> // The master requires 2 cores to prevent from a starvation scenario.
> val ssc = new StreamingContext(conf, Minutes(1))
> // Create a DStream that will connect to hostname:port, like localhost:
> //val lines = ssc.socketTextStream("rhes564", 9092)
> val lines = ssc.socketTextStream("rhes564", 2181)
> // Split each line into words
> val words = lines.flatMap(_.split(" "))
> val pairs = words.map(word => (word, 1))
> val wordCounts = pairs.reduceByKey(_ + _)
> // Print the first ten elements of each RDD generated in this DStream to the
> console
> wordCounts.print()
> ssc.start()
>
> This is what I am getting:
>
>
> scala> ---
> Time: 145954176 ms
> ---
>
> But no values
>
> Have I got the port wrong in this case or the set up is incorrect?
>
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark streaming issue

2016-04-01 Thread Mich Talebzadeh
Hi,

I am just testing Spark streaming with Kafka.

Basically I am broadcasting topic every minute to Host:port
-> rhes564:2181. This is sending few lines through a shell script as
follows:

cat ${IN_FILE} | ${KAFKA_HOME}/bin/kafka-console-producer.sh --broker-list
rhes564:9092 --topic newtopic

That works fine and I can see the messages in

${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181
--topic newtopic

Fri Apr 1 21:00:01 BST 2016  === Sending messages from rhes5
1,'OZ9062Cx22qAHo8m_fsZb16Etlq5eTnL4jYPKmgPQPyQB7Kk5IMt2xQN3yy1Qb1O3Qph16TGlHzixw02mRLAiagU0Wh17fHi5dOQ',101
2,'Py_xzno6MEWPz1bp5Cc0JBPfX90mz2uVMLPBJUWucvNPlPnVMMm81PExZ5uM0K9iEdKmleY7XFsn8O3Oxr6e07qdycvuk_lR84vI',102
3,'i2FS2ODjRBdaIpyE362JVPu4KEYSHDNTjPh46YFANquxNRK9JQT8h1W4Tph9DqGfwIgQG5ZJ8BCBklRQreyJhoLIPMbJQeH_rhN1',103
4,'Yp_q_uyH16UPTRvPdeKaslw8bhheFqqdwWaG_e8TZZ6jyscyQN556jJMxYOZjx5Zv7GV6zoa2ORsTEGcAKbKUChPFfuGAujgDkjT',104
5,'t3uuFOkNEjDE_7rc9cLbgT1o0B_jZXWsWNtmBgiC4ACffzTHUGRkl5YIZSUXB3kew2
yytvB8nbCklImDa0BWxYseSbMWiKg1R9ae',105

Now I try to see the topic in spark streaming as follows:

val conf = new SparkConf().
 setAppName("StreamTest").
 setMaster("local[12]").
 set("spark.driver.allowMultipleContexts", "true").
 set("spark.hadoop.validateOutputSpecs", "false")
val sc = new SparkContext(conf)
// Create sqlContext based on HiveContext
val sqlContext = new HiveContext(sc)
val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
//
// Create a local StreamingContext with two working thread and batch
interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.
val ssc = new StreamingContext(conf, Minutes(1))
// Create a DStream that will connect to hostname:port, like localhost:
//val lines = ssc.socketTextStream("rhes564", 9092)
val lines = ssc.socketTextStream("rhes564", 2181)
// Split each line into words
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to
the console
wordCounts.print()
ssc.start()

This is what I am getting:


scala> ---
Time: 145954176 ms
---

But no values

Have I got the port wrong in this case or the set up is incorrect?


Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


Re: Hbase Spark streaming issue.

2015-09-24 Thread Shixiong Zhu
Looks like you have an incompatible hbase-default.xml in some place. You
can use the following code to find the location of "hbase-default.xml"

println(Thread.currentThread().getContextClassLoader().getResource("hbase-default.xml"))

Best Regards,
Shixiong Zhu

2015-09-21 15:46 GMT+08:00 Siva :

> Hi,
>
> I m seeing some strange error while inserting data from spark streaming to
> hbase.
>
> I can able to write the data from spark (without streaming) to hbase
> successfully, but when i use the same code to write dstream I m seeing the
> below error.
>
> I tried setting the below parameters, still didnt help. Did any face the
> similar issue?
>
> conf.set("hbase.defaults.for.version.skip", "true")
> conf.set("hbase.defaults.for.version", "0.98.4.2.2.4.2-2-hadoop2")
>
> 15/09/20 22:39:10 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID
> 16)
> java.lang.RuntimeException: hbase-default.xml file seems to be for and old
> version of HBase (null), this version is 0.98.4.2.2.4.2-2-hadoop2
> at
> org.apache.hadoop.hbase.HBaseConfiguration.checkDefaultsVersion(HBaseConfiguration.java:73)
> at
> org.apache.hadoop.hbase.HBaseConfiguration.addHbaseResources(HBaseConfiguration.java:105)
> at
> org.apache.hadoop.hbase.HBaseConfiguration.create(HBaseConfiguration.java:116)
> at
> org.apache.hadoop.hbase.HBaseConfiguration.create(HBaseConfiguration.java:125)
> at
> $line51.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$HBaseConn$.hbaseConnection(:49)
> at
> $line52.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$TestHbaseSpark$$anonfun$run$1$$anonfun$apply$1.apply(:73)
> at
> $line52.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$TestHbaseSpark$$anonfun$run$1$$anonfun$apply$1.apply(:73)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:782)
> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:782)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 15/09/20 22:39:10 WARN TaskSetManager: Lost task 0.0 in stage 14.0 (TID
> 16, localhost): java.lang.RuntimeException: hbase-default.xml file seems to
> be for and old version of HBase (null), this version is
> 0.98.4.2.2.4.2-2-hadoop2
>
>
> Thanks,
> Siva.
>


Hbase Spark streaming issue.

2015-09-21 Thread Siva
Hi,

I m seeing some strange error while inserting data from spark streaming to
hbase.

I can able to write the data from spark (without streaming) to hbase
successfully, but when i use the same code to write dstream I m seeing the
below error.

I tried setting the below parameters, still didnt help. Did any face the
similar issue?

conf.set("hbase.defaults.for.version.skip", "true")
conf.set("hbase.defaults.for.version", "0.98.4.2.2.4.2-2-hadoop2")

15/09/20 22:39:10 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID
16)
java.lang.RuntimeException: hbase-default.xml file seems to be for and old
version of HBase (null), this version is 0.98.4.2.2.4.2-2-hadoop2
at
org.apache.hadoop.hbase.HBaseConfiguration.checkDefaultsVersion(HBaseConfiguration.java:73)
at
org.apache.hadoop.hbase.HBaseConfiguration.addHbaseResources(HBaseConfiguration.java:105)
at
org.apache.hadoop.hbase.HBaseConfiguration.create(HBaseConfiguration.java:116)
at
org.apache.hadoop.hbase.HBaseConfiguration.create(HBaseConfiguration.java:125)
at
$line51.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$HBaseConn$.hbaseConnection(:49)
at
$line52.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$TestHbaseSpark$$anonfun$run$1$$anonfun$apply$1.apply(:73)
at
$line52.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$TestHbaseSpark$$anonfun$run$1$$anonfun$apply$1.apply(:73)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:782)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:782)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/09/20 22:39:10 WARN TaskSetManager: Lost task 0.0 in stage 14.0 (TID 16,
localhost): java.lang.RuntimeException: hbase-default.xml file seems to be
for and old version of HBase (null), this version is
0.98.4.2.2.4.2-2-hadoop2


Thanks,
Siva.


Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

2014-12-01 Thread Akhil Das
I see you have no worker machines to execute the job

[image: Inline image 1]

You haven't configured your spark cluster properly.

Quick fix to get it running would be run it on local mode, for that change
this line

JavaStreamingContext jssc = *new* JavaStreamingContext("spark://
192.168.88.130:7077", "SparkStream", *new* Duration(3000));

to this

JavaStreamingContext jssc = *new* JavaStreamingContext("local[4]",
"SparkStream", *new* Duration(3000));


Thanks
Best Regards

On Mon, Dec 1, 2014 at 4:18 PM,  wrote:

>  Hi,
>
>
>
> The spark master is working, and I have given the same url in the code:
>
>
>
> The warning is gone, and the new log is:
>
> ---
>
> Time: 141742785 ms
>
> ---
>
>
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Starting job streaming job 141742785 ms.0
> from job set of time 141742785 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Finished job streaming job 141742785 ms.0
> from job set of time 141742785 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Total delay: 0.028 s for time 141742785
> ms (execution: 0.001 s)
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Added jobs for time 141742785 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.MappedRDD
> (Logging.scala:logInfo(59)) - Removing RDD 25 from persistence list
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 25
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.BlockRDD
> (Logging.scala:logInfo(59)) - Removing RDD 24 from persistence list
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 24
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5]
> kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of
> RDD BlockRDD[24] at BlockRDD at ReceiverInputDStream.scala:69 of time
> 141742785 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4]
> scheduler.ReceiverTracker (Logging.scala:logInfo(59)) *- Stream 0
> received 0 blocks*
>
> ---
>
> Time: 1417427853000 ms
>
> ---
>
>
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Starting job streaming job 1417427853000 ms.0
> from job set of time 1417427853000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Finished job streaming job 1417427853000 ms.0
> from job set of time 1417427853000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Total delay: 0.015 s for time 1417427853000
> ms (execution: 0.001 s)
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Added jobs for time 1417427853000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.MappedRDD
> (Logging.scala:logInfo(59)) - Removing RDD 27 from persistence list
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 27
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.BlockRDD
> (Logging.scala:logInfo(59)) - Removing RDD 26 from persistence list
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 26
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4]
> kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of
> RDD BlockRDD[26] at BlockRDD at ReceiverInputDStream.scala:69 of time
> 1417427853000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-6]
> scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - *Stream 0
> received 0 blocks*
>
>
>
> What should be my approach now ?
>
> Need urgent help.
>
>
>
> Regards,
>
> Aiman
>
>
>
> *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com]
> *Sent:* Monday, December 01, 2014 3:56 PM
> *To:* Sarosh, M.
> *Cc:* user@spark.apache.org
> *Subject:* Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks
>
>
>
> It says:
>
>
>
>  14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not
> 

RE: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

2014-12-01 Thread m.sarosh
Hi,

The spark master is working, and I have given the same url in the code:
[cid:image001.png@01D00D82.6DC2FFF0]

The warning is gone, and the new log is:
---
Time: 141742785 ms
---

INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Starting job streaming job 141742785 ms.0 
from job set of time 141742785 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Finished job streaming job 141742785 ms.0 
from job set of time 141742785 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Total delay: 0.028 s for time 141742785 ms 
(execution: 0.001 s)
INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Added jobs for time 141742785 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.MappedRDD 
(Logging.scala:logInfo(59)) - Removing RDD 25 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManager 
(Logging.scala:logInfo(59)) - Removing RDD 25
INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.BlockRDD 
(Logging.scala:logInfo(59)) - Removing RDD 24 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager 
(Logging.scala:logInfo(59)) - Removing RDD 24
INFO  [sparkDriver-akka.actor.default-dispatcher-5] kafka.KafkaInputDStream 
(Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[24] at BlockRDD 
at ReceiverInputDStream.scala:69 of time 141742785 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.ReceiverTracker 
(Logging.scala:logInfo(59)) - Stream 0 received 0 blocks
---
Time: 1417427853000 ms
---

INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Starting job streaming job 1417427853000 ms.0 
from job set of time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Finished job streaming job 1417427853000 ms.0 
from job set of time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Total delay: 0.015 s for time 1417427853000 ms 
(execution: 0.001 s)
INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Added jobs for time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.MappedRDD 
(Logging.scala:logInfo(59)) - Removing RDD 27 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-5] storage.BlockManager 
(Logging.scala:logInfo(59)) - Removing RDD 27
INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.BlockRDD 
(Logging.scala:logInfo(59)) - Removing RDD 26 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager 
(Logging.scala:logInfo(59)) - Removing RDD 26
INFO  [sparkDriver-akka.actor.default-dispatcher-4] kafka.KafkaInputDStream 
(Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[26] at BlockRDD 
at ReceiverInputDStream.scala:69 of time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-6] scheduler.ReceiverTracker 
(Logging.scala:logInfo(59)) - Stream 0 received 0 blocks

What should be my approach now ?
Need urgent help.

Regards,
Aiman

From: Akhil Das [mailto:ak...@sigmoidanalytics.com]
Sent: Monday, December 01, 2014 3:56 PM
To: Sarosh, M.
Cc: user@spark.apache.org
Subject: Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

It says:

 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not 
accepted any resources; check your cluster UI to ensure that workers are 
registered and have sufficient memory

A quick guess would be, you are giving the wrong master url. ( 
spark://192.168.88.130:7077<http://192.168.88.130:7077/> ) Open the webUI 
running on port 8080 and use the master url listed there on top left corner of 
the page.

Thanks
Best Regards

On Mon, Dec 1, 2014 at 3:42 PM, 
mailto:m.sar...@accenture.com>> wrote:
Hi,

I am integrating Kafka and Spark, using spark-streaming. I have created a topic 
as a kafka producer:

bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test


I am publishing messages in kafka and trying to read them using spark-streaming 
java code and displaying them on screen.
The daemons are all up: Spark-master,worker; zookeeper; kafka.
I am writing a java code for doing it, using KafkaUtils.createStream
code is below:

package com.spark;

import scala.Tuple2;
import kafka.serializer.Decoder;
import kafka.serializer.Encoder;
import org.apache.spark.streaming.Duration;
import org.apache.spa

Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

2014-12-01 Thread Akhil Das
It says:

 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not
accepted any resources; check your cluster UI to ensure that workers are
registered and have sufficient memory

A quick guess would be, you are giving the wrong master url. ( spark://
192.168.88.130:7077 ) Open the webUI running on port 8080 and use the
master url listed there on top left corner of the page.

Thanks
Best Regards

On Mon, Dec 1, 2014 at 3:42 PM,  wrote:

>  Hi,
>
>
>
> I am integrating Kafka and Spark, using spark-streaming. I have created a
> topic as a kafka producer:
>
>
>
> bin/kafka-topics.sh --create --zookeeper localhost:2181
> --replication-factor 1 --partitions 1 --topic test
>
>
>
>
>
> I am publishing messages in kafka and trying to read them using
> spark-streaming java code and displaying them on screen.
>
> The daemons are all up: Spark-master,worker; zookeeper; kafka.
>
> I am writing a java code for doing it, using KafkaUtils.createStream
>
> code is below:
>
>
>
> *package* *com.spark*;
>
>
>
> *import* scala.Tuple2;
>
> *import* *kafka*.serializer.Decoder;
>
> *import* *kafka*.serializer.Encoder;
>
> *import* org.apache.spark.streaming.Duration;
>
> *import* org.apache.spark.*;
>
> *import* org.apache.spark.api.java.function.*;
>
> *import* org.apache.spark.api.java.*;
>
> *import* *org.apache.spark.streaming.kafka*.KafkaUtils;
>
> *import* *org.apache.spark.streaming.kafka*.*;
>
> *import* org.apache.spark.streaming.api.java.JavaStreamingContext;
>
> *import* org.apache.spark.streaming.api.java.JavaPairDStream;
>
> *import* org.apache.spark.streaming.api.java.JavaDStream;
>
> *import* org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
>
> *import* java.util.Map;
>
> *import* java.util.HashMap;
>
>
>
> *public* *class* *SparkStream* {
>
>*public* *static* *void* main(String args[])
>
>{
>
>   *if*(args.length != 3)
>
>   {
>
>  System.*out*.println("Usage: spark-submit –class
> com.spark.SparkStream target/SparkStream-with-dependencies.jar
>   ");
>
>  System.*exit*(1);
>
>   }
>
>
>
>
>
>   Map topicMap = *new*
> HashMap();
>
>   String[] topic = args[2].split(",");
>
>   *for*(String t: topic)
>
>   {
>
>  topicMap.put(t, *new* Integer(1));
>
>   }
>
>
>
>   JavaStreamingContext jssc = *new* JavaStreamingContext(
> "spark://192.168.88.130:7077", "SparkStream", *new* Duration(3000));
>
>   JavaPairReceiverInputDStream messages =
> *KafkaUtils*.createStream(jssc, args[0], args[1], topicMap );
>
>
>
>   System.*out*.println("Connection done");
>
>   JavaDStream data = messages.map(*new* 
> *Function String>, String>()*
>
> {
>
>*public* String
> call(Tuple2 message)
>
>{
>
>   System.*out*
> .println("NewMessage: "+message._2()); //for debugging
>
>   *return*
> message._2();
>
>}
>
> });
>
>
>
> data.print();
>
>
>
>   jssc.start();
>
>   jssc.awaitTermination();
>
>
>
>}
>
> }
>
>
>
>
>
> I am running the job, and at other terminal I am running kafka-producer to
> publish messages:
>
> #bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>
> >Hi kafka
>
> >second message
>
> >another message
>
>
>
> But the output logs at the spark-streaming console doesn't show the
> messages, but shows zero blocks received:
>
>
>
>
>
> ---
>
> Time: 1417107363000 ms
>
> ---
>
>
>
> 14/11/27 11:56:03 INFO scheduler.JobScheduler: Starting job streaming
> job 1417107363000 ms.0 from job set of time 1417107363000 ms
>
> 14/11/27 11:56:03 INFO scheduler.JobScheduler: Finished job streaming
> job 1417107363000 ms.0 from job set of time 1417107363000 ms
>
> 14/11/27 11:56:03 INFO scheduler.JobScheduler: Total delay: 0.008 s
> for time 1417107363000 ms (execution: 0.000 s)
>
> 14/11/27 11:56:03 INFO scheduler.JobScheduler: Added jobs for time
> 1417107363000 ms
>
> 14/11/27 11:56:03 INFO rdd.BlockRDD: Removing RDD 13 from persistence
> list
>
> 14/11/27 11:56:03 INFO storage.BlockManager: Removing RDD 13
>
> 14/11/27 11:56:03 INFO kafka.KafkaInputDStream: Removing blocks of RDD
> BlockRDD[13] at BlockRDD at ReceiverInputDStream.scala:69 of time
> 1417107363000 ms
>
> 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has
> not accepted any resources; check your cluster UI to ensure that workers
> are regi

Kafka+Spark-streaming issue: Stream 0 received 0 blocks

2014-12-01 Thread m.sarosh
Hi,

I am integrating Kafka and Spark, using spark-streaming. I have created a topic 
as a kafka producer:

bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test


I am publishing messages in kafka and trying to read them using spark-streaming 
java code and displaying them on screen.
The daemons are all up: Spark-master,worker; zookeeper; kafka.
I am writing a java code for doing it, using KafkaUtils.createStream
code is below:

package com.spark;

import scala.Tuple2;
import kafka.serializer.Decoder;
import kafka.serializer.Encoder;
import org.apache.spark.streaming.Duration;
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.*;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import java.util.Map;
import java.util.HashMap;

public class SparkStream {
   public static void main(String args[])
   {
  if(args.length != 3)
  {
 System.out.println("Usage: spark-submit -class 
com.spark.SparkStream target/SparkStream-with-dependencies.jar  
 ");
 System.exit(1);
  }


  Map topicMap = new HashMap();
  String[] topic = args[2].split(",");
  for(String t: topic)
  {
 topicMap.put(t, new Integer(1));
  }

  JavaStreamingContext jssc = new 
JavaStreamingContext("spark://192.168.88.130:7077", "SparkStream", new 
Duration(3000));
  JavaPairReceiverInputDStream messages = 
KafkaUtils.createStream(jssc, args[0], args[1], topicMap );

  System.out.println("Connection done");
  JavaDStream data = messages.map(new 
Function, String>()
{
   public String 
call(Tuple2 message)
   {
  
System.out.println("NewMessage: "+message._2()); //for debugging
  return 
message._2();
   }
});

data.print();

  jssc.start();
  jssc.awaitTermination();

   }
}


I am running the job, and at other terminal I am running kafka-producer to 
publish messages:
#bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>Hi kafka
>second message
>another message

But the output logs at the spark-streaming console doesn't show the messages, 
but shows zero blocks received:


---
Time: 1417107363000 ms
---

14/11/27 11:56:03 INFO scheduler.JobScheduler: Starting job streaming job 
1417107363000 ms.0 from job set of time 1417107363000 ms
14/11/27 11:56:03 INFO scheduler.JobScheduler: Finished job streaming job 
1417107363000 ms.0 from job set of time 1417107363000 ms
14/11/27 11:56:03 INFO scheduler.JobScheduler: Total delay: 0.008 s for 
time 1417107363000 ms (execution: 0.000 s)
14/11/27 11:56:03 INFO scheduler.JobScheduler: Added jobs for time 
1417107363000 ms
14/11/27 11:56:03 INFO rdd.BlockRDD: Removing RDD 13 from persistence list
14/11/27 11:56:03 INFO storage.BlockManager: Removing RDD 13
14/11/27 11:56:03 INFO kafka.KafkaInputDStream: Removing blocks of RDD 
BlockRDD[13] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417107363000 
ms
14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not 
accepted any resources; check your cluster UI to ensure that workers are 
registered and have sufficient memory
14/11/27 11:56:06 INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks


Why isn't the data block getting received? i have tried using kafka 
producer-consumer on console bin/kafka-console-producer  and 
bin/kafka-console-consumer...  its working perfect, but why not the code above? 
Please help me.


Regards,
Aiman Sarosh




This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Where allowed by local law, electronic 
communications with Accenture and its affiliates, including e-mail and instant 
messaging (including content), may be scanned by our systems for the purposes 
of information security and

Re: Spark Streaming Issue not running 24/7

2014-10-31 Thread Akhil Das
It says 478548 on host 172.18.152.36: java.lang.ArrayIndexOutOfBoundsException
Can you try putting a try { }catch around all those operations that you are
doing on the DStream? In that way it will not stop the entire application
due to corrupt data/field etc.

Thanks
Best Regards

On Fri, Oct 31, 2014 at 10:09 AM, sivarani 
wrote:

> The problem is simple
>
> I want a to stream data 24/7 do some calculations and save the result in a
> csv/json file so that i could use it for visualization using dc.js/d3.js
>
> I opted for spark streaming on yarn cluster with kafka tried running it for
> 24/7
>
> Using GroupByKey and updateStateByKey to have the computed historical data
>
> Initially streaming is working fine.. but after few hours i am getting
>
> 14/10/30 23:48:49 ERROR TaskSetManager: Task 2485162.0:3 failed 4 times;
> aborting job
> 14/10/30 23:48:50 ERROR JobScheduler: Error running job streaming job
> 141469227 ms.1
> org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 2485162.0:3 failed 4 times, most recent failure: Exception failure in TID
> 478548 on host 172.18.152.36: java.lang.ArrayIndexOutOfBoundsException
>
> Driver stacktrace:
> at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
> I guess its due to the GroupByKey and updateStateByKey, i tried
> GroupByKey(100) increased partition
>
> Also when data is in state say for eg 10th sec 1000 records are in state,
> 100th sec 20,000 records are in state out of which 19,000 records are not
> updated how to remove them from state.. UpdateStateByKey(none) how and when
> to do that, how we will know when to send none, and save the data before
> setting none?
>
> I also tried not sending any data a few hours but check the web ui i am
> getting task FINISHED
>
> app-20141030203943- NewApp  0   6.0 GB  2014/10/30 20:39:43
>  hadoop  FINISHED
> 4.2 h
>
> This makes me confused.. In the code it says awaitTermination, but did not
> terminate the task.. will streaming stop if no data is received for a
> significant amount of time? Is there any doc available on how much time
> spark will run when no data is streamed? Any Doc available
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Issue-not-running-24-7-tp17791.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Spark Streaming Issue not running 24/7

2014-10-30 Thread sivarani
The problem is simple

I want a to stream data 24/7 do some calculations and save the result in a
csv/json file so that i could use it for visualization using dc.js/d3.js

I opted for spark streaming on yarn cluster with kafka tried running it for
24/7

Using GroupByKey and updateStateByKey to have the computed historical data

Initially streaming is working fine.. but after few hours i am getting

14/10/30 23:48:49 ERROR TaskSetManager: Task 2485162.0:3 failed 4 times;
aborting job
14/10/30 23:48:50 ERROR JobScheduler: Error running job streaming job
141469227 ms.1
org.apache.spark.SparkException: Job aborted due to stage failure: Task
2485162.0:3 failed 4 times, most recent failure: Exception failure in TID
478548 on host 172.18.152.36: java.lang.ArrayIndexOutOfBoundsException

Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
I guess its due to the GroupByKey and updateStateByKey, i tried
GroupByKey(100) increased partition

Also when data is in state say for eg 10th sec 1000 records are in state,
100th sec 20,000 records are in state out of which 19,000 records are not
updated how to remove them from state.. UpdateStateByKey(none) how and when
to do that, how we will know when to send none, and save the data before
setting none?

I also tried not sending any data a few hours but check the web ui i am
getting task FINISHED

app-20141030203943- NewApp  0   6.0 GB  2014/10/30 20:39:43 hadoop  
FINISHED
4.2 h

This makes me confused.. In the code it says awaitTermination, but did not
terminate the task.. will streaming stop if no data is received for a
significant amount of time? Is there any doc available on how much time
spark will run when no data is streamed? Any Doc available



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Issue-not-running-24-7-tp17791.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark streaming issue

2014-05-27 Thread Sourav Chandra
HI,

I am facing a weird issue. I am using spark 0.9 and running a streaming
application.

In the UI, the duration shows order of seconds but if I dig into that
particular stage details, it shows total time taken across all tasks for
the stage is much much less (in milliseconds)

I am using Fair scheduling policy and pool name is counter-metric-persistor.

What could the reason for this?

*Stage screenshot: Stage 97*


 97 
counter-metric-persistor
foreach
at 
RealTimeAnalyticsApplication.scala:332014/05/27
07:22:2314.5 s
6/6

*Stage details screenshot: Stage 97*

Details for Stage 97

   - *Total task time across all tasks: *154 ms

Summary Metrics for 6 Completed Tasks
 MetricMin 25th percentileMedian 75th percentile Max Result serialization
time 0 ms 0 ms 0 ms 0 ms 0 ms Duration 12 ms 13 ms 23 ms 30 ms 54 ms Time
spent fetching task results 0 ms 0 ms 0 ms 0 ms 0 ms Scheduler delay 7 ms 7
ms 8 ms 8 ms 8 ms
Aggregated Metrics by Executor Executor ID Address Task TimeTotal
TasksFailed TasksSucceeded
Tasks Shuffle ReadShuffle Write Shuffle Spill (Memory) Shuffle Spill
(Disk)0ls230-127-p.nyc0.ls.local:53463199 ms6060.0 B0.0 B0.0 B0.0 B
Tasks
 Task IndexTask ID StatusLocality Level ExecutorLaunch Time DurationGC
TimeResult Ser 
TimeErrors0408SUCCESSPROCESS_LOCALls230-127-p.nyc0.ls.local2014/05/27
07:22:3730 ms
1 411 SUCCESS PROCESS_LOCAL ls230-127-p.nyc0.ls.local 2014/05/27 07:22:3722 ms
2 412 SUCCESS PROCESS_LOCAL ls230-127-p.nyc0.ls.local 2014/05/27 07:22:3723 ms
3 414 SUCCESS PROCESS_LOCAL ls230-127-p.nyc0.ls.local 2014/05/27 07:22:3713 ms
4 415 SUCCESS PROCESS_LOCAL ls230-127-p.nyc0.ls.local 2014/05/27 07:22:3712 ms
5 416 SUCCESS PROCESS_LOCAL ls230-127-p.nyc0.ls.local 2014/05/27 07:22:3754 ms


Thanks,
-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com