[ https://issues.apache.org/jira/browse/SPARK-17698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Reynold Xin resolved SPARK-17698. --------------------------------- Resolution: Fixed Assignee: Tejas Patil Fix Version/s: 2.1.0 > Join predicates should not contain filter clauses > ------------------------------------------------- > > Key: SPARK-17698 > URL: https://issues.apache.org/jira/browse/SPARK-17698 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.0.0 > Reporter: Tejas Patil > Assignee: Tejas Patil > Priority: Minor > Fix For: 2.1.0 > > > `ExtractEquiJoinKeys` is incorrectly using filter predicates as the join > condition for joins. While this does not lead to incorrect results but in > case of bucketed + sorted tables, we might miss out on avoiding un-necessary > shuffle + sort. eg. > {code} > val df = (1 until 10).toDF("id").coalesce(1) > hc.sql("DROP TABLE IF EXISTS table1").collect > df.write.bucketBy(8, "id").sortBy("id").saveAsTable("table1") > hc.sql("DROP TABLE IF EXISTS table2").collect > df.write.bucketBy(8, "id").sortBy("id").saveAsTable("table2") > sqlContext.sql(""" > SELECT a.id, b.id > FROM table1 a > FULL OUTER JOIN table2 b > ON a.id = b.id AND a.id='1' AND b.id='1' > """).explain(true) > {code} > This is doing shuffle + sort over table scan outputs which is not needed as > both tables are bucketed and sorted on the same columns and have same number > of buckets. This should be a single stage job. > {code} > SortMergeJoin [id#38, cast(id#38 as double), 1.0], [id#39, 1.0, cast(id#39 as > double)], FullOuter > :- *Sort [id#38 ASC NULLS FIRST, cast(id#38 as double) ASC NULLS FIRST, 1.0 > ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(id#38, cast(id#38 as double), 1.0, 200) > : +- *FileScan parquet default.table1[id#38] Batched: true, Format: > ParquetFormat, InputPaths: file:spark-warehouse/table1, PartitionFilters: [], > PushedFilters: [], ReadSchema: struct<id:int> > +- *Sort [id#39 ASC NULLS FIRST, 1.0 ASC NULLS FIRST, cast(id#39 as double) > ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(id#39, 1.0, cast(id#39 as double), 200) > +- *FileScan parquet default.table2[id#39] Batched: true, Format: > ParquetFormat, InputPaths: file:spark-warehouse/table2, PartitionFilters: [], > PushedFilters: [], ReadSchema: struct<id:int> > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org