beliefer commented on a change in pull request #34904:
URL: https://github.com/apache/spark/pull/34904#discussion_r772219016



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -147,40 +148,56 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
 
                 val scanRelation = DataSourceV2ScanRelation(sHolder.relation, 
wrappedScan, output)
 
-                val plan = Aggregate(
-                  output.take(groupingExpressions.length), resultExpressions, 
scanRelation)
-
-                // scalastyle:off
-                // Change the optimized logical plan to reflect the pushed 
down aggregate
-                // e.g. TABLE t (c1 INT, c2 INT, c3 INT)
-                // SELECT min(c1), max(c1) FROM t GROUP BY c2;
-                // The original logical plan is
-                // Aggregate [c2#10],[min(c1#9) AS min(c1)#17, max(c1#9) AS 
max(c1)#18]
-                // +- RelationV2[c1#9, c2#10] ...
-                //
-                // After change the V2ScanRelation output to [c2#10, 
min(c1)#21, max(c1)#22]
-                // we have the following
-                // !Aggregate [c2#10], [min(c1#9) AS min(c1)#17, max(c1#9) AS 
max(c1)#18]
-                // +- RelationV2[c2#10, min(c1)#21, max(c1)#22] ...
-                //
-                // We want to change it to
-                // == Optimized Logical Plan ==
-                // Aggregate [c2#10], [min(min(c1)#21) AS min(c1)#17, 
max(max(c1)#22) AS max(c1)#18]
-                // +- RelationV2[c2#10, min(c1)#21, max(c1)#22] ...
-                // scalastyle:on
-                val aggOutput = output.drop(groupAttrs.length)
-                plan.transformExpressions {
-                  case agg: AggregateExpression =>
-                    val ordinal = aggExprToOutputOrdinal(agg.canonicalized)
-                    val aggFunction: aggregate.AggregateFunction =
-                      agg.aggregateFunction match {
-                        case max: aggregate.Max => max.copy(child = 
aggOutput(ordinal))
-                        case min: aggregate.Min => min.copy(child = 
aggOutput(ordinal))
-                        case sum: aggregate.Sum => sum.copy(child = 
aggOutput(ordinal))
-                        case _: aggregate.Count => 
aggregate.Sum(aggOutput(ordinal))
-                        case other => other
-                      }
-                    agg.copy(aggregateFunction = aggFunction)
+                if (r.supportCompletePushDown()) {
+                  val projectExpressions = resultExpressions.map { expr =>
+                    expr.transform {
+                      case agg: AggregateExpression =>
+                        val ordinal = aggExprToOutputOrdinal(agg.canonicalized)
+                        val aggAttribute = aggOutput(ordinal)
+                        val child = if (aggAttribute.dataType == 
agg.resultAttribute.dataType) {
+                          aggAttribute
+                        } else {
+                          Cast(aggAttribute, agg.resultAttribute.dataType)

Review comment:
       Because the JDBC protocol returns decimal(20, 2), but spark need 
decimal(32, 2)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to