Just don't call .awaitTermindation() because it blocks execution of the next line of code. You can assign result of .start() to a specific variable, or put them into list/array.
And to wait until one of the streams finishes, use spark.streams.awaitAnyTermination() or something like this (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries) S at "Wed, 25 Aug 2021 14:14:48 +0530" wrote: S> Hello, S> I have a structured streaming job that needs to be able to write to multiple sinks. We are using Continuous Trigger and not Microbatch Trigger. S> 1. When we use the foreach method using: S> dataset1.writeStream.foreach(kafka ForEachWriter logic).trigger(ContinuousMode).start().awaitTermination() S> dataset1.writeStream.foreach(mongo ForEachWriter logic).trigger(ContinuousMode).start().awaitTermination() S> The first statement blocks the second one for obvious reasons. So this does not serve our purpose. S> 2. The next step for this problem would be to use the foreachbatch. That is not supported in the ContinuousMode. S> 3. The next step was to use something like this S> dataset1.writeStream.format("kafka").trigger(ContinuousMode).start().awaitTermination() S> dataset1.writeStream.format("mongo").trigger(ContinuousMode).start().awaitTermination() S> for both the sinks. This does not work either. Only the 1st query works. The second one does not. S> Is there any solution to the problem of being able to write to multiple sinks in Continuous Trigger Mode using Structured Streaming? -- With best wishes, Alex Ott http://alexott.net/ Twitter: alexott_en (English), alexott (Russian) --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org