HeartSaVioR commented on code in PR #48124:
URL: https://github.com/apache/spark/pull/48124#discussion_r1836131863


##########
python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py:
##########
@@ -542,15 +554,85 @@ def check_results(batch_df, batch_id):
                     Row(id="a", timestamp="20"),
                     Row(id="a-expired", timestamp="0"),
                 }
-            else:
+            elif batch_id == 2:
                 # watermark has not progressed, so timer registered in batch 
1(watermark = 10)
                 # has not yet expired
                 assert set(batch_df.sort("id").collect()) == {Row(id="a", 
timestamp="15")}
+            else:
+                for q in self.spark.streams.active:
+                    q.stop()
 
         self._test_transform_with_state_in_pandas_event_time(
             EventTimeStatefulProcessor(), check_results
         )
 
+    def _test_transform_with_state_in_pandas_chaining_ops(

Review Comment:
   I don't see any caller for this. Have you missed to add caller?



##########
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala:
##########
@@ -529,7 +530,31 @@ class RelationalGroupedDataset protected[sql](
         initialStateSchema = initialState.df.schema
       )
     }
-    Dataset.ofRows(df.sparkSession, plan)
+
+    if (eventTimeColumnName.isEmpty) {
+      Dataset.ofRows(df.sparkSession, plan)
+    } else {
+      updateEventTimeColumnAfterTransformWithState(plan, eventTimeColumnName)
+    }
+  }
+
+  /**
+   * Creates a new dataset with updated eventTimeColumn after the 
transformWithState
+   * logical node.
+   */
+  private def updateEventTimeColumnAfterTransformWithState(
+      transformWithStateInPandas: LogicalPlan,
+      eventTimeColumnName: String): DataFrame = {
+    val transformWithStateDataset = Dataset.ofRows(
+      df.sparkSession,
+      transformWithStateInPandas
+    )
+
+    Dataset.ofRows(df.sparkSession, EliminateEventTimeWatermark(

Review Comment:
   EliminateEventTimeWatermark is no longer needed as we now have this in 
Analyzer rule.
   
   But maybe we could deal with this in separate PR to address other cases as 
well.



##########
python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py:
##########
@@ -26,13 +26,7 @@
 
 from pyspark import SparkConf
 from pyspark.sql.functions import split
-from pyspark.sql.types import (
-    StringType,
-    StructType,
-    StructField,
-    Row,
-    IntegerType,
-)
+from pyspark.sql.types import StringType, StructType, StructField, Row, 
IntegerType, TimestampType

Review Comment:
   nit: probably linter may complain?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala:
##########
@@ -191,8 +223,15 @@ case class TransformWithStateInPandasExec(
     val updatesStartTimeNs = currentTimeNs
 
     val (dedupAttributes, argOffsets) = resolveArgOffsets(child.output, 
groupingAttributes)
-    val data =
-      groupAndProject(dataIterator, groupingAttributes, child.output, 
dedupAttributes)
+    // If timeout is based on event time, then filter late data based on 
watermark
+    val filteredIter = watermarkPredicateForDataForLateEvents match {

Review Comment:
   Wow, I wonder how we could miss this... Nice catch! I was about to ask for a 
new test covering this, but I'll revisit once you update the test as your new 
test seems to also contain this part.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -439,6 +439,25 @@ class IncrementalExecution(
               eventTimeWatermarkForEviction = iwEviction)
           ))
 
+      // UpdateEventTimeColumnExec is used to tag the eventTime column, and 
validate
+      // emitted rows adhere to watermark in the output of 
transformWithStateInp.
+      // Hence, this node shares the same watermark value as 
TransformWithStateInPandasExec.
+      // This is the same as above in TransformWithStateExec.
+      // The only difference is TransformWithStateInPandasExec is analysed 
slightly different
+      // in the physical plan with `Project` and it is not 
`SerializeFromObjectExec`.
+      case UpdateEventTimeColumnExec(eventTime, delay, None,
+      ProjectExec(projectList, t: TransformWithStateInPandasExec))

Review Comment:
   I wonder why Project is necessary between `UpdateEventTimeColumn` and 
`TransformWithStateInPandas` - could you please check the value of projectList? 
If that value is the same with output of TransformWithStateInPandasExec, 
optimizer might be able to remove out Project (in future). If that's the case, 
we may want to address the pattern of not having a Project in between as well.



##########
python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py:
##########
@@ -241,11 +235,15 @@ def check_results(batch_df, _):
 
     # test list state with ttl has the same behavior as list state when state 
doesn't expire.
     def test_transform_with_state_in_pandas_list_state_large_ttl(self):
-        def check_results(batch_df, _):
-            assert set(batch_df.sort("id").collect()) == {
-                Row(id="0", countAsString="2"),
-                Row(id="1", countAsString="2"),
-            }
+        def check_results(batch_df, batch_id):
+            if batch_id == 0:
+                assert set(batch_df.sort("id").collect()) == {
+                    Row(id="0", countAsString="2"),
+                    Row(id="1", countAsString="2"),
+                }
+            else:

Review Comment:
   Instead of doing this in all places, shall we find the way for pytest to 
perform the same with before() / after()?
   
   Also, since we are reading from files, why not just use 
Trigger.AvailableNow() which will terminate the query automatically instead of 
doing this? sleep(10) won't be necessary as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to