rf972 commented on a change in pull request #29695:
URL: https://github.com/apache/spark/pull/29695#discussion_r525272170
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -73,33 +77,25 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] {
postScanFilters)
Aggregate(groupingExpressions, resultExpressions, plan)
} else {
- val resultAttributes = resultExpressions.map(_.toAttribute)
- .map ( e => e match { case a: AttributeReference => a })
- var index = 0
val aggOutputBuilder = ArrayBuilder.make[AttributeReference]
- for (a <- resultAttributes) {
- val newName = if (a.name.contains("FILTER")) {
- a.name.substring(0, a.name.indexOf("FILTER") - 1)
- } else if (a.name.contains("DISTINCT")) {
- a.name.replace("DISTINCT ", "")
- } else {
- a.name
- }
-
- aggOutputBuilder +=
- a.copy(name = newName,
- dataType = aggregates(index).dataType)(exprId =
NamedExpression.newExprId,
- qualifier = a.qualifier)
- index += 1
+ for (a <- aggregates) {
+ aggOutputBuilder += AttributeReference(toPrettySQL(a),
a.dataType)()
}
val aggOutput = aggOutputBuilder.result
- var newOutput = aggOutput
- for (col <- output) {
- if (!aggOutput.exists(_.name.contains(col.name))) {
- newOutput = col +: newOutput
+ val newOutputBuilder = ArrayBuilder.make[AttributeReference]
+ for (col1 <- output) {
+ var found = false
+ for (col2 <- aggOutput) {
+ if (contains(col2.name, col1.name)) {
Review comment:
Thanks @huaxingao for incorporating our changes in the patch !
Here is a brief overview of our TPCH testing with this patch. We initially
picked two queries from TPCH which should show immediate benefits from
aggregate pushdown. The Q01 test was our first choice, but we put that aside
for now since it has UDFs and the count operator. So now we are focusing on
the Q06 test. Our simulations show that the filter pushdown in Q06 reduces the
data transfer (from storage/database) significantly (in our simulated case from
700 MB down to about 1.5 MB). Our simulations also show that aggregate
pushdown further reduces the data transfer from that 1.5 MB down to just 34
bytes.
The Q06 TPCH test requires support for an expression under an aggregate. So
something like sum(x * y). We are actively working on this now, and once we
finish adding this support (within a week or so, we hope), we will share the
code and our actual TPCH results. Also if you have any thoughts on the
direction for adding this support that would be appreciated too. Thanks!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]