peter-toth commented on code in PR #39887:
URL: https://github.com/apache/spark/pull/39887#discussion_r1096707745


##########
sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala:
##########
@@ -2598,4 +2598,29 @@ class SubquerySuite extends QueryTest
         Row("aa"))
     }
   }
+
+  test("SPARK-42346: Rewrite distinct aggregates after merging subqueries") {
+    withTempView("t1") {
+      Seq((1, 2), (3, 4)).toDF("c1", "c2").createOrReplaceTempView("t1")
+
+      checkAnswer(sql(
+        """
+          |SELECT
+          |  (SELECT count(distinct c1) FROM t1),
+          |  (SELECT count(distinct c2) FROM t1)
+          |""".stripMargin),
+        Row(2, 2))

Review Comment:
   No, the 2 plans differ as expected. After `MergeScalarSubqueries` the 2 
subqueries became identical and both contain `count(DISTINCT c1)` and 
`count(DISTINCT c2)` in `mergedValue`.
   The final physical plan with the reuse makes the improvement more visible:
   ```
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=true
   +- == Final Plan ==
      *(1) Project [Subquery subquery#251, [id=#117].count(DISTINCT c1) AS 
scalarsubquery()#259L, ReusedSubquery Subquery subquery#251, 
[id=#117].count(DISTINCT c2) AS scalarsubquery()#260L]
      :  :- Subquery subquery#251, [id=#117]
      :  :  +- AdaptiveSparkPlan isFinalPlan=true
            +- == Final Plan ==
               *(3) Project [named_struct(count(DISTINCT c1), count(DISTINCT 
c1)#254L, count(DISTINCT c2), count(DISTINCT c2)#256L) AS mergedValue#265]
               +- *(3) HashAggregate(keys=[], functions=[count(t1.c1#267), 
count(c2#268)], output=[count(DISTINCT c1)#254L, count(DISTINCT c2)#256L])
                  +- ShuffleQueryStage 1
                     +- Exchange SinglePartition, ENSURE_REQUIREMENTS, 
[plan_id=228]
                        +- *(2) HashAggregate(keys=[], 
functions=[partial_count(t1.c1#267) FILTER (WHERE (gid#266 = 1)), 
partial_count(c2#268) FILTER (WHERE (gid#266 = 2))], output=[count#271L, 
count#272L])
                           +- *(2) HashAggregate(keys=[t1.c1#267, c2#268, 
gid#266], functions=[], output=[t1.c1#267, c2#268, gid#266])
                              +- AQEShuffleRead coalesced
                                 +- ShuffleQueryStage 0
                                    +- Exchange hashpartitioning(t1.c1#267, 
c2#268, gid#266, 5), ENSURE_REQUIREMENTS, [plan_id=193]
                                       +- *(1) HashAggregate(keys=[t1.c1#267, 
c2#268, gid#266], functions=[], output=[t1.c1#267, c2#268, gid#266])
                                          +- *(1) Expand [[c1#247, null, 1], 
[null, c2#248, 2]], [t1.c1#267, c2#268, gid#266]
                                             +- *(1) Project [_1#242 AS c1#247, 
_2#243 AS c2#248]
                                                +- *(1) LocalTableScan [_1#242, 
_2#243]
            +- == Initial Plan ==
               Project [named_struct(count(DISTINCT c1), count(DISTINCT 
c1)#254L, count(DISTINCT c2), count(DISTINCT c2)#256L) AS mergedValue#265]
               +- HashAggregate(keys=[], functions=[count(t1.c1#267), 
count(c2#268)], output=[count(DISTINCT c1)#254L, count(DISTINCT c2)#256L])
                  +- Exchange SinglePartition, ENSURE_REQUIREMENTS, 
[plan_id=114]
                     +- HashAggregate(keys=[], 
functions=[partial_count(t1.c1#267) FILTER (WHERE (gid#266 = 1)), 
partial_count(c2#268) FILTER (WHERE (gid#266 = 2))], output=[count#271L, 
count#272L])
                        +- HashAggregate(keys=[t1.c1#267, c2#268, gid#266], 
functions=[], output=[t1.c1#267, c2#268, gid#266])
                           +- Exchange hashpartitioning(t1.c1#267, c2#268, 
gid#266, 5), ENSURE_REQUIREMENTS, [plan_id=110]
                              +- HashAggregate(keys=[t1.c1#267, c2#268, 
gid#266], functions=[], output=[t1.c1#267, c2#268, gid#266])
                                 +- Expand [[c1#247, null, 1], [null, c2#248, 
2]], [t1.c1#267, c2#268, gid#266]
                                    +- Project [_1#242 AS c1#247, _2#243 AS 
c2#248]
                                       +- LocalTableScan [_1#242, _2#243]
      :  +- ReusedSubquery Subquery subquery#251, [id=#117]
      +- *(1) Scan OneRowRelation[]
   +- == Initial Plan ==
   ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to