This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c3c819a68a [SPARK-42525][SQL] Collapse two adjacent windows with the 
same partition/order in subquery
0c3c819a68a is described below

commit 0c3c819a68a8fceedcb5974b2f7e30121cd464e6
Author: zml1206 <zhuml1...@gmail.com>
AuthorDate: Sun Feb 26 11:21:31 2023 +0800

    [SPARK-42525][SQL] Collapse two adjacent windows with the same 
partition/order in subquery
    
    ### What changes were proposed in this pull request?
    Extend the CollapseWindow rule to collapse Window nodes, when one window in 
subquery.
    
    ### Why are the changes needed?
    
    ```
    select a, b, c, row_number() over (partition by a order by b) as d from
    ( select a, b, rank() over (partition by a order by b) as c from t1) t2
    
    == Optimized Logical Plan ==
    before
    Window [row_number() windowspecdefinition(a#11, b#12 ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS d#26], 
[a#11], [b#12 ASC NULLS FIRST]
    +- Window [rank(b#12) windowspecdefinition(a#11, b#12 ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS c#25], 
[a#11], [b#12 ASC NULLS FIRST]
       +- InMemoryRelation [a#11, b#12], StorageLevel(disk, memory, 
deserialized, 1 replicas)
             +- *(1) Project [_1#6 AS a#11, _2#7 AS b#12]
                +- *(1) SerializeFromObject 
[knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1 AS _1#6, 
knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#7]
                   +- *(1) MapElements 
org.apache.spark.sql.DataFrameSuite$$Lambda$1517/16288483683a479fda, obj#5: 
scala.Tuple2
                      +- *(1) DeserializeToObject staticinvoke(class 
java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#0L, true, false, 
true), obj#4: java.lang.Long
                         +- *(1) Range (0, 10, step=1, splits=2)
    
    after
    Window [rank(b#12) windowspecdefinition(a#11, b#12 ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS c#25, 
row_number() windowspecdefinition(a#11, b#12 ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS d#26], 
[a#11], [b#12 ASC NULLS FIRST]
    +- InMemoryRelation [a#11, b#12], StorageLevel(disk, memory, deserialized, 
1 replicas)
          +- *(1) Project [_1#6 AS a#11, _2#7 AS b#12]
             +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, 
scala.Tuple2, true]))._1 AS _1#6, knownnotnull(assertnotnull(input[0, 
scala.Tuple2, true]))._2 AS _2#7]
                +- *(1) MapElements 
org.apache.spark.sql.DataFrameSuite$$Lambda$1518/19280286724d7a64ca, obj#5: 
scala.Tuple2
                   +- *(1) DeserializeToObject staticinvoke(class 
java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#0L, true, false, 
true), obj#4: java.lang.Long
                      +- *(1) Range (0, 10, step=1, splits=2)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT
    
    Closes #40115 from zml1206/SPARK-42525.
    
    Authored-by: zml1206 <zhuml1...@gmail.com>
    Signed-off-by: Yuming Wang <yumw...@ebay.com>
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala     |  9 +++++++--
 .../sql/catalyst/optimizer/CollapseWindowSuite.scala | 20 ++++++++++++++++++++
 .../spark/sql/DataFrameWindowFunctionsSuite.scala    | 16 ++++++++++++++++
 3 files changed, 43 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 1233f2207f5..a0d49c29470 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1251,9 +1251,14 @@ object OptimizeWindowFunctions extends Rule[LogicalPlan] 
{
  *   independent and are of the same window function type, collapse into the 
parent.
  */
 object CollapseWindow extends Rule[LogicalPlan] {
+  private def specCompatible(s1: Seq[Expression], s2: Seq[Expression]): 
Boolean = {
+    s1.length == s2.length &&
+      s1.zip(s2).forall(e => e._1.semanticEquals(e._2))
+  }
+
   private def windowsCompatible(w1: Window, w2: Window): Boolean = {
-    w1.partitionSpec == w2.partitionSpec &&
-      w1.orderSpec == w2.orderSpec &&
+    specCompatible(w1.partitionSpec, w2.partitionSpec) &&
+      specCompatible(w1.orderSpec, w2.orderSpec) &&
       w1.references.intersect(w2.windowOutputSet).isEmpty &&
       w1.windowExpressions.nonEmpty && w2.windowExpressions.nonEmpty &&
       // This assumes Window contains the same type of window expressions. 
This is ensured
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
index 63cc3554564..515203da7ca 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
@@ -148,4 +148,24 @@ class CollapseWindowSuite extends PlanTest {
 
     comparePlans(optimized, query)
   }
+
+  test("SPARK-42525: collapse two adjacent windows with the same 
partition/order " +
+    "but qualifiers are different ") {
+
+    val query = testRelation
+      .window(Seq(min(a).as("_we0")), Seq(c.withQualifier(Seq("0"))), 
Seq(c.asc))
+      .select($"a", $"b", $"c", $"_we0" as "min_a")
+      .window(Seq(max(a).as("_we1")), Seq(c.withQualifier(Seq("1"))), 
Seq(c.asc))
+      .select($"a", $"b", $"c", $"min_a", $"_we1" as "max_a")
+      .analyze
+
+    val optimized = Optimize.execute(query)
+
+    val correctAnswer = testRelation
+      .window(Seq(min(a).as("_we0"), max(a).as("_we1")), Seq(c), Seq(c.asc))
+      .select(a, b, c, $"_we0" as "min_a", $"_we1" as "max_a")
+      .analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index b9421f8b13d..1fb937e93b8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.matchers.must.Matchers.the
 import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, Lag, Literal, NonFoldableLiteral}
 import org.apache.spark.sql.catalyst.optimizer.TransposeWindow
+import org.apache.spark.sql.catalyst.plans.logical.{Window => LogicalWindow}
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, Exchange, 
ShuffleExchangeExec}
@@ -1429,4 +1430,19 @@ class DataFrameWindowFunctionsSuite extends QueryTest
       }
     }
   }
+
+  test("SPARK-42525: collapse two adjacent windows with the same 
partition/order in subquery") {
+    withTempView("t1") {
+      Seq((1, 1), (2, 2)).toDF("a", "b").createOrReplaceTempView("t1")
+      val df = sql(
+        """
+          |SELECT a, b, rk, row_number() OVER (PARTITION BY a ORDER BY b) AS rn
+          |FROM   (SELECT a, b, rank() OVER (PARTITION BY a ORDER BY b) AS rk
+          |        FROM t1) t2
+          |""".stripMargin)
+
+      val windows = df.queryExecution.optimizedPlan.collect { case w: 
LogicalWindow => w }
+      assert(windows.size === 1)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to