spark git commit: [SPARK-20576][SQL] Support generic hint function in Dataset/DataFrame

2017-05-03 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 b1a732fea -> f0e80aa2d


[SPARK-20576][SQL] Support generic hint function in Dataset/DataFrame

## What changes were proposed in this pull request?
We allow users to specify hints (currently only "broadcast" is supported) in 
SQL and DataFrame. However, while SQL has a standard hint format (/*+ ... */), 
DataFrame doesn't have one and sometimes users are confused that they can't 
find how to apply a broadcast hint. This ticket adds a generic hint function on 
DataFrame that allows using the same hint on DataFrames as well as SQL.

As an example, after this patch, the following will apply a broadcast hint on a 
DataFrame using the new hint function:

```
df1.join(df2.hint("broadcast"))
```

## How was this patch tested?
Added a test case in DataFrameJoinSuite.

Author: Reynold Xin 

Closes #17839 from rxin/SPARK-20576.

(cherry picked from commit 527fc5d0c990daaacad4740f62cfe6736609b77b)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f0e80aa2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f0e80aa2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f0e80aa2

Branch: refs/heads/branch-2.2
Commit: f0e80aa2ddee80819ef33ee24eb6a15a73bc02d5
Parents: b1a732f
Author: Reynold Xin 
Authored: Wed May 3 09:22:25 2017 -0700
Committer: Reynold Xin 
Committed: Wed May 3 09:22:41 2017 -0700

--
 .../sql/catalyst/analysis/ResolveHints.scala  |  8 +++-
 .../main/scala/org/apache/spark/sql/Dataset.scala | 16 
 .../org/apache/spark/sql/DataFrameJoinSuite.scala | 18 +-
 3 files changed, 40 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f0e80aa2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
index c4827b8..df688fa 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
@@ -86,7 +86,13 @@ object ResolveHints {
 
 def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
   case h: Hint if 
BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
-applyBroadcastHint(h.child, h.parameters.toSet)
+if (h.parameters.isEmpty) {
+  // If there is no table alias specified, turn the entire subtree 
into a BroadcastHint.
+  BroadcastHint(h.child)
+} else {
+  // Otherwise, find within the subtree query plans that should be 
broadcasted.
+  applyBroadcastHint(h.child, h.parameters.toSet)
+}
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f0e80aa2/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 06dd550..5f602dc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1074,6 +1074,22 @@ class Dataset[T] private[sql](
   def apply(colName: String): Column = col(colName)
 
   /**
+   * Specifies some hint on the current Dataset. As an example, the following 
code specifies
+   * that one of the plan can be broadcasted:
+   *
+   * {{{
+   *   df1.join(df2.hint("broadcast"))
+   * }}}
+   *
+   * @group basic
+   * @since 2.2.0
+   */
+  @scala.annotation.varargs
+  def hint(name: String, parameters: String*): Dataset[T] = withTypedPlan {
+Hint(name, parameters, logicalPlan)
+  }
+
+  /**
* Selects column based on the column name and return it as a [[Column]].
*
* @note The column name can also reference to a nested column like `a.b`.

http://git-wip-us.apache.org/repos/asf/spark/blob/f0e80aa2/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index 541ffb5..4a52af6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -151,7 +151,7 @@ class DataFrameJoinSuite extends QueryTest with 

spark git commit: [SPARK-20576][SQL] Support generic hint function in Dataset/DataFrame

2017-05-03 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 27f543b15 -> 527fc5d0c


[SPARK-20576][SQL] Support generic hint function in Dataset/DataFrame

## What changes were proposed in this pull request?
We allow users to specify hints (currently only "broadcast" is supported) in 
SQL and DataFrame. However, while SQL has a standard hint format (/*+ ... */), 
DataFrame doesn't have one and sometimes users are confused that they can't 
find how to apply a broadcast hint. This ticket adds a generic hint function on 
DataFrame that allows using the same hint on DataFrames as well as SQL.

As an example, after this patch, the following will apply a broadcast hint on a 
DataFrame using the new hint function:

```
df1.join(df2.hint("broadcast"))
```

## How was this patch tested?
Added a test case in DataFrameJoinSuite.

Author: Reynold Xin 

Closes #17839 from rxin/SPARK-20576.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/527fc5d0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/527fc5d0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/527fc5d0

Branch: refs/heads/master
Commit: 527fc5d0c990daaacad4740f62cfe6736609b77b
Parents: 27f543b
Author: Reynold Xin 
Authored: Wed May 3 09:22:25 2017 -0700
Committer: Reynold Xin 
Committed: Wed May 3 09:22:25 2017 -0700

--
 .../sql/catalyst/analysis/ResolveHints.scala  |  8 +++-
 .../main/scala/org/apache/spark/sql/Dataset.scala | 16 
 .../org/apache/spark/sql/DataFrameJoinSuite.scala | 18 +-
 3 files changed, 40 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/527fc5d0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
index c4827b8..df688fa 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
@@ -86,7 +86,13 @@ object ResolveHints {
 
 def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
   case h: Hint if 
BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
-applyBroadcastHint(h.child, h.parameters.toSet)
+if (h.parameters.isEmpty) {
+  // If there is no table alias specified, turn the entire subtree 
into a BroadcastHint.
+  BroadcastHint(h.child)
+} else {
+  // Otherwise, find within the subtree query plans that should be 
broadcasted.
+  applyBroadcastHint(h.child, h.parameters.toSet)
+}
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/527fc5d0/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 147e765..620c8bd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1161,6 +1161,22 @@ class Dataset[T] private[sql](
   def apply(colName: String): Column = col(colName)
 
   /**
+   * Specifies some hint on the current Dataset. As an example, the following 
code specifies
+   * that one of the plan can be broadcasted:
+   *
+   * {{{
+   *   df1.join(df2.hint("broadcast"))
+   * }}}
+   *
+   * @group basic
+   * @since 2.2.0
+   */
+  @scala.annotation.varargs
+  def hint(name: String, parameters: String*): Dataset[T] = withTypedPlan {
+Hint(name, parameters, logicalPlan)
+  }
+
+  /**
* Selects column based on the column name and return it as a [[Column]].
*
* @note The column name can also reference to a nested column like `a.b`.

http://git-wip-us.apache.org/repos/asf/spark/blob/527fc5d0/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index 541ffb5..4a52af6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -151,7 +151,7 @@ class DataFrameJoinSuite extends QueryTest with 
SharedSQLContext {
   Row(1, 1, 1, 1) :: Row(2, 1, 2, 2) :: Nil)
   }
 
-  test("broadcast join hint") {
+  test("broadcast