[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19714 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19714#discussion_r153969180 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -149,10 +147,47 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case _ => false } +private def broadcastSide( +canBuildLeft: Boolean, +canBuildRight: Boolean, +left: LogicalPlan, +right: LogicalPlan): BuildSide = { + + def smallerSide = +if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight else BuildLeft + + val buildRight = canBuildRight && right.stats.hints.broadcast + val buildLeft = canBuildLeft && left.stats.hints.broadcast + + + if (buildRight && buildLeft) { +// Broadcast smaller side base on its estimated physical size +// if both sides have broadcast hint +smallerSide + } else if (buildRight) { +BuildRight + } else if (buildLeft) { +BuildLeft + } else if (canBuildRight && canBuildLeft) { +// for the last default broadcast nested loop join +smallerSide + } else { +throw new AnalysisException( + "Can not decide to use which side for BuildSide for this join") --- End diff -- to be consistent with the method name, how about: `Can not decide which side to broadcast for this join` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19714#discussion_r153969298 --- Diff: docs/sql-programming-guide.md --- @@ -1492,6 +1492,64 @@ that these options will be deprecated in future release as more optimizations ar +## Broadcast Hint for SQL Queries + +The `BROADCAST` hint guides Spark to broadcast each specified table when joining them with another table or view. +When Spark deciding the join methods, the broadcast hash join (i.e., BHJ) is preferred, +even if the statistics is above the configuration `spark.sql.autoBroadcastJoinThreshold`. +When both sides of a join are specified, Spark broadcasts the one having the lower statistics. +Note Spark does not guaranttee BHJ is always chosen, since not all cases (e.g. full outer join) --- End diff -- typo: guarantee --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19714#discussion_r153867013 --- Diff: docs/sql-programming-guide.md --- @@ -1492,6 +1492,19 @@ that these options will be deprecated in future release as more optimizations ar +## Broadcast Hint for SQL Queries --- End diff -- +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19714#discussion_r153865403 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -189,6 +219,13 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // --- Without joining keys // Pick BroadcastNestedLoopJoin if one side could be broadcasted + case j @ logical.Join(left, right, joinType, condition) +if (canBuildRight(joinType) && right.stats.hints.broadcast) + || (canBuildLeft(joinType) && left.stats.hints.broadcast) => --- End diff -- The same here --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19714#discussion_r153865264 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -149,10 +147,42 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case _ => false } +private def broadcastSide( +canBuildLeft: Boolean, +canBuildRight: Boolean, +left: LogicalPlan, +right: LogicalPlan): BuildSide = { + + def smallerSide = +if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight else BuildLeft + + val buildRight = canBuildRight && right.stats.hints.broadcast + val buildLeft = canBuildLeft && left.stats.hints.broadcast + + // Both sides have broadcast hint, broadcast smaller side base on its estimated physical size. + if (buildRight && buildLeft) { +smallerSide + } else if (buildRight) { +BuildRight + } else if (buildLeft) { +BuildLeft + // This used for `case logical.Join(left, right, joinType, condition)` + } else { +smallerSide + } +} + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { // --- BroadcastHashJoin + case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) +if (canBuildRight(joinType) && right.stats.hints.broadcast) + || (canBuildLeft(joinType) && left.stats.hints.broadcast) => --- End diff -- move `||` to line 180 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19714#discussion_r153864916 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -149,10 +147,42 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case _ => false } +private def broadcastSide( +canBuildLeft: Boolean, +canBuildRight: Boolean, +left: LogicalPlan, +right: LogicalPlan): BuildSide = { + + def smallerSide = +if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight else BuildLeft + + val buildRight = canBuildRight && right.stats.hints.broadcast + val buildLeft = canBuildLeft && left.stats.hints.broadcast + + // Both sides have broadcast hint, broadcast smaller side base on its estimated physical size. + if (buildRight && buildLeft) { +smallerSide + } else if (buildRight) { +BuildRight + } else if (buildLeft) { +BuildLeft + // This used for `case logical.Join(left, right, joinType, condition)` + } else { +smallerSide --- End diff -- ``` else if (canBuildRight && canBuildLeft) { smallerSide } else { // throw exception } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19714#discussion_r153864513 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -149,10 +147,42 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case _ => false } +private def broadcastSide( +canBuildLeft: Boolean, +canBuildRight: Boolean, +left: LogicalPlan, +right: LogicalPlan): BuildSide = { + + def smallerSide = +if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight else BuildLeft + + val buildRight = canBuildRight && right.stats.hints.broadcast + val buildLeft = canBuildLeft && left.stats.hints.broadcast + + // Both sides have broadcast hint, broadcast smaller side base on its estimated physical size. + if (buildRight && buildLeft) { +smallerSide + } else if (buildRight) { +BuildRight + } else if (buildLeft) { +BuildLeft + // This used for `case logical.Join(left, right, joinType, condition)` --- End diff -- remove this line. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19714#discussion_r153864221 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -203,12 +240,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { joins.CartesianProductExec(planLater(left), planLater(right), condition) :: Nil case logical.Join(left, right, joinType, condition) => -val buildSide = - if (right.stats.sizeInBytes <= left.stats.sizeInBytes) { -BuildRight - } else { -BuildLeft - } +val buildSide = broadcastSide(true, true, left, right) --- End diff -- `canBuildLeft = true, canBuildRight = true` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19714#discussion_r153863330 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -149,10 +147,42 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case _ => false } +private def broadcastSide( +canBuildLeft: Boolean, +canBuildRight: Boolean, +left: LogicalPlan, +right: LogicalPlan): BuildSide = { + + def smallerSide = +if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight else BuildLeft + + val buildRight = canBuildRight && right.stats.hints.broadcast + val buildLeft = canBuildLeft && left.stats.hints.broadcast + + // Both sides have broadcast hint, broadcast smaller side base on its estimated physical size. + if (buildRight && buildLeft) { +smallerSide + } else if (buildRight) { +BuildRight + } else if (buildLeft) { +BuildLeft + // This used for `case logical.Join(left, right, joinType, condition)` --- End diff -- ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19714#discussion_r153771297 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala --- @@ -223,4 +223,69 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils { assert(HashJoin.rewriteKeyExpr(l :: ss :: Nil) === l :: ss :: Nil) assert(HashJoin.rewriteKeyExpr(i :: ss :: Nil) === i :: ss :: Nil) } + + test("Shouldn't change broadcast join buildSide if user clearly specified") { +def assertJoinBuildSide(pair: (String, String, BuildSide)): Any = { + val (sqlString, joinMethod, buildSide) = pair + val executedPlan = sql(sqlString).queryExecution.executedPlan + executedPlan match { +case b: BroadcastNestedLoopJoinExec => + assert(b.getClass.getSimpleName === joinMethod) + assert(b.buildSide === buildSide) +case w: WholeStageCodegenExec => + assert(w.children.head.getClass.getSimpleName === joinMethod) + assert(w.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide === buildSide) + } +} + +withTempView("t1", "t2") { + spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value").createTempView("t1") + spark.createDataFrame(Seq((1, "1"), (2, "12.3"), (2, "123"))).toDF("key", "value") +.createTempView("t2") + + val t1Size = spark.table("t1").queryExecution.analyzed.children.head.stats.sizeInBytes + val t2Size = spark.table("t2").queryExecution.analyzed.children.head.stats.sizeInBytes + assert(t1Size < t2Size) + + val bh = BroadcastHashJoinExec.toString + val bl = BroadcastNestedLoopJoinExec.toString + + Seq( +// INNER JOIN && t1Size < t2Size => BuildLeft +("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 JOIN t2 ON t1.key = t2.key", bh, BuildLeft), +// LEFT JOIN => BuildRight +("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 LEFT JOIN t2 ON t1.key = t2.key", bh, BuildRight), +// RIGHT JOIN => BuildLeft +("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 RIGHT JOIN t2 ON t1.key = t2.key", bh, BuildLeft), +// INNER JOIN && broadcast(t1) => BuildLeft +("SELECT /*+ MAPJOIN(t1) */ * FROM t1 JOIN t2 ON t1.key = t2.key", bh, BuildLeft), +// INNER JOIN && broadcast(t2) => BuildRight +("SELECT /*+ MAPJOIN(t2) */ * FROM t1 JOIN t2 ON t1.key = t2.key", bh, BuildRight) + ).foreach(assertJoinBuildSide) --- End diff -- I think it's more readable to write ``` assertJoinBuildSide(...) assertJoinBuildSide(...) ... ``` than ``` Seq( ... ).foreach(assertJoinBuildSide) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19714#discussion_r153770711 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -149,10 +147,43 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case _ => false } +private def broadcastBuildSide( +canBuildLeft: Boolean, +canBuildRight: Boolean, +left: LogicalPlan, +right: LogicalPlan): BuildSide = { + + // Both sides have broadcast hint, broadcast smaller side base on its estimated physical size. + def buildSideBaseSize = --- End diff -- nit: `smallerSide` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19714#discussion_r153770517 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -149,10 +147,43 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case _ => false } +private def broadcastBuildSide( --- End diff -- nit: `broadcastSide` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user wangyum commented on a diff in the pull request: https://github.com/apache/spark/pull/19714#discussion_r153736381 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -153,6 +151,27 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // --- BroadcastHashJoin + case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) +if canBuildRight(joinType) && canBuildLeft(joinType) + && left.stats.hints.broadcast && right.stats.hints.broadcast => --- End diff -- I have an idea to combine these three cases to `broadcastBuildSide` function: https://github.com/apache/spark/blob/a2cf0ced6615e342e84e270cf060ace69eb3ba0f/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala#L150-L185 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19714#discussion_r153680715 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -153,6 +151,27 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // --- BroadcastHashJoin + case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) +if canBuildRight(joinType) && canBuildLeft(joinType) + && left.stats.hints.broadcast && right.stats.hints.broadcast => --- End diff -- I think we can create new methods for it ``` def shouldBuildLeft(joinType: JoinType, left: LogicalPlan, right: LogicalPlan): Boolean { if (left.stats.hints.broadcast) { if (canBuildRight(joinType) && right.stats.hints.broadcast) { // if both sides have broadcast hint, only broadcast left side if its estimated pyhsical size is smaller than right side left.stats.sizeInBytes <= right.stats.sizeInBytes } else { // if only left side has the broadcast hint, broadcast the left side. true } } else { if (canBuildRight(joinType) && right.stats.hints.broadcast) { // if only right side has the broadcast hint, do not broadcast the left side. false } else { // if neither one of the sides has broadcast hint, only broadcast the left side if its estimated physical size is smaller than the treshold and smaller than right side. canBroadcast(left) && left.stats.sizeInBytes <= right.stats.sizeInBytes } } } def shouldBuildRight... ``` and use it like ``` case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) if canBuildRight(joinType) && shouldBuildRight(joinType, left, right) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19714#discussion_r153679188 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -91,10 +91,10 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { * predicates can be evaluated by matching join keys. If found, Join implementations are chosen * with the following precedence: * - * - Broadcast: if one side of the join has an estimated physical size that is smaller than the - * user-configurable [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold - * or if that side has an explicit broadcast hint (e.g. the user applied the - * [[org.apache.spark.sql.functions.broadcast()]] function to a DataFrame), then that side + * - Broadcast: if one side of the join has an explicit broadcast hint (e.g. the user applied the --- End diff -- ``` Broadcast: We prefer to broadcast the join side with an explicit broadcast hint(e.g. the user applied the [[org.apache.spark.sql.functions.broadcast()]] function to a DataFrame). If both sides have the broadcast hint, we prefer to broadcast the side with a smaller estimated physical size. If neither one of the sides has the broadcast hint, we only broadcast the join side if its estimated physical size that is smaller than the user-configurable [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold. ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19714#discussion_r153677941 --- Diff: docs/sql-programming-guide.md --- @@ -1492,6 +1492,61 @@ that these options will be deprecated in future release as more optimizations ar +## Broadcast Hint for SQL Queries + +Broadcast hint is a way for users to manually annotate a query and suggest to the query optimizer the join method. +It is very useful when the query optimizer cannot make optimal decision with respect to join methods +due to conservativeness or the lack of proper statistics. +Spark Broadcast Hint has higher priority than autoBroadcastJoin mechanism, examples: + + + + + +{% highlight scala %} +val src = sql("SELECT * FROM src") +broadcast(src).join(recordsDF, Seq("key")).show() --- End diff -- a more standard way: ``` import org.apache.spark.sql.functions.broadcast broadcast(spark.table("src")).join(spark.table("records"), Seq("key")).show() ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19714#discussion_r153677668 --- Diff: docs/sql-programming-guide.md --- @@ -1492,6 +1492,61 @@ that these options will be deprecated in future release as more optimizations ar +## Broadcast Hint for SQL Queries + +Broadcast hint is a way for users to manually annotate a query and suggest to the query optimizer the join method. +It is very useful when the query optimizer cannot make optimal decision with respect to join methods +due to conservativeness or the lack of proper statistics. +Spark Broadcast Hint has higher priority than autoBroadcastJoin mechanism, examples: + + + + + +{% highlight scala %} +val src = sql("SELECT * FROM src") +broadcast(src).join(recordsDF, Seq("key")).show() +{% endhighlight %} + + + + + +{% highlight java %} +Dataset src = sql("SELECT * FROM src"); +broadcast(src).join(recordsDF, Seq("key")).show(); +{% endhighlight %} + + + + + +{% highlight python %} +src = spark.sql("SELECT * FROM src") +recordsDF.join(broadcast(src), "key").show() +{% endhighlight %} + + + + + +{% highlight r %} +src <- sql("SELECT COUNT(*) FROM src") +showDF(join(broadcast(src), recordsDF, src$key == recordsDF$key))) +{% endhighlight %} + + + + + +{% highlight sql %} +SELECT /*+ BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key +{% endhighlight %} + + + +(Note that we accept `BROADCAST`, `BROADCASTJOIN` and `MAPJOIN` for broadcast hint) --- End diff -- shall we treat it as a comment on the SQL example? ``` --we accept BROADCAST, BROADCASTJOIN and MAPJOIN for broadcast hint SELECT /*+ BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19714#discussion_r153591750 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala --- @@ -223,4 +223,50 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils { assert(HashJoin.rewriteKeyExpr(l :: ss :: Nil) === l :: ss :: Nil) assert(HashJoin.rewriteKeyExpr(i :: ss :: Nil) === i :: ss :: Nil) } + + test("Shouldn't change broadcast join buildSide if user clearly specified") { +def assertJoinBuildSide(pair: (String, BuildSide)): Any = { --- End diff -- Also pass the expected join method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19714#discussion_r153591249 --- Diff: docs/sql-programming-guide.md --- @@ -1492,6 +1492,61 @@ that these options will be deprecated in future release as more optimizations ar +## Broadcast Hint for SQL Queries + +Broadcast hint is a way for users to manually annotate a query and suggest to the query optimizer the join method. +It is very useful when the query optimizer cannot make optimal decision with respect to join methods +due to conservativeness or the lack of proper statistics. +Spark Broadcast Hint has higher priority than autoBroadcastJoin mechanism, examples: --- End diff -- > The `BROADCAST` hint guides Spark to broadcast each specified table when joining them with another table or view. When Spark deciding the join methods, the broadcast hash join (i.e., BHJ) is preferred, even if the statistics is above the configuration `spark.sql.autoBroadcastJoinThreshold`. When both sides of a join are specified, Spark broadcasts the one having the lower statistics. Note Spark does not guaranttee BHJ is always chosen, since not all cases (e.g. full outer join) support BHJ. When the broadcast nested loop join is selected, we still respect the hint. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19714#discussion_r153584009 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -153,6 +151,27 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // --- BroadcastHashJoin + case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) +if canBuildRight(joinType) && canBuildLeft(joinType) + && left.stats.hints.broadcast && right.stats.hints.broadcast => + if (right.stats.sizeInBytes <= left.stats.sizeInBytes) { + Seq(joins.BroadcastHashJoinExec( + leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right))) + } else { + Seq(joins.BroadcastHashJoinExec( + leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right))) + } --- End diff -- Also add a test case? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19714#discussion_r153583900 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -153,6 +151,27 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // --- BroadcastHashJoin + case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) +if canBuildRight(joinType) && canBuildLeft(joinType) + && left.stats.hints.broadcast && right.stats.hints.broadcast => --- End diff -- ```Scala if canBuildRight(joinType) && canBuildLeft(joinType) && (left.stats.hints.broadcast || right.stats.hints.broadcast) => ``` Can you combine these three cases? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19714#discussion_r153583413 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -153,6 +151,27 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // --- BroadcastHashJoin + case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) +if canBuildRight(joinType) && canBuildLeft(joinType) + && left.stats.hints.broadcast && right.stats.hints.broadcast => + if (right.stats.sizeInBytes <= left.stats.sizeInBytes) { + Seq(joins.BroadcastHashJoinExec( + leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right))) + } else { + Seq(joins.BroadcastHashJoinExec( + leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right))) + } + + case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) +if canBuildRight(joinType) && right.stats.hints.broadcast => --- End diff -- two more space before if --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19714#discussion_r153583308 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -153,6 +151,27 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // --- BroadcastHashJoin + case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) +if canBuildRight(joinType) && canBuildLeft(joinType) + && left.stats.hints.broadcast && right.stats.hints.broadcast => + if (right.stats.sizeInBytes <= left.stats.sizeInBytes) { + Seq(joins.BroadcastHashJoinExec( + leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right))) + } else { + Seq(joins.BroadcastHashJoinExec( + leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right))) + } --- End diff -- Just need to do something like ``` val buildSide = if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight else BuildLeft Seq(joins.BroadcastHashJoinExec( leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right))) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19714#discussion_r153428919 --- Diff: docs/sql-programming-guide.md --- @@ -1492,6 +1492,19 @@ that these options will be deprecated in future release as more optimizations ar +## Broadcast Hint for SQL Queries --- End diff -- let's also mention the behavior change in migration guide section. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19714#discussion_r153427238 --- Diff: docs/sql-programming-guide.md --- @@ -1492,6 +1492,19 @@ that these options will be deprecated in future release as more optimizations ar +## Broadcast Hint for SQL Queries + +Broadcast hint is a way for users to manually annotate a query and suggest to the query optimizer the join method. +It is very useful when the query optimizer cannot make optimal decision with respect to join methods +due to conservativeness or the lack of proper statistics. The hint syntax looks like the following +(Note that we accept `BROADCAST`, `BROADCASTJOIN` and `MAPJOIN` for broadcast hint): + +{% highlight sql %} + +SELECT /*+ MAPJOIN(t1) */ * FROM t1 JOIN t2 ON t1.key = t2.key --- End diff -- let's use `BROADCAST` instead of `MAPJOIN` in the example --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19714#discussion_r153427121 --- Diff: docs/sql-programming-guide.md --- @@ -1492,6 +1492,19 @@ that these options will be deprecated in future release as more optimizations ar +## Broadcast Hint for SQL Queries + +Broadcast hint is a way for users to manually annotate a query and suggest to the query optimizer the join method. +It is very useful when the query optimizer cannot make optimal decision with respect to join methods +due to conservativeness or the lack of proper statistics. The hint syntax looks like the following +(Note that we accept `BROADCAST`, `BROADCASTJOIN` and `MAPJOIN` for broadcast hint): + +{% highlight sql %} --- End diff -- let's also add examples of other languages. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user wangyum commented on a diff in the pull request: https://github.com/apache/spark/pull/19714#discussion_r153401195 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -153,6 +152,27 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // --- BroadcastHashJoin + case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) --- End diff -- if users ask to broadcast both the join side in the hint will pick a smaller side to broadcast according to stats --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user wangyum commented on a diff in the pull request: https://github.com/apache/spark/pull/19714#discussion_r153401036 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala --- @@ -223,4 +223,36 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils { assert(HashJoin.rewriteKeyExpr(l :: ss :: Nil) === l :: ss :: Nil) assert(HashJoin.rewriteKeyExpr(i :: ss :: Nil) === i :: ss :: Nil) } + + test("Shouldn't change broadcast join buildSide if user clearly specified") { +spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value").createTempView("table1") +spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value").createTempView("table2") + +def assertJoinBuildSide(pair: (String, BuildSide)): Any = { + val (sqlString, s) = pair + val df = sql(sqlString) + val physical = df.queryExecution.executedPlan + physical match { --- End diff -- Just check the results in the absence of this patch is also successful. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19714#discussion_r153098067 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala --- @@ -223,4 +223,36 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils { assert(HashJoin.rewriteKeyExpr(l :: ss :: Nil) === l :: ss :: Nil) assert(HashJoin.rewriteKeyExpr(i :: ss :: Nil) === i :: ss :: Nil) } + + test("Shouldn't change broadcast join buildSide if user clearly specified") { +spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value").createTempView("table1") +spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value").createTempView("table2") + +def assertJoinBuildSide(pair: (String, BuildSide)): Any = { + val (sqlString, s) = pair + val df = sql(sqlString) + val physical = df.queryExecution.executedPlan + physical match { --- End diff -- Instead of doing `match`, can you just try to call `collect` and assert the result is 1? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19714#discussion_r150438437 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala --- @@ -223,4 +223,19 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils { assert(HashJoin.rewriteKeyExpr(l :: ss :: Nil) === l :: ss :: Nil) assert(HashJoin.rewriteKeyExpr(i :: ss :: Nil) === i :: ss :: Nil) } + + test("Shouldn't change broadcast join buildSide if user clearly specified") { +spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value").createTempView("table1") +spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value").createTempView("table2") + +val bl = sql(s"SELECT /*+ MAPJOIN(t1) */ * FROM table1 t1 JOIN table2 t2 ON t1.key = t2.key") + .queryExecution + .executedPlan +assert(bl.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide === BuildLeft) + +val br = sql(s"SELECT /*+ MAPJOIN(t2) */ * FROM table1 t1 JOIN table2 t2 ON t1.key = t2.key") + .queryExecution + .executedPlan +assert(br.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide === BuildRight) --- End diff -- Could you also add test cases for `BroadcastNestedLoopJoinExec`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user wangyum commented on a diff in the pull request: https://github.com/apache/spark/pull/19714#discussion_r150368276 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -154,12 +158,12 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // --- BroadcastHashJoin case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) -if canBuildRight(joinType) && canBroadcast(right) => +if canBuildRight(joinType) && canBroadcast(right, left.stats.hints.broadcast) => --- End diff -- @mgaido91 Thanks for your suggestion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19714#discussion_r150255560 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -154,12 +158,12 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // --- BroadcastHashJoin case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) -if canBuildRight(joinType) && canBroadcast(right) => +if canBuildRight(joinType) && canBroadcast(right, left.stats.hints.broadcast) => --- End diff -- What about just adding here `&& !left.stats.hints.broadcast`? In this way we are not changing the `canBroadcast` method which is not really readable after the change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
GitHub user wangyum opened a pull request: https://github.com/apache/spark/pull/19714 [SPARK-22489][SQL] Shouldn't change broadcast join buildSide if user clearly specified ## What changes were proposed in this pull request? How to reproduce: ```scala import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value").createTempView("table1") spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value").createTempView("table2") val bl = sql(s"SELECT /*+ MAPJOIN(t1) */ * FROM table1 t1 JOIN table2 t2 ON t1.key = t2.key").queryExecution.executedPlan println(bl.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide) ``` The result is `BuildRight`, but should be `BuildLeft`. This PR fix this issue. ## How was this patch tested? unit tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/wangyum/spark SPARK-22489 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19714.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19714 commit 68dfc42d80548c1eeb75275df43d4542146a60d4 Author: Yuming WangDate: 2017-11-10T05:55:51Z Shouldn't change broadcast join buildSide if user clearly specified --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org