you means sparkSession.streams.awaitAnyTermination()? May i have your code ? or you can see the following:
my demo code: ```` val hourDevice = beginTimeDevice.groupBy($"subsId",$"eventBeginHour",$"serviceType") .agg("duration" -> "sum").withColumnRenamed("sum(duration)", "durationForHour") hourDevice.writeStream .outputMode("update") .option("truncate", "false") .format("console") .start() beginTimeDevice.groupBy($"subsId",$"eventBeginDay",$"serviceType") .agg("duration" -> "sum").withColumnRenamed("sum(duration)", "durationForDay") dayDevice.writeStream .outputMode("update") .option("truncate", "false") .format("console") .start() sparkSession.streams.awaitAnyTermination() ````` sparkSession.streams.awaitAnyTermination() is ok, maybe its wrong somewhere else in your code. act_coder <acct...@gmail.com> 于2020年11月5日周四 上午11:45写道: > If I use for each function, then I may need to use custom Kafka stream > writer > right ?! > > And I might not be able to use default writestream.format(Kafka) method ?! > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Best, Kevin Pis