Re: ********Spark streaming issue to Elastic data**********

2024-05-06 Thread Mich Talebzadeh
Hi Kartrick,

Unfortunately Materialised views are not available in Spark as yet. I
raised Jira [SPARK-48117] Spark Materialized Views: Improve Query
Performance and Data Management - ASF JIRA (
 as a feature request.

Let me think about another way and revert


Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
United Kingdom

   view my Linkedin profile

*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".

On Mon, 6 May 2024 at 07:54, Karthick Nk  wrote:

> Thanks Mich,
> can you please confirm me is my understanding correct?
> First, we have to create the materialized view based on the mapping
> details we have by using multiple tables as source(since we have multiple
> join condition from different tables). From the materialised view we can
> stream the view data into elastic index by using cdc?
> Thanks in advance.
> On Fri, May 3, 2024 at 3:39 PM Mich Talebzadeh 
> wrote:
>> My recommendation! is using materialized views (MVs) created in Hive with
>> Spark Structured Streaming and Change Data Capture (CDC) is a good
>> combination for efficiently streaming view data updates in your scenario.
>> HTH
>> Mich Talebzadeh,
>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>> London
>> United Kingdom
>>view my Linkedin profile
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> Von Braun
>> )".
>> On Thu, 2 May 2024 at 21:25, Karthick Nk  wrote:
>>> Hi All,
>>> Requirements:
>>> I am working on the data flow, which will use the view definition(view
>>> definition already defined in schema), there are multiple tables used in
>>> the view definition. Here we want to stream the view data into elastic
>>> index based on if any of the table(used in the view definition) data got
>>> changed.
>>> Current flow:
>>> 1. we are inserting id's from the table(which used in the view
>>> definition) into the common table.
>>> 2. From the common table by using the id, we will be streaming the view
>>> data (by using if any of the incomming id is present in the collective id
>>> of all tables used from view definition) by using spark structured
>>> streaming.
>>> Issue:
>>> 1. Here we are facing issue - For each incomming id here we running view
>>> definition(so it will read all the data from all the data) and check if any
>>> of the incomming id is present in the collective id's of view result, Due
>>> to which it is taking more memory in the cluster driver and taking more
>>> time to process.
>>> I am epxpecting an alternate solution, if we can avoid full scan of view
>>> definition every time, If you have any alternate deisgn flow how we can
>>> achieve the result, please suggest for the same.
>>> Note: Also, it will be helpfull, if you can share the details like
>>> community forum or platform to discuss this kind of deisgn related topics,
>>> it will be more helpfull.

Re: ********Spark streaming issue to Elastic data**********

2024-05-05 Thread Karthick Nk
Thanks Mich,

can you please confirm me is my understanding correct?

First, we have to create the materialized view based on the mapping details
we have by using multiple tables as source(since we have multiple
join condition from different tables). From the materialised view we can
stream the view data into elastic index by using cdc?

Thanks in advance.

On Fri, May 3, 2024 at 3:39 PM Mich Talebzadeh 

> My recommendation! is using materialized views (MVs) created in Hive with
> Spark Structured Streaming and Change Data Capture (CDC) is a good
> combination for efficiently streaming view data updates in your scenario.
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> London
> United Kingdom
>view my Linkedin profile
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
> On Thu, 2 May 2024 at 21:25, Karthick Nk  wrote:
>> Hi All,
>> Requirements:
>> I am working on the data flow, which will use the view definition(view
>> definition already defined in schema), there are multiple tables used in
>> the view definition. Here we want to stream the view data into elastic
>> index based on if any of the table(used in the view definition) data got
>> changed.
>> Current flow:
>> 1. we are inserting id's from the table(which used in the view
>> definition) into the common table.
>> 2. From the common table by using the id, we will be streaming the view
>> data (by using if any of the incomming id is present in the collective id
>> of all tables used from view definition) by using spark structured
>> streaming.
>> Issue:
>> 1. Here we are facing issue - For each incomming id here we running view
>> definition(so it will read all the data from all the data) and check if any
>> of the incomming id is present in the collective id's of view result, Due
>> to which it is taking more memory in the cluster driver and taking more
>> time to process.
>> I am epxpecting an alternate solution, if we can avoid full scan of view
>> definition every time, If you have any alternate deisgn flow how we can
>> achieve the result, please suggest for the same.
>> Note: Also, it will be helpfull, if you can share the details like
>> community forum or platform to discuss this kind of deisgn related topics,
>> it will be more helpfull.

Re: ********Spark streaming issue to Elastic data**********

2024-05-03 Thread Mich Talebzadeh
My recommendation! is using materialized views (MVs) created in Hive with
Spark Structured Streaming and Change Data Capture (CDC) is a good
combination for efficiently streaming view data updates in your scenario.


Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
United Kingdom

   view my Linkedin profile

*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".

On Thu, 2 May 2024 at 21:25, Karthick Nk  wrote:

> Hi All,
> Requirements:
> I am working on the data flow, which will use the view definition(view
> definition already defined in schema), there are multiple tables used in
> the view definition. Here we want to stream the view data into elastic
> index based on if any of the table(used in the view definition) data got
> changed.
> Current flow:
> 1. we are inserting id's from the table(which used in the view definition)
> into the common table.
> 2. From the common table by using the id, we will be streaming the view
> data (by using if any of the incomming id is present in the collective id
> of all tables used from view definition) by using spark structured
> streaming.
> Issue:
> 1. Here we are facing issue - For each incomming id here we running view
> definition(so it will read all the data from all the data) and check if any
> of the incomming id is present in the collective id's of view result, Due
> to which it is taking more memory in the cluster driver and taking more
> time to process.
> I am epxpecting an alternate solution, if we can avoid full scan of view
> definition every time, If you have any alternate deisgn flow how we can
> achieve the result, please suggest for the same.
> Note: Also, it will be helpfull, if you can share the details like
> community forum or platform to discuss this kind of deisgn related topics,
> it will be more helpfull.

Re: spark streaming: issue with logging with separate log4j properties files for driver and executor

2016-05-24 Thread chandan prakash
Used passing  parameters in sparkConf instead of passing to spark-submit
command : (still dont know why passing to spark-submit command did not work)

 sparkConf.set("spark.executor.extraJavaOptions", "-XX:+UseG1GC ")

On Tue, May 24, 2016 at 10:24 PM, chandan prakash  wrote:

> Any suggestion?
> On Mon, May 23, 2016 at 5:18 PM, chandan prakash <
>> wrote:
>> Hi,
>> I am able to do logging for driver but not for executor.
>> I am running spark streaming under mesos.
>> Want to do log4j logging separately for driver and executor.
>> Used the below option in spark-submit command :
>> --driver-java-options 
>> "-Dlog4j.configuration=file:/usr/local/spark-1.5.1-bin-hadoop2.6/conf/"
>>  --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=file:
>> /usr/local/spark-1.5.1-bin-hadoop2.6/conf/
>> "
>> Logging for driver at path mentioned as in
>> is
>> happening fine.
>> But for executor, there is no logging happening (shud be at
>> /tmp/requestLogExecutor.log as mentioned in 
>> on executor machines)
>> *Any suggestions how to get logging enabled for executor ?*
>> TIA,
>> Chandan
>> --
>> Chandan Prakash
> --
> Chandan Prakash

Chandan Prakash

Re: spark streaming: issue with logging with separate log4j properties files for driver and executor

2016-05-24 Thread chandan prakash
Any suggestion?

On Mon, May 23, 2016 at 5:18 PM, chandan prakash 

> Hi,
> I am able to do logging for driver but not for executor.
> I am running spark streaming under mesos.
> Want to do log4j logging separately for driver and executor.
> Used the below option in spark-submit command :
> --driver-java-options 
> "-Dlog4j.configuration=file:/usr/local/spark-1.5.1-bin-hadoop2.6/conf/"
>  --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=file:
> /usr/local/spark-1.5.1-bin-hadoop2.6/conf/
> "
> Logging for driver at path mentioned as in
> is happening
> fine.
> But for executor, there is no logging happening (shud be at
> /tmp/requestLogExecutor.log as mentioned in 
> on executor machines)
> *Any suggestions how to get logging enabled for executor ?*
> TIA,
> Chandan
> --
> Chandan Prakash

Chandan Prakash

Re: Spark streaming issue

2016-04-01 Thread Mich Talebzadeh
Ok I managed to make this work.

All I am interested is receiving messages from topic every minute. No
filtering yet jut full text

import _root_.kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka.KafkaUtils
val sparkConf = new SparkConf().
 set("spark.driver.allowMultipleContexts", "true").
 set("spark.hadoop.validateOutputSpecs", "false")
val ssc = new StreamingContext(sparkConf, Seconds(60))
val kafkaParams = Map[String, String]("bootstrap.servers" ->
"rhes564:9092", "schema.registry.url" -> "http://rhes564:8081";,
"zookeeper.connect" -> "rhes564:2181", "" -> "StreamTest" )
val topic = Set("newtopic")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topic)

Time: 145955454 ms
(null,Sat Apr 2 00:33:01 BST 2016  === Sending messages from rhes5)

Dr Mich Talebzadeh

LinkedIn *

On 1 April 2016 at 23:26, Mich Talebzadeh  wrote:

> I adopted this approach
> scala> val conf = new SparkConf().
>  |  setAppName("StreamTest").
>  |  setMaster("local[12]").
>  |  set("spark.driver.allowMultipleContexts", "true").
>  |  set("spark.hadoop.validateOutputSpecs", "false")
> conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@321d96f7
> scala> val ssc = new StreamingContext(conf, Seconds(60))
> ssc: org.apache.spark.streaming.StreamingContext =
> org.apache.spark.streaming.StreamingContext@5dbae9eb
> scala> val kafkaParams = Map("" -> "rhes564:9092")
> kafkaParams: scala.collection.immutable.Map[String,String] =
> Map( -> rhes564:9092)
> scala> val topics = Set("newtopic")
> topics: scala.collection.immutable.Set[String] = Set(newtopic)
> scala> val stream = KafkaUtils.createDirectStream(ssc, kafkaParams, topics)
> stream: org.apache.spark.streaming.dstream.InputDStream[(Nothing,
> Nothing)] =
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@6d2d3b21
> So that opens data stream. What next?
> Thanks
> Dr Mich Talebzadeh
> LinkedIn * 
> *
> On 1 April 2016 at 22:37, Mich Talebzadeh 
> wrote:
>> yes I noticed that
>> scala> val kafkaStream = KafkaUtils.createStream(ssc, "rhes564:2181",
>> "rhes564:9092", "newtopic", 1)
>> :52: error: overloaded method value createStream with
>> alternatives:
>>   (jssc:
>> String,groupId: String,topics: java.util.Map[String,Integer],storageLevel:
>>   (ssc: org.apache.spark.streaming.StreamingContext,zkQuorum:
>> String,groupId: String,topics:
>> scala.collection.immutable.Map[String,Int],storageLevel:
>> String)]
>>  cannot be applied to (org.apache.spark.streaming.StreamingContext,
>> String, String, String, Int)
>>  val kafkaStream = KafkaUtils.createStream(ssc, "rhes564:2181",
>> "rhes564:9092", "newtopic", 1)
>> Dr Mich Talebzadeh
>> LinkedIn * 
>> *
>> On 1 April 2016 at 22:25, Cody Koeninger  wrote:
>>> You're not passing valid Scala values.  rhes564:2181  without quotes
>>> isn't a valid literal, newtopic isn't a list of strings, etc.
>>> On Fri, Apr 1, 2016 at 4:04 PM, Mich Talebzadeh
>>>  wrote:
>>> > Thanks Cody.
>>> >

Re: Spark streaming issue

2016-04-01 Thread Mich Talebzadeh
I adopted this approach

scala> val conf = new SparkConf().
 |  setAppName("StreamTest").
 |  setMaster("local[12]").
 |  set("spark.driver.allowMultipleContexts", "true").
 |  set("spark.hadoop.validateOutputSpecs", "false")
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@321d96f7
scala> val ssc = new StreamingContext(conf, Seconds(60))
ssc: org.apache.spark.streaming.StreamingContext =
scala> val kafkaParams = Map("" -> "rhes564:9092")
kafkaParams: scala.collection.immutable.Map[String,String] =
Map( -> rhes564:9092)
scala> val topics = Set("newtopic")
topics: scala.collection.immutable.Set[String] = Set(newtopic)
scala> val stream = KafkaUtils.createDirectStream(ssc, kafkaParams, topics)
stream: org.apache.spark.streaming.dstream.InputDStream[(Nothing, Nothing)]
= org.apache.spark.streaming.kafka.DirectKafkaInputDStream@6d2d3b21

So that opens data stream. What next?


Dr Mich Talebzadeh

LinkedIn *

On 1 April 2016 at 22:37, Mich Talebzadeh  wrote:

> yes I noticed that
> scala> val kafkaStream = KafkaUtils.createStream(ssc, "rhes564:2181",
> "rhes564:9092", "newtopic", 1)
> :52: error: overloaded method value createStream with
> alternatives:
>   (jssc:
> String,groupId: String,topics: java.util.Map[String,Integer],storageLevel:
>   (ssc: org.apache.spark.streaming.StreamingContext,zkQuorum:
> String,groupId: String,topics:
> scala.collection.immutable.Map[String,Int],storageLevel:
> String)]
>  cannot be applied to (org.apache.spark.streaming.StreamingContext,
> String, String, String, Int)
>  val kafkaStream = KafkaUtils.createStream(ssc, "rhes564:2181",
> "rhes564:9092", "newtopic", 1)
> Dr Mich Talebzadeh
> LinkedIn * 
> *
> On 1 April 2016 at 22:25, Cody Koeninger  wrote:
>> You're not passing valid Scala values.  rhes564:2181  without quotes
>> isn't a valid literal, newtopic isn't a list of strings, etc.
>> On Fri, Apr 1, 2016 at 4:04 PM, Mich Talebzadeh
>>  wrote:
>> > Thanks Cody.
>> >
>> > Can I use Receiver-based Approach here?
>> >
>> > I have created the topic newtopic as below
>> >
>> > ${KAFKA_HOME}/bin/ --create --zookeeper rhes564:2181
>> > --replication-factor 1 --partitions 1 --topic newtopic
>> >
>> >
>> > This is basically what I am doing the Spark
>> >
>> > val lines = ssc.socketTextStream("rhes564", 2181)
>> >
>> > Which obviously not working
>> >
>> > This is what is suggested in the doc
>> >
>> > import org.apache.spark.streaming.kafka._
>> >
>> > val kafkaStream = KafkaUtils.createStream(streamingContext,
>> >  [ZK quorum], [consumer group id], [per-topic number of Kafka
>> partitions
>> > to consume])
>> >
>> > *is a list of one or more zookeeper servers that make
>> quorum
>> > *is the name of kafka consumer group
>> > *is a list of one or more kafka topics to consume from
>> > *is the number of threads the kafka consumer should use
>> >
>> > Now this comes back with error. onviously not passing parameters
>> correctly!
>> >
>> > scala> val kafkaStream = KafkaUtils.createStream(streamingContext,
>> > rhes564:2181, rhes564:9092, newtopic 1)
>> > :1: error: identifier expected but integer literal found.
>> >val kafkaStream = KafkaUtils.createStream(streamingContext,
>> > rhes564:2181, rhes564:9092, newtopic 1)
>> >
>> >
>> >
>> >
>> >
>> >
>> > Dr Mich Talebzadeh
>> >
>> >
>> >
>> > LinkedIn
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > On 1 April 2016 at 21:13, Cody Koeninger  wrote:
>> >>
>> >> It looks like you're using a plain socket stream to connect to a
>> >> zookeeper port, which won't work.
>> >>
>> >>   Look at
>> >>
>> >> On Fri, Apr 1, 2016 at 3:03 PM, Mich Talebzadeh
>> >>  wrote:
>> >> >
>> >> > Hi,
>> >> >
>> >> > I am just testing Spark streaming with Kafka.
>> >> >
>> >> > Basically I am broadcasting topic every minute to Host:port ->
>> >> > rhes564:2181.
>> >> > This is sending few lines through a shell script as follow

Re: Spark streaming issue

2016-04-01 Thread Mich Talebzadeh
yes I noticed that

scala> val kafkaStream = KafkaUtils.createStream(ssc, "rhes564:2181",
"rhes564:9092", "newtopic", 1)

:52: error: overloaded method value createStream with alternatives:
String,groupId: String,topics: java.util.Map[String,Integer],storageLevel:[String,String]

  (ssc: org.apache.spark.streaming.StreamingContext,zkQuorum:
String,groupId: String,topics:
 cannot be applied to (org.apache.spark.streaming.StreamingContext, String,
String, String, Int)
 val kafkaStream = KafkaUtils.createStream(ssc, "rhes564:2181",
"rhes564:9092", "newtopic", 1)

Dr Mich Talebzadeh

LinkedIn *

On 1 April 2016 at 22:25, Cody Koeninger  wrote:

> You're not passing valid Scala values.  rhes564:2181  without quotes
> isn't a valid literal, newtopic isn't a list of strings, etc.
> On Fri, Apr 1, 2016 at 4:04 PM, Mich Talebzadeh
>  wrote:
> > Thanks Cody.
> >
> > Can I use Receiver-based Approach here?
> >
> > I have created the topic newtopic as below
> >
> > ${KAFKA_HOME}/bin/ --create --zookeeper rhes564:2181
> > --replication-factor 1 --partitions 1 --topic newtopic
> >
> >
> > This is basically what I am doing the Spark
> >
> > val lines = ssc.socketTextStream("rhes564", 2181)
> >
> > Which obviously not working
> >
> > This is what is suggested in the doc
> >
> > import org.apache.spark.streaming.kafka._
> >
> > val kafkaStream = KafkaUtils.createStream(streamingContext,
> >  [ZK quorum], [consumer group id], [per-topic number of Kafka
> partitions
> > to consume])
> >
> > *is a list of one or more zookeeper servers that make
> quorum
> > *is the name of kafka consumer group
> > *is a list of one or more kafka topics to consume from
> > *is the number of threads the kafka consumer should use
> >
> > Now this comes back with error. onviously not passing parameters
> correctly!
> >
> > scala> val kafkaStream = KafkaUtils.createStream(streamingContext,
> > rhes564:2181, rhes564:9092, newtopic 1)
> > :1: error: identifier expected but integer literal found.
> >val kafkaStream = KafkaUtils.createStream(streamingContext,
> > rhes564:2181, rhes564:9092, newtopic 1)
> >
> >
> >
> >
> >
> >
> > Dr Mich Talebzadeh
> >
> >
> >
> > LinkedIn
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > On 1 April 2016 at 21:13, Cody Koeninger  wrote:
> >>
> >> It looks like you're using a plain socket stream to connect to a
> >> zookeeper port, which won't work.
> >>
> >>   Look at
> >>
> >> On Fri, Apr 1, 2016 at 3:03 PM, Mich Talebzadeh
> >>  wrote:
> >> >
> >> > Hi,
> >> >
> >> > I am just testing Spark streaming with Kafka.
> >> >
> >> > Basically I am broadcasting topic every minute to Host:port ->
> >> > rhes564:2181.
> >> > This is sending few lines through a shell script as follows:
> >> >
> >> > cat ${IN_FILE} | ${KAFKA_HOME}/bin/
> >> > --broker-list
> >> > rhes564:9092 --topic newtopic
> >> >
> >> > That works fine and I can see the messages in
> >> >
> >> > ${KAFKA_HOME}/bin/ --zookeeper rhes564:2181
> >> > --topic
> >> > newtopic
> >> >
> >> > Fri Apr 1 21:00:01 BST 2016  === Sending messages from rhes5
> >> >
> >> >
> 1,'OZ9062Cx22qAHo8m_fsZb16Etlq5eTnL4jYPKmgPQPyQB7Kk5IMt2xQN3yy1Qb1O3Qph16TGlHzixw02mRLAiagU0Wh17fHi5dOQ',101
> >> >
> >> >
> 2,'Py_xzno6MEWPz1bp5Cc0JBPfX90mz2uVMLPBJUWucvNPlPnVMMm81PExZ5uM0K9iEdKmleY7XFsn8O3Oxr6e07qdycvuk_lR84vI',102
> >> >
> >> >
> 3,'i2FS2ODjRBdaIpyE362JVPu4KEYSHDNTjPh46YFANquxNRK9JQT8h1W4Tph9DqGfwIgQG5ZJ8BCBklRQreyJhoLIPMbJQeH_rhN1',103
> >> >
> >> >
> 4,'Yp_q_uyH16UPTRvPdeKaslw8bhheFqqdwWaG_e8TZZ6jyscyQN556jJMxYOZjx5Zv7GV6zoa2ORsTEGcAKbKUChPFfuGAujgDkjT',104
> >> >
> >> >
> 5,'t3uuFOkNEjDE_7rc9cLbgT1o0B_jZXWsWNtmBgiC4ACffzTHUGRkl5YIZSUXB3kew2yytvB8nbCklImDa0BWxYseSbMWiKg1R9ae',105
> >> >
> >> > Now I try to see the topic in spark streaming as follows:
> >> >
> >> > val conf = new SparkConf().
> >> >  setAppName("StreamTest").
> >> >  setMaster("local[12]").
> >> >  set("spark.driver.allowMultipleContexts", "true").
> >> >  set("spark.hadoop.validateOutputSpecs", "false")
> >> > val sc = new SparkContext(conf)
> >> > // Create sqlContext based on HiveContext
> >> > val sqlContext = new HiveContext(sc)
> >> > val HiveC

Re: Spark streaming issue

2016-04-01 Thread Cody Koeninger
You're not passing valid Scala values.  rhes564:2181  without quotes
isn't a valid literal, newtopic isn't a list of strings, etc.

On Fri, Apr 1, 2016 at 4:04 PM, Mich Talebzadeh
> Thanks Cody.
> Can I use Receiver-based Approach here?
> I have created the topic newtopic as below
> ${KAFKA_HOME}/bin/ --create --zookeeper rhes564:2181
> --replication-factor 1 --partitions 1 --topic newtopic
> This is basically what I am doing the Spark
> val lines = ssc.socketTextStream("rhes564", 2181)
> Which obviously not working
> This is what is suggested in the doc
> import org.apache.spark.streaming.kafka._
> val kafkaStream = KafkaUtils.createStream(streamingContext,
>  [ZK quorum], [consumer group id], [per-topic number of Kafka partitions
> to consume])
> *is a list of one or more zookeeper servers that make quorum
> *is the name of kafka consumer group
> *is a list of one or more kafka topics to consume from
> *is the number of threads the kafka consumer should use
> Now this comes back with error. onviously not passing parameters correctly!
> scala> val kafkaStream = KafkaUtils.createStream(streamingContext,
> rhes564:2181, rhes564:9092, newtopic 1)
> :1: error: identifier expected but integer literal found.
>val kafkaStream = KafkaUtils.createStream(streamingContext,
> rhes564:2181, rhes564:9092, newtopic 1)
> Dr Mich Talebzadeh
> LinkedIn
> On 1 April 2016 at 21:13, Cody Koeninger  wrote:
>> It looks like you're using a plain socket stream to connect to a
>> zookeeper port, which won't work.
>>   Look at
>> On Fri, Apr 1, 2016 at 3:03 PM, Mich Talebzadeh
>>  wrote:
>> >
>> > Hi,
>> >
>> > I am just testing Spark streaming with Kafka.
>> >
>> > Basically I am broadcasting topic every minute to Host:port ->
>> > rhes564:2181.
>> > This is sending few lines through a shell script as follows:
>> >
>> > cat ${IN_FILE} | ${KAFKA_HOME}/bin/
>> > --broker-list
>> > rhes564:9092 --topic newtopic
>> >
>> > That works fine and I can see the messages in
>> >
>> > ${KAFKA_HOME}/bin/ --zookeeper rhes564:2181
>> > --topic
>> > newtopic
>> >
>> > Fri Apr 1 21:00:01 BST 2016  === Sending messages from rhes5
>> >
>> > 1,'OZ9062Cx22qAHo8m_fsZb16Etlq5eTnL4jYPKmgPQPyQB7Kk5IMt2xQN3yy1Qb1O3Qph16TGlHzixw02mRLAiagU0Wh17fHi5dOQ',101
>> >
>> > 2,'Py_xzno6MEWPz1bp5Cc0JBPfX90mz2uVMLPBJUWucvNPlPnVMMm81PExZ5uM0K9iEdKmleY7XFsn8O3Oxr6e07qdycvuk_lR84vI',102
>> >
>> > 3,'i2FS2ODjRBdaIpyE362JVPu4KEYSHDNTjPh46YFANquxNRK9JQT8h1W4Tph9DqGfwIgQG5ZJ8BCBklRQreyJhoLIPMbJQeH_rhN1',103
>> >
>> > 4,'Yp_q_uyH16UPTRvPdeKaslw8bhheFqqdwWaG_e8TZZ6jyscyQN556jJMxYOZjx5Zv7GV6zoa2ORsTEGcAKbKUChPFfuGAujgDkjT',104
>> >
>> > 5,'t3uuFOkNEjDE_7rc9cLbgT1o0B_jZXWsWNtmBgiC4ACffzTHUGRkl5YIZSUXB3kew2yytvB8nbCklImDa0BWxYseSbMWiKg1R9ae',105
>> >
>> > Now I try to see the topic in spark streaming as follows:
>> >
>> > val conf = new SparkConf().
>> >  setAppName("StreamTest").
>> >  setMaster("local[12]").
>> >  set("spark.driver.allowMultipleContexts", "true").
>> >  set("spark.hadoop.validateOutputSpecs", "false")
>> > val sc = new SparkContext(conf)
>> > // Create sqlContext based on HiveContext
>> > val sqlContext = new HiveContext(sc)
>> > val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> > //
>> > // Create a local StreamingContext with two working thread and batch
>> > interval of 1 second.
>> > // The master requires 2 cores to prevent from a starvation scenario.
>> > val ssc = new StreamingContext(conf, Minutes(1))
>> > // Create a DStream that will connect to hostname:port, like
>> > localhost:
>> > //val lines = ssc.socketTextStream("rhes564", 9092)
>> > val lines = ssc.socketTextStream("rhes564", 2181)
>> > // Split each line into words
>> > val words = lines.flatMap(_.split(" "))
>> > val pairs = => (word, 1))
>> > val wordCounts = pairs.reduceByKey(_ + _)
>> > // Print the first ten elements of each RDD generated in this DStream to
>> > the
>> > console
>> > wordCounts.print()
>> > ssc.start()
>> >
>> > This is what I am getting:
>> >
>> >
>> > scala> ---
>> > Time: 145954176 ms
>> > ---
>> >
>> > But no values
>> >
>> > Have I got the port wrong in this case or the set up is incorrect?
>> >
>> >
>> > Thanks
>> >
>> > Dr Mich Talebzadeh
>> >
>> >
>> >
>> > LinkedIn
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >

To unsubscribe, e-mail:

Re: Spark streaming issue

2016-04-01 Thread Mich Talebzadeh
Thanks Cody.

Can I use Receiver-based Approach here?

I have created the topic newtopic as below

${KAFKA_HOME}/bin/ --create --zookeeper rhes564:2181
--replication-factor 1 --partitions 1 --topic newtopic

This is basically what I am doing the Spark

val lines = ssc.socketTextStream("rhes564", 2181)

Which obviously not working

This is what is suggested in the doc

import org.apache.spark.streaming.kafka._

val kafkaStream = KafkaUtils.createStream(streamingContext,
 [ZK quorum], [consumer group id], [per-topic number of Kafka
partitions to consume])

*is a list of one or more zookeeper servers that make quorum
*is the name of kafka consumer group
*is a list of one or more kafka topics to consume from
*is the number of threads the kafka consumer should use

Now this comes back with error. onviously not passing parameters correctly!

scala> val kafkaStream = KafkaUtils.createStream(streamingContext,
rhes564:2181, rhes564:9092, newtopic 1)
:1: error: identifier expected but integer literal found.
   val kafkaStream = KafkaUtils.createStream(streamingContext,
rhes564:2181, rhes564:9092, newtopic 1)

Dr Mich Talebzadeh

LinkedIn *

On 1 April 2016 at 21:13, Cody Koeninger  wrote:

> It looks like you're using a plain socket stream to connect to a
> zookeeper port, which won't work.
>   Look at
> On Fri, Apr 1, 2016 at 3:03 PM, Mich Talebzadeh
>  wrote:
> >
> > Hi,
> >
> > I am just testing Spark streaming with Kafka.
> >
> > Basically I am broadcasting topic every minute to Host:port ->
> rhes564:2181.
> > This is sending few lines through a shell script as follows:
> >
> > cat ${IN_FILE} | ${KAFKA_HOME}/bin/
> --broker-list
> > rhes564:9092 --topic newtopic
> >
> > That works fine and I can see the messages in
> >
> > ${KAFKA_HOME}/bin/ --zookeeper rhes564:2181
> --topic
> > newtopic
> >
> > Fri Apr 1 21:00:01 BST 2016  === Sending messages from rhes5
> >
> 1,'OZ9062Cx22qAHo8m_fsZb16Etlq5eTnL4jYPKmgPQPyQB7Kk5IMt2xQN3yy1Qb1O3Qph16TGlHzixw02mRLAiagU0Wh17fHi5dOQ',101
> >
> 2,'Py_xzno6MEWPz1bp5Cc0JBPfX90mz2uVMLPBJUWucvNPlPnVMMm81PExZ5uM0K9iEdKmleY7XFsn8O3Oxr6e07qdycvuk_lR84vI',102
> >
> 3,'i2FS2ODjRBdaIpyE362JVPu4KEYSHDNTjPh46YFANquxNRK9JQT8h1W4Tph9DqGfwIgQG5ZJ8BCBklRQreyJhoLIPMbJQeH_rhN1',103
> >
> 4,'Yp_q_uyH16UPTRvPdeKaslw8bhheFqqdwWaG_e8TZZ6jyscyQN556jJMxYOZjx5Zv7GV6zoa2ORsTEGcAKbKUChPFfuGAujgDkjT',104
> >
> 5,'t3uuFOkNEjDE_7rc9cLbgT1o0B_jZXWsWNtmBgiC4ACffzTHUGRkl5YIZSUXB3kew2yytvB8nbCklImDa0BWxYseSbMWiKg1R9ae',105
> >
> > Now I try to see the topic in spark streaming as follows:
> >
> > val conf = new SparkConf().
> >  setAppName("StreamTest").
> >  setMaster("local[12]").
> >  set("spark.driver.allowMultipleContexts", "true").
> >  set("spark.hadoop.validateOutputSpecs", "false")
> > val sc = new SparkContext(conf)
> > // Create sqlContext based on HiveContext
> > val sqlContext = new HiveContext(sc)
> > val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> > //
> > // Create a local StreamingContext with two working thread and batch
> > interval of 1 second.
> > // The master requires 2 cores to prevent from a starvation scenario.
> > val ssc = new StreamingContext(conf, Minutes(1))
> > // Create a DStream that will connect to hostname:port, like
> localhost:
> > //val lines = ssc.socketTextStream("rhes564", 9092)
> > val lines = ssc.socketTextStream("rhes564", 2181)
> > // Split each line into words
> > val words = lines.flatMap(_.split(" "))
> > val pairs = => (word, 1))
> > val wordCounts = pairs.reduceByKey(_ + _)
> > // Print the first ten elements of each RDD generated in this DStream to
> the
> > console
> > wordCounts.print()
> > ssc.start()
> >
> > This is what I am getting:
> >
> >
> > scala> ---
> > Time: 145954176 ms
> > ---
> >
> > But no values
> >
> > Have I got the port wrong in this case or the set up is incorrect?
> >
> >
> > Thanks
> >
> > Dr Mich Talebzadeh
> >
> >
> >
> > LinkedIn
> >
> >
> >
> >
> >
> >
> >

Re: Spark streaming issue

2016-04-01 Thread Cody Koeninger
It looks like you're using a plain socket stream to connect to a
zookeeper port, which won't work.

  Look at

On Fri, Apr 1, 2016 at 3:03 PM, Mich Talebzadeh
> Hi,
> I am just testing Spark streaming with Kafka.
> Basically I am broadcasting topic every minute to Host:port -> rhes564:2181.
> This is sending few lines through a shell script as follows:
> cat ${IN_FILE} | ${KAFKA_HOME}/bin/ --broker-list
> rhes564:9092 --topic newtopic
> That works fine and I can see the messages in
> ${KAFKA_HOME}/bin/ --zookeeper rhes564:2181 --topic
> newtopic
> Fri Apr 1 21:00:01 BST 2016  === Sending messages from rhes5
> 1,'OZ9062Cx22qAHo8m_fsZb16Etlq5eTnL4jYPKmgPQPyQB7Kk5IMt2xQN3yy1Qb1O3Qph16TGlHzixw02mRLAiagU0Wh17fHi5dOQ',101
> 2,'Py_xzno6MEWPz1bp5Cc0JBPfX90mz2uVMLPBJUWucvNPlPnVMMm81PExZ5uM0K9iEdKmleY7XFsn8O3Oxr6e07qdycvuk_lR84vI',102
> 3,'i2FS2ODjRBdaIpyE362JVPu4KEYSHDNTjPh46YFANquxNRK9JQT8h1W4Tph9DqGfwIgQG5ZJ8BCBklRQreyJhoLIPMbJQeH_rhN1',103
> 4,'Yp_q_uyH16UPTRvPdeKaslw8bhheFqqdwWaG_e8TZZ6jyscyQN556jJMxYOZjx5Zv7GV6zoa2ORsTEGcAKbKUChPFfuGAujgDkjT',104
> 5,'t3uuFOkNEjDE_7rc9cLbgT1o0B_jZXWsWNtmBgiC4ACffzTHUGRkl5YIZSUXB3kew2yytvB8nbCklImDa0BWxYseSbMWiKg1R9ae',105
> Now I try to see the topic in spark streaming as follows:
> val conf = new SparkConf().
>  setAppName("StreamTest").
>  setMaster("local[12]").
>  set("spark.driver.allowMultipleContexts", "true").
>  set("spark.hadoop.validateOutputSpecs", "false")
> val sc = new SparkContext(conf)
> // Create sqlContext based on HiveContext
> val sqlContext = new HiveContext(sc)
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> //
> // Create a local StreamingContext with two working thread and batch
> interval of 1 second.
> // The master requires 2 cores to prevent from a starvation scenario.
> val ssc = new StreamingContext(conf, Minutes(1))
> // Create a DStream that will connect to hostname:port, like localhost:
> //val lines = ssc.socketTextStream("rhes564", 9092)
> val lines = ssc.socketTextStream("rhes564", 2181)
> // Split each line into words
> val words = lines.flatMap(_.split(" "))
> val pairs = => (word, 1))
> val wordCounts = pairs.reduceByKey(_ + _)
> // Print the first ten elements of each RDD generated in this DStream to the
> console
> wordCounts.print()
> ssc.start()
> This is what I am getting:
> scala> ---
> Time: 145954176 ms
> ---
> But no values
> Have I got the port wrong in this case or the set up is incorrect?
> Thanks
> Dr Mich Talebzadeh
> LinkedIn

To unsubscribe, e-mail:
For additional commands, e-mail:

Re: Spark Streaming Issue not running 24/7

2014-10-31 Thread Akhil Das
It says 478548 on host java.lang.ArrayIndexOutOfBoundsException
Can you try putting a try { }catch around all those operations that you are
doing on the DStream? In that way it will not stop the entire application
due to corrupt data/field etc.

Best Regards

On Fri, Oct 31, 2014 at 10:09 AM, sivarani 

> The problem is simple
> I want a to stream data 24/7 do some calculations and save the result in a
> csv/json file so that i could use it for visualization using dc.js/d3.js
> I opted for spark streaming on yarn cluster with kafka tried running it for
> 24/7
> Using GroupByKey and updateStateByKey to have the computed historical data
> Initially streaming is working fine.. but after few hours i am getting
> 14/10/30 23:48:49 ERROR TaskSetManager: Task 2485162.0:3 failed 4 times;
> aborting job
> 14/10/30 23:48:50 ERROR JobScheduler: Error running job streaming job
> 141469227 ms.1
> org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 2485162.0:3 failed 4 times, most recent failure: Exception failure in TID
> 478548 on host java.lang.ArrayIndexOutOfBoundsException
> Driver stacktrace:
> at
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
> I guess its due to the GroupByKey and updateStateByKey, i tried
> GroupByKey(100) increased partition
> Also when data is in state say for eg 10th sec 1000 records are in state,
> 100th sec 20,000 records are in state out of which 19,000 records are not
> updated how to remove them from state.. UpdateStateByKey(none) how and when
> to do that, how we will know when to send none, and save the data before
> setting none?
> I also tried not sending any data a few hours but check the web ui i am
> getting task FINISHED
> app-20141030203943- NewApp  0   6.0 GB  2014/10/30 20:39:43
>  hadoop  FINISHED
> 4.2 h
> This makes me confused.. In the code it says awaitTermination, but did not
> terminate the task.. will streaming stop if no data is received for a
> significant amount of time? Is there any doc available on how much time
> spark will run when no data is streamed? Any Doc available
> --
> View this message in context:
> Sent from the Apache Spark User List mailing list archive at
> -
> To unsubscribe, e-mail:
> For additional commands, e-mail: