sunchao commented on a change in pull request #34445:
URL: https://github.com/apache/spark/pull/34445#discussion_r741361243
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -457,17 +457,22 @@ object OrcUtils extends Logging {
}
}
+ // if there are group by columns, we will build result row first,
+ // and then append group by columns values (partition columns values) to
the result row.
+ val schemaWithoutGroupby =
Review comment:
nit: `schemaWithoutGroupBy`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/AggregatePushDownUtils.scala
##########
@@ -138,4 +141,19 @@ object AggregatePushDownUtils {
converter.convert(aggregatesAsRow, columnVectors.toArray)
new ColumnarBatch(columnVectors.asInstanceOf[Array[ColumnVector]], 1)
}
+
+ /**
+ * Return the schema for aggregates only (exclude group by columns)
+ */
+ def getSchemaWithoutGroupingExpression(
+ aggregation: Aggregation,
+ aggSchema: StructType): StructType = {
+ val groupByColNums = aggregation.groupByColumns.length
Review comment:
nit: `groupByColumns`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -195,7 +201,12 @@ object ParquetUtils {
case (_, i) =>
throw new SparkException("Unexpected parquet type name: " +
primitiveTypeNames(i))
}
- converter.currentRecord
+
+ if (aggregation.groupByColumns.length > 0) {
+ new JoinedRow(partitionValues, converter.currentRecord)
Review comment:
just curious how we make sure the `partitionValues` is always consistent
with `aggregation.groupByColumns`? in terms of ordering.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -195,7 +201,12 @@ object ParquetUtils {
case (_, i) =>
throw new SparkException("Unexpected parquet type name: " +
primitiveTypeNames(i))
}
- converter.currentRecord
+
+ if (aggregation.groupByColumns.length > 0) {
Review comment:
nit: maybe use `aggregation.groupByColumns.nonEmpty`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/AggregatePushDownUtils.scala
##########
@@ -138,4 +141,19 @@ object AggregatePushDownUtils {
converter.convert(aggregatesAsRow, columnVectors.toArray)
new ColumnarBatch(columnVectors.asInstanceOf[Array[ColumnVector]], 1)
}
+
+ /**
+ * Return the schema for aggregates only (exclude group by columns)
+ */
+ def getSchemaWithoutGroupingExpression(
Review comment:
nit: maybe `getSchemaWithoutGroupColumns`?
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceAggregatePushDownSuite.scala
##########
@@ -261,6 +261,63 @@ trait FileSourceAggregatePushDownSuite
}
}
+ test("aggregate with partition group by can be pushed down") {
Review comment:
nit: add JIRA number
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]