Hi, What's the code in readFromKafka to read from hello2 and hello1?
Pozdrawiam, Jacek Laskowski ---- https://about.me/JacekLaskowski Spark Structured Streaming (Apache Spark 2.2+) https://bit.ly/spark-structured-streaming Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Tue, Sep 19, 2017 at 10:54 PM, kant kodali <kanth...@gmail.com> wrote: > Hi All, > > I have the following Psuedo code (I could paste the real code however it > is pretty long and involves Database calls inside dataset.map operation and > so on) so I am just trying to simplify my question. would like to know if > there is something wrong with the following pseudo code? > > DataSet<String> inputDS = readFromKaka(topicName) > > DataSet<String> mongoDS = inputDS.map(insertIntoDatabase); // Works Since > I can see data getting populated > > DataSet<String> outputDS1 = mongoDS.map(readFromDatabase); // Works as well > > DataSet<String> outputDS2 = mongoDS.map( readFromDatabase); // Doesn't work > > outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new > KafkaSink("hello1")).start().awaitTermination() > > outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new > KafkaSink("hello2")).start().awaitTermination() > > > *So what's happening with above code is that I can see data coming out of > hello1 topic but not from hello2 topic.* I thought there is something > wrong with "outputDS2" so I switched the order so now the code looks like > this > > DataSet<String> inputDS = readFromKaka(topicName) > > DataSet<String> mongoDS = inputDS.map(insertIntoDatabase); // Works Since > I can see data getting populated > > DataSet<String> outputDS2 = mongoDS.map( readFromDatabase); // This Works > > DataSet<String> outputDS1 = mongoDS.map(readFromDatabase); // Desn't work > > outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new > KafkaSink("hello1")).start().awaitTermination() > > outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new > KafkaSink("hello2")).start().awaitTermination() > > *Now I can see data coming out from hello2 kafka topic but not from hello1 > topic*. *In short, I can only see data from outputDS1 or outputDS2 but > not both. * At this point I am not sure what is going on? > > Thanks! > > >