sunchao commented on a change in pull request #34445:
URL: https://github.com/apache/spark/pull/34445#discussion_r750528925



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/AggregatePushDownUtils.scala
##########
@@ -81,19 +81,26 @@ object AggregatePushDownUtils {
       }
     }
 
-    if (aggregation.groupByColumns.nonEmpty || dataFilters.nonEmpty) {
+    if (dataFilters.nonEmpty) {
       // Parquet/ORC footer has max/min/count for columns
       // e.g. SELECT COUNT(col1) FROM t
       // but footer doesn't have max/min/count for a column if max/min/count
       // are combined with filter or group by
       // e.g. SELECT COUNT(col1) FROM t WHERE col2 = 8
       //      SELECT COUNT(col1) FROM t GROUP BY col2
       // However, if the filter is on partition column, max/min/count can 
still be pushed down
-      // Todo:  add support if groupby column is partition col
-      //        (https://issues.apache.org/jira/browse/SPARK-36646)
       return None
     }
 
+    if (aggregation.groupByColumns.nonEmpty &&

Review comment:
       nit: maybe add some comments explaining the reasoning why we have this 
check and only support the case when group by columns is the same as partition 
columns. What if the number of group by columns is smaller than that of 
partition columns?

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceAggregatePushDownSuite.scala
##########
@@ -261,6 +261,65 @@ trait FileSourceAggregatePushDownSuite
     }
   }
 
+  test("aggregate with partition group by can be pushed down") {
+    withTempPath { dir =>
+      spark.range(10).selectExpr("id", "id % 3 as p")
+        .write.partitionBy("p").format(format).save(dir.getCanonicalPath)
+      withTempView("tmp") {
+        
spark.read.format(format).load(dir.getCanonicalPath).createOrReplaceTempView("tmp");
+        Seq("false", "true").foreach { enableVectorizedReader =>
+          withSQLConf(aggPushDownEnabledKey -> "true",
+            vectorizedReaderEnabledKey -> enableVectorizedReader) {
+            val df = sql("SELECT count(*), count(id), p, max(id), p, count(p), 
max(id)," +
+              "  min(id), p FROM tmp group by p")
+            df.queryExecution.optimizedPlan.collect {
+              case _: DataSourceV2ScanRelation =>
+                val expected_plan_fragment =
+                  "PushedAggregation: [COUNT(*), COUNT(id), MAX(id), COUNT(p), 
MIN(id)], " +
+                    "PushedFilters: [], PushedGroupBy: [p]"
+                checkKeywordsExistsInExplain(df, expected_plan_fragment)
+            }
+            checkAnswer(df, Seq(Row(3, 3, 1, 7, 1, 3, 7, 1, 1), Row(3, 3, 2, 
8, 2, 3, 8, 2, 2),
+              Row(4, 4, 0, 9, 0, 4, 9, 0, 0)))
+          }
+        }
+      }
+    }
+  }
+
+  test("aggregate with multi partition group by columns can be pushed down") {
+    withTempPath { dir =>
+      Seq((10, 1, 2, 5, 6), (2, 1, 2, 5, 6), (3, 2, 1, 4, 8), (4, 2, 1, 4, 9),
+        (5, 2, 1, 5, 8), (6, 2, 1, 4, 8), (1, 1, 2, 5, 6), (4, 1, 2, 5, 6),
+        (3, 2, 2, 9, 10), (-4, 2, 2, 9, 10), (6, 2, 2, 9, 10))
+        .toDF("value", "p1", "p2", "p3", "p4")
+        .write
+        .partitionBy("p2", "p1", "p4", "p3")
+        .format(format)
+        .save(dir.getCanonicalPath)
+      withTempView("tmp") {
+        
spark.read.format(format).load(dir.getCanonicalPath).createOrReplaceTempView("tmp");
+        Seq("false", "true").foreach { enableVectorizedReader =>
+          withSQLConf(aggPushDownEnabledKey -> "true",
+            vectorizedReaderEnabledKey -> enableVectorizedReader) {
+            val df = sql("SELECT count(*), count(value), max(value), 
min(value)," +
+              " p4, p2, p3, p1 FROM tmp GROUP BY p1, p2, p3, p4")
+            df.queryExecution.optimizedPlan.collect {
+              case _: DataSourceV2ScanRelation =>
+                val expected_plan_fragment =
+                  "PushedAggregation: [COUNT(*), COUNT(value), MAX(value), 
MIN(value)]," +
+                    " PushedFilters: [], PushedGroupBy: [p1, p2, p3, p4]"
+                // checkKeywordsExistsInExplain(df, expected_plan_fragment)

Review comment:
       nit: remove this?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/AggregatePushDownUtils.scala
##########
@@ -81,19 +81,26 @@ object AggregatePushDownUtils {
       }
     }
 
-    if (aggregation.groupByColumns.nonEmpty || dataFilters.nonEmpty) {
+    if (dataFilters.nonEmpty) {
       // Parquet/ORC footer has max/min/count for columns
       // e.g. SELECT COUNT(col1) FROM t
       // but footer doesn't have max/min/count for a column if max/min/count
       // are combined with filter or group by
       // e.g. SELECT COUNT(col1) FROM t WHERE col2 = 8
       //      SELECT COUNT(col1) FROM t GROUP BY col2
       // However, if the filter is on partition column, max/min/count can 
still be pushed down
-      // Todo:  add support if groupby column is partition col
-      //        (https://issues.apache.org/jira/browse/SPARK-36646)
       return None
     }
 
+    if (aggregation.groupByColumns.nonEmpty &&
+      partitionNames.size != aggregation.groupByColumns.length) {
+      return None
+    }
+    aggregation.groupByColumns.foreach { col =>

Review comment:
       nit: maybe also add some comments here - it's not that easy to 
understand and can help the maintenance of this code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to