jingz-db commented on code in PR #48124:
URL: https://github.com/apache/spark/pull/48124#discussion_r1842906748


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -439,6 +439,25 @@ class IncrementalExecution(
               eventTimeWatermarkForEviction = iwEviction)
           ))
 
+      // UpdateEventTimeColumnExec is used to tag the eventTime column, and 
validate
+      // emitted rows adhere to watermark in the output of 
transformWithStateInp.
+      // Hence, this node shares the same watermark value as 
TransformWithStateInPandasExec.
+      // This is the same as above in TransformWithStateExec.
+      // The only difference is TransformWithStateInPandasExec is analysed 
slightly different
+      // in the physical plan with `Project` and it is not 
`SerializeFromObjectExec`.
+      case UpdateEventTimeColumnExec(eventTime, delay, None,
+      ProjectExec(projectList, t: TransformWithStateInPandasExec))

Review Comment:
   Thanks for all the code pointers! I added a line in the `Optimizer` and the 
the column is now not pruned so UpdateEventTimeColumnExec and 
TransformWithStateInPandasExec are now placed together. I leave the rule inside 
Optimizer and `IncrementalExecution` will only match with only the case where 
UpdateEventTimeColumnExec and TransformWithStateInPandasExec are placed 
together with no extra layer in between.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to