Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/19714#discussion_r153865264
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -149,10 +147,42 @@ abstract class SparkStrategies extends
QueryPlanner[SparkPlan] {
case _ => false
}
+ private def broadcastSide(
+ canBuildLeft: Boolean,
+ canBuildRight: Boolean,
+ left: LogicalPlan,
+ right: LogicalPlan): BuildSide = {
+
+ def smallerSide =
+ if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight
else BuildLeft
+
+ val buildRight = canBuildRight && right.stats.hints.broadcast
+ val buildLeft = canBuildLeft && left.stats.hints.broadcast
+
+ // Both sides have broadcast hint, broadcast smaller side base on
its estimated physical size.
+ if (buildRight && buildLeft) {
+ smallerSide
+ } else if (buildRight) {
+ BuildRight
+ } else if (buildLeft) {
+ BuildLeft
+ // This used for `case logical.Join(left, right, joinType,
condition)`
+ } else {
+ smallerSide
+ }
+ }
+
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
// --- BroadcastHashJoin
--------------------------------------------------------------------
+ case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition,
left, right)
+ if (canBuildRight(joinType) && right.stats.hints.broadcast)
+ || (canBuildLeft(joinType) && left.stats.hints.broadcast) =>
--- End diff --
move `||` to line 180
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]