wangyum commented on code in PR #39887:
URL: https://github.com/apache/spark/pull/39887#discussion_r1096671489


##########
sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala:
##########
@@ -2598,4 +2598,29 @@ class SubquerySuite extends QueryTest
         Row("aa"))
     }
   }
+
+  test("SPARK-42346: Rewrite distinct aggregates after merging subqueries") {
+    withTempView("t1") {
+      Seq((1, 2), (3, 4)).toDF("c1", "c2").createOrReplaceTempView("t1")
+
+      checkAnswer(sql(
+        """
+          |SELECT
+          |  (SELECT count(distinct c1) FROM t1),
+          |  (SELECT count(distinct c2) FROM t1)
+          |""".stripMargin),
+        Row(2, 2))

Review Comment:
   It seems `MergeScalarSubqueries` can not improve this case:
   Spark 3.2:
   ```
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=false
   +- Project [Subquery subquery#185, [id=#223] AS scalarsubquery()#198L, 
Subquery subquery#186, [id=#242] AS scalarsubquery()#199L]
      :  :- Subquery subquery#185, [id=#223]
      :  :  +- AdaptiveSparkPlan isFinalPlan=false
      :  :     +- HashAggregate(keys=[], functions=[count(distinct c1#153)], 
output=[count(DISTINCT c1)#193L])
      :  :        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, 
[plan_id=221]
      :  :           +- HashAggregate(keys=[], 
functions=[partial_count(distinct c1#153)], output=[count#202L])
      :  :              +- HashAggregate(keys=[c1#153], functions=[], 
output=[c1#153])
      :  :                 +- Exchange hashpartitioning(c1#153, 200), 
ENSURE_REQUIREMENTS, [plan_id=217]
      :  :                    +- HashAggregate(keys=[c1#153], functions=[], 
output=[c1#153])
      :  :                       +- FileScan parquet default.t1[c1#153] 
Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 
paths)[file:/Users/yumwang/Downloads/spark-3.2.3-bin-hadoop2.7/spark-warehous...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c1:int>
      :  +- Subquery subquery#186, [id=#242]
      :     +- AdaptiveSparkPlan isFinalPlan=false
      :        +- HashAggregate(keys=[], functions=[count(distinct c2#197)], 
output=[count(DISTINCT c2)#195L])
      :           +- Exchange SinglePartition, ENSURE_REQUIREMENTS, 
[plan_id=240]
      :              +- HashAggregate(keys=[], 
functions=[partial_count(distinct c2#197)], output=[count#206L])
      :                 +- HashAggregate(keys=[c2#197], functions=[], 
output=[c2#197])
      :                    +- Exchange hashpartitioning(c2#197, 200), 
ENSURE_REQUIREMENTS, [plan_id=236]
      :                       +- HashAggregate(keys=[c2#197], functions=[], 
output=[c2#197])
      :                          +- FileScan parquet default.t1[c2#197] 
Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 
paths)[file:/Users/yumwang/Downloads/spark-3.2.3-bin-hadoop2.7/spark-warehous...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c2:int>
      +- Scan OneRowRelation[]
   ```
   
   After `MergeScalarSubqueries` and this PR:
   ```
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=false
   +- Project [Subquery subquery#0, [id=#36].count(DISTINCT c1) AS 
scalarsubquery()#11L, Subquery subquery#1, [id=#63].count(DISTINCT c2) AS 
scalarsubquery()#12L]
      :  :- Subquery subquery#0, [id=#36]
      :  :  +- AdaptiveSparkPlan isFinalPlan=false
      :  :     +- Project [named_struct(count(DISTINCT c1), count(DISTINCT 
c1)#5L, count(DISTINCT c2), count(DISTINCT c2)#8L) AS mergedValue#17]
      :  :        +- HashAggregate(keys=[], 
functions=[count(spark_catalog.default.t1.c1#19), count(c2#20)], 
output=[count(DISTINCT c1)#5L, count(DISTINCT c2)#8L])
      :  :           +- Exchange SinglePartition, ENSURE_REQUIREMENTS, 
[plan_id=33]
      :  :              +- HashAggregate(keys=[], 
functions=[partial_count(spark_catalog.default.t1.c1#19) FILTER (WHERE (gid#18 
= 1)), partial_count(c2#20) FILTER (WHERE (gid#18 = 2))], output=[count#23L, 
count#24L])
      :  :                 +- 
HashAggregate(keys=[spark_catalog.default.t1.c1#19, c2#20, gid#18], 
functions=[], output=[spark_catalog.default.t1.c1#19, c2#20, gid#18])
      :  :                    +- Exchange 
hashpartitioning(spark_catalog.default.t1.c1#19, c2#20, gid#18, 5), 
ENSURE_REQUIREMENTS, [plan_id=29]
      :  :                       +- 
HashAggregate(keys=[spark_catalog.default.t1.c1#19, c2#20, gid#18], 
functions=[], output=[spark_catalog.default.t1.c1#19, c2#20, gid#18])
      :  :                          +- Expand [[c1#2, null, 1], [null, c2#3, 
2]], [spark_catalog.default.t1.c1#19, c2#20, gid#18]
      :  :                             +- FileScan parquet 
spark_catalog.default.t1[c1#2,c2#3] Batched: true, DataFilters: [], Format: 
Parquet, Location: InMemoryFileIndex(1 
paths)[file:/Users/yumwang/opensource/spark/spark-warehouse/org.apache.spark....,
 PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c1:int,c2:int>
      :  +- Subquery subquery#1, [id=#63]
      :     +- AdaptiveSparkPlan isFinalPlan=false
      :        +- Project [named_struct(count(DISTINCT c1), count(DISTINCT 
c1)#5L, count(DISTINCT c2), count(DISTINCT c2)#8L) AS mergedValue#17]
      :           +- HashAggregate(keys=[], 
functions=[count(spark_catalog.default.t1.c1#19), count(c2#20)], 
output=[count(DISTINCT c1)#5L, count(DISTINCT c2)#8L])
      :              +- Exchange SinglePartition, ENSURE_REQUIREMENTS, 
[plan_id=60]
      :                 +- HashAggregate(keys=[], 
functions=[partial_count(spark_catalog.default.t1.c1#19) FILTER (WHERE (gid#18 
= 1)), partial_count(c2#20) FILTER (WHERE (gid#18 = 2))], output=[count#23L, 
count#24L])
      :                    +- 
HashAggregate(keys=[spark_catalog.default.t1.c1#19, c2#20, gid#18], 
functions=[], output=[spark_catalog.default.t1.c1#19, c2#20, gid#18])
      :                       +- Exchange 
hashpartitioning(spark_catalog.default.t1.c1#19, c2#20, gid#18, 5), 
ENSURE_REQUIREMENTS, [plan_id=56]
      :                          +- 
HashAggregate(keys=[spark_catalog.default.t1.c1#19, c2#20, gid#18], 
functions=[], output=[spark_catalog.default.t1.c1#19, c2#20, gid#18])
      :                             +- Expand [[c1#2, null, 1], [null, c2#3, 
2]], [spark_catalog.default.t1.c1#19, c2#20, gid#18]
      :                                +- FileScan parquet 
spark_catalog.default.t1[c1#2,c2#3] Batched: true, DataFilters: [], Format: 
Parquet, Location: InMemoryFileIndex(1 
paths)[file:/Users/yumwang/opensource/spark/spark-warehouse/org.apache.spark....,
 PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c1:int,c2:int>
      +- Scan OneRowRelation[]
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to