[ https://issues.apache.org/jira/browse/SPARK-24025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jacek Laskowski updated SPARK-24025: ------------------------------------ Attachment: join-jira.png > Join of bucketed and non-bucketed tables can give two exchanges and sorts for > non-bucketed side > ----------------------------------------------------------------------------------------------- > > Key: SPARK-24025 > URL: https://issues.apache.org/jira/browse/SPARK-24025 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.3.0, 2.3.1 > Environment: {code:java} > ./bin/spark-shell --version > Welcome to > ____ __ > / __/__ ___ _____/ /__ > _\ \/ _ \/ _ `/ __/ '_/ > /___/ .__/\_,_/_/ /_/\_\ version 2.3.0 > /_/ > Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_162 > Branch master > Compiled by user sameera on 2018-02-22T19:24:29Z > Revision a0d7949896e70f427e7f3942ff340c9484ff0aab > Url g...@github.com:sameeragarwal/spark.git > Type --help for more information.{code} > Reporter: Jacek Laskowski > Priority: Major > Attachments: join-jira.png > > > While exploring bucketing I found the following join query of non-bucketed > and bucketed tables that ends up with two exchanges and two sorts in the > physical plan for the non-bucketed join side. > {code} > // Make sure that you don't end up with a BroadcastHashJoin and a > BroadcastExchange > // Disable auto broadcasting > spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) > val bucketedTableName = "bucketed_4_id" > val large = spark.range(1000000) > large.write > .bucketBy(4, "id") > .sortBy("id") > .mode("overwrite") > .saveAsTable(bucketedTableName) > // Describe the table and include bucketing spec only > val descSQL = sql(s"DESC FORMATTED > $bucketedTableName").filter($"col_name".contains("Bucket") || $"col_name" === > "Sort Columns") > scala> descSQL.show(truncate = false) > +--------------+---------+-------+ > |col_name |data_type|comment| > +--------------+---------+-------+ > |Num Buckets |4 | | > |Bucket Columns|[`id`] | | > |Sort Columns |[`id`] | | > +--------------+---------+-------+ > val bucketedTable = spark.table(bucketedTableName) > val t1 = spark.range(4) > .repartition(2, $"id") // Use just 2 partitions > .sortWithinPartitions("id") // sort partitions > val q = t1.join(bucketedTable, "id") > // Note two exchanges and sorts > scala> q.explain > == Physical Plan == > *(5) Project [id#79L] > +- *(5) SortMergeJoin [id#79L], [id#77L], Inner > :- *(3) Sort [id#79L ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(id#79L, 4) > : +- *(2) Sort [id#79L ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(id#79L, 2) > : +- *(1) Range (0, 4, step=1, splits=8) > +- *(4) Sort [id#77L ASC NULLS FIRST], false, 0 > +- *(4) Project [id#77L] > +- *(4) Filter isnotnull(id#77L) > +- *(4) FileScan parquet default.bucketed_4_id[id#77L] Batched: > true, Format: Parquet, Location: > InMemoryFileIndex[file:/Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id], > PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: > struct<id:bigint> > q.foreach(_ => ()) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org