nvander1 opened a new pull request #24563: [SPARK-27359] [OPTIMIZER] [SQL] URL: https://github.com/apache/spark/pull/24563 ## What changes were proposed in this pull request? An optimization for joins on a condition of `arrays_overlap`. I believe this worthwhile to integrate into Spark due to the recent release of several new array functions in Spark 2.4. This optimization will allow users to make better use of the arrays overlap function. The technique proposed in the patch can also be trivially extended to joins with a condition involving `array_contains`. The following code will produce a cartesian product in the physical plans. ```scala import spark.implicits._ import org.apache.spark.sql.functions._ val a = Seq((Seq(1, 2, 3), "one")).toDF("num", "name") val b = Seq((Seq(1, 5), "two")).toDF("num", "name") val j = a.join(b, arrays_overlap(b("num"), a("num"))) j.explain(true) ``` ``` == Parsed Logical Plan == Join Inner, arrays_overlap(num#158, num#149) :- Project [_1#146 AS num#149, _2#147 AS name#150] : +- LocalRelation [_1#146, _2#147] +- Project [_1#155 AS num#158, _2#156 AS name#159] +- LocalRelation [_1#155, _2#156] == Analyzed Logical Plan == num: array<int>, name: string, num: array<int>, name: string Join Inner, arrays_overlap(num#158, num#149) :- Project [_1#146 AS num#149, _2#147 AS name#150] : +- LocalRelation [_1#146, _2#147] +- Project [_1#155 AS num#158, _2#156 AS name#159] +- LocalRelation [_1#155, _2#156] == Optimized Logical Plan == Join Inner, arrays_overlap(num#158, num#149) :- LocalRelation [num#149, name#150] +- LocalRelation [num#158, name#159] == Physical Plan == CartesianProduct arrays_overlap(num#158, num#149) :- LocalTableScan [num#149, name#150] +- LocalTableScan [num#158, name#159] ``` This is unacceptable for joins on large datasets. The query can be written into an equivalent equijoin by: 1. exploding the arrays 2. joining on the exploded columns 3. dropping the exploded columns on the joined data 4. removing duplicates from the result of 3) Doing so will bring a query that might otherwise never complete, down to a reasonable time. ``` == Parsed Logical Plan == Join Inner, arrays_overlap(num#158, num#149) :- Project [_1#146 AS num#149, _2#147 AS name#150] : +- LocalRelation [_1#146, _2#147] +- Project [_1#155 AS num#158, _2#156 AS name#159] +- LocalRelation [_1#155, _2#156] == Analyzed Logical Plan == num: array<int>, name: string, num: array<int>, name: string Join Inner, arrays_overlap(num#158, num#149) :- Project [_1#146 AS num#149, _2#147 AS name#150] : +- LocalRelation [_1#146, _2#147] +- Project [_1#155 AS num#158, _2#156 AS name#159] +- LocalRelation [_1#155, _2#156] == Optimized Logical Plan == Aggregate [1], [first(num#149, false) AS num#149, first(name#150, false) AS name#150, first(num#158, false) AS num#158, first(name#159, false) AS name#159] +- Project [num#149, name#150, num#158, name#159] +- Join Inner, (explode_larr#178 = explode_rarr#180) :- Project [num#149, name#150, explode_larr#178] : +- Generate explode(num#149), false, [explode_larr#178] : +- LocalRelation [num#149, name#150] +- Project [num#158, name#159, explode_rarr#180] +- Generate explode(num#158), false, [explode_rarr#180] +- LocalRelation [num#158, name#159] == Physical Plan == SortAggregate(key=[1#185], functions=[finalmerge_first(merge first#188, valueSet#189) AS first(num#149)()#181, finalmerge_first(merge first#192, valueSet#193) AS first(name#150)()#182, finalmerge_first(merge first#196, valueSet#197) AS first(num#158)()#183, finalmerge_first(merge first#200, valueSet#201) AS first(name#159)()#184], output=[num#149, name#150, num#158, name#159]) +- Sort [1#185 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(1#185, 200) +- SortAggregate(key=[1 AS 1#185], functions=[partial_first(num#149, false) AS (first#188, valueSet#189), partial_first(name#150, false) AS (first#192, valueSet#193), partial_first(num#158, false) AS (first#196, valueSet#197), partial_first(name#159, false) AS (first#200, valueSet#201)], output=[1#185, first#188, valueSet#189, first#192, valueSet#193, first#196, valueSet#197, first#200, valueSet#201]) +- *(3) Sort [1 AS 1#185 ASC NULLS FIRST], false, 0 +- *(3) Project [num#149, name#150, num#158, name#159] +- *(3) SortMergeJoin [explode_larr#178], [explode_rarr#180], Inner :- Sort [explode_larr#178 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(explode_larr#178, 200) : +- *(1) Project [num#149, name#150, explode_larr#178] : +- *(1) Generate explode(num#149), [num#149, name#150], false, [explode_larr#178] : +- LocalTableScan [num#149, name#150] +- Sort [explode_rarr#180 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(explode_rarr#180, 200) +- *(2) Project [num#158, name#159, explode_rarr#180] +- *(2) Generate explode(num#158), [num#158, name#159], false, [explode_rarr#180] +- LocalTableScan [num#158, name#159] ``` ## How was this patch tested? This patch has only been tested via manual tests on a large dataset. I've used the technique implemented by this patch to perform similar joins with ~300 million records on either side of the join. If you agree that this is a worthwhile optimization, I'll happily contribute some unit tests to ensure the robustness of the optimization.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
