Just don't call .awaitTermindation() because it blocks execution of the
next line of code. You can assign result of .start() to a specific
variable, or put them into list/array.

And to wait until one of the streams finishes, use
spark.streams.awaitAnyTermination() or something like this
(https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries)
 

S  at "Wed, 25 Aug 2021 14:14:48 +0530" wrote:
 S> Hello,

 S> I have a structured streaming job that needs to be able to write to 
multiple sinks. We are using Continuous Trigger and not Microbatch Trigger. 

 S> 1. When we use the foreach method using:
 S> dataset1.writeStream.foreach(kafka ForEachWriter 
logic).trigger(ContinuousMode).start().awaitTermination() 
 S> dataset1.writeStream.foreach(mongo ForEachWriter 
logic).trigger(ContinuousMode).start().awaitTermination() 
 S> The first statement blocks the second one for obvious reasons. So this does 
not serve our purpose.
 S> 2. The next step for this problem would be to use the foreachbatch. That is 
not supported in the ContinuousMode.
 S> 3. The next step was to use something like this 
 S> 
dataset1.writeStream.format("kafka").trigger(ContinuousMode).start().awaitTermination()
 
 S> 
dataset1.writeStream.format("mongo").trigger(ContinuousMode).start().awaitTermination()
 S> for both the sinks. This does not work either. Only the 1st query works. 
The second one does not.

 S> Is there any solution to the problem of being able to write to multiple 
sinks in Continuous Trigger Mode using Structured Streaming?



-- 
With best wishes,                    Alex Ott
http://alexott.net/
Twitter: alexott_en (English), alexott (Russian)

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to