jingz-db commented on code in PR #48124:
URL: https://github.com/apache/spark/pull/48124#discussion_r1837405539
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -439,6 +439,25 @@ class IncrementalExecution(
eventTimeWatermarkForEviction = iwEviction)
))
+ // UpdateEventTimeColumnExec is used to tag the eventTime column, and
validate
+ // emitted rows adhere to watermark in the output of
transformWithStateInp.
+ // Hence, this node shares the same watermark value as
TransformWithStateInPandasExec.
+ // This is the same as above in TransformWithStateExec.
+ // The only difference is TransformWithStateInPandasExec is analysed
slightly different
+ // in the physical plan with `Project` and it is not
`SerializeFromObjectExec`.
+ case UpdateEventTimeColumnExec(eventTime, delay, None,
+ ProjectExec(projectList, t: TransformWithStateInPandasExec))
Review Comment:
The output we got from the projectList variable here is the output schema of
the output event time column:
> projectList: List(outputTimestamp#15)
I really appreciate you bring the optimization thing up here. After reading
your comments, I tried to figure out where this extra projection is coming from
and it took me very long to figure out - we did not add this projection
anywhere in any of the event time related change. I found this is probably
coming from the pushed down projection from the chaining operator. In the test
case, `groupBy` is chained after TWS, and this groupBy only aggregate on some
of the output columns of the TWS output. I am not super familiar with
optimization of analyzer, but I think eventually this projection is coming from
the pushed down filter from the aggregator operator. So we will see the extra
layer of Projection here. That being said, the match case class that contains
the `Projection` is not sufficient.
I added another match case class above and I also added another test case to
test on cases that matches the situation without the `Projection`.
I did a quick test on Scala side and it doesn't have this issue. It is
because TWS node is wrapped by `SerializeFromObjectExec` and this probably does
not match with some optimizer rule.
But I think the exact match on the `ProjectExec` is a bit hacky - there is
probably more optimization rule I am not aware of. Do you think we should do a
recursive match here in case TWS node is (recursively) wrapped by other
operators besides `Project`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]