HeartSaVioR commented on code in PR #48149:
URL: https://github.com/apache/spark/pull/48149#discussion_r1766393298


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryOptimizationCorrectnessSuite.scala:
##########
@@ -524,4 +524,66 @@ class StreamingQueryOptimizationCorrectnessSuite extends 
StreamTest {
       doTest(numExpectedStatefulOperatorsForOneEmptySource = 1)
     }
   }
+
+  test("SPARK-49699: observe node is not pruned out from PruneFilters") {
+    val input1 = MemoryStream[Int]
+    val df = input1.toDF()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .observe("observation", count(lit(1)).as("rows"))
+      // Enforce PruneFilters to come into play and prune subtree. We could do 
the same
+      // with the reproducer of SPARK-48267, but let's just be simpler.
+      .filter(expr("false"))
+
+    testStream(df)(
+      AddData(input1, 1, 2, 3),
+      CheckNewAnswer(),
+      Execute { qe =>
+        val observeRow = qe.lastExecution.observedMetrics.get("observation")
+        assert(observeRow.get.getAs[Long]("rows") == 3L)
+      }
+    )
+  }
+
+  test("SPARK-49699: watermark node is not pruned out from PruneFilters") {
+    // NOTE: The test actually passes without SPARK-49699, because of the 
trickiness of
+    // filter pushdown and PruneFilters. Unlike observe node, the `false` 
filter is pushed down
+    // below to watermark node, hence PruneFilters rule does not prune out 
watermark node even
+    // before SPARK-49699. Propagate empty relation does not also propagate 
emptiness into
+    // watermark node, so the node is retained. The test is added for 
preventing regression.
+
+    val input1 = MemoryStream[Int]
+    val df = input1.toDF()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .withWatermark("eventTime", "0 second")
+      // Enforce PruneFilter to come into play and prune subtree. We could do 
the same
+      // with the reproducer of SPARK-48267, but let's just be simpler.
+      .filter(expr("false"))
+
+    testStream(df)(
+      AddData(input1, 1, 2, 3),
+      CheckNewAnswer(),
+      Execute { qe =>
+        // If the watermark node is pruned out, this would be null.
+        assert(qe.lastProgress.eventTime.get("watermark") != null)
+      }
+    )
+  }
+
+  test("SPARK-49699: stateful operator node is not pruned out from 
PruneFilters") {
+    val input1 = MemoryStream[Int]
+    val df = input1.toDF()
+      .groupBy("value")
+      .count()
+      // Enforce PruneFilter to come into play and prune subtree. We could do 
the same
+      // with the reproducer of SPARK-48267, but let's just be simpler.
+      .filter(expr("false"))
+
+    testStream(df, OutputMode.Complete())(
+      AddData(input1, 1, 2, 3),
+      CheckNewAnswer(),
+      Execute { qe =>
+        assert(qe.lastProgress.stateOperators.length == 1)

Review Comment:
   Without SPARK-49699, the (stateful) streaming aggregation operator is lost.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryOptimizationCorrectnessSuite.scala:
##########
@@ -524,4 +524,66 @@ class StreamingQueryOptimizationCorrectnessSuite extends 
StreamTest {
       doTest(numExpectedStatefulOperatorsForOneEmptySource = 1)
     }
   }
+
+  test("SPARK-49699: observe node is not pruned out from PruneFilters") {
+    val input1 = MemoryStream[Int]
+    val df = input1.toDF()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .observe("observation", count(lit(1)).as("rows"))
+      // Enforce PruneFilters to come into play and prune subtree. We could do 
the same
+      // with the reproducer of SPARK-48267, but let's just be simpler.
+      .filter(expr("false"))
+
+    testStream(df)(
+      AddData(input1, 1, 2, 3),
+      CheckNewAnswer(),
+      Execute { qe =>
+        val observeRow = qe.lastExecution.observedMetrics.get("observation")

Review Comment:
   Without [SPARK-49699](https://issues.apache.org/jira/browse/SPARK-49699), we 
couldn't find the observed metrics named "observation".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to