jingz-db commented on code in PR #48124:
URL: https://github.com/apache/spark/pull/48124#discussion_r1837405539
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -439,6 +439,25 @@ class IncrementalExecution(
eventTimeWatermarkForEviction = iwEviction)
))
+ // UpdateEventTimeColumnExec is used to tag the eventTime column, and
validate
+ // emitted rows adhere to watermark in the output of
transformWithStateInp.
+ // Hence, this node shares the same watermark value as
TransformWithStateInPandasExec.
+ // This is the same as above in TransformWithStateExec.
+ // The only difference is TransformWithStateInPandasExec is analysed
slightly different
+ // in the physical plan with `Project` and it is not
`SerializeFromObjectExec`.
+ case UpdateEventTimeColumnExec(eventTime, delay, None,
+ ProjectExec(projectList, t: TransformWithStateInPandasExec))
Review Comment:
The output we got from the projectList variable here is the output schema of
the output event time column:
> projectList: List(outputTimestamp#15)
I really appreciate you bring the optimization thing up here. After reading
your comments, I tried to figure out where this extra projection is coming from
and it took me very long to figure out - we did not add this projection
anywhere in any of the event time related change. So then I found this is
coming from the pushed down projection from the chaining operator - I chained
`groupBy`, but only groupBy on some of the output columns of the TWS output. I
am not super familiar with optimization of analyzer, but I think eventually
this projection is coming from the pushed down filter from the aggregator
operator. So we will see the extra layer of Projection here. That being said,
the match case class that contains the `Projection` is not sufficient.
I added another match case class above and I also added another test case to
test on cases that matches the situation without the `Projection`.
I did a quick test on Scala side and it doesn't have this issue. It is
because TWS node is wrapped by `SerializeFromObjectExec` and this probably does
not match with some optimizer rule.
But I think the exact match on the `ProjectExec` is a bit hacky - there is
probably more optimization rule I am not aware of. Do you think we should do a
recursive match here in case TWS node is (recursively) wrapped by other
operators besides `Project`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]