jerrypeng commented on code in PR #39931:
URL: https://github.com/apache/spark/pull/39931#discussion_r1119299951
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala:
##########
@@ -90,19 +90,24 @@ trait FlatMapGroupsWithStateExecBase
override def shortName: String = "flatMapGroupsWithState"
- override def shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean
= {
+ override def shouldRunAnotherBatch(newInputWatermark: Long): Boolean = {
timeoutConf match {
case ProcessingTimeTimeout =>
true // Always run batches to process timeouts
case EventTimeTimeout =>
// Process another non-data batch only if the watermark has changed in
this executed plan
eventTimeWatermarkForEviction.isDefined &&
- newMetadata.batchWatermarkMs > eventTimeWatermarkForEviction.get
+ newInputWatermark > eventTimeWatermarkForEviction.get
case _ =>
false
}
}
+ // There is no guarantee that any of the column in the output is bound to
the watermark. The
+ // user function is quite flexible. Hence Spark does not support the
stateful operator(s) after
+ // (flat)MapGroupsWithState.
+ override def produceOutputWatermark(inputWatermarkMs: Long): Option[Long] =
None
Review Comment:
I think this is ok for now, but what is the future plan to support emitting
watermarks from flatMapGroupsWithState? It is only an implementation
limitation in SS that we cannot support stateful operators after a UDF. I
would imagine we can select the first event time column or let the use pick the
event time column and use that for watermarking
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]