[jira] [Commented] (SPARK-23500) Filters on named_structs could be pushed into scans

2018-03-27 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415141#comment-16415141
 ] 

Apache Spark commented on SPARK-23500:
--

User 'gatorsmile' has created a pull request for this issue:
https://github.com/apache/spark/pull/20911

> Filters on named_structs could be pushed into scans
> ---
>
> Key: SPARK-23500
> URL: https://issues.apache.org/jira/browse/SPARK-23500
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Henry Robinson
>Assignee: Henry Robinson
>Priority: Major
> Fix For: 2.4.0
>
>
> Simple filters on dataframes joined with {{joinWith()}} are missing an 
> opportunity to get pushed into the scan because they're written in terms of 
> {{named_struct}} that could be removed by the optimizer.
> Given the following simple query over two dataframes:
> {code:java}
> scala> val df = spark.read.parquet("one_million")
> df: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> val df2 = spark.read.parquet("one_million")
> df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> df.joinWith(df2, df2.col("id") === df.col("id2")).filter("_2.id > 
> 30").explain
> == Physical Plan ==
> *(2) BroadcastHashJoin [_1#94.id2], [_2#95.id], Inner, BuildRight
> :- *(2) Project [named_struct(id, id#0L, id2, id2#1L) AS _1#94]
> :  +- *(2) FileScan parquet [id#0L,id2#1L] Batched: true, Format: Parquet, 
> Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million], 
> PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, 
> struct, false].id))
>+- *(1) Project [named_struct(id, id#90L, id2, id2#91L) AS _2#95]
>   +- *(1) Filter (named_struct(id, id#90L, id2, id2#91L).id > 30)
>  +- *(1) FileScan parquet [id#90L,id2#91L] Batched: true, Format: 
> Parquet, Location: 
> InMemoryFileIndex[file:/Users/henry/src/spark/one_million], PartitionFilters: 
> [], PushedFilters: [], ReadSchema: struct
> {code}
> Using {{joinWith}} means that the filter is placed on a {{named_struct}}, and 
> is then pushed down. When the filter is just above the scan, the 
> wrapping-and-projection of {{named_struct(id...).id}} is a no-op and could be 
> removed. Then the filter can be pushed down to Parquet.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23500) Filters on named_structs could be pushed into scans

2018-03-05 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386604#comment-16386604
 ] 

Apache Spark commented on SPARK-23500:
--

User 'henryr' has created a pull request for this issue:
https://github.com/apache/spark/pull/20687

> Filters on named_structs could be pushed into scans
> ---
>
> Key: SPARK-23500
> URL: https://issues.apache.org/jira/browse/SPARK-23500
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Henry Robinson
>Priority: Major
>
> Simple filters on dataframes joined with {{joinWith()}} are missing an 
> opportunity to get pushed into the scan because they're written in terms of 
> {{named_struct}} that could be removed by the optimizer.
> Given the following simple query over two dataframes:
> {code:java}
> scala> val df = spark.read.parquet("one_million")
> df: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> val df2 = spark.read.parquet("one_million")
> df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> df.joinWith(df2, df2.col("id") === df.col("id2")).filter("_2.id > 
> 30").explain
> == Physical Plan ==
> *(2) BroadcastHashJoin [_1#94.id2], [_2#95.id], Inner, BuildRight
> :- *(2) Project [named_struct(id, id#0L, id2, id2#1L) AS _1#94]
> :  +- *(2) FileScan parquet [id#0L,id2#1L] Batched: true, Format: Parquet, 
> Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million], 
> PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, 
> struct, false].id))
>+- *(1) Project [named_struct(id, id#90L, id2, id2#91L) AS _2#95]
>   +- *(1) Filter (named_struct(id, id#90L, id2, id2#91L).id > 30)
>  +- *(1) FileScan parquet [id#90L,id2#91L] Batched: true, Format: 
> Parquet, Location: 
> InMemoryFileIndex[file:/Users/henry/src/spark/one_million], PartitionFilters: 
> [], PushedFilters: [], ReadSchema: struct
> {code}
> Using {{joinWith}} means that the filter is placed on a {{named_struct}}, and 
> is then pushed down. When the filter is just above the scan, the 
> wrapping-and-projection of {{named_struct(id...).id}} is a no-op and could be 
> removed. Then the filter can be pushed down to Parquet.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23500) Filters on named_structs could be pushed into scans

2018-02-27 Thread Henry Robinson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379201#comment-16379201
 ] 

Henry Robinson commented on SPARK-23500:


Ok, I figured it out! {{SimplifyCreateStructOps}} does not get applied 
recursively across the whole plan, just across the expressions recursively 
reachable from the root. So even the following:

{{df.filter("named_struct('id', id, 'id2', id2).id > 1").select("id2")}}

doesn't trigger the rule because the {{named_struct}} is never seen. 

Changing the rule to walk the plan, and then walk the expression trees rooted 
at each node, caused the optimization to trigger.

{code}
object SimplifyCreateStructOps extends Rule[LogicalPlan] {
  override def apply(plan: LogicalPlan): LogicalPlan = plan transform { case p 
=>
p.transformExpressionsUp {
  case GetStructField(createNamedStructLike: CreateNamedStructLike, 
ordinal, _) =>
createNamedStructLike.valExprs(ordinal)
}
  }
}
{code}



> Filters on named_structs could be pushed into scans
> ---
>
> Key: SPARK-23500
> URL: https://issues.apache.org/jira/browse/SPARK-23500
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Henry Robinson
>Priority: Major
>
> Simple filters on dataframes joined with {{joinWith()}} are missing an 
> opportunity to get pushed into the scan because they're written in terms of 
> {{named_struct}} that could be removed by the optimizer.
> Given the following simple query over two dataframes:
> {code:java}
> scala> val df = spark.read.parquet("one_million")
> df: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> val df2 = spark.read.parquet("one_million")
> df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> df.joinWith(df2, df2.col("id") === df.col("id2")).filter("_2.id > 
> 30").explain
> == Physical Plan ==
> *(2) BroadcastHashJoin [_1#94.id2], [_2#95.id], Inner, BuildRight
> :- *(2) Project [named_struct(id, id#0L, id2, id2#1L) AS _1#94]
> :  +- *(2) FileScan parquet [id#0L,id2#1L] Batched: true, Format: Parquet, 
> Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million], 
> PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, 
> struct, false].id))
>+- *(1) Project [named_struct(id, id#90L, id2, id2#91L) AS _2#95]
>   +- *(1) Filter (named_struct(id, id#90L, id2, id2#91L).id > 30)
>  +- *(1) FileScan parquet [id#90L,id2#91L] Batched: true, Format: 
> Parquet, Location: 
> InMemoryFileIndex[file:/Users/henry/src/spark/one_million], PartitionFilters: 
> [], PushedFilters: [], ReadSchema: struct
> {code}
> Using {{joinWith}} means that the filter is placed on a {{named_struct}}, and 
> is then pushed down. When the filter is just above the scan, the 
> wrapping-and-projection of {{named_struct(id...).id}} is a no-op and could be 
> removed. Then the filter can be pushed down to Parquet.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23500) Filters on named_structs could be pushed into scans

2018-02-26 Thread Henry Robinson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16377802#comment-16377802
 ] 

Henry Robinson commented on SPARK-23500:


There's an optimizer rule, {{SimplifyCreateStructOps}}, that should be kicking 
in here. In a simpler plan, it correctly unnests the {{named_struct}}:

{code:java}
scala> df.filter("named_struct('id', id).id > 1").explain
== Physical Plan ==
*(1) Project [id#0L, id2#1L]
+- *(1) Filter (isnotnull(id#0L) && (id#0L > 1))
+- *(1) FileScan parquet [id#0L,id2#1L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million], 
PartitionFilters: [], PushedFilters: [IsNotNull(id), GreaterThan(id,1)], 
ReadSchema: struct

scala> df.joinWith(df2, df.col("id") === 
df2.col("2id")).project("_1.id2").filter("_1.id2 + _2.2id2 > 30").explain
:32: error: value project is not a member of 
org.apache.spark.sql.Dataset[(org.apache.spark.sql.Row, 
org.apache.spark.sql.Row)]
df.joinWith(df2, df.col("id") === 
df2.col("2id")).project("_1.id2").filter("_1.id2 + _2.2id2 > 30").explain{code}

> Filters on named_structs could be pushed into scans
> ---
>
> Key: SPARK-23500
> URL: https://issues.apache.org/jira/browse/SPARK-23500
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Henry Robinson
>Priority: Major
>
> Simple filters on dataframes joined with {{joinWith()}} are missing an 
> opportunity to get pushed into the scan because they're written in terms of 
> {{named_struct}} that could be removed by the optimizer.
> Given the following simple query over two dataframes:
> {code:java}
> scala> val df = spark.read.parquet("one_million")
> df: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> val df2 = spark.read.parquet("one_million")
> df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> df.joinWith(df2, df2.col("id") === df.col("id2")).filter("_2.id > 
> 30").explain
> == Physical Plan ==
> *(2) BroadcastHashJoin [_1#94.id2], [_2#95.id], Inner, BuildRight
> :- *(2) Project [named_struct(id, id#0L, id2, id2#1L) AS _1#94]
> :  +- *(2) FileScan parquet [id#0L,id2#1L] Batched: true, Format: Parquet, 
> Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million], 
> PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, 
> struct, false].id))
>+- *(1) Project [named_struct(id, id#90L, id2, id2#91L) AS _2#95]
>   +- *(1) Filter (named_struct(id, id#90L, id2, id2#91L).id > 30)
>  +- *(1) FileScan parquet [id#90L,id2#91L] Batched: true, Format: 
> Parquet, Location: 
> InMemoryFileIndex[file:/Users/henry/src/spark/one_million], PartitionFilters: 
> [], PushedFilters: [], ReadSchema: struct
> {code}
> Using {{joinWith}} means that the filter is placed on a {{named_struct}}, and 
> is then pushed down. When the filter is just above the scan, the 
> wrapping-and-projection of {{named_struct(id...).id}} is a no-op and could be 
> removed. Then the filter can be pushed down to Parquet.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org