maropu commented on a change in pull request #29695:
URL: https://github.com/apache/spark/pull/29695#discussion_r602060646



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
##########
@@ -191,6 +191,9 @@ class JDBCOptions(
   // An option to allow/disallow pushing down predicate into JDBC data source
   val pushDownPredicate = parameters.getOrElse(JDBC_PUSHDOWN_PREDICATE, 
"true").toBoolean
 
+  // An option to allow/disallow pushing down aggregate into JDBC data source
+  val pushDownAggregate = parameters.getOrElse(JDBC_PUSHDOWN_AGGREGATE, 
"false").toBoolean

Review comment:
       I think this should be documented in 
https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
##########
@@ -700,6 +704,51 @@ object DataSourceStrategy
     (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, 
handledFilters)
   }
 
+  private def columnAsString(e: Expression): String = e match {
+    case AttributeReference(name, _, _, _) => name
+    case Cast(child, _, _) => columnAsString (child)
+    case Add(left, right, _) =>
+      columnAsString(left) + " + " + columnAsString(right)
+    case Subtract(left, right, _) =>
+      columnAsString(left) + " - " + columnAsString(right)
+    case Multiply(left, right, _) =>
+      columnAsString(left) + " * " + columnAsString(right)
+    case Divide(left, right, _) =>
+      columnAsString(left) + " / " + columnAsString(right)
+    case CheckOverflow(child, _, _) => columnAsString (child)
+    case PromotePrecision(child) => columnAsString (child)
+    case _ => ""
+  }
+
+  protected[sql] def translateAggregate(aggregates: AggregateExpression): 
Option[AggregateFunc] = {
+

Review comment:
       nit: unnecessary blank.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
##########
@@ -700,6 +704,41 @@ object DataSourceStrategy
     (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, 
handledFilters)
   }
 
+  private def columnAsString(e: Expression): String = e match {

Review comment:
       > This covers a lot of cases but also makes it easy to break. We can 
begin with simplest case and add more supports later.
   
   +1
   
   

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
##########
@@ -181,21 +248,53 @@ private[jdbc] class JDBCRDD(
     filters: Array[Filter],
     partitions: Array[Partition],
     url: String,
-    options: JDBCOptions)
+    options: JDBCOptions,
+    aggregation: Aggregation = Aggregation.empty)
   extends RDD[InternalRow](sc, Nil) {
 
   /**
    * Retrieve the list of partitions corresponding to this RDD.
    */
   override def getPartitions: Array[Partition] = partitions
 
+  private var updatedSchema: StructType = new StructType()

Review comment:
       Could we avoid to use `var` here?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala
##########
@@ -77,15 +77,25 @@ case class Count(children: Seq[Expression]) extends 
DeclarativeAggregate {
 
   override def defaultResult: Option[Literal] = Option(Literal(0L))
 
+  private[sql] var pushDown: Boolean = false

Review comment:
       I feel putting this variable for the pushdown feature does not look a 
good design. We cannot extend `Count` for pushdown-specific counting?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
##########
@@ -700,6 +704,51 @@ object DataSourceStrategy
     (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, 
handledFilters)
   }
 
+  private def columnAsString(e: Expression): String = e match {
+    case AttributeReference(name, _, _, _) => name
+    case Cast(child, _, _) => columnAsString (child)
+    case Add(left, right, _) =>
+      columnAsString(left) + " + " + columnAsString(right)
+    case Subtract(left, right, _) =>
+      columnAsString(left) + " - " + columnAsString(right)
+    case Multiply(left, right, _) =>
+      columnAsString(left) + " * " + columnAsString(right)
+    case Divide(left, right, _) =>
+      columnAsString(left) + " / " + columnAsString(right)
+    case CheckOverflow(child, _, _) => columnAsString (child)
+    case PromotePrecision(child) => columnAsString (child)
+    case _ => ""
+  }
+
+  protected[sql] def translateAggregate(aggregates: AggregateExpression): 
Option[AggregateFunc] = {
+
+    aggregates.aggregateFunction match {
+      case min: aggregate.Min =>
+        val colName = columnAsString(min.child)
+        if (colName.nonEmpty) Some(Min(colName, min.dataType)) else None
+      case max: aggregate.Max =>
+        val colName = columnAsString(max.child)
+        if (colName.nonEmpty) Some(Max(colName, max.dataType)) else None
+      case avg: aggregate.Average =>
+        val colName = columnAsString(avg.child)
+        if (colName.nonEmpty) Some(Avg(colName, avg.dataType, 
aggregates.isDistinct)) else None
+      case sum: aggregate.Sum =>
+        val colName = columnAsString(sum.child)
+        if (colName.nonEmpty) Some(Sum(colName, sum.dataType, 
aggregates.isDistinct)) else None
+      case count: aggregate.Count =>
+        val columnName = count.children.head match {
+          case Literal(_, _) =>
+            "1"

Review comment:
       nit: `          case Literal(_, _) => "1"`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to