c21 commented on a change in pull request #34445:
URL: https://github.com/apache/spark/pull/34445#discussion_r740549841



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -457,17 +457,22 @@ object OrcUtils extends Logging {
       }
     }
 
+    // if there are group by columns, we will build result set row first,
+    // and then append group by columns values (partition col values) to the 
result set row.

Review comment:
       nit: `partition col values` -> `partition columns values`
   nit: `result set row ` -> `result row`? What's `result set row`?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -490,7 +495,12 @@ object OrcUtils extends Logging {
             s"createAggInternalRowFromFooter should not take $x as the 
aggregate expression")
       }
 
-    val orcValuesDeserializer = new OrcDeserializer(aggSchema, (0 until 
aggSchema.length).toArray)
-    orcValuesDeserializer.deserializeFromValues(aggORCValues)
+    val orcValuesDeserializer = new OrcDeserializer(schemaWithoutGroupby,
+      (0 until schemaWithoutGroupby.length).toArray)
+    if (aggregation.groupByColumns.length > 0) {
+      new JoinedRow(partitionValues, 
orcValuesDeserializer.deserializeFromValues(aggORCValues))

Review comment:
       nit: `orcValuesDeserializer.deserializeFromValues(aggORCValues)` can be 
called only once before the if-else branch.

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceAggregatePushDownSuite.scala
##########
@@ -261,6 +261,63 @@ trait FileSourceAggregatePushDownSuite
     }
   }
 
+  test("aggregate with partition group by can be pushed down") {
+    withTempPath { dir =>
+      spark.range(10).selectExpr("id", "id % 3 as p")
+        .write.partitionBy("p").format(format).save(dir.getCanonicalPath)
+      withTempView("tmp") {
+        
spark.read.format(format).load(dir.getCanonicalPath).createOrReplaceTempView("tmp");
+        Seq("false", "true").foreach { enableVectorizedReader =>
+          withSQLConf(aggPushDownEnabledKey -> "true",
+            vectorizedReaderEnabledKey -> enableVectorizedReader) {
+            val df = sql("SELECT count(*), count(id), p, max(id), p, p, 
max(id), min(id), p" +

Review comment:
       shall we also add test for aggregate on group-by column? e.g. 
`count(p)`, `max(p)` etc. 

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/AggregatePushDownUtils.scala
##########
@@ -138,4 +141,14 @@ object AggregatePushDownUtils {
     converter.convert(aggregatesAsRow, columnVectors.toArray)
     new ColumnarBatch(columnVectors.asInstanceOf[Array[ColumnVector]], 1)
   }
+
+  def aggSchemaWithOutGroupBy(aggregation: Aggregation, aggSchema: 
StructType): StructType = {

Review comment:
       Shall we add some comments for the public method?
   
   Also nit: shall we call it `getSchemaWithoutGroupingExpression`? To be 
aligned with all aggregate operators, e.g. 
`BaseAggregateExec.groupingExpressions`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to