you means sparkSession.streams.awaitAnyTermination()?   May i have your
code ?  or you can see the following:


my demo code:

````
 val hourDevice =
beginTimeDevice.groupBy($"subsId",$"eventBeginHour",$"serviceType")
      .agg("duration" -> "sum").withColumnRenamed("sum(duration)",
"durationForHour")

    hourDevice.writeStream
      .outputMode("update")
      .option("truncate", "false")
      .format("console")
      .start()

    beginTimeDevice.groupBy($"subsId",$"eventBeginDay",$"serviceType")
      .agg("duration" -> "sum").withColumnRenamed("sum(duration)",
"durationForDay")

   dayDevice.writeStream
      .outputMode("update")
      .option("truncate", "false")
      .format("console")
      .start()

   sparkSession.streams.awaitAnyTermination()
`````

   sparkSession.streams.awaitAnyTermination() is ok,  maybe its wrong
somewhere else in your code.

act_coder <acct...@gmail.com> 于2020年11月5日周四 上午11:45写道:

> If I use for each function, then I may need to use custom Kafka stream
> writer
> right ?!
>
> And I might not be able to use default writestream.format(Kafka) method ?!
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 

Best,

Kevin Pis

Reply via email to