nvander1 opened a new pull request #24563: [SPARK-27359] [OPTIMIZER] [SQL]
URL: https://github.com/apache/spark/pull/24563
 
 
   ## What changes were proposed in this pull request?
   
   An  optimization for joins on a condition of `arrays_overlap`. I believe 
this worthwhile to integrate into Spark due to the recent release of several 
new array functions in Spark 2.4. This optimization will allow
   users to make better use of the arrays overlap function. The technique 
proposed in the patch can also be trivially extended to joins with a condition 
involving `array_contains`.
   
   The following code will produce a cartesian product in the physical plans.
   ```scala
   import spark.implicits._
   import org.apache.spark.sql.functions._
   
   val a = Seq((Seq(1, 2, 3), "one")).toDF("num", "name")
   val b = Seq((Seq(1, 5), "two")).toDF("num", "name")
   val j = a.join(b, arrays_overlap(b("num"), a("num")))
   j.explain(true)
   ```
   ```
   == Parsed Logical Plan ==
   Join Inner, arrays_overlap(num#158, num#149)
   :- Project [_1#146 AS num#149, _2#147 AS name#150]
   :  +- LocalRelation [_1#146, _2#147]
   +- Project [_1#155 AS num#158, _2#156 AS name#159]
      +- LocalRelation [_1#155, _2#156]
   
   == Analyzed Logical Plan ==
   num: array<int>, name: string, num: array<int>, name: string
   Join Inner, arrays_overlap(num#158, num#149)
   :- Project [_1#146 AS num#149, _2#147 AS name#150]
   :  +- LocalRelation [_1#146, _2#147]
   +- Project [_1#155 AS num#158, _2#156 AS name#159]
      +- LocalRelation [_1#155, _2#156]
   
   == Optimized Logical Plan ==
   Join Inner, arrays_overlap(num#158, num#149)
   :- LocalRelation [num#149, name#150]
   +- LocalRelation [num#158, name#159]
   
   == Physical Plan ==
   CartesianProduct arrays_overlap(num#158, num#149)
   :- LocalTableScan [num#149, name#150]
   +- LocalTableScan [num#158, name#159]
   ```
   
   This is unacceptable for joins on large datasets.
   The query can be written into an equivalent equijoin by:
   1. exploding the arrays
   2. joining on the exploded columns
   3. dropping the exploded columns on the joined data
   4. removing duplicates from the result of 3)
   
   Doing so will bring a query that might otherwise never complete, down to a 
reasonable time.
   ```
   == Parsed Logical Plan ==
   Join Inner, arrays_overlap(num#158, num#149)
   :- Project [_1#146 AS num#149, _2#147 AS name#150]
   :  +- LocalRelation [_1#146, _2#147]
   +- Project [_1#155 AS num#158, _2#156 AS name#159]
      +- LocalRelation [_1#155, _2#156]
   
   == Analyzed Logical Plan ==
   num: array<int>, name: string, num: array<int>, name: string
   Join Inner, arrays_overlap(num#158, num#149)
   :- Project [_1#146 AS num#149, _2#147 AS name#150]
   :  +- LocalRelation [_1#146, _2#147]
   +- Project [_1#155 AS num#158, _2#156 AS name#159]
      +- LocalRelation [_1#155, _2#156]
   
   == Optimized Logical Plan ==
   Aggregate [1], [first(num#149, false) AS num#149, first(name#150, false) AS 
name#150, first(num#158, false) AS num#158, first(name#159, false) AS name#159]
   +- Project [num#149, name#150, num#158, name#159]
      +- Join Inner, (explode_larr#178 = explode_rarr#180)
         :- Project [num#149, name#150, explode_larr#178]
         :  +- Generate explode(num#149), false, [explode_larr#178]
         :     +- LocalRelation [num#149, name#150]
         +- Project [num#158, name#159, explode_rarr#180]
            +- Generate explode(num#158), false, [explode_rarr#180]
               +- LocalRelation [num#158, name#159]
   
   == Physical Plan ==
   SortAggregate(key=[1#185], functions=[finalmerge_first(merge first#188, 
valueSet#189) AS first(num#149)()#181, finalmerge_first(merge first#192, 
valueSet#193) AS first(name#150)()#182, finalmerge_first(merge first#196, 
valueSet#197) AS first(num#158)()#183, finalmerge_first(merge first#200, 
valueSet#201) AS first(name#159)()#184], output=[num#149, name#150, num#158, 
name#159])
   +- Sort [1#185 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(1#185, 200)
         +- SortAggregate(key=[1 AS 1#185], functions=[partial_first(num#149, 
false) AS (first#188, valueSet#189), partial_first(name#150, false) AS 
(first#192, valueSet#193), partial_first(num#158, false) AS (first#196, 
valueSet#197), partial_first(name#159, false) AS (first#200, valueSet#201)], 
output=[1#185, first#188, valueSet#189, first#192, valueSet#193, first#196, 
valueSet#197, first#200, valueSet#201])
            +- *(3) Sort [1 AS 1#185 ASC NULLS FIRST], false, 0
               +- *(3) Project [num#149, name#150, num#158, name#159]
                  +- *(3) SortMergeJoin [explode_larr#178], [explode_rarr#180], 
Inner
                     :- Sort [explode_larr#178 ASC NULLS FIRST], false, 0
                     :  +- Exchange hashpartitioning(explode_larr#178, 200)
                     :     +- *(1) Project [num#149, name#150, explode_larr#178]
                     :        +- *(1) Generate explode(num#149), [num#149, 
name#150], false, [explode_larr#178]
                     :           +- LocalTableScan [num#149, name#150]
                     +- Sort [explode_rarr#180 ASC NULLS FIRST], false, 0
                        +- Exchange hashpartitioning(explode_rarr#180, 200)
                           +- *(2) Project [num#158, name#159, explode_rarr#180]
                              +- *(2) Generate explode(num#158), [num#158, 
name#159], false, [explode_rarr#180]
                                 +- LocalTableScan [num#158, name#159]
   ```
   
   ## How was this patch tested?
   
   This patch has only been tested via manual tests on a large dataset.
   I've used the technique implemented by this patch to perform similar joins 
with ~300 million records on either side of the join. If you agree that this is 
a worthwhile optimization, I'll happily contribute some unit tests to ensure 
the robustness of the optimization.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to