HeartSaVioR commented on code in PR #48124:
URL: https://github.com/apache/spark/pull/48124#discussion_r1837434759


##########
python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py:
##########
@@ -241,11 +235,15 @@ def check_results(batch_df, _):
 
     # test list state with ttl has the same behavior as list state when state 
doesn't expire.
     def test_transform_with_state_in_pandas_list_state_large_ttl(self):
-        def check_results(batch_df, _):
-            assert set(batch_df.sort("id").collect()) == {
-                Row(id="0", countAsString="2"),
-                Row(id="1", countAsString="2"),
-            }
+        def check_results(batch_df, batch_id):
+            if batch_id == 0:
+                assert set(batch_df.sort("id").collect()) == {
+                    Row(id="0", countAsString="2"),
+                    Row(id="1", countAsString="2"),
+                }
+            else:

Review Comment:
   It is only happening with processing time timer mode and I don't expect a 
lot of tests would use that.
   
   But I'm fine either way. I'm OK to use the same approach for all timer modes.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -439,6 +439,25 @@ class IncrementalExecution(
               eventTimeWatermarkForEviction = iwEviction)
           ))
 
+      // UpdateEventTimeColumnExec is used to tag the eventTime column, and 
validate
+      // emitted rows adhere to watermark in the output of 
transformWithStateInp.
+      // Hence, this node shares the same watermark value as 
TransformWithStateInPandasExec.
+      // This is the same as above in TransformWithStateExec.
+      // The only difference is TransformWithStateInPandasExec is analysed 
slightly different
+      // in the physical plan with `Project` and it is not 
`SerializeFromObjectExec`.
+      case UpdateEventTimeColumnExec(eventTime, delay, None,
+      ProjectExec(projectList, t: TransformWithStateInPandasExec))

Review Comment:
   It's probably easier to not allow pushdown between UpdateEventTimeColumnExec 
and TransformWithStateExec.
   
   I see filter is not allowed to be pushed down below 
UpdateEventTimeColumnExec, so it is OK. This might be the case of column 
pruning - 
   
   
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L1036-L1045
   
   This could insert the project down to children nodes; we can disallow this 
for UpdateEventTimeColumnExec, like following:
   
   
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L1031-L1032
   
   Could you give a try with this and see you don't see Projection? If that 
works, we can just consider UpdateEventTimeColumnExec and 
TransformWithStateExec to be always close to each other.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to