[ https://issues.apache.org/jira/browse/SPARK-23500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Henry Robinson updated SPARK-23500: ----------------------------------- Description: Simple filters on dataframes joined with {{joinWith()}} are missing an opportunity to get pushed into the scan because they're written in terms of {{named_struct}} that could be removed by the optimizer. Given the following simple query over two dataframes: {code:java} scala> val df = spark.read.parquet("one_million") df: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] scala> val df2 = spark.read.parquet("one_million") df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] scala> df.joinWith(df2, df2.col("id") === df.col("id2")).filter("_2.id > 30").explain == Physical Plan == *(2) BroadcastHashJoin [_1#94.id2], [_2#95.id], Inner, BuildRight :- *(2) Project [named_struct(id, id#0L, id2, id2#1L) AS _1#94] : +- *(2) FileScan parquet [id#0L,id2#1L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,id2:bigint> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct<id:bigint,id2:bigint>, false].id)) +- *(1) Project [named_struct(id, id#90L, id2, id2#91L) AS _2#95] +- *(1) Filter (named_struct(id, id#90L, id2, id2#91L).id > 30) +- *(1) FileScan parquet [id#90L,id2#91L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,id2:bigint> {code} Using {{joinWith}} means that the filter is placed on a {{named_struct}}, and is then pushed down. When the filter is just above the scan, the wrapping-and-projection of {{named_struct(id...).id}} is a no-op and could be removed. Then the filter can be pushed down to Parquet. was: Simple filters on dataframes joined with {{joinWith()}} are missing an opportunity to get pushed into the scan because they're written in terms of {{named_struct}} that could be removed by the optimizer. Given the following simple query over two dataframes: {code:java} scala> val df = spark.read.parquet("one_million") df: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] scala> val df2 = spark.read.parquet("one_million") df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] scala> df.joinWith(df2, df2.col("id") === df.col("id2")).filter("_2.id > 30").explain == Physical Plan == *(2) BroadcastHashJoin [_1#94.id2], [_2#95.id], Inner, BuildRight :- *(2) Project [named_struct(id, id#0L, id2, id2#1L) AS _1#94] : +- *(2) FileScan parquet [id#0L,id2#1L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,id2:bigint> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct<id:bigint,id2:bigint>, false].id)) +- *(1) Project [named_struct(id, id#90L, id2, id2#91L) AS _2#95] +- *(1) Filter (named_struct(id, id#90L, id2, id2#91L).id > 30) +- *(1) FileScan parquet [id#90L,id2#91L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,id2:bigint> {code} Using {{joinWith}} means that the filter is placed on a {{named_struct}}, and is then pushed down. When the filter is just above the scan, the wrapping-and-projection of {{named_struct(id...).id}} is a no-op and could be removed. Then the filter can be pushed down to Parquet. > Filters on named_structs could be pushed into scans > --------------------------------------------------- > > Key: SPARK-23500 > URL: https://issues.apache.org/jira/browse/SPARK-23500 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 2.3.0 > Reporter: Henry Robinson > Priority: Major > > Simple filters on dataframes joined with {{joinWith()}} are missing an > opportunity to get pushed into the scan because they're written in terms of > {{named_struct}} that could be removed by the optimizer. > Given the following simple query over two dataframes: > {code:java} > scala> val df = spark.read.parquet("one_million") > df: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] > scala> val df2 = spark.read.parquet("one_million") > df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] > scala> df.joinWith(df2, df2.col("id") === df.col("id2")).filter("_2.id > > 30").explain > == Physical Plan == > *(2) BroadcastHashJoin [_1#94.id2], [_2#95.id], Inner, BuildRight > :- *(2) Project [named_struct(id, id#0L, id2, id2#1L) AS _1#94] > : +- *(2) FileScan parquet [id#0L,id2#1L] Batched: true, Format: Parquet, > Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million], > PartitionFilters: [], PushedFilters: [], ReadSchema: > struct<id:bigint,id2:bigint> > +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, > struct<id:bigint,id2:bigint>, false].id)) > +- *(1) Project [named_struct(id, id#90L, id2, id2#91L) AS _2#95] > +- *(1) Filter (named_struct(id, id#90L, id2, id2#91L).id > 30) > +- *(1) FileScan parquet [id#90L,id2#91L] Batched: true, Format: > Parquet, Location: > InMemoryFileIndex[file:/Users/henry/src/spark/one_million], PartitionFilters: > [], PushedFilters: [], ReadSchema: struct<id:bigint,id2:bigint> > {code} > Using {{joinWith}} means that the filter is placed on a {{named_struct}}, and > is then pushed down. When the filter is just above the scan, the > wrapping-and-projection of {{named_struct(id...).id}} is a no-op and could be > removed. Then the filter can be pushed down to Parquet. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org