jingz-db commented on code in PR #48124:
URL: https://github.com/apache/spark/pull/48124#discussion_r1837405539


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -439,6 +439,25 @@ class IncrementalExecution(
               eventTimeWatermarkForEviction = iwEviction)
           ))
 
+      // UpdateEventTimeColumnExec is used to tag the eventTime column, and 
validate
+      // emitted rows adhere to watermark in the output of 
transformWithStateInp.
+      // Hence, this node shares the same watermark value as 
TransformWithStateInPandasExec.
+      // This is the same as above in TransformWithStateExec.
+      // The only difference is TransformWithStateInPandasExec is analysed 
slightly different
+      // in the physical plan with `Project` and it is not 
`SerializeFromObjectExec`.
+      case UpdateEventTimeColumnExec(eventTime, delay, None,
+      ProjectExec(projectList, t: TransformWithStateInPandasExec))

Review Comment:
   The output we got from the projectList variable here is the output schema of 
the output event time column:
   > projectList: List(outputTimestamp#15)
   
   I really appreciate you bring the optimization thing up here. After reading 
your comments, I tried to figure out where this extra projection is coming from 
and it took me very long to figure out  - we did not add this projection 
anywhere in any of the event time related change. I found this is probably 
coming from the pushed down projection from the chaining operator. In the test 
case, `groupBy` is chained after TWS, and this groupBy only aggregate on some 
of the output columns of the TWS output. I am not super familiar with 
optimization of analyzer, but I think eventually this projection is coming from 
the pushed down filter from the aggregator operator. So we will see the extra 
layer of Projection here. That being said, the match case class that contains 
the `Projection` is not sufficient.
   I added another match case class above and I also added another test case to 
test on cases that matches the situation without the `Projection`.
   I did a quick test on Scala side and it doesn't have this issue. It is 
because TWS node is wrapped by `SerializeFromObjectExec` and this probably does 
not match with some optimizer rule.
   
   But I think the exact match on the `ProjectExec` is a bit hacky - there is 
probably more optimization rule I am not aware of. Do you think we should do a 
recursive match here in case TWS node is (recursively) wrapped by other 
operators besides `Project`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to