spark git commit: [SPARK-20576][SQL] Support generic hint function in Dataset/DataFrame
Repository: spark Updated Branches: refs/heads/branch-2.2 b1a732fea -> f0e80aa2d [SPARK-20576][SQL] Support generic hint function in Dataset/DataFrame ## What changes were proposed in this pull request? We allow users to specify hints (currently only "broadcast" is supported) in SQL and DataFrame. However, while SQL has a standard hint format (/*+ ... */), DataFrame doesn't have one and sometimes users are confused that they can't find how to apply a broadcast hint. This ticket adds a generic hint function on DataFrame that allows using the same hint on DataFrames as well as SQL. As an example, after this patch, the following will apply a broadcast hint on a DataFrame using the new hint function: ``` df1.join(df2.hint("broadcast")) ``` ## How was this patch tested? Added a test case in DataFrameJoinSuite. Author: Reynold XinCloses #17839 from rxin/SPARK-20576. (cherry picked from commit 527fc5d0c990daaacad4740f62cfe6736609b77b) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f0e80aa2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f0e80aa2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f0e80aa2 Branch: refs/heads/branch-2.2 Commit: f0e80aa2ddee80819ef33ee24eb6a15a73bc02d5 Parents: b1a732f Author: Reynold Xin Authored: Wed May 3 09:22:25 2017 -0700 Committer: Reynold Xin Committed: Wed May 3 09:22:41 2017 -0700 -- .../sql/catalyst/analysis/ResolveHints.scala | 8 +++- .../main/scala/org/apache/spark/sql/Dataset.scala | 16 .../org/apache/spark/sql/DataFrameJoinSuite.scala | 18 +- 3 files changed, 40 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f0e80aa2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala index c4827b8..df688fa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala @@ -86,7 +86,13 @@ object ResolveHints { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case h: Hint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) => -applyBroadcastHint(h.child, h.parameters.toSet) +if (h.parameters.isEmpty) { + // If there is no table alias specified, turn the entire subtree into a BroadcastHint. + BroadcastHint(h.child) +} else { + // Otherwise, find within the subtree query plans that should be broadcasted. + applyBroadcastHint(h.child, h.parameters.toSet) +} } } http://git-wip-us.apache.org/repos/asf/spark/blob/f0e80aa2/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 06dd550..5f602dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1074,6 +1074,22 @@ class Dataset[T] private[sql]( def apply(colName: String): Column = col(colName) /** + * Specifies some hint on the current Dataset. As an example, the following code specifies + * that one of the plan can be broadcasted: + * + * {{{ + * df1.join(df2.hint("broadcast")) + * }}} + * + * @group basic + * @since 2.2.0 + */ + @scala.annotation.varargs + def hint(name: String, parameters: String*): Dataset[T] = withTypedPlan { +Hint(name, parameters, logicalPlan) + } + + /** * Selects column based on the column name and return it as a [[Column]]. * * @note The column name can also reference to a nested column like `a.b`. http://git-wip-us.apache.org/repos/asf/spark/blob/f0e80aa2/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index 541ffb5..4a52af6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -151,7 +151,7 @@ class DataFrameJoinSuite extends QueryTest with
spark git commit: [SPARK-20576][SQL] Support generic hint function in Dataset/DataFrame
Repository: spark Updated Branches: refs/heads/master 27f543b15 -> 527fc5d0c [SPARK-20576][SQL] Support generic hint function in Dataset/DataFrame ## What changes were proposed in this pull request? We allow users to specify hints (currently only "broadcast" is supported) in SQL and DataFrame. However, while SQL has a standard hint format (/*+ ... */), DataFrame doesn't have one and sometimes users are confused that they can't find how to apply a broadcast hint. This ticket adds a generic hint function on DataFrame that allows using the same hint on DataFrames as well as SQL. As an example, after this patch, the following will apply a broadcast hint on a DataFrame using the new hint function: ``` df1.join(df2.hint("broadcast")) ``` ## How was this patch tested? Added a test case in DataFrameJoinSuite. Author: Reynold XinCloses #17839 from rxin/SPARK-20576. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/527fc5d0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/527fc5d0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/527fc5d0 Branch: refs/heads/master Commit: 527fc5d0c990daaacad4740f62cfe6736609b77b Parents: 27f543b Author: Reynold Xin Authored: Wed May 3 09:22:25 2017 -0700 Committer: Reynold Xin Committed: Wed May 3 09:22:25 2017 -0700 -- .../sql/catalyst/analysis/ResolveHints.scala | 8 +++- .../main/scala/org/apache/spark/sql/Dataset.scala | 16 .../org/apache/spark/sql/DataFrameJoinSuite.scala | 18 +- 3 files changed, 40 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/527fc5d0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala index c4827b8..df688fa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala @@ -86,7 +86,13 @@ object ResolveHints { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case h: Hint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) => -applyBroadcastHint(h.child, h.parameters.toSet) +if (h.parameters.isEmpty) { + // If there is no table alias specified, turn the entire subtree into a BroadcastHint. + BroadcastHint(h.child) +} else { + // Otherwise, find within the subtree query plans that should be broadcasted. + applyBroadcastHint(h.child, h.parameters.toSet) +} } } http://git-wip-us.apache.org/repos/asf/spark/blob/527fc5d0/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 147e765..620c8bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1161,6 +1161,22 @@ class Dataset[T] private[sql]( def apply(colName: String): Column = col(colName) /** + * Specifies some hint on the current Dataset. As an example, the following code specifies + * that one of the plan can be broadcasted: + * + * {{{ + * df1.join(df2.hint("broadcast")) + * }}} + * + * @group basic + * @since 2.2.0 + */ + @scala.annotation.varargs + def hint(name: String, parameters: String*): Dataset[T] = withTypedPlan { +Hint(name, parameters, logicalPlan) + } + + /** * Selects column based on the column name and return it as a [[Column]]. * * @note The column name can also reference to a nested column like `a.b`. http://git-wip-us.apache.org/repos/asf/spark/blob/527fc5d0/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index 541ffb5..4a52af6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -151,7 +151,7 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { Row(1, 1, 1, 1) :: Row(2, 1, 2, 2) :: Nil) } - test("broadcast join hint") { + test("broadcast