HeartSaVioR commented on code in PR #39931:
URL: https://github.com/apache/spark/pull/39931#discussion_r1119401204


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala:
##########
@@ -90,19 +90,24 @@ trait FlatMapGroupsWithStateExecBase
 
   override def shortName: String = "flatMapGroupsWithState"
 
-  override def shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean 
= {
+  override def shouldRunAnotherBatch(newInputWatermark: Long): Boolean = {
     timeoutConf match {
       case ProcessingTimeTimeout =>
         true  // Always run batches to process timeouts
       case EventTimeTimeout =>
         // Process another non-data batch only if the watermark has changed in 
this executed plan
         eventTimeWatermarkForEviction.isDefined &&
-          newMetadata.batchWatermarkMs > eventTimeWatermarkForEviction.get
+          newInputWatermark > eventTimeWatermarkForEviction.get
       case _ =>
         false
     }
   }
 
+  // There is no guarantee that any of the column in the output is bound to 
the watermark. The
+  // user function is quite flexible. Hence Spark does not support the 
stateful operator(s) after
+  // (flat)MapGroupsWithState.
+  override def produceOutputWatermark(inputWatermarkMs: Long): Option[Long] = 
None

Review Comment:
   Spark has no idea about the value of the timestamp for the output. That 
said, either user should provide the logic to calculate the watermark (I don't 
think it will work), or the operator has to track the value for the event time 
(delayed by one batch). 
   
   But even before we talk about this, there are lots of issues about the event 
time with typed API. During conversion between untyped <-> typed Spark loses 
the metadata in some cases. Furthermore, the output of flatMapGroupsWithState 
is also typed, which we need to convert back to untyped to apply further 
stateful operations, hence the risk of losing metadata. In addition, we don't 
have an ability of specifying new event time in the output for typed API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to