HeartSaVioR commented on code in PR #48124:
URL: https://github.com/apache/spark/pull/48124#discussion_r1837434759
##########
python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py:
##########
@@ -241,11 +235,15 @@ def check_results(batch_df, _):
# test list state with ttl has the same behavior as list state when state
doesn't expire.
def test_transform_with_state_in_pandas_list_state_large_ttl(self):
- def check_results(batch_df, _):
- assert set(batch_df.sort("id").collect()) == {
- Row(id="0", countAsString="2"),
- Row(id="1", countAsString="2"),
- }
+ def check_results(batch_df, batch_id):
+ if batch_id == 0:
+ assert set(batch_df.sort("id").collect()) == {
+ Row(id="0", countAsString="2"),
+ Row(id="1", countAsString="2"),
+ }
+ else:
Review Comment:
It is only happening with processing time timer mode and I don't expect a
lot of tests would use that.
But I'm fine either way. I'm OK to use the same approach for all timer modes.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -439,6 +439,25 @@ class IncrementalExecution(
eventTimeWatermarkForEviction = iwEviction)
))
+ // UpdateEventTimeColumnExec is used to tag the eventTime column, and
validate
+ // emitted rows adhere to watermark in the output of
transformWithStateInp.
+ // Hence, this node shares the same watermark value as
TransformWithStateInPandasExec.
+ // This is the same as above in TransformWithStateExec.
+ // The only difference is TransformWithStateInPandasExec is analysed
slightly different
+ // in the physical plan with `Project` and it is not
`SerializeFromObjectExec`.
+ case UpdateEventTimeColumnExec(eventTime, delay, None,
+ ProjectExec(projectList, t: TransformWithStateInPandasExec))
Review Comment:
It's probably easier to not allow pushdown between UpdateEventTimeColumnExec
and TransformWithStateExec.
I see filter is not allowed to be pushed down below
UpdateEventTimeColumnExec, so it is OK. This might be the case of column
pruning -
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L1036-L1045
This could insert the project down to children nodes; we can disallow this
for UpdateEventTimeColumnExec, like following:
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L1031-L1032
Could you give a try with this and see you don't see Projection? If that
works, we can just consider UpdateEventTimeColumnExec and
TransformWithStateExec to be always close to each other.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]