HeartSaVioR commented on code in PR #48149:
URL: https://github.com/apache/spark/pull/48149#discussion_r1766393298
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryOptimizationCorrectnessSuite.scala:
##########
@@ -524,4 +524,66 @@ class StreamingQueryOptimizationCorrectnessSuite extends
StreamTest {
doTest(numExpectedStatefulOperatorsForOneEmptySource = 1)
}
}
+
+ test("SPARK-49699: observe node is not pruned out from PruneFilters") {
+ val input1 = MemoryStream[Int]
+ val df = input1.toDF()
+ .withColumn("eventTime", timestamp_seconds($"value"))
+ .observe("observation", count(lit(1)).as("rows"))
+ // Enforce PruneFilters to come into play and prune subtree. We could do
the same
+ // with the reproducer of SPARK-48267, but let's just be simpler.
+ .filter(expr("false"))
+
+ testStream(df)(
+ AddData(input1, 1, 2, 3),
+ CheckNewAnswer(),
+ Execute { qe =>
+ val observeRow = qe.lastExecution.observedMetrics.get("observation")
+ assert(observeRow.get.getAs[Long]("rows") == 3L)
+ }
+ )
+ }
+
+ test("SPARK-49699: watermark node is not pruned out from PruneFilters") {
+ // NOTE: The test actually passes without SPARK-49699, because of the
trickiness of
+ // filter pushdown and PruneFilters. Unlike observe node, the `false`
filter is pushed down
+ // below to watermark node, hence PruneFilters rule does not prune out
watermark node even
+ // before SPARK-49699. Propagate empty relation does not also propagate
emptiness into
+ // watermark node, so the node is retained. The test is added for
preventing regression.
+
+ val input1 = MemoryStream[Int]
+ val df = input1.toDF()
+ .withColumn("eventTime", timestamp_seconds($"value"))
+ .withWatermark("eventTime", "0 second")
+ // Enforce PruneFilter to come into play and prune subtree. We could do
the same
+ // with the reproducer of SPARK-48267, but let's just be simpler.
+ .filter(expr("false"))
+
+ testStream(df)(
+ AddData(input1, 1, 2, 3),
+ CheckNewAnswer(),
+ Execute { qe =>
+ // If the watermark node is pruned out, this would be null.
+ assert(qe.lastProgress.eventTime.get("watermark") != null)
+ }
+ )
+ }
+
+ test("SPARK-49699: stateful operator node is not pruned out from
PruneFilters") {
+ val input1 = MemoryStream[Int]
+ val df = input1.toDF()
+ .groupBy("value")
+ .count()
+ // Enforce PruneFilter to come into play and prune subtree. We could do
the same
+ // with the reproducer of SPARK-48267, but let's just be simpler.
+ .filter(expr("false"))
+
+ testStream(df, OutputMode.Complete())(
+ AddData(input1, 1, 2, 3),
+ CheckNewAnswer(),
+ Execute { qe =>
+ assert(qe.lastProgress.stateOperators.length == 1)
Review Comment:
Without SPARK-49699, the (stateful) streaming aggregation operator is lost.
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryOptimizationCorrectnessSuite.scala:
##########
@@ -524,4 +524,66 @@ class StreamingQueryOptimizationCorrectnessSuite extends
StreamTest {
doTest(numExpectedStatefulOperatorsForOneEmptySource = 1)
}
}
+
+ test("SPARK-49699: observe node is not pruned out from PruneFilters") {
+ val input1 = MemoryStream[Int]
+ val df = input1.toDF()
+ .withColumn("eventTime", timestamp_seconds($"value"))
+ .observe("observation", count(lit(1)).as("rows"))
+ // Enforce PruneFilters to come into play and prune subtree. We could do
the same
+ // with the reproducer of SPARK-48267, but let's just be simpler.
+ .filter(expr("false"))
+
+ testStream(df)(
+ AddData(input1, 1, 2, 3),
+ CheckNewAnswer(),
+ Execute { qe =>
+ val observeRow = qe.lastExecution.observedMetrics.get("observation")
Review Comment:
Without [SPARK-49699](https://issues.apache.org/jira/browse/SPARK-49699), we
couldn't find the observed metrics named "observation".
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]