[jira] [Comment Edited] (SPARK-23985) predicate push down doesn't work with simple compound partition spec
[ https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16625876#comment-16625876 ] Yuming Wang edited comment on SPARK-23985 at 9/26/18 1:54 AM: -- [~uzadude] Seem we should not push down predicate. Pelase see these test case: [https://github.com/apache/spark/blob/2c73d2a948bdde798aaf0f87c18846281deb05fd/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala#L1086-L1144] Here is an example: {code:scala} spark.range(10).selectExpr("cast(id % 5 as string) as a", "id as b").write.saveAsTable("t1") val w1 = spark.sql( "select * from (select *, row_number() over (partition by alit order by b) as rn from " + "(select *, a % 4 as alit from t1) x) y where a>2 order by a") w1.show val w2 = spark.sql( "select * from (select *, row_number() over (partition by alit order by b) as rn from " + "(select *, a % 4 as alit from t1 where a> 2) x) y order by a") w2.show {code} output: {noformat} +---+---++---+ | a| b|alit| rn| +---+---++---+ | 3| 3| 3.0| 1| | 3| 8| 3.0| 2| | 4| 4| 0.0| 2| | 4| 9| 0.0| 4| +---+---++---+ +---+---++---+ | a| b|alit| rn| +---+---++---+ | 3| 3| 3.0| 1| | 3| 8| 3.0| 2| | 4| 4| 0.0| 1| | 4| 9| 0.0| 2| +---+---++---+ {noformat} was (Author: q79969786): [~uzadude] Seem we should not push down predicate. Pelase see these test case: https://github.com/apache/spark/blob/2c73d2a948bdde798aaf0f87c18846281deb05fd/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala#L1086-L1144 > predicate push down doesn't work with simple compound partition spec > > > Key: SPARK-23985 > URL: https://issues.apache.org/jira/browse/SPARK-23985 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Ohad Raviv >Priority: Minor > > while predicate push down works with this query: > {code:sql} > select * from ( >select *, row_number() over (partition by a order by b) from t1 > )z > where a>1 > {code} > it dowsn't work with: > {code:sql} > select * from ( >select *, row_number() over (partition by concat(a,'lit') order by b) from > t1 > )z > where a>1 > {code} > > I added a test to FilterPushdownSuite which I think recreates the problem: > {code} > test("Window: predicate push down -- ohad") { > val winExpr = windowExpr(count('b), > windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) > val originalQuery = testRelation.select('a, 'b, 'c, > winExpr.as('window)).where('a > 1) > val correctAnswer = testRelation > .where('a > 1).select('a, 'b, 'c) > .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil) > .select('a, 'b, 'c, 'window).analyze > comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) > } > {code} > will try to create a PR with a correction -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23985) predicate push down doesn't work with simple compound partition spec
[ https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16625876#comment-16625876 ] Yuming Wang edited comment on SPARK-23985 at 9/25/18 2:19 PM: -- [~uzadude] Seem we should not push down predicate. Pelase see these test case: https://github.com/apache/spark/blob/2c73d2a948bdde798aaf0f87c18846281deb05fd/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala#L1086-L1144 was (Author: q79969786): Thanks [~uzadude] I will deep dive it. > predicate push down doesn't work with simple compound partition spec > > > Key: SPARK-23985 > URL: https://issues.apache.org/jira/browse/SPARK-23985 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Ohad Raviv >Priority: Minor > > while predicate push down works with this query: > {code:sql} > select * from ( >select *, row_number() over (partition by a order by b) from t1 > )z > where a>1 > {code} > it dowsn't work with: > {code:sql} > select * from ( >select *, row_number() over (partition by concat(a,'lit') order by b) from > t1 > )z > where a>1 > {code} > > I added a test to FilterPushdownSuite which I think recreates the problem: > {code} > test("Window: predicate push down -- ohad") { > val winExpr = windowExpr(count('b), > windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) > val originalQuery = testRelation.select('a, 'b, 'c, > winExpr.as('window)).where('a > 1) > val correctAnswer = testRelation > .where('a > 1).select('a, 'b, 'c) > .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil) > .select('a, 'b, 'c, 'window).analyze > comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) > } > {code} > will try to create a PR with a correction -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23985) predicate push down doesn't work with simple compound partition spec
[ https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16625448#comment-16625448 ] Ohad Raviv edited comment on SPARK-23985 at 9/24/18 7:15 AM: - {quote}You should move where("a>'1'") before withColumn: {quote} this is exactly the issue I've opened. the Optimizer should understand this on its own. I understand my original mistake in the example, and have changed it. try now. was (Author: uzadude): {quote}You should move where("a>'1'") before withColumn:{quote} this is exactly the issue I've opened. the Optimizer should understand this on its own. > predicate push down doesn't work with simple compound partition spec > > > Key: SPARK-23985 > URL: https://issues.apache.org/jira/browse/SPARK-23985 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Ohad Raviv >Priority: Minor > > while predicate push down works with this query: > {code:sql} > select * from ( >select *, row_number() over (partition by a order by b) from t1 > )z > where a>1 > {code} > it dowsn't work with: > {code:sql} > select * from ( >select *, row_number() over (partition by concat(a,'lit') order by b) from > t1 > )z > where a>1 > {code} > > I added a test to FilterPushdownSuite which I think recreates the problem: > {code} > test("Window: predicate push down -- ohad") { > val winExpr = windowExpr(count('b), > windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) > val originalQuery = testRelation.select('a, 'b, 'c, > winExpr.as('window)).where('a > 1) > val correctAnswer = testRelation > .where('a > 1).select('a, 'b, 'c) > .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil) > .select('a, 'b, 'c, 'window).analyze > comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) > } > {code} > will try to create a PR with a correction -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23985) predicate push down doesn't work with simple compound partition spec
[ https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16625436#comment-16625436 ] Ohad Raviv edited comment on SPARK-23985 at 9/24/18 7:07 AM: - the same is true for Spark 2.4: {code} sparkSession.range(10).selectExpr("cast(id as string) as a", "id as b", "id").write.saveAsTable("t1") val w = sparkSession.sql( "select *, row_number() over (partition by concat(a,'lit') order by b) from t1 where a>'1'") w.explain val windowSpec = Window.partitionBy(concat(col("a"), lit("lit"))).orderBy("b") sparkSession.table("t1").withColumn("d", row_number() over windowSpec) .where("a>'1'") .explain {code} plans: {noformat} == Physical Plan == *(3) Project [a#11, b#12L, id#13L, row_number() OVER (PARTITION BY concat(a, lit) ORDER BY b ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#22] +- Window [row_number() windowspecdefinition(_w0#23, b#12L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS row_number() OVER (PARTITION BY concat(a, lit) ORDER BY b ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#22], [_w0#23], [b#12L ASC NULLS FIRST] +- *(2) Sort [_w0#23 ASC NULLS FIRST, b#12L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(_w0#23, 1) +- *(1) Project [a#11, b#12L, id#13L, concat(a#11, lit) AS _w0#23] +- *(1) Filter (isnotnull(a#11) && (a#11 > 1)) +- *(1) FileScan parquet default.t1[a#11,b#12L,id#13L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:../catalyst/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [IsNotNull(a), GreaterThan(a,1)], ReadSchema: struct == Physical Plan == *(3) Project [a#11, b#12L, id#13L, d#28] +- *(3) Filter (isnotnull(a#11) && (a#11 > 1)) +- Window [row_number() windowspecdefinition(_w0#29, b#12L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS d#28], [_w0#29], [b#12L ASC NULLS FIRST] +- *(2) Sort [_w0#29 ASC NULLS FIRST, b#12L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(_w0#29, 1) +- *(1) Project [a#11, b#12L, id#13L, concat(a#11, lit) AS _w0#29] +- *(1) FileScan parquet default.t1[a#11,b#12L,id#13L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:../catalyst/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct {noformat} was (Author: uzadude): the same is true for Spark 2.4: {code} sparkSession.range(10).selectExpr("cast(id as string) as a", "id as b", "id").write.saveAsTable("t1") val w = sparkSession.sql( "select *, row_number() over (partition by concat(a,'lit') order by b) from t1 where a>'1'") w.explain val windowSpec = Window.partitionBy(concat(col("a"), lit("lit"))).orderBy("b") sparkSession.table("t1").withColumn("d", row_number() over windowSpec) .where("a>'1'") .explain {code} plans: {code} == Physical Plan == *(3) Project [a#11, b#12L, id#13L, row_number() OVER (PARTITION BY concat(a, lit) ORDER BY b ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#22] +- Window [row_number() windowspecdefinition(_w0#23, b#12L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS row_number() OVER (PARTITION BY concat(a, lit) ORDER BY b ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#22], [_w0#23], [b#12L ASC NULLS FIRST] +- *(2) Sort [_w0#23 ASC NULLS FIRST, b#12L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(_w0#23, 1) +- *(1) Project [a#11, b#12L, id#13L, concat(a#11, lit) AS _w0#23] +- *(1) Filter (isnotnull(a#11) && (a#11 > 1)) +- *(1) FileScan parquet default.t1[a#11,b#12L,id#13L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:../catalyst/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [IsNotNull(a), GreaterThan(a,1)], ReadSchema: struct == Physical Plan == *(3) Project [a#11, b#12L, id#13L, d#28] +- *(3) Filter (isnotnull(a#11) && (a#11 > 1)) +- Window [row_number() windowspecdefinition(_w0#29, b#12L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS d#28], [_w0#29], [b#12L ASC NULLS FIRST] +- *(2) Sort [_w0#29 ASC NULLS FIRST, b#12L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(_w0#29, 1) +- *(1) Project [a#11, b#12L, id#13L, concat(a#11, lit) AS _w0#29] +- *(1) FileScan parquet default.t1[a#11,b#12L,id#13L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:../catalyst/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct {code} > predicate push down doesn't work with simple compound partition spec > > > Key: SPARK-23985 >
[jira] [Comment Edited] (SPARK-23985) predicate push down doesn't work with simple compound partition spec
[ https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16625432#comment-16625432 ] Yuming Wang edited comment on SPARK-23985 at 9/24/18 7:07 AM: -- You should move {{where("a>'1'")}} before {{withColumn}}: {code} import org.apache.spark.sql.functions._ spark.range(10).selectExpr( "cast(id as string) a", "id as b").write.saveAsTable("t1") val windowSpec = Window.partitionBy(concat(col("a"), lit("lit"))).orderBy("b") spark.table("t1").where("a>'1'").withColumn("d", row_number() over windowSpec).explain{code} {noformat} == Physical Plan == *(3) Project [a#8, b#9L, d#13] +- Window [row_number() windowspecdefinition(_w0#19, b#9L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS d#13], [_w0#19], [b#9L ASC NULLS FIRST] +- *(2) Sort [_w0#19 ASC NULLS FIRST, b#9L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(_w0#19, 5) +- *(1) Project [a#8, b#9L, concat(a#8, lit) AS _w0#19] +- *(1) Filter (isnotnull(a#8) && (a#8 > 1)) +- *(1) FileScan parquet default.t1[a#8,b#9L] Batched: true, DataFilters: [isnotnull(a#8), (a#8 > 1)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/opensource/spark/core/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [IsNotNull(a), GreaterThan(a,1)], ReadSchema: struct {noformat} was (Author: q79969786): This works: {code:scala} import org.apache.spark.sql.functions._ spark.range(10).selectExpr( "cast(id as string) a", "id as b").write.saveAsTable("t1") val windowSpec = Window.partitionBy(concat(col("a"), lit("lit"))).orderBy("b") spark.table("t1").where("a>'1'").withColumn("d", row_number() over windowSpec).explain{code} {noformat} == Physical Plan == *(3) Project [a#8, b#9L, d#13] +- Window [row_number() windowspecdefinition(_w0#19, b#9L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS d#13], [_w0#19], [b#9L ASC NULLS FIRST] +- *(2) Sort [_w0#19 ASC NULLS FIRST, b#9L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(_w0#19, 5) +- *(1) Project [a#8, b#9L, concat(a#8, lit) AS _w0#19] +- *(1) Filter (isnotnull(a#8) && (a#8 > 1)) +- *(1) FileScan parquet default.t1[a#8,b#9L] Batched: true, DataFilters: [isnotnull(a#8), (a#8 > 1)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/opensource/spark/core/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [IsNotNull(a), GreaterThan(a,1)], ReadSchema: struct {noformat} > predicate push down doesn't work with simple compound partition spec > > > Key: SPARK-23985 > URL: https://issues.apache.org/jira/browse/SPARK-23985 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Ohad Raviv >Priority: Minor > > while predicate push down works with this query: > {code:sql} > select *, row_number() over (partition by a order by b) from t1 where a>1 > {code} > it dowsn't work with: > {code:sql} > select *, row_number() over (partition by concat(a,'lit') order by b) from t1 > where a>1 > {code} > > I added a test to FilterPushdownSuite which I think recreates the problem: > {code:scala} > test("Window: predicate push down -- ohad") { > val winExpr = windowExpr(count('b), > windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) > val originalQuery = testRelation.select('a, 'b, 'c, > winExpr.as('window)).where('a > 1) > val correctAnswer = testRelation > .where('a > 1).select('a, 'b, 'c) > .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil) > .select('a, 'b, 'c, 'window).analyze > comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) > } > {code} > will try to create a PR with a correction -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org