[jira] [Updated] (SPARK-21034) Allow filter pushdown filters through non deterministic functions for columns involved in groupby / join

2017-08-09 Thread Abhijit Bhole (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhijit Bhole updated SPARK-21034:
--
Description: 
If the column is involved in aggregation / join then pushing down filter should 
not change the results.

Here is a sample code - 


{code:java}
from pyspark.sql import functions as F

df = spark.createDataFrame([{ "a": 1, "b" : 2, "c":7}, { "a": 3, "b" : 4, "c" : 
8},
   { "a": 1, "b" : 5, "c":7}, { "a": 1, "b" : 6, "c":7} 
])

df.groupBy(["a"]).agg(F.sum("b")).where("a = 1").explain()

df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()

== Physical Plan ==
*HashAggregate(keys=[a#15L], functions=[sum(b#16L)])
+- Exchange hashpartitioning(a#15L, 4)
   +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L)])
  +- *Project [a#15L, b#16L]
 +- *Filter (isnotnull(a#15L) && (a#15L = 1))
+- Scan ExistingRDD[a#15L,b#16L,c#17L]
>>>
>>> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()
== Physical Plan ==
*Filter (isnotnull(a#15L) && (a#15L = 1))
+- *HashAggregate(keys=[a#15L], functions=[sum(b#16L), first(c#17L, false)])
   +- Exchange hashpartitioning(a#15L, 4)
  +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L), 
partial_first(c#17L, false)])
 +- Scan ExistingRDD[a#15L,b#16L,c#17L]
{code}


As you can see, the filter is not pushed down when F.first aggregate function 
is used.

  was:
Here is a sample code - 


{code:java}
from pyspark.sql import functions as F

df = spark.createDataFrame([{ "a": 1, "b" : 2, "c":7}, { "a": 3, "b" : 4, "c" : 
8},
   { "a": 1, "b" : 5, "c":7}, { "a": 1, "b" : 6, "c":7} 
])

df.groupBy(["a"]).agg(F.sum("b")).where("a = 1").explain()

df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()

== Physical Plan ==
*HashAggregate(keys=[a#15L], functions=[sum(b#16L)])
+- Exchange hashpartitioning(a#15L, 4)
   +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L)])
  +- *Project [a#15L, b#16L]
 +- *Filter (isnotnull(a#15L) && (a#15L = 1))
+- Scan ExistingRDD[a#15L,b#16L,c#17L]
>>>
>>> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()
== Physical Plan ==
*Filter (isnotnull(a#15L) && (a#15L = 1))
+- *HashAggregate(keys=[a#15L], functions=[sum(b#16L), first(c#17L, false)])
   +- Exchange hashpartitioning(a#15L, 4)
  +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L), 
partial_first(c#17L, false)])
 +- Scan ExistingRDD[a#15L,b#16L,c#17L]
{code}


As you can see, the filter is not pushed down when F.first aggregate function 
is used.


> Allow filter pushdown filters through non deterministic functions for columns 
> involved in groupby / join
> 
>
> Key: SPARK-21034
> URL: https://issues.apache.org/jira/browse/SPARK-21034
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.1.1, 2.2.0
>Reporter: Abhijit Bhole
>
> If the column is involved in aggregation / join then pushing down filter 
> should not change the results.
> Here is a sample code - 
> {code:java}
> from pyspark.sql import functions as F
> df = spark.createDataFrame([{ "a": 1, "b" : 2, "c":7}, { "a": 3, "b" : 4, "c" 
> : 8},
>{ "a": 1, "b" : 5, "c":7}, { "a": 1, "b" : 6, 
> "c":7} ])
> df.groupBy(["a"]).agg(F.sum("b")).where("a = 1").explain()
> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()
> == Physical Plan ==
> *HashAggregate(keys=[a#15L], functions=[sum(b#16L)])
> +- Exchange hashpartitioning(a#15L, 4)
>+- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L)])
>   +- *Project [a#15L, b#16L]
>  +- *Filter (isnotnull(a#15L) && (a#15L = 1))
> +- Scan ExistingRDD[a#15L,b#16L,c#17L]
> >>>
> >>> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()
> == Physical Plan ==
> *Filter (isnotnull(a#15L) && (a#15L = 1))
> +- *HashAggregate(keys=[a#15L], functions=[sum(b#16L), first(c#17L, false)])
>+- Exchange hashpartitioning(a#15L, 4)
>   +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L), 
> partial_first(c#17L, false)])
>  +- Scan ExistingRDD[a#15L,b#16L,c#17L]
> {code}
> As you can see, the filter is not pushed down when F.first aggregate function 
> is used.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21034) Allow filter pushdown filters through non deterministic functions for columns involved in groupby / join

2017-08-09 Thread Abhijit Bhole (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhijit Bhole updated SPARK-21034:
--
Summary: Allow filter pushdown filters through non deterministic functions 
for columns involved in groupby / join  (was: Filter not getting pushed down 
the groupBy clause when first() or last() aggregate function is used)

> Allow filter pushdown filters through non deterministic functions for columns 
> involved in groupby / join
> 
>
> Key: SPARK-21034
> URL: https://issues.apache.org/jira/browse/SPARK-21034
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.1.1, 2.2.0
>Reporter: Abhijit Bhole
>
> Here is a sample code - 
> {code:java}
> from pyspark.sql import functions as F
> df = spark.createDataFrame([{ "a": 1, "b" : 2, "c":7}, { "a": 3, "b" : 4, "c" 
> : 8},
>{ "a": 1, "b" : 5, "c":7}, { "a": 1, "b" : 6, 
> "c":7} ])
> df.groupBy(["a"]).agg(F.sum("b")).where("a = 1").explain()
> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()
> == Physical Plan ==
> *HashAggregate(keys=[a#15L], functions=[sum(b#16L)])
> +- Exchange hashpartitioning(a#15L, 4)
>+- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L)])
>   +- *Project [a#15L, b#16L]
>  +- *Filter (isnotnull(a#15L) && (a#15L = 1))
> +- Scan ExistingRDD[a#15L,b#16L,c#17L]
> >>>
> >>> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()
> == Physical Plan ==
> *Filter (isnotnull(a#15L) && (a#15L = 1))
> +- *HashAggregate(keys=[a#15L], functions=[sum(b#16L), first(c#17L, false)])
>+- Exchange hashpartitioning(a#15L, 4)
>   +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L), 
> partial_first(c#17L, false)])
>  +- Scan ExistingRDD[a#15L,b#16L,c#17L]
> {code}
> As you can see, the filter is not pushed down when F.first aggregate function 
> is used.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org