[Structured Streaming] How to replay data and overwrite using FileSink

2017-09-20 Thread Bandish Chheda
Hi,

We are using StructuredStreaming (Spark 2.2.0) for processing data from
Kafka. We read from a Kafka topic, do some conversions, computation and
then use FileSink to store data to partitioned path in HDFS. We have
enabled checkpoint (using a dir in HDFS).

For cases when there is a bad code push to streaming job, we want to replay
data from Kafka (I was able to do that using custom starting offset).
During replay, how do I make FileSink to overwrite the existing data.
>From code (
https://github.com/apache/spark/blob/branch-2.2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala#L99)
it looks like, it checks for latest batchId and skips the processing. Any
recommended way to avoid that? I am thinking of deleting files and
corresponding entries in _spark_metadata based on last modified time (and
time from which I want to replay).

Any other better solutions?

Thank you


How to get time slice or the batch time for which the current micro batch is running in Spark Streaming

2017-09-20 Thread SRK
Hi,

How to get the time slice or the batch time for which the current micro
batch is running in Spark Streaming? Currently I am using System time which
is causing the clearing keys feature of reduceByKeyAndWindow to not work
properly.

Thanks,
Swetha



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Is there a SparkILoop for Java?

2017-09-20 Thread kant kodali
Is there an API like SparkILoop

for
Python?

On Wed, Sep 20, 2017 at 7:17 AM, Jean Georges Perrin  wrote:

> It all depends what you want to do :) - we've implemented some dynamic
> interpretation of Java code through dynamic class loading, not as powerful
> and requires some work on your end, but you can build a Java-based
> interpreter.
>
> jg
>
>
> On Sep 20, 2017, at 08:06, kant kodali  wrote:
>
> Got it! Thats what I thought. Java 9 is going to release tomorrow
> http://www.java9countdown.xyz/
>
> Which has the repl called Jshell.
>
>
>
> On Wed, Sep 20, 2017 at 5:45 AM, Weichen Xu 
> wrote:
>
>> I haven't hear that. It seems that java do not have an official REPL.
>>
>> On Wed, Sep 20, 2017 at 8:38 PM, kant kodali  wrote:
>>
>>> Hi All,
>>>
>>> I am wondering if there is a SparkILoop
>>> 
>>>  for
>>> java so I can pass Java code as a string to repl?
>>>
>>> Thanks!
>>>
>>
>>
>


Re: Spark code to get select firelds from ES

2017-09-20 Thread Jean Georges Perrin
Same issue with RDBMS ingestion (I think). I solved it with views. Can you do 
views on ES?

jg


> On Sep 20, 2017, at 09:22, Kedarnath Dixit  
> wrote:
> 
> Hi,
> 
> I want to get only select fields from ES using Spark ES connector.
> 
> I have done some code which is fetching all the documents matching given 
> index as below:
> 
> JavaPairRDD> esRDD = JavaEsSpark.esRDD(jsc, 
> searchIndex);
> 
> However, is there a way to only get specific fields from documents for every 
> index in ES than getting everything ?
> 
> Example: Let's say, I have many fields in the documents as below and I have 
> @timestamp  which is also a field in the response { .., 
> @timestamp=Fri  Jul 07 01:36:00 IST 2017, ..}, Here how can I get 
> the only  field @timestamp for all my indexes ?
> 
> I could see something here but unable to correlate. can someone help me 
> please ?
> 
> 
> Many Thanks!
> ~KD
> 
> 
> 
> 
> 
> 
> 
> 
>  With Regards,
>   
>  ~Kedar Dixit
> 
> 
> 
> 
>  kedarnath_di...@persistent.com | @kedarsdixit | M +91 90499 15588 | T +91 
> (20) 6703 4783
> Persistent Systems | Partners In Innovation | www.persistent.com
> 
> 
> DISCLAIMER
> ==
> This e-mail may contain privileged and confidential information which is the 
> property of Persistent Systems Ltd. It is intended only for the use of the 
> individual or entity to which it is addressed. If you are not the intended 
> recipient, you are not authorized to read, retain, copy, print, distribute or 
> use this message. If you have received this communication in error, please 
> notify the sender and delete all copies of this message. Persistent Systems 
> Ltd. does not accept any liability for virus infected mails.
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: graphframes on cluster

2017-09-20 Thread Felix Cheung
Could you include the code where it fails?
Generally the best way to use gf is to use the --packages options with 
spark-submit command


From: Imran Rajjad 
Sent: Wednesday, September 20, 2017 5:47:27 AM
To: user @spark
Subject: graphframes on cluster

Trying to run graph frames on a spark cluster. Do I need to include the package 
in spark context settings? or the only the driver program is suppose to have 
the graphframe libraries in its class path? Currently the job is crashing when 
action function is invoked on graphframe classes.

regards,
Imran

--
I.R


Re: Pyspark define UDF for windows

2017-09-20 Thread Weichen Xu
UDF cannot be used as window function. You can use built-in window function
or UDAF.

On Wed, Sep 20, 2017 at 7:23 PM, Simon Dirmeier 
wrote:

> Dear all,
> I am trying to partition a DataFrame into windows and then for every
> column and window use a custom function (udf) using Spark's Python
> interface.
> Within that function I cast a column of a window in a m x n matrix to do a
> median-polish and afterwards return a list again.
>
> This doesn't work:
>
> w = Window().partitionBy(["col"]).rowsBetween(-sys.maxsize, sys.maxsize)
> def median_polish(rows, cols, values):
> // shape values as matrix defined by rows/cols
> // compute median polish
> // cast matrix back to vector
> return values
> med_pol_udf = func.udf(median_polish, DoubleType())
> for x in df.columns:
>if x.startswith("some string"):
>   df = df.withColumn(x, med_pol_udf("rows", "cols", x).over(w))
>
>
>
> The issue seems to be the windowing. Can you actually do that in Pyspark?
> Or would I need to change to Scala?
> Thanks for your help.
>
> Best,
> Simon
>


Re: Is there a SparkILoop for Java?

2017-09-20 Thread kant kodali
Got it! Thats what I thought. Java 9 is going to release tomorrow
http://www.java9countdown.xyz/

Which has the repl called Jshell.



On Wed, Sep 20, 2017 at 5:45 AM, Weichen Xu 
wrote:

> I haven't hear that. It seems that java do not have an official REPL.
>
> On Wed, Sep 20, 2017 at 8:38 PM, kant kodali  wrote:
>
>> Hi All,
>>
>> I am wondering if there is a SparkILoop
>> 
>>  for
>> java so I can pass Java code as a string to repl?
>>
>> Thanks!
>>
>
>


graphframes on cluster

2017-09-20 Thread Imran Rajjad
Trying to run graph frames on a spark cluster. Do I need to include the
package in spark context settings? or the only the driver program is
suppose to have the graphframe libraries in its class path? Currently the
job is crashing when action function is invoked on graphframe classes.

regards,
Imran

-- 
I.R


Re: Is there a SparkILoop for Java?

2017-09-20 Thread Weichen Xu
I haven't hear that. It seems that java do not have an official REPL.

On Wed, Sep 20, 2017 at 8:38 PM, kant kodali  wrote:

> Hi All,
>
> I am wondering if there is a SparkILoop
> 
>  for
> java so I can pass Java code as a string to repl?
>
> Thanks!
>


Is there a SparkILoop for Java?

2017-09-20 Thread kant kodali
Hi All,

I am wondering if there is a SparkILoop

for
java so I can pass Java code as a string to repl?

Thanks!


Re: for loops in pyspark

2017-09-20 Thread Weichen Xu
Spark manage memory allocation and release automatically. Can you post the
complete program which help checking where is wrong ?

On Wed, Sep 20, 2017 at 8:12 PM, Alexander Czech <
alexander.cz...@googlemail.com> wrote:

> Hello all,
>
> I'm running a pyspark script that makes use of for loop to create smaller
> chunks of my main dataset.
>
> some example code:
>
> for chunk in chunks:
> my_rdd = sc.parallelize(chunk).flatmap(somefunc)
> # do some stuff with my_rdd
>
> my_df = make_df(my_rdd)
> # do some stuff with my_df
> my_df.write.parquet('./some/path')
>
> After a couple of loops I always start to loose executors because out of
> memory errors. Is there a way free up memory after an loop? Do I have to do
> it in python or with spark?
>
> Thanks
>


for loops in pyspark

2017-09-20 Thread Alexander Czech
Hello all,

I'm running a pyspark script that makes use of for loop to create smaller
chunks of my main dataset.

some example code:

for chunk in chunks:
my_rdd = sc.parallelize(chunk).flatmap(somefunc)
# do some stuff with my_rdd

my_df = make_df(my_rdd)
# do some stuff with my_df
my_df.write.parquet('./some/path')

After a couple of loops I always start to loose executors because out of
memory errors. Is there a way free up memory after an loop? Do I have to do
it in python or with spark?

Thanks


Pyspark define UDF for windows

2017-09-20 Thread Simon Dirmeier

Dear all,

I am trying to partition a DataFrame into windows and then for every 
column and window use a custom function (udf) using Spark's Python 
interface.
Within that function I cast a column of a window in a m x n matrix to do 
a median-polish and afterwards return a list again.


This doesn't work:

|w 
=Window().partitionBy(["col"]).rowsBetween(-sys.maxsize,sys.maxsize)defmedian_polish(rows,cols,values)://shape 
values asmatrix defined by rows/cols //compute median polish //cast 
matrix back to vector returnvalues med_pol_udf 
=func.udf(median_polish,DoubleType())forx 
indf.columns:ifx.startswith("some string"):df 
=df.withColumn(x,med_pol_udf("rows","cols",x).over(w)) |


The issue seems to be the windowing. Can you actually do that in 
Pyspark? Or would I need to change to Scala?

Thanks for your help.

Best,
Simon
||


Spark Streaming + Kafka + Hive: delayed

2017-09-20 Thread toletum
Hello.

I have a process (python) that reads a kafka queue, for each record it checks 
in a table.

# Load table in memory
table=sqlContext.sql("select id from table")
table.cache()

kafkaTopic.foreachRDD(processForeach)

def processForeach (time, rdd):
 print(time)
 for k in rdd.collect ():
 if (table.filter("id =' %s'" % k["id"]).count()>0):
 print (k)

The problem is that little by little spark time is lagging behind, I can see it 
in the "print(time)" output. the kafka topic with a maximum of 3 messages per 
second.


Re: Structured streaming coding question

2017-09-20 Thread kant kodali
Just tried  with sparkSession.streams().awaitAnyTermination(); And thats
the only await* I had and it works! But what if I don't want all my queries
to fail or stop making progress if one of them fails?

On Wed, Sep 20, 2017 at 2:26 AM, kant kodali  wrote:

> Hi Burak,
>
> Are you saying get rid of both
>
> query1.awaitTermination();
> query2.awaitTermination();
>
> and just have the line below?
>
> sparkSession.streams().awaitAnyTermination();
>
> Thanks!
>
>
> On Wed, Sep 20, 2017 at 12:51 AM, kant kodali  wrote:
>
>> If I don't get anywhere after query1.awaitTermination();
>>
>> Then I cannot put this sparkSession.streams().awaitAnyTermination(); as
>> the last line of code right? Like below
>>
>> query1.awaitTermination();
>> sparkSession.streams().awaitAnyTermination();
>>
>>
>> On Wed, Sep 20, 2017 at 12:07 AM, Burak Yavuz  wrote:
>>
>>> Please remove
>>>
>>> query1.awaitTermination();
>>> query2.awaitTermination();
>>>
>>> once
>>>
>>> query1.awaitTermination();
>>>
>>> is called, you don't even get to query2.awaitTermination().
>>>
>>>
>>> On Tue, Sep 19, 2017 at 11:59 PM, kant kodali 
>>> wrote:
>>>
 Hi Burak,

 Thanks much! had no clue that existed. Now, I changed it to this.

 StreamingQuery query1 = 
 outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new 
 KafkaSink("hello1")).start();
 StreamingQuery query2 = 
 outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new 
 KafkaSink("hello2")).start();

 query1.awaitTermination();
 query2.awaitTermination();
 sparkSession.streams().awaitAnyTermination();





 On Tue, Sep 19, 2017 at 11:48 PM, Burak Yavuz  wrote:

> Hey Kant,
>
> That won't work either. Your second query may fail, and as long as
> your first query is running, you will not know. Put this as the last line
> instead:
>
> spark.streams.awaitAnyTermination()
>
> On Tue, Sep 19, 2017 at 10:11 PM, kant kodali 
> wrote:
>
>> Looks like my problem was the order of awaitTermination() for some
>> reason.
>>
>> *Doesn't work *
>>
>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>> KafkaSink("hello1")).start().awaitTermination()
>>
>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>> KafkaSink("hello2")).start().awaitTermination()
>>
>> *Works*
>>
>> StreamingQuery query1 = outputDS1.writeStream().trigge
>> r(Trigger.processingTime(1000)).foreach(new
>> KafkaSink("hello1")).start();
>>
>> query1.awaitTermination()
>>
>> StreamingQuery query2 =outputDS2.writeStream().trigg
>> er(Trigger.processingTime(1000)).foreach(new
>> KafkaSink("hello2")).start();
>>
>> query2.awaitTermination()
>>
>>
>>
>> On Tue, Sep 19, 2017 at 10:09 PM, kant kodali 
>> wrote:
>>
>>> Looks like my problem was the order of awaitTermination() for some
>>> reason.
>>>
>>> Doesn't work
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Sep 19, 2017 at 1:54 PM, kant kodali 
>>> wrote:
>>>
 Hi All,

 I have the following Psuedo code (I could paste the real code
 however it is pretty long and involves Database calls inside 
 dataset.map
 operation and so on) so I am just trying to simplify my question. would
 like to know if there is something wrong with the following pseudo 
 code?

 DataSet inputDS = readFromKaka(topicName)

 DataSet mongoDS = inputDS.map(insertIntoDatabase); //
 Works Since I can see data getting populated

 DataSet outputDS1 = mongoDS.map(readFromDatabase); // Works
 as well

 DataSet outputDS2 = mongoDS.map( readFromDatabase); //
 Doesn't work

 outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
 KafkaSink("hello1")).start().awaitTermination()

 outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
 KafkaSink("hello2")).start().awaitTermination()


 *So what's happening with above code is that I can see data coming
 out of hello1 topic but not from hello2 topic.* I thought there is
 something wrong with "outputDS2" so I switched the order  so now the 
 code
 looks like this

 DataSet inputDS = readFromKaka(topicName)

 DataSet mongoDS = inputDS.map(insertIntoDatabase); //
 Works Since I can see data getting populated

 DataSet outputDS2 = mongoDS.map( readFromDatabase); // This
 Works

 DataSet outputDS1 = 

Re: Structured streaming coding question

2017-09-20 Thread kant kodali
Hi Burak,

Are you saying get rid of both

query1.awaitTermination();
query2.awaitTermination();

and just have the line below?

sparkSession.streams().awaitAnyTermination();

Thanks!


On Wed, Sep 20, 2017 at 12:51 AM, kant kodali  wrote:

> If I don't get anywhere after query1.awaitTermination();
>
> Then I cannot put this sparkSession.streams().awaitAnyTermination(); as
> the last line of code right? Like below
>
> query1.awaitTermination();
> sparkSession.streams().awaitAnyTermination();
>
>
> On Wed, Sep 20, 2017 at 12:07 AM, Burak Yavuz  wrote:
>
>> Please remove
>>
>> query1.awaitTermination();
>> query2.awaitTermination();
>>
>> once
>>
>> query1.awaitTermination();
>>
>> is called, you don't even get to query2.awaitTermination().
>>
>>
>> On Tue, Sep 19, 2017 at 11:59 PM, kant kodali  wrote:
>>
>>> Hi Burak,
>>>
>>> Thanks much! had no clue that existed. Now, I changed it to this.
>>>
>>> StreamingQuery query1 = 
>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new 
>>> KafkaSink("hello1")).start();
>>> StreamingQuery query2 = 
>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new 
>>> KafkaSink("hello2")).start();
>>>
>>> query1.awaitTermination();
>>> query2.awaitTermination();
>>> sparkSession.streams().awaitAnyTermination();
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Sep 19, 2017 at 11:48 PM, Burak Yavuz  wrote:
>>>
 Hey Kant,

 That won't work either. Your second query may fail, and as long as your
 first query is running, you will not know. Put this as the last line
 instead:

 spark.streams.awaitAnyTermination()

 On Tue, Sep 19, 2017 at 10:11 PM, kant kodali 
 wrote:

> Looks like my problem was the order of awaitTermination() for some
> reason.
>
> *Doesn't work *
>
> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
> KafkaSink("hello1")).start().awaitTermination()
>
> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
> KafkaSink("hello2")).start().awaitTermination()
>
> *Works*
>
> StreamingQuery query1 = outputDS1.writeStream().trigge
> r(Trigger.processingTime(1000)).foreach(new
> KafkaSink("hello1")).start();
>
> query1.awaitTermination()
>
> StreamingQuery query2 =outputDS2.writeStream().trigg
> er(Trigger.processingTime(1000)).foreach(new
> KafkaSink("hello2")).start();
>
> query2.awaitTermination()
>
>
>
> On Tue, Sep 19, 2017 at 10:09 PM, kant kodali 
> wrote:
>
>> Looks like my problem was the order of awaitTermination() for some
>> reason.
>>
>> Doesn't work
>>
>>
>>
>>
>>
>> On Tue, Sep 19, 2017 at 1:54 PM, kant kodali 
>> wrote:
>>
>>> Hi All,
>>>
>>> I have the following Psuedo code (I could paste the real code
>>> however it is pretty long and involves Database calls inside dataset.map
>>> operation and so on) so I am just trying to simplify my question. would
>>> like to know if there is something wrong with the following pseudo code?
>>>
>>> DataSet inputDS = readFromKaka(topicName)
>>>
>>> DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works
>>> Since I can see data getting populated
>>>
>>> DataSet outputDS1 = mongoDS.map(readFromDatabase); // Works
>>> as well
>>>
>>> DataSet outputDS2 = mongoDS.map( readFromDatabase); //
>>> Doesn't work
>>>
>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>> KafkaSink("hello1")).start().awaitTermination()
>>>
>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>> KafkaSink("hello2")).start().awaitTermination()
>>>
>>>
>>> *So what's happening with above code is that I can see data coming
>>> out of hello1 topic but not from hello2 topic.* I thought there is
>>> something wrong with "outputDS2" so I switched the order  so now the 
>>> code
>>> looks like this
>>>
>>> DataSet inputDS = readFromKaka(topicName)
>>>
>>> DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works
>>> Since I can see data getting populated
>>>
>>> DataSet outputDS2 = mongoDS.map( readFromDatabase); // This
>>> Works
>>>
>>> DataSet outputDS1 = mongoDS.map(readFromDatabase); // Desn't
>>> work
>>>
>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>> KafkaSink("hello1")).start().awaitTermination()
>>>
>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>> KafkaSink("hello2")).start().awaitTermination()
>>>
>>> *Now I can see data coming out from hello2 kafka topic but not from
>>> hello1 topic*. *In  short, I can only 

Re: Structured streaming coding question

2017-09-20 Thread kant kodali
If I don't get anywhere after query1.awaitTermination();

Then I cannot put this sparkSession.streams().awaitAnyTermination(); as the
last line of code right? Like below

query1.awaitTermination();
sparkSession.streams().awaitAnyTermination();


On Wed, Sep 20, 2017 at 12:07 AM, Burak Yavuz  wrote:

> Please remove
>
> query1.awaitTermination();
> query2.awaitTermination();
>
> once
>
> query1.awaitTermination();
>
> is called, you don't even get to query2.awaitTermination().
>
>
> On Tue, Sep 19, 2017 at 11:59 PM, kant kodali  wrote:
>
>> Hi Burak,
>>
>> Thanks much! had no clue that existed. Now, I changed it to this.
>>
>> StreamingQuery query1 = 
>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new 
>> KafkaSink("hello1")).start();
>> StreamingQuery query2 = 
>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new 
>> KafkaSink("hello2")).start();
>>
>> query1.awaitTermination();
>> query2.awaitTermination();
>> sparkSession.streams().awaitAnyTermination();
>>
>>
>>
>>
>>
>> On Tue, Sep 19, 2017 at 11:48 PM, Burak Yavuz  wrote:
>>
>>> Hey Kant,
>>>
>>> That won't work either. Your second query may fail, and as long as your
>>> first query is running, you will not know. Put this as the last line
>>> instead:
>>>
>>> spark.streams.awaitAnyTermination()
>>>
>>> On Tue, Sep 19, 2017 at 10:11 PM, kant kodali 
>>> wrote:
>>>
 Looks like my problem was the order of awaitTermination() for some
 reason.

 *Doesn't work *

 outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
 KafkaSink("hello1")).start().awaitTermination()

 outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
 KafkaSink("hello2")).start().awaitTermination()

 *Works*

 StreamingQuery query1 = outputDS1.writeStream().trigge
 r(Trigger.processingTime(1000)).foreach(new
 KafkaSink("hello1")).start();

 query1.awaitTermination()

 StreamingQuery query2 =outputDS2.writeStream().trigg
 er(Trigger.processingTime(1000)).foreach(new
 KafkaSink("hello2")).start();

 query2.awaitTermination()



 On Tue, Sep 19, 2017 at 10:09 PM, kant kodali 
 wrote:

> Looks like my problem was the order of awaitTermination() for some
> reason.
>
> Doesn't work
>
>
>
>
>
> On Tue, Sep 19, 2017 at 1:54 PM, kant kodali 
> wrote:
>
>> Hi All,
>>
>> I have the following Psuedo code (I could paste the real code however
>> it is pretty long and involves Database calls inside dataset.map 
>> operation
>> and so on) so I am just trying to simplify my question. would like to 
>> know
>> if there is something wrong with the following pseudo code?
>>
>> DataSet inputDS = readFromKaka(topicName)
>>
>> DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works
>> Since I can see data getting populated
>>
>> DataSet outputDS1 = mongoDS.map(readFromDatabase); // Works
>> as well
>>
>> DataSet outputDS2 = mongoDS.map( readFromDatabase); //
>> Doesn't work
>>
>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>> KafkaSink("hello1")).start().awaitTermination()
>>
>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>> KafkaSink("hello2")).start().awaitTermination()
>>
>>
>> *So what's happening with above code is that I can see data coming
>> out of hello1 topic but not from hello2 topic.* I thought there is
>> something wrong with "outputDS2" so I switched the order  so now the code
>> looks like this
>>
>> DataSet inputDS = readFromKaka(topicName)
>>
>> DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works
>> Since I can see data getting populated
>>
>> DataSet outputDS2 = mongoDS.map( readFromDatabase); // This
>> Works
>>
>> DataSet outputDS1 = mongoDS.map(readFromDatabase); // Desn't
>> work
>>
>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>> KafkaSink("hello1")).start().awaitTermination()
>>
>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>> KafkaSink("hello2")).start().awaitTermination()
>>
>> *Now I can see data coming out from hello2 kafka topic but not from
>> hello1 topic*. *In  short, I can only see data from outputDS1 or
>> outputDS2 but not both. * At this point I am not sure what is going
>> on?
>>
>> Thanks!
>>
>>
>>
>

>>>
>>
>


Re: Structured streaming coding question

2017-09-20 Thread Burak Yavuz
Please remove

query1.awaitTermination();
query2.awaitTermination();

once

query1.awaitTermination();

is called, you don't even get to query2.awaitTermination().


On Tue, Sep 19, 2017 at 11:59 PM, kant kodali  wrote:

> Hi Burak,
>
> Thanks much! had no clue that existed. Now, I changed it to this.
>
> StreamingQuery query1 = 
> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new 
> KafkaSink("hello1")).start();
> StreamingQuery query2 = 
> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new 
> KafkaSink("hello2")).start();
>
> query1.awaitTermination();
> query2.awaitTermination();
> sparkSession.streams().awaitAnyTermination();
>
>
>
>
>
> On Tue, Sep 19, 2017 at 11:48 PM, Burak Yavuz  wrote:
>
>> Hey Kant,
>>
>> That won't work either. Your second query may fail, and as long as your
>> first query is running, you will not know. Put this as the last line
>> instead:
>>
>> spark.streams.awaitAnyTermination()
>>
>> On Tue, Sep 19, 2017 at 10:11 PM, kant kodali  wrote:
>>
>>> Looks like my problem was the order of awaitTermination() for some
>>> reason.
>>>
>>> *Doesn't work *
>>>
>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>> KafkaSink("hello1")).start().awaitTermination()
>>>
>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>> KafkaSink("hello2")).start().awaitTermination()
>>>
>>> *Works*
>>>
>>> StreamingQuery query1 = outputDS1.writeStream().trigge
>>> r(Trigger.processingTime(1000)).foreach(new
>>> KafkaSink("hello1")).start();
>>>
>>> query1.awaitTermination()
>>>
>>> StreamingQuery query2 =outputDS2.writeStream().trigg
>>> er(Trigger.processingTime(1000)).foreach(new
>>> KafkaSink("hello2")).start();
>>>
>>> query2.awaitTermination()
>>>
>>>
>>>
>>> On Tue, Sep 19, 2017 at 10:09 PM, kant kodali 
>>> wrote:
>>>
 Looks like my problem was the order of awaitTermination() for some
 reason.

 Doesn't work





 On Tue, Sep 19, 2017 at 1:54 PM, kant kodali 
 wrote:

> Hi All,
>
> I have the following Psuedo code (I could paste the real code however
> it is pretty long and involves Database calls inside dataset.map operation
> and so on) so I am just trying to simplify my question. would like to know
> if there is something wrong with the following pseudo code?
>
> DataSet inputDS = readFromKaka(topicName)
>
> DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works
> Since I can see data getting populated
>
> DataSet outputDS1 = mongoDS.map(readFromDatabase); // Works as
> well
>
> DataSet outputDS2 = mongoDS.map( readFromDatabase); // Doesn't
> work
>
> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
> KafkaSink("hello1")).start().awaitTermination()
>
> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
> KafkaSink("hello2")).start().awaitTermination()
>
>
> *So what's happening with above code is that I can see data coming out
> of hello1 topic but not from hello2 topic.* I thought there is
> something wrong with "outputDS2" so I switched the order  so now the code
> looks like this
>
> DataSet inputDS = readFromKaka(topicName)
>
> DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works
> Since I can see data getting populated
>
> DataSet outputDS2 = mongoDS.map( readFromDatabase); // This
> Works
>
> DataSet outputDS1 = mongoDS.map(readFromDatabase); // Desn't
> work
>
> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
> KafkaSink("hello1")).start().awaitTermination()
>
> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
> KafkaSink("hello2")).start().awaitTermination()
>
> *Now I can see data coming out from hello2 kafka topic but not from
> hello1 topic*. *In  short, I can only see data from outputDS1 or
> outputDS2 but not both. * At this point I am not sure what is going
> on?
>
> Thanks!
>
>
>

>>>
>>
>


Re: Structured streaming coding question

2017-09-20 Thread kant kodali
Hi Burak,

Thanks much! had no clue that existed. Now, I changed it to this.

StreamingQuery query1 =
outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
KafkaSink("hello1")).start();
StreamingQuery query2 =
outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
KafkaSink("hello2")).start();

query1.awaitTermination();
query2.awaitTermination();
sparkSession.streams().awaitAnyTermination();





On Tue, Sep 19, 2017 at 11:48 PM, Burak Yavuz  wrote:

> Hey Kant,
>
> That won't work either. Your second query may fail, and as long as your
> first query is running, you will not know. Put this as the last line
> instead:
>
> spark.streams.awaitAnyTermination()
>
> On Tue, Sep 19, 2017 at 10:11 PM, kant kodali  wrote:
>
>> Looks like my problem was the order of awaitTermination() for some reason.
>>
>> *Doesn't work *
>>
>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>> KafkaSink("hello1")).start().awaitTermination()
>>
>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>> KafkaSink("hello2")).start().awaitTermination()
>>
>> *Works*
>>
>> StreamingQuery query1 = outputDS1.writeStream().trigge
>> r(Trigger.processingTime(1000)).foreach(new
>> KafkaSink("hello1")).start();
>>
>> query1.awaitTermination()
>>
>> StreamingQuery query2 =outputDS2.writeStream().trigg
>> er(Trigger.processingTime(1000)).foreach(new
>> KafkaSink("hello2")).start();
>>
>> query2.awaitTermination()
>>
>>
>>
>> On Tue, Sep 19, 2017 at 10:09 PM, kant kodali  wrote:
>>
>>> Looks like my problem was the order of awaitTermination() for some
>>> reason.
>>>
>>> Doesn't work
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Sep 19, 2017 at 1:54 PM, kant kodali  wrote:
>>>
 Hi All,

 I have the following Psuedo code (I could paste the real code however
 it is pretty long and involves Database calls inside dataset.map operation
 and so on) so I am just trying to simplify my question. would like to know
 if there is something wrong with the following pseudo code?

 DataSet inputDS = readFromKaka(topicName)

 DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works
 Since I can see data getting populated

 DataSet outputDS1 = mongoDS.map(readFromDatabase); // Works as
 well

 DataSet outputDS2 = mongoDS.map( readFromDatabase); // Doesn't
 work

 outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
 KafkaSink("hello1")).start().awaitTermination()

 outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
 KafkaSink("hello2")).start().awaitTermination()


 *So what's happening with above code is that I can see data coming out
 of hello1 topic but not from hello2 topic.* I thought there is
 something wrong with "outputDS2" so I switched the order  so now the code
 looks like this

 DataSet inputDS = readFromKaka(topicName)

 DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works
 Since I can see data getting populated

 DataSet outputDS2 = mongoDS.map( readFromDatabase); // This
 Works

 DataSet outputDS1 = mongoDS.map(readFromDatabase); // Desn't
 work

 outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
 KafkaSink("hello1")).start().awaitTermination()

 outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
 KafkaSink("hello2")).start().awaitTermination()

 *Now I can see data coming out from hello2 kafka topic but not from
 hello1 topic*. *In  short, I can only see data from outputDS1 or
 outputDS2 but not both. * At this point I am not sure what is going on?

 Thanks!



>>>
>>
>


Re: Structured streaming coding question

2017-09-20 Thread Burak Yavuz
Hey Kant,

That won't work either. Your second query may fail, and as long as your
first query is running, you will not know. Put this as the last line
instead:

spark.streams.awaitAnyTermination()

On Tue, Sep 19, 2017 at 10:11 PM, kant kodali  wrote:

> Looks like my problem was the order of awaitTermination() for some reason.
>
> *Doesn't work *
>
> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
> KafkaSink("hello1")).start().awaitTermination()
>
> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
> KafkaSink("hello2")).start().awaitTermination()
>
> *Works*
>
> StreamingQuery query1 = outputDS1.writeStream().trigge
> r(Trigger.processingTime(1000)).foreach(new KafkaSink("hello1")).start();
>
> query1.awaitTermination()
>
> StreamingQuery query2 =outputDS2.writeStream().trigg
> er(Trigger.processingTime(1000)).foreach(new KafkaSink("hello2")).start();
>
> query2.awaitTermination()
>
>
>
> On Tue, Sep 19, 2017 at 10:09 PM, kant kodali  wrote:
>
>> Looks like my problem was the order of awaitTermination() for some reason.
>>
>> Doesn't work
>>
>>
>>
>>
>>
>> On Tue, Sep 19, 2017 at 1:54 PM, kant kodali  wrote:
>>
>>> Hi All,
>>>
>>> I have the following Psuedo code (I could paste the real code however it
>>> is pretty long and involves Database calls inside dataset.map operation and
>>> so on) so I am just trying to simplify my question. would like to know if
>>> there is something wrong with the following pseudo code?
>>>
>>> DataSet inputDS = readFromKaka(topicName)
>>>
>>> DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works
>>> Since I can see data getting populated
>>>
>>> DataSet outputDS1 = mongoDS.map(readFromDatabase); // Works as
>>> well
>>>
>>> DataSet outputDS2 = mongoDS.map( readFromDatabase); // Doesn't
>>> work
>>>
>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>> KafkaSink("hello1")).start().awaitTermination()
>>>
>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>> KafkaSink("hello2")).start().awaitTermination()
>>>
>>>
>>> *So what's happening with above code is that I can see data coming out
>>> of hello1 topic but not from hello2 topic.* I thought there is
>>> something wrong with "outputDS2" so I switched the order  so now the code
>>> looks like this
>>>
>>> DataSet inputDS = readFromKaka(topicName)
>>>
>>> DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works
>>> Since I can see data getting populated
>>>
>>> DataSet outputDS2 = mongoDS.map( readFromDatabase); // This Works
>>>
>>> DataSet outputDS1 = mongoDS.map(readFromDatabase); // Desn't work
>>>
>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>> KafkaSink("hello1")).start().awaitTermination()
>>>
>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>> KafkaSink("hello2")).start().awaitTermination()
>>>
>>> *Now I can see data coming out from hello2 kafka topic but not from
>>> hello1 topic*. *In  short, I can only see data from outputDS1 or
>>> outputDS2 but not both. * At this point I am not sure what is going on?
>>>
>>> Thanks!
>>>
>>>
>>>
>>
>