jingz-db commented on code in PR #48124:
URL: https://github.com/apache/spark/pull/48124#discussion_r1837303580
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -439,6 +439,25 @@ class IncrementalExecution(
eventTimeWatermarkForEviction = iwEviction)
))
+ // UpdateEventTimeColumnExec is used to tag the eventTime column, and
validate
+ // emitted rows adhere to watermark in the output of
transformWithStateInp.
+ // Hence, this node shares the same watermark value as
TransformWithStateInPandasExec.
+ // This is the same as above in TransformWithStateExec.
+ // The only difference is TransformWithStateInPandasExec is analysed
slightly different
+ // in the physical plan with `Project` and it is not
`SerializeFromObjectExec`.
+ case UpdateEventTimeColumnExec(eventTime, delay, None,
+ ProjectExec(projectList, t: TransformWithStateInPandasExec))
Review Comment:
This is a minor difference on how the logical operator is constructed.
For Scala, the logicalplan node is defined and constructed here as
`CatalystSerde.serialize[U](mapped)`:
https://github.com/apache/spark/blob/5e49665ac39b49b875d6970f93df59aedd830fa5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala#L632
However for Python, the logical plan node is constructed without this
CatalystSerde because it was not originally typed when it got to the
`RelatonalGroupedDataset`, we need to explicitly resolve the output type by
projection to the user defined schema of output:
https://github.com/apache/spark/blob/d1544907a1cf94ebd6bd70e78def0efa4d6d549d/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala#L499
The output we got from the projectList variable here is the output schema
defined by the user in the `outputStructType`:
> projectList: List(outputTimestamp#15)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]