Hi,

What's the code in readFromKafka to read from hello2 and hello1?

Pozdrawiam,
Jacek Laskowski
----
https://about.me/JacekLaskowski
Spark Structured Streaming (Apache Spark 2.2+)
https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

On Tue, Sep 19, 2017 at 10:54 PM, kant kodali <kanth...@gmail.com> wrote:

> Hi All,
>
> I have the following Psuedo code (I could paste the real code however it
> is pretty long and involves Database calls inside dataset.map operation and
> so on) so I am just trying to simplify my question. would like to know if
> there is something wrong with the following pseudo code?
>
> DataSet<String> inputDS = readFromKaka(topicName)
>
> DataSet<String> mongoDS = inputDS.map(insertIntoDatabase); // Works Since
> I can see data getting populated
>
> DataSet<String> outputDS1 = mongoDS.map(readFromDatabase); // Works as well
>
> DataSet<String> outputDS2 = mongoDS.map( readFromDatabase); // Doesn't work
>
> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
> KafkaSink("hello1")).start().awaitTermination()
>
> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
> KafkaSink("hello2")).start().awaitTermination()
>
>
> *So what's happening with above code is that I can see data coming out of
> hello1 topic but not from hello2 topic.* I thought there is something
> wrong with "outputDS2" so I switched the order  so now the code looks like
> this
>
> DataSet<String> inputDS = readFromKaka(topicName)
>
> DataSet<String> mongoDS = inputDS.map(insertIntoDatabase); // Works Since
> I can see data getting populated
>
> DataSet<String> outputDS2 = mongoDS.map( readFromDatabase); // This Works
>
> DataSet<String> outputDS1 = mongoDS.map(readFromDatabase); // Desn't work
>
> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
> KafkaSink("hello1")).start().awaitTermination()
>
> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
> KafkaSink("hello2")).start().awaitTermination()
>
> *Now I can see data coming out from hello2 kafka topic but not from hello1
> topic*. *In  short, I can only see data from outputDS1 or outputDS2 but
> not both. * At this point I am not sure what is going on?
>
> Thanks!
>
>
>

Reply via email to