[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19714


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-29 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r153969180
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -149,10 +147,47 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   case _ => false
 }
 
+private def broadcastSide(
+canBuildLeft: Boolean,
+canBuildRight: Boolean,
+left: LogicalPlan,
+right: LogicalPlan): BuildSide = {
+
+  def smallerSide =
+if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight 
else BuildLeft
+
+  val buildRight = canBuildRight && right.stats.hints.broadcast
+  val buildLeft = canBuildLeft && left.stats.hints.broadcast
+
+
+  if (buildRight && buildLeft) {
+// Broadcast smaller side base on its estimated physical size
+// if both sides have broadcast hint
+smallerSide
+  } else if (buildRight) {
+BuildRight
+  } else if (buildLeft) {
+BuildLeft
+  } else if (canBuildRight && canBuildLeft) {
+// for the last default broadcast nested loop join
+smallerSide
+  } else {
+throw new AnalysisException(
+  "Can not decide to use which side for BuildSide for this join")
--- End diff --

to be consistent with the method name, how about: `Can not decide which 
side to broadcast for this join`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-29 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r153969298
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1492,6 +1492,64 @@ that these options will be deprecated in future 
release as more optimizations ar
   
 
 
+## Broadcast Hint for SQL Queries
+
+The `BROADCAST` hint guides Spark to broadcast each specified table when 
joining them with another table or view.
+When Spark deciding the join methods, the broadcast hash join (i.e., BHJ) 
is preferred, 
+even if the statistics is above the configuration 
`spark.sql.autoBroadcastJoinThreshold`.
+When both sides of a join are specified, Spark broadcasts the one having 
the lower statistics.
+Note Spark does not guaranttee BHJ is always chosen, since not all cases 
(e.g. full outer join) 
--- End diff --

typo: guarantee


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-29 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r153867013
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1492,6 +1492,19 @@ that these options will be deprecated in future 
release as more optimizations ar
   
 
 
+## Broadcast Hint for SQL Queries
--- End diff --

+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-29 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r153865403
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -189,6 +219,13 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   // --- Without joining keys 

 
   // Pick BroadcastNestedLoopJoin if one side could be broadcasted
+  case j @ logical.Join(left, right, joinType, condition)
+if (canBuildRight(joinType) && right.stats.hints.broadcast)
+  || (canBuildLeft(joinType) && left.stats.hints.broadcast) =>
--- End diff --

The same here


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-29 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r153865264
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -149,10 +147,42 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   case _ => false
 }
 
+private def broadcastSide(
+canBuildLeft: Boolean,
+canBuildRight: Boolean,
+left: LogicalPlan,
+right: LogicalPlan): BuildSide = {
+
+  def smallerSide =
+if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight 
else BuildLeft
+
+  val buildRight = canBuildRight && right.stats.hints.broadcast
+  val buildLeft = canBuildLeft && left.stats.hints.broadcast
+
+  // Both sides have broadcast hint, broadcast smaller side base on 
its estimated physical size.
+  if (buildRight && buildLeft) {
+smallerSide
+  } else if (buildRight) {
+BuildRight
+  } else if (buildLeft) {
+BuildLeft
+  // This used for `case logical.Join(left, right, joinType, 
condition)`
+  } else {
+smallerSide
+  }
+}
+
 def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
 
   // --- BroadcastHashJoin 

 
+  case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, 
left, right)
+if (canBuildRight(joinType) && right.stats.hints.broadcast)
+  || (canBuildLeft(joinType) && left.stats.hints.broadcast) =>
--- End diff --

move `||` to line 180


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-29 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r153864916
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -149,10 +147,42 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   case _ => false
 }
 
+private def broadcastSide(
+canBuildLeft: Boolean,
+canBuildRight: Boolean,
+left: LogicalPlan,
+right: LogicalPlan): BuildSide = {
+
+  def smallerSide =
+if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight 
else BuildLeft
+
+  val buildRight = canBuildRight && right.stats.hints.broadcast
+  val buildLeft = canBuildLeft && left.stats.hints.broadcast
+
+  // Both sides have broadcast hint, broadcast smaller side base on 
its estimated physical size.
+  if (buildRight && buildLeft) {
+smallerSide
+  } else if (buildRight) {
+BuildRight
+  } else if (buildLeft) {
+BuildLeft
+  // This used for `case logical.Join(left, right, joinType, 
condition)`
+  } else {
+smallerSide
--- End diff --

```
else if (canBuildRight && canBuildLeft) {
  smallerSide
} else {
  // throw exception
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-29 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r153864513
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -149,10 +147,42 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   case _ => false
 }
 
+private def broadcastSide(
+canBuildLeft: Boolean,
+canBuildRight: Boolean,
+left: LogicalPlan,
+right: LogicalPlan): BuildSide = {
+
+  def smallerSide =
+if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight 
else BuildLeft
+
+  val buildRight = canBuildRight && right.stats.hints.broadcast
+  val buildLeft = canBuildLeft && left.stats.hints.broadcast
+
+  // Both sides have broadcast hint, broadcast smaller side base on 
its estimated physical size.
+  if (buildRight && buildLeft) {
+smallerSide
+  } else if (buildRight) {
+BuildRight
+  } else if (buildLeft) {
+BuildLeft
+  // This used for `case logical.Join(left, right, joinType, 
condition)`
--- End diff --

remove this line.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-29 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r153864221
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -203,12 +240,7 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 joins.CartesianProductExec(planLater(left), planLater(right), 
condition) :: Nil
 
   case logical.Join(left, right, joinType, condition) =>
-val buildSide =
-  if (right.stats.sizeInBytes <= left.stats.sizeInBytes) {
-BuildRight
-  } else {
-BuildLeft
-  }
+val buildSide = broadcastSide(true, true, left, right)
--- End diff --

`canBuildLeft = true, canBuildRight = true`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-29 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r153863330
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -149,10 +147,42 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   case _ => false
 }
 
+private def broadcastSide(
+canBuildLeft: Boolean,
+canBuildRight: Boolean,
+left: LogicalPlan,
+right: LogicalPlan): BuildSide = {
+
+  def smallerSide =
+if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight 
else BuildLeft
+
+  val buildRight = canBuildRight && right.stats.hints.broadcast
+  val buildLeft = canBuildLeft && left.stats.hints.broadcast
+
+  // Both sides have broadcast hint, broadcast smaller side base on 
its estimated physical size.
+  if (buildRight && buildLeft) {
+smallerSide
+  } else if (buildRight) {
+BuildRight
+  } else if (buildLeft) {
+BuildLeft
+  // This used for `case logical.Join(left, right, joinType, 
condition)`
--- End diff --

?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-29 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r153771297
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
 ---
@@ -223,4 +223,69 @@ class BroadcastJoinSuite extends QueryTest with 
SQLTestUtils {
 assert(HashJoin.rewriteKeyExpr(l :: ss :: Nil) === l :: ss :: Nil)
 assert(HashJoin.rewriteKeyExpr(i :: ss :: Nil) === i :: ss :: Nil)
   }
+
+  test("Shouldn't change broadcast join buildSide if user clearly 
specified") {
+def assertJoinBuildSide(pair: (String, String, BuildSide)): Any = {
+  val (sqlString, joinMethod, buildSide) = pair
+  val executedPlan = sql(sqlString).queryExecution.executedPlan
+  executedPlan match {
+case b: BroadcastNestedLoopJoinExec =>
+  assert(b.getClass.getSimpleName === joinMethod)
+  assert(b.buildSide === buildSide)
+case w: WholeStageCodegenExec =>
+  assert(w.children.head.getClass.getSimpleName === joinMethod)
+  
assert(w.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide === 
buildSide)
+  }
+}
+
+withTempView("t1", "t2") {
+  spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", 
"value").createTempView("t1")
+  spark.createDataFrame(Seq((1, "1"), (2, "12.3"), (2, 
"123"))).toDF("key", "value")
+.createTempView("t2")
+
+  val t1Size = 
spark.table("t1").queryExecution.analyzed.children.head.stats.sizeInBytes
+  val t2Size = 
spark.table("t2").queryExecution.analyzed.children.head.stats.sizeInBytes
+  assert(t1Size < t2Size)
+
+  val bh = BroadcastHashJoinExec.toString
+  val bl = BroadcastNestedLoopJoinExec.toString
+
+  Seq(
+// INNER JOIN && t1Size < t2Size => BuildLeft
+("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 JOIN t2 ON t1.key = 
t2.key", bh, BuildLeft),
+// LEFT JOIN => BuildRight
+("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 LEFT JOIN t2 ON t1.key = 
t2.key", bh, BuildRight),
+// RIGHT JOIN => BuildLeft
+("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 RIGHT JOIN t2 ON t1.key 
= t2.key", bh, BuildLeft),
+// INNER JOIN && broadcast(t1) => BuildLeft
+("SELECT /*+ MAPJOIN(t1) */ * FROM t1 JOIN t2 ON t1.key = t2.key", 
bh, BuildLeft),
+// INNER JOIN && broadcast(t2) => BuildRight
+("SELECT /*+ MAPJOIN(t2) */ * FROM t1 JOIN t2 ON t1.key = t2.key", 
bh, BuildRight)
+  ).foreach(assertJoinBuildSide)
--- End diff --

I think it's more readable to write
```
assertJoinBuildSide(...)
assertJoinBuildSide(...)
...
```
than
```
Seq(
  ...
).foreach(assertJoinBuildSide)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-29 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r153770711
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -149,10 +147,43 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   case _ => false
 }
 
+private def broadcastBuildSide(
+canBuildLeft: Boolean,
+canBuildRight: Boolean,
+left: LogicalPlan,
+right: LogicalPlan): BuildSide = {
+
+  // Both sides have broadcast hint, broadcast smaller side base on 
its estimated physical size.
+  def buildSideBaseSize =
--- End diff --

nit: `smallerSide`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-29 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r153770517
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -149,10 +147,43 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   case _ => false
 }
 
+private def broadcastBuildSide(
--- End diff --

nit: `broadcastSide`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-29 Thread wangyum
Github user wangyum commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r153736381
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -153,6 +151,27 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 
   // --- BroadcastHashJoin 

 
+  case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, 
left, right)
+if canBuildRight(joinType) && canBuildLeft(joinType)
+  && left.stats.hints.broadcast && right.stats.hints.broadcast =>
--- End diff --

I have an idea to combine these three cases to `broadcastBuildSide` 
function:

https://github.com/apache/spark/blob/a2cf0ced6615e342e84e270cf060ace69eb3ba0f/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala#L150-L185


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r153680715
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -153,6 +151,27 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 
   // --- BroadcastHashJoin 

 
+  case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, 
left, right)
+if canBuildRight(joinType) && canBuildLeft(joinType)
+  && left.stats.hints.broadcast && right.stats.hints.broadcast =>
--- End diff --

I think we can create new methods for it
```
def shouldBuildLeft(joinType: JoinType, left: LogicalPlan, right: 
LogicalPlan): Boolean {
  if (left.stats.hints.broadcast) {
if (canBuildRight(joinType) && right.stats.hints.broadcast) {
  // if both sides have broadcast hint, only broadcast left side if its 
estimated pyhsical size is smaller than right side
  left.stats.sizeInBytes <= right.stats.sizeInBytes
} else {
  // if only left side has the broadcast hint, broadcast the left side.
  true
}
  } else {
  if (canBuildRight(joinType) && right.stats.hints.broadcast) {
// if only right side has the broadcast hint, do not broadcast the 
left side.
false
  } else {
// if neither one of the sides has broadcast hint, only broadcast 
the left side if its estimated physical size is smaller than the treshold and 
smaller than right side.
canBroadcast(left) && left.stats.sizeInBytes <= 
right.stats.sizeInBytes
  }
  }
}

def shouldBuildRight...
```
and use it like
```
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, 
right)
if canBuildRight(joinType) && shouldBuildRight(joinType, left, right)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r153679188
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -91,10 +91,10 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
* predicates can be evaluated by matching join keys. If found,  Join 
implementations are chosen
* with the following precedence:
*
-   * - Broadcast: if one side of the join has an estimated physical size 
that is smaller than the
-   * user-configurable [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] 
threshold
-   * or if that side has an explicit broadcast hint (e.g. the user 
applied the
-   * [[org.apache.spark.sql.functions.broadcast()]] function to a 
DataFrame), then that side
+   * - Broadcast: if one side of the join has an explicit broadcast hint 
(e.g. the user applied the
--- End diff --

```
Broadcast: We prefer to broadcast the join side with an explicit broadcast 
hint(e.g. the user applied the [[org.apache.spark.sql.functions.broadcast()]] 
function to a DataFrame). If both sides have the broadcast hint, we prefer to 
broadcast the side with a smaller estimated physical size. If neither one of 
the sides has the broadcast hint, we only broadcast the join side if its 
estimated physical size that is smaller than the user-configurable 
[[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold.
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r153677941
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1492,6 +1492,61 @@ that these options will be deprecated in future 
release as more optimizations ar
   
 
 
+## Broadcast Hint for SQL Queries
+
+Broadcast hint is a way for users to manually annotate a query and suggest 
to the query optimizer the join method. 
+It is very useful when the query optimizer cannot make optimal decision 
with respect to join methods 
+due to conservativeness or the lack of proper statistics. 
+Spark Broadcast Hint has higher priority than autoBroadcastJoin mechanism, 
examples:
+
+
+
+
+
+{% highlight scala %}
+val src = sql("SELECT * FROM src")
+broadcast(src).join(recordsDF, Seq("key")).show()
--- End diff --

a more standard way:
```
import org.apache.spark.sql.functions.broadcast
broadcast(spark.table("src")).join(spark.table("records"), 
Seq("key")).show()
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r153677668
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1492,6 +1492,61 @@ that these options will be deprecated in future 
release as more optimizations ar
   
 
 
+## Broadcast Hint for SQL Queries
+
+Broadcast hint is a way for users to manually annotate a query and suggest 
to the query optimizer the join method. 
+It is very useful when the query optimizer cannot make optimal decision 
with respect to join methods 
+due to conservativeness or the lack of proper statistics. 
+Spark Broadcast Hint has higher priority than autoBroadcastJoin mechanism, 
examples:
+
+
+
+
+
+{% highlight scala %}
+val src = sql("SELECT * FROM src")
+broadcast(src).join(recordsDF, Seq("key")).show()
+{% endhighlight %}
+
+
+
+
+
+{% highlight java %}
+Dataset src = sql("SELECT * FROM src");
+broadcast(src).join(recordsDF, Seq("key")).show();
+{% endhighlight %}
+
+
+
+
+
+{% highlight python %}
+src = spark.sql("SELECT * FROM src")
+recordsDF.join(broadcast(src), "key").show()
+{% endhighlight %}
+
+
+
+
+
+{% highlight r %}
+src <- sql("SELECT COUNT(*) FROM src")
+showDF(join(broadcast(src), recordsDF, src$key == recordsDF$key)))
+{% endhighlight %}
+
+
+
+
+
+{% highlight sql %}
+SELECT /*+ BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key
+{% endhighlight %}
+
+
+
+(Note that we accept `BROADCAST`, `BROADCASTJOIN` and `MAPJOIN` for 
broadcast hint)
--- End diff --

shall we treat it as a comment on the SQL example?
```
--we accept BROADCAST, BROADCASTJOIN and MAPJOIN for broadcast hint
SELECT /*+ BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-28 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r153591750
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
 ---
@@ -223,4 +223,50 @@ class BroadcastJoinSuite extends QueryTest with 
SQLTestUtils {
 assert(HashJoin.rewriteKeyExpr(l :: ss :: Nil) === l :: ss :: Nil)
 assert(HashJoin.rewriteKeyExpr(i :: ss :: Nil) === i :: ss :: Nil)
   }
+
+  test("Shouldn't change broadcast join buildSide if user clearly 
specified") {
+def assertJoinBuildSide(pair: (String, BuildSide)): Any = {
--- End diff --

Also pass the expected join method. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-28 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r153591249
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1492,6 +1492,61 @@ that these options will be deprecated in future 
release as more optimizations ar
   
 
 
+## Broadcast Hint for SQL Queries
+
+Broadcast hint is a way for users to manually annotate a query and suggest 
to the query optimizer the join method. 
+It is very useful when the query optimizer cannot make optimal decision 
with respect to join methods 
+due to conservativeness or the lack of proper statistics. 
+Spark Broadcast Hint has higher priority than autoBroadcastJoin mechanism, 
examples:
--- End diff --

> The `BROADCAST` hint guides Spark to broadcast each specified table when 
joining them with another table or view. When Spark deciding the join methods, 
the broadcast hash join (i.e., BHJ) is preferred, even if the statistics is 
above the configuration `spark.sql.autoBroadcastJoinThreshold`. When both sides 
of a join are specified, Spark broadcasts the one having the lower statistics. 
Note Spark does not guaranttee BHJ is always chosen, since not all cases (e.g. 
full outer join) support BHJ. When the broadcast nested loop join is selected, 
we still respect the hint. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-28 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r153584009
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -153,6 +151,27 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 
   // --- BroadcastHashJoin 

 
+  case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, 
left, right)
+if canBuildRight(joinType) && canBuildLeft(joinType)
+  && left.stats.hints.broadcast && right.stats.hints.broadcast =>
+   if (right.stats.sizeInBytes <= left.stats.sizeInBytes) {
+ Seq(joins.BroadcastHashJoinExec(
+   leftKeys, rightKeys, joinType, BuildRight, condition, 
planLater(left), planLater(right)))
+   } else {
+ Seq(joins.BroadcastHashJoinExec(
+   leftKeys, rightKeys, joinType, BuildLeft, condition, 
planLater(left), planLater(right)))
+   }
--- End diff --

Also add a test case?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-28 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r153583900
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -153,6 +151,27 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 
   // --- BroadcastHashJoin 

 
+  case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, 
left, right)
+if canBuildRight(joinType) && canBuildLeft(joinType)
+  && left.stats.hints.broadcast && right.stats.hints.broadcast =>
--- End diff --

```Scala
if canBuildRight(joinType) && canBuildLeft(joinType)
  && (left.stats.hints.broadcast || right.stats.hints.broadcast) =>
```

Can you combine these three cases?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-28 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r153583413
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -153,6 +151,27 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 
   // --- BroadcastHashJoin 

 
+  case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, 
left, right)
+if canBuildRight(joinType) && canBuildLeft(joinType)
+  && left.stats.hints.broadcast && right.stats.hints.broadcast =>
+   if (right.stats.sizeInBytes <= left.stats.sizeInBytes) {
+ Seq(joins.BroadcastHashJoinExec(
+   leftKeys, rightKeys, joinType, BuildRight, condition, 
planLater(left), planLater(right)))
+   } else {
+ Seq(joins.BroadcastHashJoinExec(
+   leftKeys, rightKeys, joinType, BuildLeft, condition, 
planLater(left), planLater(right)))
+   }
+
+  case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, 
left, right)
+if canBuildRight(joinType) && right.stats.hints.broadcast =>
--- End diff --

two more space before if


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-28 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r153583308
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -153,6 +151,27 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 
   // --- BroadcastHashJoin 

 
+  case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, 
left, right)
+if canBuildRight(joinType) && canBuildLeft(joinType)
+  && left.stats.hints.broadcast && right.stats.hints.broadcast =>
+   if (right.stats.sizeInBytes <= left.stats.sizeInBytes) {
+ Seq(joins.BroadcastHashJoinExec(
+   leftKeys, rightKeys, joinType, BuildRight, condition, 
planLater(left), planLater(right)))
+   } else {
+ Seq(joins.BroadcastHashJoinExec(
+   leftKeys, rightKeys, joinType, BuildLeft, condition, 
planLater(left), planLater(right)))
+   }
--- End diff --

Just need to do something like 
```
val buildSide = if (right.stats.sizeInBytes <= left.stats.sizeInBytes) 
BuildRight else BuildLeft
Seq(joins.BroadcastHashJoinExec(
  leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), 
planLater(right)))
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r153428919
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1492,6 +1492,19 @@ that these options will be deprecated in future 
release as more optimizations ar
   
 
 
+## Broadcast Hint for SQL Queries
--- End diff --

let's also mention the behavior change in migration guide section.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r153427238
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1492,6 +1492,19 @@ that these options will be deprecated in future 
release as more optimizations ar
   
 
 
+## Broadcast Hint for SQL Queries
+
+Broadcast hint is a way for users to manually annotate a query and suggest 
to the query optimizer the join method. 
+It is very useful when the query optimizer cannot make optimal decision 
with respect to join methods 
+due to conservativeness or the lack of proper statistics. The hint syntax 
looks like the following 
+(Note that we accept `BROADCAST`, `BROADCASTJOIN` and `MAPJOIN` for 
broadcast hint):
+
+{% highlight sql %}
+
+SELECT /*+ MAPJOIN(t1) */ * FROM t1 JOIN t2 ON t1.key = t2.key
--- End diff --

let's use `BROADCAST` instead of `MAPJOIN` in the example


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r153427121
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1492,6 +1492,19 @@ that these options will be deprecated in future 
release as more optimizations ar
   
 
 
+## Broadcast Hint for SQL Queries
+
+Broadcast hint is a way for users to manually annotate a query and suggest 
to the query optimizer the join method. 
+It is very useful when the query optimizer cannot make optimal decision 
with respect to join methods 
+due to conservativeness or the lack of proper statistics. The hint syntax 
looks like the following 
+(Note that we accept `BROADCAST`, `BROADCASTJOIN` and `MAPJOIN` for 
broadcast hint):
+
+{% highlight sql %}
--- End diff --

let's also add examples of other languages.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-27 Thread wangyum
Github user wangyum commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r153401195
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -153,6 +152,27 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 
   // --- BroadcastHashJoin 

 
+  case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, 
left, right)
--- End diff --

 if users ask to broadcast both the join side in the hint will pick a 
smaller side to broadcast according to stats


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-27 Thread wangyum
Github user wangyum commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r153401036
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
 ---
@@ -223,4 +223,36 @@ class BroadcastJoinSuite extends QueryTest with 
SQLTestUtils {
 assert(HashJoin.rewriteKeyExpr(l :: ss :: Nil) === l :: ss :: Nil)
 assert(HashJoin.rewriteKeyExpr(i :: ss :: Nil) === i :: ss :: Nil)
   }
+
+  test("Shouldn't change broadcast join buildSide if user clearly 
specified") {
+spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", 
"value").createTempView("table1")
+spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", 
"value").createTempView("table2")
+
+def assertJoinBuildSide(pair: (String, BuildSide)): Any = {
+  val (sqlString, s) = pair
+  val df = sql(sqlString)
+  val physical = df.queryExecution.executedPlan
+  physical match {
--- End diff --

Just check the results in the absence of this patch is also successful.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-26 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r153098067
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
 ---
@@ -223,4 +223,36 @@ class BroadcastJoinSuite extends QueryTest with 
SQLTestUtils {
 assert(HashJoin.rewriteKeyExpr(l :: ss :: Nil) === l :: ss :: Nil)
 assert(HashJoin.rewriteKeyExpr(i :: ss :: Nil) === i :: ss :: Nil)
   }
+
+  test("Shouldn't change broadcast join buildSide if user clearly 
specified") {
+spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", 
"value").createTempView("table1")
+spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", 
"value").createTempView("table2")
+
+def assertJoinBuildSide(pair: (String, BuildSide)): Any = {
+  val (sqlString, s) = pair
+  val df = sql(sqlString)
+  val physical = df.queryExecution.executedPlan
+  physical match {
--- End diff --

Instead of doing `match`, can you just try to call `collect` and  assert 
the result is 1?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-12 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r150438437
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
 ---
@@ -223,4 +223,19 @@ class BroadcastJoinSuite extends QueryTest with 
SQLTestUtils {
 assert(HashJoin.rewriteKeyExpr(l :: ss :: Nil) === l :: ss :: Nil)
 assert(HashJoin.rewriteKeyExpr(i :: ss :: Nil) === i :: ss :: Nil)
   }
+
+  test("Shouldn't change broadcast join buildSide if user clearly 
specified") {
+spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", 
"value").createTempView("table1")
+spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", 
"value").createTempView("table2")
+
+val bl = sql(s"SELECT /*+ MAPJOIN(t1) */ * FROM table1 t1 JOIN table2 
t2 ON t1.key = t2.key")
+  .queryExecution
+  .executedPlan
+assert(bl.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide 
=== BuildLeft)
+
+val br = sql(s"SELECT /*+ MAPJOIN(t2) */ * FROM table1 t1 JOIN table2 
t2 ON t1.key = t2.key")
+  .queryExecution
+  .executedPlan
+assert(br.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide 
=== BuildRight)
--- End diff --

Could you also add test cases for `BroadcastNestedLoopJoinExec`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-10 Thread wangyum
Github user wangyum commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r150368276
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -154,12 +158,12 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   // --- BroadcastHashJoin 

 
   case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, 
left, right)
-if canBuildRight(joinType) && canBroadcast(right) =>
+if canBuildRight(joinType) && canBroadcast(right, 
left.stats.hints.broadcast) =>
--- End diff --

@mgaido91 Thanks for your suggestion.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-10 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r150255560
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -154,12 +158,12 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   // --- BroadcastHashJoin 

 
   case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, 
left, right)
-if canBuildRight(joinType) && canBroadcast(right) =>
+if canBuildRight(joinType) && canBroadcast(right, 
left.stats.hints.broadcast) =>
--- End diff --

What about just adding here `&& !left.stats.hints.broadcast`? In this way 
we are not changing the `canBroadcast` method which is not really readable 
after the change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-09 Thread wangyum
GitHub user wangyum opened a pull request:

https://github.com/apache/spark/pull/19714

[SPARK-22489][SQL] Shouldn't change broadcast join buildSide if user 
clearly specified

## What changes were proposed in this pull request?

How to reproduce:
```scala
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec

spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", 
"value").createTempView("table1")
spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", 
"value").createTempView("table2")

val bl = sql(s"SELECT /*+ MAPJOIN(t1) */ * FROM table1 t1 JOIN table2 t2 ON 
t1.key = t2.key").queryExecution.executedPlan

println(bl.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide)
```
The result is `BuildRight`, but should be `BuildLeft`. This PR fix this 
issue.
## How was this patch tested?

unit tests

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wangyum/spark SPARK-22489

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19714.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19714


commit 68dfc42d80548c1eeb75275df43d4542146a60d4
Author: Yuming Wang 
Date:   2017-11-10T05:55:51Z

Shouldn't change broadcast join buildSide if user clearly specified




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org