[ 
https://issues.apache.org/jira/browse/SPARK-38288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luis Lozano Coira updated SPARK-38288:
--------------------------------------
    Description: 
I am establishing a connection with postgresql using the Spark SQL jdbc 
datasource. I have started the spark shell including the postgres driver and I 
can connect and execute queries without problems. I am using this statement:
{code:java}
val df = spark.read.format("jdbc").option("url", 
"jdbc:postgresql://host:port/").option("driver", 
"org.postgresql.Driver").option("dbtable", "test").option("user", 
"postgres").option("password", 
"*******").option("pushDownAggregate",true).load()
{code}
I am adding the pushDownAggregate option because I would like the aggregations 
are delegated to the source. But for some reason this is not happening.

Reviewing this pull request, it seems that this feature should be merged into 
3.2. [https://github.com/apache/spark/pull/29695]

I am making the aggregations considering the mentioned limitations. An example 
case where I don't see pushdown being done would be this one:
{code:java}
df.groupBy("name").max("age").show()
{code}

The results of the queryExecution are shown below:

{code:java}
scala> df.groupBy("name").max("age").queryExecution.executedPlan
res19: org.apache.spark.sql.execution.SparkPlan =
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[name#274], functions=[max(age#246)], output=[name#274, 
max(age)#544])
   +- Exchange hashpartitioning(name#274, 200), ENSURE_REQUIREMENTS, [id=#205]
      +- HashAggregate(keys=[name#274], functions=[partial_max(age#246)], 
output=[name#274, max#548])
         +- Scan JDBCRelation(test) [numPartitions=1] [age#246,name#274] 
PushedAggregates: [], PushedFilters: [], PushedGroupby: [], ReadSchema: 
struct<age:int,name:string>

scala> dfp.groupBy("name").max("age").queryExecution.toString
res20: String =
"== Parsed Logical Plan ==
Aggregate [name#274], [name#274, max(age#246) AS max(age)#581]
+- Relation [age#246] JDBCRelation(test) [numPartitions=1]

== Analyzed Logical Plan ==
name: string, max(age): int
Aggregate [name#274], [name#274, max(age#246) AS max(age)#581]
+- Relation [age#24...
{code}

What could be the problem? Should pushDownAggregate work in this case?

  was:
I am establishing a connection with postgresql using the Spark SQL jdbc 
datasource. I have started the spark shell including the postgres driver and I 
can connect and execute queries without problems. I am using this statement:
{code:java}
val df = spark.read.format("jdbc").option("url", 
"jdbc:postgresql://host:port/").option("driver", 
"org.postgresql.Driver").option("dbtable", "test").option("user", 
"postgres").option("password", 
"*******").option("pushDownAggregate",true).load()
{code}
I am adding the pushDownAggregate option because I would like the aggregations 
are delegated to the source. But for some reason this is not happening.

Reviewing this pull request, it seems that this feature should be merged into 
3.2. [https://github.com/apache/spark/pull/29695]

I am making the aggregations considering the mentioned limitations. An example 
case where I don't see pushdown being done would be this one:
{code:java}
df.groupBy("name").max("age").show()
{code}
What could be the problem? Should pushDownAggregate work in this case?


> Aggregate push down doesnt work using Spark SQL jdbc datasource with 
> postgresql
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-38288
>                 URL: https://issues.apache.org/jira/browse/SPARK-38288
>             Project: Spark
>          Issue Type: Question
>          Components: SQL
>    Affects Versions: 3.2.1
>            Reporter: Luis Lozano Coira
>            Priority: Major
>              Labels: DataSource, Spark-SQL
>
> I am establishing a connection with postgresql using the Spark SQL jdbc 
> datasource. I have started the spark shell including the postgres driver and 
> I can connect and execute queries without problems. I am using this statement:
> {code:java}
> val df = spark.read.format("jdbc").option("url", 
> "jdbc:postgresql://host:port/").option("driver", 
> "org.postgresql.Driver").option("dbtable", "test").option("user", 
> "postgres").option("password", 
> "*******").option("pushDownAggregate",true).load()
> {code}
> I am adding the pushDownAggregate option because I would like the 
> aggregations are delegated to the source. But for some reason this is not 
> happening.
> Reviewing this pull request, it seems that this feature should be merged into 
> 3.2. [https://github.com/apache/spark/pull/29695]
> I am making the aggregations considering the mentioned limitations. An 
> example case where I don't see pushdown being done would be this one:
> {code:java}
> df.groupBy("name").max("age").show()
> {code}
> The results of the queryExecution are shown below:
> {code:java}
> scala> df.groupBy("name").max("age").queryExecution.executedPlan
> res19: org.apache.spark.sql.execution.SparkPlan =
> AdaptiveSparkPlan isFinalPlan=false
> +- HashAggregate(keys=[name#274], functions=[max(age#246)], output=[name#274, 
> max(age)#544])
>    +- Exchange hashpartitioning(name#274, 200), ENSURE_REQUIREMENTS, [id=#205]
>       +- HashAggregate(keys=[name#274], functions=[partial_max(age#246)], 
> output=[name#274, max#548])
>          +- Scan JDBCRelation(test) [numPartitions=1] [age#246,name#274] 
> PushedAggregates: [], PushedFilters: [], PushedGroupby: [], ReadSchema: 
> struct<age:int,name:string>
> scala> dfp.groupBy("name").max("age").queryExecution.toString
> res20: String =
> "== Parsed Logical Plan ==
> Aggregate [name#274], [name#274, max(age#246) AS max(age)#581]
> +- Relation [age#246] JDBCRelation(test) [numPartitions=1]
> == Analyzed Logical Plan ==
> name: string, max(age): int
> Aggregate [name#274], [name#274, max(age#246) AS max(age)#581]
> +- Relation [age#24...
> {code}
> What could be the problem? Should pushDownAggregate work in this case?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to