Ohad Raviv created SPARK-24410:
----------------------------------

             Summary: Missing optimization for Union on bucketed tables
                 Key: SPARK-24410
                 URL: https://issues.apache.org/jira/browse/SPARK-24410
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 2.3.0
            Reporter: Ohad Raviv


A common use-case we have is of a partially aggregated table and daily 
increments that we need to further aggregate. we do this my unioning the two 
tables and aggregating again.
we tried to optimize this process by bucketing the tables, but currently it 
seems that the union operator doesn't leverage the tables being bucketed (like 
the join operator).

for example, for two bucketed tables a1,a2:

{code}
    sparkSession.range(N).selectExpr(
      "id as key",
      "id % 2 as t1",
      "id % 3 as t2")
        .repartition(col("key"))
        .write
      .mode(SaveMode.Overwrite)
        .bucketBy(3, "key")
        .sortBy("t1")
        .saveAsTable("a1")

    sparkSession.range(N).selectExpr(
      "id as key",
      "id % 2 as t1",
      "id % 3 as t2")
      .repartition(col("key"))
      .write.mode(SaveMode.Overwrite)
      .bucketBy(3, "key")
      .sortBy("t1")
      .saveAsTable("a2")

{code}
for the join query we get the "SortMergeJoin"
{code}
select * from a1 join a2 on (a1.key=a2.key)

== Physical Plan ==
*(3) SortMergeJoin [key#24L], [key#27L], Inner
:- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
:  +- *(1) Project [key#24L, t1#25L, t2#26L]
:     +- *(1) Filter isnotnull(key#24L)
:        +- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: 
true, Format: Parquet, Location: 
InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
PushedFilters: [IsNotNull(key)], ReadSchema: 
struct<key:bigint,t1:bigint,t2:bigint>
+- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
   +- *(2) Project [key#27L, t1#28L, t2#29L]
      +- *(2) Filter isnotnull(key#27L)
         +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: 
true, Format: Parquet, Location: 
InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
PushedFilters: [IsNotNull(key)], ReadSchema: 
struct<key:bigint,t1:bigint,t2:bigint>
{code}

but for aggregation after union we get a shuffle:
{code}
select key,count(*) from (select * from a1 union all select * from a2)z group 
by key

== Physical Plan ==
*(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, 
count(1)#36L])
+- Exchange hashpartitioning(key#25L, 1)
   +- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], 
output=[key#25L, count#38L])
      +- Union
         :- *(1) Project [key#25L]
         :  +- *(1) FileScan parquet default.a1[key#25L] Batched: true, Format: 
Parquet, Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a1], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<key:bigint>
         +- *(2) Project [key#28L]
            +- *(2) FileScan parquet default.a2[key#28L] Batched: true, Format: 
Parquet, Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a2], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<key:bigint>
{code}





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to