Re: Structured streaming coding question
Just tried with sparkSession.streams().awaitAnyTermination(); And thats the only await* I had and it works! But what if I don't want all my queries to fail or stop making progress if one of them fails? On Wed, Sep 20, 2017 at 2:26 AM, kant kodali wrote: > Hi Burak, > > Are you saying get rid of both > > query1.awaitTermination(); > query2.awaitTermination(); > > and just have the line below? > > sparkSession.streams().awaitAnyTermination(); > > Thanks! > > > On Wed, Sep 20, 2017 at 12:51 AM, kant kodali wrote: > >> If I don't get anywhere after query1.awaitTermination(); >> >> Then I cannot put this sparkSession.streams().awaitAnyTermination(); as >> the last line of code right? Like below >> >> query1.awaitTermination(); >> sparkSession.streams().awaitAnyTermination(); >> >> >> On Wed, Sep 20, 2017 at 12:07 AM, Burak Yavuz wrote: >> >>> Please remove >>> >>> query1.awaitTermination(); >>> query2.awaitTermination(); >>> >>> once >>> >>> query1.awaitTermination(); >>> >>> is called, you don't even get to query2.awaitTermination(). >>> >>> >>> On Tue, Sep 19, 2017 at 11:59 PM, kant kodali >>> wrote: >>> Hi Burak, Thanks much! had no clue that existed. Now, I changed it to this. StreamingQuery query1 = outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new KafkaSink("hello1")).start(); StreamingQuery query2 = outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new KafkaSink("hello2")).start(); query1.awaitTermination(); query2.awaitTermination(); sparkSession.streams().awaitAnyTermination(); On Tue, Sep 19, 2017 at 11:48 PM, Burak Yavuz wrote: > Hey Kant, > > That won't work either. Your second query may fail, and as long as > your first query is running, you will not know. Put this as the last line > instead: > > spark.streams.awaitAnyTermination() > > On Tue, Sep 19, 2017 at 10:11 PM, kant kodali > wrote: > >> Looks like my problem was the order of awaitTermination() for some >> reason. >> >> *Doesn't work * >> >> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello1")).start().awaitTermination() >> >> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello2")).start().awaitTermination() >> >> *Works* >> >> StreamingQuery query1 = outputDS1.writeStream().trigge >> r(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello1")).start(); >> >> query1.awaitTermination() >> >> StreamingQuery query2 =outputDS2.writeStream().trigg >> er(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello2")).start(); >> >> query2.awaitTermination() >> >> >> >> On Tue, Sep 19, 2017 at 10:09 PM, kant kodali >> wrote: >> >>> Looks like my problem was the order of awaitTermination() for some >>> reason. >>> >>> Doesn't work >>> >>> >>> >>> >>> >>> On Tue, Sep 19, 2017 at 1:54 PM, kant kodali >>> wrote: >>> Hi All, I have the following Psuedo code (I could paste the real code however it is pretty long and involves Database calls inside dataset.map operation and so on) so I am just trying to simplify my question. would like to know if there is something wrong with the following pseudo code? DataSet inputDS = readFromKaka(topicName) DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works Since I can see data getting populated DataSet outputDS1 = mongoDS.map(readFromDatabase); // Works as well DataSet outputDS2 = mongoDS.map( readFromDatabase); // Doesn't work outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new KafkaSink("hello1")).start().awaitTermination() outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new KafkaSink("hello2")).start().awaitTermination() *So what's happening with above code is that I can see data coming out of hello1 topic but not from hello2 topic.* I thought there is something wrong with "outputDS2" so I switched the order so now the code looks like this DataSet inputDS = readFromKaka(topicName) DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works Since I can see data getting populated DataSet outputDS2 = mongoDS.map( readFromDatabase); // This Works DataSet outputDS1 = mongoDS.map(readFromDatabase); // Desn't work outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new KafkaSink("h
Re: Structured streaming coding question
Hi Burak, Are you saying get rid of both query1.awaitTermination(); query2.awaitTermination(); and just have the line below? sparkSession.streams().awaitAnyTermination(); Thanks! On Wed, Sep 20, 2017 at 12:51 AM, kant kodali wrote: > If I don't get anywhere after query1.awaitTermination(); > > Then I cannot put this sparkSession.streams().awaitAnyTermination(); as > the last line of code right? Like below > > query1.awaitTermination(); > sparkSession.streams().awaitAnyTermination(); > > > On Wed, Sep 20, 2017 at 12:07 AM, Burak Yavuz wrote: > >> Please remove >> >> query1.awaitTermination(); >> query2.awaitTermination(); >> >> once >> >> query1.awaitTermination(); >> >> is called, you don't even get to query2.awaitTermination(). >> >> >> On Tue, Sep 19, 2017 at 11:59 PM, kant kodali wrote: >> >>> Hi Burak, >>> >>> Thanks much! had no clue that existed. Now, I changed it to this. >>> >>> StreamingQuery query1 = >>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>> KafkaSink("hello1")).start(); >>> StreamingQuery query2 = >>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>> KafkaSink("hello2")).start(); >>> >>> query1.awaitTermination(); >>> query2.awaitTermination(); >>> sparkSession.streams().awaitAnyTermination(); >>> >>> >>> >>> >>> >>> On Tue, Sep 19, 2017 at 11:48 PM, Burak Yavuz wrote: >>> Hey Kant, That won't work either. Your second query may fail, and as long as your first query is running, you will not know. Put this as the last line instead: spark.streams.awaitAnyTermination() On Tue, Sep 19, 2017 at 10:11 PM, kant kodali wrote: > Looks like my problem was the order of awaitTermination() for some > reason. > > *Doesn't work * > > outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new > KafkaSink("hello1")).start().awaitTermination() > > outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new > KafkaSink("hello2")).start().awaitTermination() > > *Works* > > StreamingQuery query1 = outputDS1.writeStream().trigge > r(Trigger.processingTime(1000)).foreach(new > KafkaSink("hello1")).start(); > > query1.awaitTermination() > > StreamingQuery query2 =outputDS2.writeStream().trigg > er(Trigger.processingTime(1000)).foreach(new > KafkaSink("hello2")).start(); > > query2.awaitTermination() > > > > On Tue, Sep 19, 2017 at 10:09 PM, kant kodali > wrote: > >> Looks like my problem was the order of awaitTermination() for some >> reason. >> >> Doesn't work >> >> >> >> >> >> On Tue, Sep 19, 2017 at 1:54 PM, kant kodali >> wrote: >> >>> Hi All, >>> >>> I have the following Psuedo code (I could paste the real code >>> however it is pretty long and involves Database calls inside dataset.map >>> operation and so on) so I am just trying to simplify my question. would >>> like to know if there is something wrong with the following pseudo code? >>> >>> DataSet inputDS = readFromKaka(topicName) >>> >>> DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works >>> Since I can see data getting populated >>> >>> DataSet outputDS1 = mongoDS.map(readFromDatabase); // Works >>> as well >>> >>> DataSet outputDS2 = mongoDS.map( readFromDatabase); // >>> Doesn't work >>> >>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>> KafkaSink("hello1")).start().awaitTermination() >>> >>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>> KafkaSink("hello2")).start().awaitTermination() >>> >>> >>> *So what's happening with above code is that I can see data coming >>> out of hello1 topic but not from hello2 topic.* I thought there is >>> something wrong with "outputDS2" so I switched the order so now the >>> code >>> looks like this >>> >>> DataSet inputDS = readFromKaka(topicName) >>> >>> DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works >>> Since I can see data getting populated >>> >>> DataSet outputDS2 = mongoDS.map( readFromDatabase); // This >>> Works >>> >>> DataSet outputDS1 = mongoDS.map(readFromDatabase); // Desn't >>> work >>> >>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>> KafkaSink("hello1")).start().awaitTermination() >>> >>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>> KafkaSink("hello2")).start().awaitTermination() >>> >>> *Now I can see data coming out from hello2 kafka topic but not from >>> hello1 topic*. *In short, I can only see data from outputDS1 or >>> outputDS2 but not both. * At this point I am not sure what is going >>> on? >>> >>> Thanks
Re: Structured streaming coding question
If I don't get anywhere after query1.awaitTermination(); Then I cannot put this sparkSession.streams().awaitAnyTermination(); as the last line of code right? Like below query1.awaitTermination(); sparkSession.streams().awaitAnyTermination(); On Wed, Sep 20, 2017 at 12:07 AM, Burak Yavuz wrote: > Please remove > > query1.awaitTermination(); > query2.awaitTermination(); > > once > > query1.awaitTermination(); > > is called, you don't even get to query2.awaitTermination(). > > > On Tue, Sep 19, 2017 at 11:59 PM, kant kodali wrote: > >> Hi Burak, >> >> Thanks much! had no clue that existed. Now, I changed it to this. >> >> StreamingQuery query1 = >> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello1")).start(); >> StreamingQuery query2 = >> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello2")).start(); >> >> query1.awaitTermination(); >> query2.awaitTermination(); >> sparkSession.streams().awaitAnyTermination(); >> >> >> >> >> >> On Tue, Sep 19, 2017 at 11:48 PM, Burak Yavuz wrote: >> >>> Hey Kant, >>> >>> That won't work either. Your second query may fail, and as long as your >>> first query is running, you will not know. Put this as the last line >>> instead: >>> >>> spark.streams.awaitAnyTermination() >>> >>> On Tue, Sep 19, 2017 at 10:11 PM, kant kodali >>> wrote: >>> Looks like my problem was the order of awaitTermination() for some reason. *Doesn't work * outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new KafkaSink("hello1")).start().awaitTermination() outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new KafkaSink("hello2")).start().awaitTermination() *Works* StreamingQuery query1 = outputDS1.writeStream().trigge r(Trigger.processingTime(1000)).foreach(new KafkaSink("hello1")).start(); query1.awaitTermination() StreamingQuery query2 =outputDS2.writeStream().trigg er(Trigger.processingTime(1000)).foreach(new KafkaSink("hello2")).start(); query2.awaitTermination() On Tue, Sep 19, 2017 at 10:09 PM, kant kodali wrote: > Looks like my problem was the order of awaitTermination() for some > reason. > > Doesn't work > > > > > > On Tue, Sep 19, 2017 at 1:54 PM, kant kodali > wrote: > >> Hi All, >> >> I have the following Psuedo code (I could paste the real code however >> it is pretty long and involves Database calls inside dataset.map >> operation >> and so on) so I am just trying to simplify my question. would like to >> know >> if there is something wrong with the following pseudo code? >> >> DataSet inputDS = readFromKaka(topicName) >> >> DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works >> Since I can see data getting populated >> >> DataSet outputDS1 = mongoDS.map(readFromDatabase); // Works >> as well >> >> DataSet outputDS2 = mongoDS.map( readFromDatabase); // >> Doesn't work >> >> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello1")).start().awaitTermination() >> >> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello2")).start().awaitTermination() >> >> >> *So what's happening with above code is that I can see data coming >> out of hello1 topic but not from hello2 topic.* I thought there is >> something wrong with "outputDS2" so I switched the order so now the code >> looks like this >> >> DataSet inputDS = readFromKaka(topicName) >> >> DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works >> Since I can see data getting populated >> >> DataSet outputDS2 = mongoDS.map( readFromDatabase); // This >> Works >> >> DataSet outputDS1 = mongoDS.map(readFromDatabase); // Desn't >> work >> >> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello1")).start().awaitTermination() >> >> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello2")).start().awaitTermination() >> >> *Now I can see data coming out from hello2 kafka topic but not from >> hello1 topic*. *In short, I can only see data from outputDS1 or >> outputDS2 but not both. * At this point I am not sure what is going >> on? >> >> Thanks! >> >> >> > >>> >> >
Re: Structured streaming coding question
Please remove query1.awaitTermination(); query2.awaitTermination(); once query1.awaitTermination(); is called, you don't even get to query2.awaitTermination(). On Tue, Sep 19, 2017 at 11:59 PM, kant kodali wrote: > Hi Burak, > > Thanks much! had no clue that existed. Now, I changed it to this. > > StreamingQuery query1 = > outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new > KafkaSink("hello1")).start(); > StreamingQuery query2 = > outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new > KafkaSink("hello2")).start(); > > query1.awaitTermination(); > query2.awaitTermination(); > sparkSession.streams().awaitAnyTermination(); > > > > > > On Tue, Sep 19, 2017 at 11:48 PM, Burak Yavuz wrote: > >> Hey Kant, >> >> That won't work either. Your second query may fail, and as long as your >> first query is running, you will not know. Put this as the last line >> instead: >> >> spark.streams.awaitAnyTermination() >> >> On Tue, Sep 19, 2017 at 10:11 PM, kant kodali wrote: >> >>> Looks like my problem was the order of awaitTermination() for some >>> reason. >>> >>> *Doesn't work * >>> >>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>> KafkaSink("hello1")).start().awaitTermination() >>> >>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>> KafkaSink("hello2")).start().awaitTermination() >>> >>> *Works* >>> >>> StreamingQuery query1 = outputDS1.writeStream().trigge >>> r(Trigger.processingTime(1000)).foreach(new >>> KafkaSink("hello1")).start(); >>> >>> query1.awaitTermination() >>> >>> StreamingQuery query2 =outputDS2.writeStream().trigg >>> er(Trigger.processingTime(1000)).foreach(new >>> KafkaSink("hello2")).start(); >>> >>> query2.awaitTermination() >>> >>> >>> >>> On Tue, Sep 19, 2017 at 10:09 PM, kant kodali >>> wrote: >>> Looks like my problem was the order of awaitTermination() for some reason. Doesn't work On Tue, Sep 19, 2017 at 1:54 PM, kant kodali wrote: > Hi All, > > I have the following Psuedo code (I could paste the real code however > it is pretty long and involves Database calls inside dataset.map operation > and so on) so I am just trying to simplify my question. would like to know > if there is something wrong with the following pseudo code? > > DataSet inputDS = readFromKaka(topicName) > > DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works > Since I can see data getting populated > > DataSet outputDS1 = mongoDS.map(readFromDatabase); // Works as > well > > DataSet outputDS2 = mongoDS.map( readFromDatabase); // Doesn't > work > > outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new > KafkaSink("hello1")).start().awaitTermination() > > outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new > KafkaSink("hello2")).start().awaitTermination() > > > *So what's happening with above code is that I can see data coming out > of hello1 topic but not from hello2 topic.* I thought there is > something wrong with "outputDS2" so I switched the order so now the code > looks like this > > DataSet inputDS = readFromKaka(topicName) > > DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works > Since I can see data getting populated > > DataSet outputDS2 = mongoDS.map( readFromDatabase); // This > Works > > DataSet outputDS1 = mongoDS.map(readFromDatabase); // Desn't > work > > outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new > KafkaSink("hello1")).start().awaitTermination() > > outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new > KafkaSink("hello2")).start().awaitTermination() > > *Now I can see data coming out from hello2 kafka topic but not from > hello1 topic*. *In short, I can only see data from outputDS1 or > outputDS2 but not both. * At this point I am not sure what is going > on? > > Thanks! > > > >>> >> >
Re: Structured streaming coding question
Hi Burak, Thanks much! had no clue that existed. Now, I changed it to this. StreamingQuery query1 = outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new KafkaSink("hello1")).start(); StreamingQuery query2 = outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new KafkaSink("hello2")).start(); query1.awaitTermination(); query2.awaitTermination(); sparkSession.streams().awaitAnyTermination(); On Tue, Sep 19, 2017 at 11:48 PM, Burak Yavuz wrote: > Hey Kant, > > That won't work either. Your second query may fail, and as long as your > first query is running, you will not know. Put this as the last line > instead: > > spark.streams.awaitAnyTermination() > > On Tue, Sep 19, 2017 at 10:11 PM, kant kodali wrote: > >> Looks like my problem was the order of awaitTermination() for some reason. >> >> *Doesn't work * >> >> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello1")).start().awaitTermination() >> >> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello2")).start().awaitTermination() >> >> *Works* >> >> StreamingQuery query1 = outputDS1.writeStream().trigge >> r(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello1")).start(); >> >> query1.awaitTermination() >> >> StreamingQuery query2 =outputDS2.writeStream().trigg >> er(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello2")).start(); >> >> query2.awaitTermination() >> >> >> >> On Tue, Sep 19, 2017 at 10:09 PM, kant kodali wrote: >> >>> Looks like my problem was the order of awaitTermination() for some >>> reason. >>> >>> Doesn't work >>> >>> >>> >>> >>> >>> On Tue, Sep 19, 2017 at 1:54 PM, kant kodali wrote: >>> Hi All, I have the following Psuedo code (I could paste the real code however it is pretty long and involves Database calls inside dataset.map operation and so on) so I am just trying to simplify my question. would like to know if there is something wrong with the following pseudo code? DataSet inputDS = readFromKaka(topicName) DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works Since I can see data getting populated DataSet outputDS1 = mongoDS.map(readFromDatabase); // Works as well DataSet outputDS2 = mongoDS.map( readFromDatabase); // Doesn't work outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new KafkaSink("hello1")).start().awaitTermination() outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new KafkaSink("hello2")).start().awaitTermination() *So what's happening with above code is that I can see data coming out of hello1 topic but not from hello2 topic.* I thought there is something wrong with "outputDS2" so I switched the order so now the code looks like this DataSet inputDS = readFromKaka(topicName) DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works Since I can see data getting populated DataSet outputDS2 = mongoDS.map( readFromDatabase); // This Works DataSet outputDS1 = mongoDS.map(readFromDatabase); // Desn't work outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new KafkaSink("hello1")).start().awaitTermination() outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new KafkaSink("hello2")).start().awaitTermination() *Now I can see data coming out from hello2 kafka topic but not from hello1 topic*. *In short, I can only see data from outputDS1 or outputDS2 but not both. * At this point I am not sure what is going on? Thanks! >>> >> >
Re: Structured streaming coding question
Hey Kant, That won't work either. Your second query may fail, and as long as your first query is running, you will not know. Put this as the last line instead: spark.streams.awaitAnyTermination() On Tue, Sep 19, 2017 at 10:11 PM, kant kodali wrote: > Looks like my problem was the order of awaitTermination() for some reason. > > *Doesn't work * > > outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new > KafkaSink("hello1")).start().awaitTermination() > > outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new > KafkaSink("hello2")).start().awaitTermination() > > *Works* > > StreamingQuery query1 = outputDS1.writeStream().trigge > r(Trigger.processingTime(1000)).foreach(new KafkaSink("hello1")).start(); > > query1.awaitTermination() > > StreamingQuery query2 =outputDS2.writeStream().trigg > er(Trigger.processingTime(1000)).foreach(new KafkaSink("hello2")).start(); > > query2.awaitTermination() > > > > On Tue, Sep 19, 2017 at 10:09 PM, kant kodali wrote: > >> Looks like my problem was the order of awaitTermination() for some reason. >> >> Doesn't work >> >> >> >> >> >> On Tue, Sep 19, 2017 at 1:54 PM, kant kodali wrote: >> >>> Hi All, >>> >>> I have the following Psuedo code (I could paste the real code however it >>> is pretty long and involves Database calls inside dataset.map operation and >>> so on) so I am just trying to simplify my question. would like to know if >>> there is something wrong with the following pseudo code? >>> >>> DataSet inputDS = readFromKaka(topicName) >>> >>> DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works >>> Since I can see data getting populated >>> >>> DataSet outputDS1 = mongoDS.map(readFromDatabase); // Works as >>> well >>> >>> DataSet outputDS2 = mongoDS.map( readFromDatabase); // Doesn't >>> work >>> >>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>> KafkaSink("hello1")).start().awaitTermination() >>> >>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>> KafkaSink("hello2")).start().awaitTermination() >>> >>> >>> *So what's happening with above code is that I can see data coming out >>> of hello1 topic but not from hello2 topic.* I thought there is >>> something wrong with "outputDS2" so I switched the order so now the code >>> looks like this >>> >>> DataSet inputDS = readFromKaka(topicName) >>> >>> DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works >>> Since I can see data getting populated >>> >>> DataSet outputDS2 = mongoDS.map( readFromDatabase); // This Works >>> >>> DataSet outputDS1 = mongoDS.map(readFromDatabase); // Desn't work >>> >>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>> KafkaSink("hello1")).start().awaitTermination() >>> >>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>> KafkaSink("hello2")).start().awaitTermination() >>> >>> *Now I can see data coming out from hello2 kafka topic but not from >>> hello1 topic*. *In short, I can only see data from outputDS1 or >>> outputDS2 but not both. * At this point I am not sure what is going on? >>> >>> Thanks! >>> >>> >>> >> >
Re: Structured streaming coding question
Looks like my problem was the order of awaitTermination() for some reason. *Doesn't work * outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new KafkaSink("hello1")).start().awaitTermination() outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new KafkaSink("hello2")).start().awaitTermination() *Works* StreamingQuery query1 = outputDS1.writeStream().trigger(Trigger. processingTime(1000)).foreach(new KafkaSink("hello1")).start(); query1.awaitTermination() StreamingQuery query2 =outputDS2.writeStream().trigger(Trigger. processingTime(1000)).foreach(new KafkaSink("hello2")).start(); query2.awaitTermination() On Tue, Sep 19, 2017 at 10:09 PM, kant kodali wrote: > Looks like my problem was the order of awaitTermination() for some reason. > > Doesn't work > > > > > > On Tue, Sep 19, 2017 at 1:54 PM, kant kodali wrote: > >> Hi All, >> >> I have the following Psuedo code (I could paste the real code however it >> is pretty long and involves Database calls inside dataset.map operation and >> so on) so I am just trying to simplify my question. would like to know if >> there is something wrong with the following pseudo code? >> >> DataSet inputDS = readFromKaka(topicName) >> >> DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works >> Since I can see data getting populated >> >> DataSet outputDS1 = mongoDS.map(readFromDatabase); // Works as >> well >> >> DataSet outputDS2 = mongoDS.map( readFromDatabase); // Doesn't >> work >> >> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello1")).start().awaitTermination() >> >> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello2")).start().awaitTermination() >> >> >> *So what's happening with above code is that I can see data coming out of >> hello1 topic but not from hello2 topic.* I thought there is something >> wrong with "outputDS2" so I switched the order so now the code looks like >> this >> >> DataSet inputDS = readFromKaka(topicName) >> >> DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works >> Since I can see data getting populated >> >> DataSet outputDS2 = mongoDS.map( readFromDatabase); // This Works >> >> DataSet outputDS1 = mongoDS.map(readFromDatabase); // Desn't work >> >> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello1")).start().awaitTermination() >> >> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello2")).start().awaitTermination() >> >> *Now I can see data coming out from hello2 kafka topic but not from >> hello1 topic*. *In short, I can only see data from outputDS1 or >> outputDS2 but not both. * At this point I am not sure what is going on? >> >> Thanks! >> >> >> >
Re: Structured streaming coding question
Hi, Ah, right! Start the queries and once they're running, awaitTermination them. Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Spark Structured Streaming (Apache Spark 2.2+) https://bit.ly/spark-structured-streaming Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Wed, Sep 20, 2017 at 7:09 AM, kant kodali wrote: > Looks like my problem was the order of awaitTermination() for some reason. > > Doesn't work > > > > > > On Tue, Sep 19, 2017 at 1:54 PM, kant kodali wrote: > >> Hi All, >> >> I have the following Psuedo code (I could paste the real code however it >> is pretty long and involves Database calls inside dataset.map operation and >> so on) so I am just trying to simplify my question. would like to know if >> there is something wrong with the following pseudo code? >> >> DataSet inputDS = readFromKaka(topicName) >> >> DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works >> Since I can see data getting populated >> >> DataSet outputDS1 = mongoDS.map(readFromDatabase); // Works as >> well >> >> DataSet outputDS2 = mongoDS.map( readFromDatabase); // Doesn't >> work >> >> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello1")).start().awaitTermination() >> >> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello2")).start().awaitTermination() >> >> >> *So what's happening with above code is that I can see data coming out of >> hello1 topic but not from hello2 topic.* I thought there is something >> wrong with "outputDS2" so I switched the order so now the code looks like >> this >> >> DataSet inputDS = readFromKaka(topicName) >> >> DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works >> Since I can see data getting populated >> >> DataSet outputDS2 = mongoDS.map( readFromDatabase); // This Works >> >> DataSet outputDS1 = mongoDS.map(readFromDatabase); // Desn't work >> >> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello1")).start().awaitTermination() >> >> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello2")).start().awaitTermination() >> >> *Now I can see data coming out from hello2 kafka topic but not from >> hello1 topic*. *In short, I can only see data from outputDS1 or >> outputDS2 but not both. * At this point I am not sure what is going on? >> >> Thanks! >> >> >> >
Re: Structured streaming coding question
Hi, What's the code in readFromKafka to read from hello2 and hello1? Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Spark Structured Streaming (Apache Spark 2.2+) https://bit.ly/spark-structured-streaming Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Tue, Sep 19, 2017 at 10:54 PM, kant kodali wrote: > Hi All, > > I have the following Psuedo code (I could paste the real code however it > is pretty long and involves Database calls inside dataset.map operation and > so on) so I am just trying to simplify my question. would like to know if > there is something wrong with the following pseudo code? > > DataSet inputDS = readFromKaka(topicName) > > DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works Since > I can see data getting populated > > DataSet outputDS1 = mongoDS.map(readFromDatabase); // Works as well > > DataSet outputDS2 = mongoDS.map( readFromDatabase); // Doesn't work > > outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new > KafkaSink("hello1")).start().awaitTermination() > > outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new > KafkaSink("hello2")).start().awaitTermination() > > > *So what's happening with above code is that I can see data coming out of > hello1 topic but not from hello2 topic.* I thought there is something > wrong with "outputDS2" so I switched the order so now the code looks like > this > > DataSet inputDS = readFromKaka(topicName) > > DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works Since > I can see data getting populated > > DataSet outputDS2 = mongoDS.map( readFromDatabase); // This Works > > DataSet outputDS1 = mongoDS.map(readFromDatabase); // Desn't work > > outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new > KafkaSink("hello1")).start().awaitTermination() > > outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new > KafkaSink("hello2")).start().awaitTermination() > > *Now I can see data coming out from hello2 kafka topic but not from hello1 > topic*. *In short, I can only see data from outputDS1 or outputDS2 but > not both. * At this point I am not sure what is going on? > > Thanks! > > >
Re: Structured streaming coding question
Looks like my problem was the order of awaitTermination() for some reason. Doesn't work On Tue, Sep 19, 2017 at 1:54 PM, kant kodali wrote: > Hi All, > > I have the following Psuedo code (I could paste the real code however it > is pretty long and involves Database calls inside dataset.map operation and > so on) so I am just trying to simplify my question. would like to know if > there is something wrong with the following pseudo code? > > DataSet inputDS = readFromKaka(topicName) > > DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works Since > I can see data getting populated > > DataSet outputDS1 = mongoDS.map(readFromDatabase); // Works as well > > DataSet outputDS2 = mongoDS.map( readFromDatabase); // Doesn't work > > outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new > KafkaSink("hello1")).start().awaitTermination() > > outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new > KafkaSink("hello2")).start().awaitTermination() > > > *So what's happening with above code is that I can see data coming out of > hello1 topic but not from hello2 topic.* I thought there is something > wrong with "outputDS2" so I switched the order so now the code looks like > this > > DataSet inputDS = readFromKaka(topicName) > > DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works Since > I can see data getting populated > > DataSet outputDS2 = mongoDS.map( readFromDatabase); // This Works > > DataSet outputDS1 = mongoDS.map(readFromDatabase); // Desn't work > > outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new > KafkaSink("hello1")).start().awaitTermination() > > outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new > KafkaSink("hello2")).start().awaitTermination() > > *Now I can see data coming out from hello2 kafka topic but not from hello1 > topic*. *In short, I can only see data from outputDS1 or outputDS2 but > not both. * At this point I am not sure what is going on? > > Thanks! > > >