maropu commented on a change in pull request #29695:
URL: https://github.com/apache/spark/pull/29695#discussion_r496713408
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
##########
@@ -643,6 +647,34 @@ object DataSourceStrategy {
(nonconvertiblePredicates ++ unhandledPredicates, pushedFilters,
handledFilters)
}
+ def translateAggregate(aggregates: AggregateExpression):
Option[AggregateFunc] = {
+
+ def columnAsString(e: Expression): String = e match {
+ case AttributeReference(name, _, _, _) => name
+ case Cast(child, _, _) => child match {
Review comment:
Is this safe? Aggregating casted values and casting aggregated values
looks different.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
##########
@@ -643,6 +647,34 @@ object DataSourceStrategy {
(nonconvertiblePredicates ++ unhandledPredicates, pushedFilters,
handledFilters)
}
+ def translateAggregate(aggregates: AggregateExpression):
Option[AggregateFunc] = {
+
+ def columnAsString(e: Expression): String = e match {
+ case AttributeReference(name, _, _, _) => name
+ case Cast(child, _, _) => child match {
+ case AttributeReference(name, _, _, _) => name
+ case _ => ""
+ }
+ case _ => ""
+ }
+
+ aggregates.aggregateFunction match {
Review comment:
What if `aggregates` has `isDistinct=true` or `filter`?
##########
File path: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
##########
@@ -109,6 +120,111 @@ class JDBCV2Suite extends QueryTest with
SharedSparkSession {
checkAnswer(df, Row("mary"))
}
+ test("scan with aggregate push-down") {
+ val df1 = sql("select MAX(SALARY), MIN(BONUS) FROM h2.test.employee where
dept > 0" +
+ " group by DEPT")
+ // df1.explain(true)
+ // scalastyle:off line.size.limit
+ // == Parsed Logical Plan ==
+ // 'Aggregate ['DEPT], [unresolvedalias('MAX('SALARY), None),
unresolvedalias('MIN('BONUS), None)]
+ // +- 'Filter ('dept > 0)
+ // +- 'UnresolvedRelation [h2, test, employee], []
+ //
+ // == Analyzed Logical Plan ==
+ // max(SALARY): int, min(BONUS): int
+ // Aggregate [DEPT#0], [max(SALARY#2) AS max(SALARY)#6, min(BONUS#3) AS
min(BONUS)#7]
+ // +- Filter (dept#0 > 0)
+ // +- SubqueryAlias h2.test.employee
+ // +- RelationV2[DEPT#0, NAME#1, SALARY#2, BONUS#3] test.employee
+ //
+ // == Optimized Logical Plan ==
+ // Aggregate [DEPT#0], [max(max(SALARY)#13) AS max(SALARY)#6,
min(min(BONUS)#14) AS min(BONUS)#7]
+ // +- RelationV2[DEPT#0, max(SALARY)#13, min(BONUS)#14] test.employee
+ //
+ // == Physical Plan ==
+ // *(2) HashAggregate(keys=[DEPT#0], functions=[max(max(SALARY)#13),
min(min(BONUS)#14)], output=[max(SALARY)#6, min(BONUS)#7])
+ // +- Exchange hashpartitioning(DEPT#0, 5), true, [id=#13]
+ // +- *(1) HashAggregate(keys=[DEPT#0],
functions=[partial_max(max(SALARY)#13), partial_min(min(BONUS)#14)],
output=[DEPT#0, max#17, min#18])
Review comment:
We need this partial aggregate in this case? We can replace it with
`Project`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]