[jira] [Commented] (SPARK-23985) predicate push down doesn't work with simple compound partition spec

2018-09-24 Thread Yuming Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625876#comment-16625876
 ] 

Yuming Wang commented on SPARK-23985:
-

Thanks [~uzadude] I will deep dive it.

> predicate push down doesn't work with simple compound partition spec
> 
>
> Key: SPARK-23985
> URL: https://issues.apache.org/jira/browse/SPARK-23985
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Ohad Raviv
>Priority: Minor
>
> while predicate push down works with this query: 
> {code:sql}
> select * from (
>select *, row_number() over (partition by a order by b) from t1
> )z 
> where a>1
> {code}
> it dowsn't work with:
> {code:sql}
> select * from (
>select *, row_number() over (partition by concat(a,'lit') order by b) from 
> t1
> )z 
> where a>1
> {code}
>  
>  I added a test to FilterPushdownSuite which I think recreates the problem:
> {code}
>   test("Window: predicate push down -- ohad") {
> val winExpr = windowExpr(count('b),
>   windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame))
> val originalQuery = testRelation.select('a, 'b, 'c, 
> winExpr.as('window)).where('a > 1)
> val correctAnswer = testRelation
>   .where('a > 1).select('a, 'b, 'c)
>   .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil)
>   .select('a, 'b, 'c, 'window).analyze
> comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
>   }
> {code}
> will try to create a PR with a correction



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23985) predicate push down doesn't work with simple compound partition spec

2018-09-24 Thread Ohad Raviv (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625448#comment-16625448
 ] 

Ohad Raviv commented on SPARK-23985:


{quote}You should move where("a>'1'") before withColumn:{quote}

this is exactly the issue I've opened.
the Optimizer should understand this on its own.

> predicate push down doesn't work with simple compound partition spec
> 
>
> Key: SPARK-23985
> URL: https://issues.apache.org/jira/browse/SPARK-23985
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Ohad Raviv
>Priority: Minor
>
> while predicate push down works with this query: 
> {code:sql}
> select *, row_number() over (partition by a order by b) from t1 where a>1
> {code}
> it dowsn't work with:
> {code:sql}
> select *, row_number() over (partition by concat(a,'lit') order by b) from t1 
> where a>1
> {code}
>  
> I added a test to FilterPushdownSuite which I think recreates the problem:
> {code:scala}
>   test("Window: predicate push down -- ohad") {
> val winExpr = windowExpr(count('b),
>   windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame))
> val originalQuery = testRelation.select('a, 'b, 'c, 
> winExpr.as('window)).where('a > 1)
> val correctAnswer = testRelation
>   .where('a > 1).select('a, 'b, 'c)
>   .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil)
>   .select('a, 'b, 'c, 'window).analyze
> comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
>   }
> {code}
> will try to create a PR with a correction



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23985) predicate push down doesn't work with simple compound partition spec

2018-09-24 Thread Ohad Raviv (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625436#comment-16625436
 ] 

Ohad Raviv commented on SPARK-23985:


the same is true for Spark 2.4:
{code}
sparkSession.range(10).selectExpr("cast(id as string) as a", "id as b", 
"id").write.saveAsTable("t1")
val w = sparkSession.sql(
  "select *, row_number() over (partition by concat(a,'lit') order by b) from 
t1 where a>'1'")
w.explain

val windowSpec = Window.partitionBy(concat(col("a"), lit("lit"))).orderBy("b")
sparkSession.table("t1").withColumn("d", row_number() over windowSpec)
  .where("a>'1'")
  .explain
{code}
plans:
{code}
== Physical Plan ==
*(3) Project [a#11, b#12L, id#13L, row_number() OVER (PARTITION BY concat(a, 
lit) ORDER BY b ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW)#22]
+- Window [row_number() windowspecdefinition(_w0#23, b#12L ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
row_number() OVER (PARTITION BY concat(a, lit) ORDER BY b ASC NULLS FIRST ROWS 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#22], [_w0#23], [b#12L ASC NULLS 
FIRST]
   +- *(2) Sort [_w0#23 ASC NULLS FIRST, b#12L ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(_w0#23, 1)
 +- *(1) Project [a#11, b#12L, id#13L, concat(a#11, lit) AS _w0#23]
+- *(1) Filter (isnotnull(a#11) && (a#11 > 1))
   +- *(1) FileScan parquet default.t1[a#11,b#12L,id#13L] Batched: 
true, Format: Parquet, Location: 
InMemoryFileIndex[file:../catalyst/spark-warehouse/t1], PartitionFilters: [], 
PushedFilters: [IsNotNull(a), GreaterThan(a,1)], ReadSchema: 
struct


== Physical Plan ==
*(3) Project [a#11, b#12L, id#13L, d#28]
+- *(3) Filter (isnotnull(a#11) && (a#11 > 1))
   +- Window [row_number() windowspecdefinition(_w0#29, b#12L ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS d#28], 
[_w0#29], [b#12L ASC NULLS FIRST]
  +- *(2) Sort [_w0#29 ASC NULLS FIRST, b#12L ASC NULLS FIRST], false, 0
 +- Exchange hashpartitioning(_w0#29, 1)
+- *(1) Project [a#11, b#12L, id#13L, concat(a#11, lit) AS _w0#29]
   +- *(1) FileScan parquet default.t1[a#11,b#12L,id#13L] Batched: 
true, Format: Parquet, Location: 
InMemoryFileIndex[file:../catalyst/spark-warehouse/t1], PartitionFilters: [], 
PushedFilters: [], ReadSchema: struct
{code}

> predicate push down doesn't work with simple compound partition spec
> 
>
> Key: SPARK-23985
> URL: https://issues.apache.org/jira/browse/SPARK-23985
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Ohad Raviv
>Priority: Minor
>
> while predicate push down works with this query: 
> {code:sql}
> select *, row_number() over (partition by a order by b) from t1 where a>1
> {code}
> it dowsn't work with:
> {code:sql}
> select *, row_number() over (partition by concat(a,'lit') order by b) from t1 
> where a>1
> {code}
>  
> I added a test to FilterPushdownSuite which I think recreates the problem:
> {code:scala}
>   test("Window: predicate push down -- ohad") {
> val winExpr = windowExpr(count('b),
>   windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame))
> val originalQuery = testRelation.select('a, 'b, 'c, 
> winExpr.as('window)).where('a > 1)
> val correctAnswer = testRelation
>   .where('a > 1).select('a, 'b, 'c)
>   .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil)
>   .select('a, 'b, 'c, 'window).analyze
> comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
>   }
> {code}
> will try to create a PR with a correction



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23985) predicate push down doesn't work with simple compound partition spec

2018-09-24 Thread Yuming Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625432#comment-16625432
 ] 

Yuming Wang commented on SPARK-23985:
-

This works:
{code:scala}
import org.apache.spark.sql.functions._
spark.range(10).selectExpr(
  "cast(id as string) a",
  "id as b").write.saveAsTable("t1")
val windowSpec = Window.partitionBy(concat(col("a"), lit("lit"))).orderBy("b")
spark.table("t1").where("a>'1'").withColumn("d", row_number() over 
windowSpec).explain{code}

{noformat}
== Physical Plan ==
*(3) Project [a#8, b#9L, d#13]
+- Window [row_number() windowspecdefinition(_w0#19, b#9L ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS d#13], 
[_w0#19], [b#9L ASC NULLS FIRST]
   +- *(2) Sort [_w0#19 ASC NULLS FIRST, b#9L ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(_w0#19, 5)
 +- *(1) Project [a#8, b#9L, concat(a#8, lit) AS _w0#19]
+- *(1) Filter (isnotnull(a#8) && (a#8 > 1))
   +- *(1) FileScan parquet default.t1[a#8,b#9L] Batched: true, 
DataFilters: [isnotnull(a#8), (a#8 > 1)], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/yumwang/opensource/spark/core/spark-warehouse/t1],
 PartitionFilters: [], PushedFilters: [IsNotNull(a), GreaterThan(a,1)], 
ReadSchema: struct

{noformat}


> predicate push down doesn't work with simple compound partition spec
> 
>
> Key: SPARK-23985
> URL: https://issues.apache.org/jira/browse/SPARK-23985
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Ohad Raviv
>Priority: Minor
>
> while predicate push down works with this query: 
> {code:sql}
> select *, row_number() over (partition by a order by b) from t1 where a>1
> {code}
> it dowsn't work with:
> {code:sql}
> select *, row_number() over (partition by concat(a,'lit') order by b) from t1 
> where a>1
> {code}
>  
> I added a test to FilterPushdownSuite which I think recreates the problem:
> {code:scala}
>   test("Window: predicate push down -- ohad") {
> val winExpr = windowExpr(count('b),
>   windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame))
> val originalQuery = testRelation.select('a, 'b, 'c, 
> winExpr.as('window)).where('a > 1)
> val correctAnswer = testRelation
>   .where('a > 1).select('a, 'b, 'c)
>   .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil)
>   .select('a, 'b, 'c, 'window).analyze
> comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
>   }
> {code}
> will try to create a PR with a correction



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23985) predicate push down doesn't work with simple compound partition spec

2018-09-24 Thread Ohad Raviv (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625422#comment-16625422
 ] 

Ohad Raviv commented on SPARK-23985:


you're right. that's very strange. looks like something got lost in 
translation. 
when I'm running you're example (which is actually mine..) indeed I get the 
right plan. However, if I try my original code it is still the un-optimized 
plan (with Spark 2.3):
{code}
import org.apache.spark.sql.functions._
spark.range(10).selectExpr(
  "cast(id as string) a",
  "id as b").write.saveAsTable("t1")
val windowSpec = Window.partitionBy(concat(col("a"), 
lit("lit"))).orderBy("b")
spark.table("t1").withColumn("d", row_number() over windowSpec)
  .where("a>'1'")
  .explain
{code}
{code}
== Physical Plan ==
*(3) Project [a#8, b#9L, d#13]
+- *(3) Filter (isnotnull(a#8) && (a#8 > 1))
   +- Window [row_number() windowspecdefinition(_w0#14, b#9L ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS d#13], 
[_w0#14], [b#9L ASC NULLS FIRST]
  +- *(2) Sort [_w0#14 ASC NULLS FIRST, b#9L ASC NULLS FIRST], false, 0
 +- Exchange hashpartitioning(_w0#14, 2)
+- *(1) Project [a#8, b#9L, concat(a#8, lit) AS _w0#14]
   +- *(1) FileScan parquet unitest.t1[a#8,b#9L] Batched: true, 
Format: Parquet, Location: InMemoryFileIndex[../t1], PartitionFilters: [], 
PushedFilters: [], ReadSchema: struct
{code}
can you understand the diff?

 

> predicate push down doesn't work with simple compound partition spec
> 
>
> Key: SPARK-23985
> URL: https://issues.apache.org/jira/browse/SPARK-23985
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Ohad Raviv
>Priority: Minor
>
> while predicate push down works with this query: 
> {code:sql}
> select *, row_number() over (partition by a order by b) from t1 where a>1
> {code}
> it dowsn't work with:
> {code:sql}
> select *, row_number() over (partition by concat(a,'lit') order by b) from t1 
> where a>1
> {code}
>  
> I added a test to FilterPushdownSuite which I think recreates the problem:
> {code:scala}
>   test("Window: predicate push down -- ohad") {
> val winExpr = windowExpr(count('b),
>   windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame))
> val originalQuery = testRelation.select('a, 'b, 'c, 
> winExpr.as('window)).where('a > 1)
> val correctAnswer = testRelation
>   .where('a > 1).select('a, 'b, 'c)
>   .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil)
>   .select('a, 'b, 'c, 'window).analyze
> comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
>   }
> {code}
> will try to create a PR with a correction



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23985) predicate push down doesn't work with simple compound partition spec

2018-09-23 Thread Yuming Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625305#comment-16625305
 ] 

Yuming Wang commented on SPARK-23985:
-

[~uzadude] It seems already works:
{code:scala}
withTable("t1") {
  withSQLConf(SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_LEVEL.key -> "warn") {
spark.range(10).selectExpr("cast(id as string) as a", "id as b", 
"id").write.saveAsTable("t1")
val w = spark.sql(
  "select *, row_number() over (partition by concat(a,'lit') order by b) 
from t1 where a>'1'")
w.explain()
  }
}
{code}

{noformat}
== Physical Plan ==
*(3) Project [a#11, b#12L, id#13L, row_number() OVER (PARTITION BY concat(a, 
lit) ORDER BY b ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW)#22]
+- Window [row_number() windowspecdefinition(_w0#23, b#12L ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
row_number() OVER (PARTITION BY concat(a, lit) ORDER BY b ASC NULLS FIRST ROWS 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#22], [_w0#23], [b#12L ASC NULLS 
FIRST]
   +- *(2) Sort [_w0#23 ASC NULLS FIRST, b#12L ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(_w0#23, 5)
 +- *(1) Project [a#11, b#12L, id#13L, concat(a#11, lit) AS _w0#23]
+- *(1) Filter (isnotnull(a#11) && (a#11 > 1))
   +- *(1) FileScan parquet default.t1[a#11,b#12L,id#13L] Batched: 
true, DataFilters: [isnotnull(a#11), (a#11 > 1)], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/yumwang/opensource/spark/core/spark-warehouse/t1],
 PartitionFilters: [], PushedFilters: [IsNotNull(a), GreaterThan(a,1)], 
ReadSchema: struct
17:58:56.582 WARN org.apache.spark.sql.DataFrameSuite: 
{noformat}


> predicate push down doesn't work with simple compound partition spec
> 
>
> Key: SPARK-23985
> URL: https://issues.apache.org/jira/browse/SPARK-23985
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Ohad Raviv
>Priority: Minor
>
> while predicate push down works with this query: 
> {code:sql}
> select *, row_number() over (partition by a order by b) from t1 where a>1
> {code}
> it dowsn't work with:
> {code:sql}
> select *, row_number() over (partition by concat(a,'lit') order by b) from t1 
> where a>1
> {code}
>  
> I added a test to FilterPushdownSuite which I think recreates the problem:
> {code:scala}
>   test("Window: predicate push down -- ohad") {
> val winExpr = windowExpr(count('b),
>   windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame))
> val originalQuery = testRelation.select('a, 'b, 'c, 
> winExpr.as('window)).where('a > 1)
> val correctAnswer = testRelation
>   .where('a > 1).select('a, 'b, 'c)
>   .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil)
>   .select('a, 'b, 'c, 'window).analyze
> comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
>   }
> {code}
> will try to create a PR with a correction



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23985) predicate push down doesn't work with simple compound partition spec

2018-04-16 Thread Ohad Raviv (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16439114#comment-16439114
 ] 

Ohad Raviv commented on SPARK-23985:


I see in the Optimizer that filters are getting pushed only if they appear in 
the partitionSpec as they are.
Looks like we need to add to Expression some kind of property that indicates 
weather we can push through it.
More trivial example than Concat could bu Struct.
[~cloud_fan] - I see you have dealt with this code about a year ago, could you 
please take a look?

Ohad.

> predicate push down doesn't work with simple compound partition spec
> 
>
> Key: SPARK-23985
> URL: https://issues.apache.org/jira/browse/SPARK-23985
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Ohad Raviv
>Priority: Minor
>
> while predicate push down works with this query: 
> {code:sql}
> select *, row_number() over (partition by a order by b) from t1 where a>1
> {code}
> it dowsn't work with:
> {code:sql}
> select *, row_number() over (partition by concat(a,'lit') order by b) from t1 
> where a>1
> {code}
>  
> I added a test to FilterPushdownSuite which I think recreates the problem:
> {code:scala}
>   test("Window: predicate push down -- ohad") {
> val winExpr = windowExpr(count('b),
>   windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame))
> val originalQuery = testRelation.select('a, 'b, 'c, 
> winExpr.as('window)).where('a > 1)
> val correctAnswer = testRelation
>   .where('a > 1).select('a, 'b, 'c)
>   .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil)
>   .select('a, 'b, 'c, 'window).analyze
> comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
>   }
> {code}
> will try to create a PR with a correction



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org