jerrypeng commented on code in PR #39931:
URL: https://github.com/apache/spark/pull/39931#discussion_r1119299951


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala:
##########
@@ -90,19 +90,24 @@ trait FlatMapGroupsWithStateExecBase
 
   override def shortName: String = "flatMapGroupsWithState"
 
-  override def shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean 
= {
+  override def shouldRunAnotherBatch(newInputWatermark: Long): Boolean = {
     timeoutConf match {
       case ProcessingTimeTimeout =>
         true  // Always run batches to process timeouts
       case EventTimeTimeout =>
         // Process another non-data batch only if the watermark has changed in 
this executed plan
         eventTimeWatermarkForEviction.isDefined &&
-          newMetadata.batchWatermarkMs > eventTimeWatermarkForEviction.get
+          newInputWatermark > eventTimeWatermarkForEviction.get
       case _ =>
         false
     }
   }
 
+  // There is no guarantee that any of the column in the output is bound to 
the watermark. The
+  // user function is quite flexible. Hence Spark does not support the 
stateful operator(s) after
+  // (flat)MapGroupsWithState.
+  override def produceOutputWatermark(inputWatermarkMs: Long): Option[Long] = 
None

Review Comment:
   I think this is ok for now, but what is the future plan to support emitting 
watermarks from flatMapGroupsWithState?  It is only an implementation 
limitation in SS that we cannot support stateful operators after a UDF.  I 
would imagine we can select the first event time column or let the use pick the 
event time column and use that for watermarking



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to