[GitHub] spark issue #21853: [SPARK-23957][SQL] Sorts in subqueries are redundant and...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/21853 Thank you very much @gatorsmile and @maropu --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21863: [SPARK-18874][SQL][FOLLOW-UP] Improvement type mismatche...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/21863 @gatorsmile Got it. Thank you. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21863: [SPARK-18874][SQL][FOLLOW-UP] Improvement type mismatche...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/21863 @gatorsmile Hi sean, isn't @mgaido91 working in the same area with the in subq pr ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204891972 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1275,6 +1276,64 @@ object ReplaceExceptWithAntiJoin extends Rule[LogicalPlan] { } } +/** + * Replaces logical [[ExceptAll]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : + * {{{ + *SELECT c1 FROM ut1 EXCEPT ALL SELECT c1 FROM ut2 + * }}} + * + * Rewritten Query: + * {{{ + * SELECT c1 + * FROM ( + * SELECT replicate_rows(sum_val, c1) AS (sum_val, c1) + * FROM ( + * SELECT c1, cnt, sum_val + * FROM ( + * SELECT c1, sum(vcol) AS sum_val + * FROM ( + * SELECT 1L as vcol, c1 FROM ut1 + * UNION ALL + * SELECT -1L as vcol, c1 FROM ut2 + * ) AS union_all + *GROUP BY union_all.c1 + * ) + *WHERE sum_val > 0 + * ) + * ) + * }}} + */ + +object RewriteExcepAll extends Rule[LogicalPlan] { --- End diff -- @maropu I have added a unit test to check the plan. Please look at it when you get a chance. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/21857 @gatorsmile I have the link to the design doc in the description ? Is there another way ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r20408 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala --- @@ -52,7 +52,7 @@ trait CheckAnalysis extends PredicateHelper { } protected def mapColumnInSetOperation(plan: LogicalPlan): Option[Attribute] = plan match { -case _: Intersect | _: Except | _: Distinct => +case _: Intersect | _: ExceptBase | _: Distinct => --- End diff -- @gatorsmile Thats right Sean. We will not need changes here. However may i request you to please command-B on Except class ? We may need to change the pattern matching in other places, right ? Just wanted to make sure you are okay with it before i went ahead and made the changes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204777353 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -182,14 +182,16 @@ case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation } } -case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) { - +abstract class ExceptBase(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) { /** We don't use right.output because those rows get excluded from the set. */ override def output: Seq[Attribute] = left.output override protected def validConstraints: Set[Expression] = leftConstraints } +case class Except(left: LogicalPlan, right: LogicalPlan) extends ExceptBase(left, right) +case class ExceptAll(left: LogicalPlan, right: LogicalPlan) extends ExceptBase(left, right) --- End diff -- @maropu Some details to aid the decision making. I remember now.. This way, i had to change less number of files. I just looked at the usage of Except to double check. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204775243 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1275,6 +1276,64 @@ object ReplaceExceptWithAntiJoin extends Rule[LogicalPlan] { } } +/** + * Replaces logical [[ExceptAll]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : + * {{{ + *SELECT c1 FROM ut1 EXCEPT ALL SELECT c1 FROM ut2 + * }}} + * + * Rewritten Query: + * {{{ + * SELECT c1 + * FROM ( + * SELECT replicate_rows(sum_val, c1) AS (sum_val, c1) + * FROM ( + * SELECT c1, cnt, sum_val + * FROM ( + * SELECT c1, sum(vcol) AS sum_val + * FROM ( + * SELECT 1L as vcol, c1 FROM ut1 + * UNION ALL + * SELECT -1L as vcol, c1 FROM ut2 + * ) AS union_all + *GROUP BY union_all.c1 + * ) + *WHERE sum_val > 0 + * ) + * ) + * }}} + */ + +object RewriteExcepAll extends Rule[LogicalPlan] { --- End diff -- Ah.. ok... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204774985 --- Diff: sql/core/src/test/resources/sql-tests/inputs/except-all.sql --- @@ -0,0 +1,146 @@ +CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES +(0),(1),(2),(2),(2),(2),(3),(null),(null) AS tab1(c1) ; --- End diff -- @maropu Will do. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204774728 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala --- @@ -222,6 +222,32 @@ case class Stack(children: Seq[Expression]) extends Generator { } } +/** + * Replicate the row N times. N is specified as the first argument to the function. + * This is a internal function solely used by optimizer to rewrite EXCEPT ALL AND + * INTERSECT ALL queries. + */ +case class ReplicateRows(children: Seq[Expression]) extends Generator with CodegenFallback { --- End diff -- @maropu I would like to take this in a follow up. I think we have codegen disabled for generators in general. So we will not be able to take advantage of it ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204773666 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala --- @@ -222,6 +222,32 @@ case class Stack(children: Seq[Expression]) extends Generator { } } +/** + * Replicate the row N times. N is specified as the first argument to the function. + * This is a internal function solely used by optimizer to rewrite EXCEPT ALL AND + * INTERSECT ALL queries. + */ +case class ReplicateRows(children: Seq[Expression]) extends Generator with CodegenFallback { + private lazy val numColumns = children.length - 1 // remove the multiplier value from output. + + override def elementSchema: StructType = +StructType(children.tail.zipWithIndex.map { + case (e, index) => StructField(s"col$index", e.dataType) + }) --- End diff -- @maropu will check and fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204773452 --- Diff: sql/core/src/test/resources/sql-tests/inputs/except-all.sql --- @@ -0,0 +1,146 @@ +CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES --- End diff -- @maropu I thought we like to keep these sql files relatively small and not contain too many sqls. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204772193 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -182,14 +182,16 @@ case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation } } -case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) { - +abstract class ExceptBase(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) { /** We don't use right.output because those rows get excluded from the set. */ override def output: Seq[Attribute] = left.output override protected def validConstraints: Set[Expression] = leftConstraints } +case class Except(left: LogicalPlan, right: LogicalPlan) extends ExceptBase(left, right) +case class ExceptAll(left: LogicalPlan, right: LogicalPlan) extends ExceptBase(left, right) --- End diff -- @maropu Right. So this way , most of the pattern matching happens on the the Base class where things are common. I went back and forth on this as well.. If there is a consensus i will change it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21845: [SPARK-24886][INFRA] Fix the testing script to increase ...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/21845 @HyukjinKwon Super. Thanks a lot for fixing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21845: [SPARK-24886][INFRA] Fix the testing script to increase ...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/21845 @HyukjinKwon I saw the following test run for 11 minutes on jenkins for one of my PR. Not sure if its a transient problem. Just thought, i should let you know. On the nightly runs, should we have test that runs for that long ? SPARK-22499: Least and greatest should not generate codes beyond 64KB (11 minutes, 38 seconds) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204679789 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala --- @@ -222,6 +222,37 @@ case class Stack(children: Seq[Expression]) extends Generator { } } +/** + * Replicate the row N times. N is specified as the first argument to the function. + * This is a internal function solely used by optimizer to rewrite EXCEPT ALL AND + * INTERSECT ALL queries. + */ +@ExpressionDescription( --- End diff -- @HyukjinKwon OK.. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274] Implement EXCEPT ALL clause.
GitHub user dilipbiswal opened a pull request: https://github.com/apache/spark/pull/21857 [SPARK-21274] Implement EXCEPT ALL clause. ## What changes were proposed in this pull request? Implements EXCEPT ALL clause through query rewrites using existing operators in Spark. In this PR, an internal UDTF (replicate_rows) is added to aid in preserving duplicate rows. Please refer to [Link](https://drive.google.com/open?id=1nyW0T0b_ajUduQoPgZLAsyHK8s3_dko3ulQuxaLpUXE) for the design. **Note** This proposed UDTF is kept as a internal function that is purely used to aid with this particular rewrite to give us flexibility to change to a more generalized UDTF in future. Input Query ``` SQL SELECT c1 FROM ut1 EXCEPT ALL SELECT c1 FROM ut2 ``` Rewritten Query ```SQL SELECT c1 FROM ( SELECT replicate_rows(sum_val, c1) AS (sum_val, c1) FROM ( SELECT c1, cnt, sum_val FROM ( SELECT c1, sum(vcol) AS sum_val FROM ( SELECT 1L as vcol, c1 FROM ut1 UNION ALL SELECT -1L as vcol, c1 FROM ut2 ) AS union_all GROUP BY union_all.c1 ) WHERE sum_val > 0 ) ) ``` ## How was this patch tested? Added test cases in SQLQueryTestSuite and DataFrameSuite You can merge this pull request into a Git repository by running: $ git pull https://github.com/dilipbiswal/spark dkb_except_all_final Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21857.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21857 commit 5cf8c4caa8bce874c5336498a6dc805f0bec1681 Author: Dilip Biswal Date: 2018-05-07T08:18:17Z [SPARK-21274] Implement EXCEPT ALL clause. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/16677 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21853: [SPARK-23957][SQL] Sorts in subqueries are redund...
GitHub user dilipbiswal opened a pull request: https://github.com/apache/spark/pull/21853 [SPARK-23957][SQL] Sorts in subqueries are redundant and can be removed ## What changes were proposed in this pull request? Thanks to @henryr for the original idea at https://github.com/apache/spark/pull/21049 Description from the original PR : Subqueries (at least in SQL) have 'bag of tuples' semantics. Ordering them is therefore redundant (unless combined with a limit). This patch removes the top sort operators from the subquery plans. ## How was this patch tested? Added test cases in SubquerySuite to cover in, exists and scalar subqueries. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dilipbiswal/spark SPARK-23957 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21853.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21853 commit 191c0eb9c12a1ba7ad210e4429feca11df832598 Author: Dilip Biswal Date: 2018-07-23T18:46:24Z [SPARK-23957] Sorts in subqueries are redundant and can be removed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21822: [SPARK-24865] Remove AnalysisBarrier - WIP
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21822#discussion_r204169456 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala --- @@ -533,7 +537,8 @@ trait CheckAnalysis extends PredicateHelper { // Simplify the predicates before validating any unsupported correlation patterns // in the plan. -BooleanSimplification(sub).foreachUp { +// TODO(rxin): Why did this need to call BooleanSimplification??? --- End diff -- @hvanhovell Yeah. I agree. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21822: [SPARK-24865] Remove AnalysisBarrier - WIP
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21822#discussion_r204166360 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala --- @@ -533,7 +537,8 @@ trait CheckAnalysis extends PredicateHelper { // Simplify the predicates before validating any unsupported correlation patterns // in the plan. -BooleanSimplification(sub).foreachUp { +// TODO(rxin): Why did this need to call BooleanSimplification??? --- End diff -- @hvanhovell Hi Herman, as you said, we do the actual pulling up of the predicates in the optimizer in PullupCorrelatedPredicates in subquery.scala. We are also doing a BooleanSimplication first before traversing the plan there. In here, we are doing the error reporting and i thought it would be better to keep the traversal the same way. Basically previously we did the error reporting and rewriting in Analyzer and now, we do the error reporting in checkAnalysis and rewriting in Optimizer. Just to refresh your memory so you can help to take the right call here :-) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21813: [SPARK-24424][SQL] Support ANSI-SQL compliant syntax for...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/21813 Thank you very much @gatorsmile @viirya --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21049: [SPARK-23957][SQL] Remove redundant sort operators from ...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/21049 @gatorsmile Sure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21822: [SPARK-24865] Remove AnalysisBarrier - WIP
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21822#discussion_r203924609 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala --- @@ -533,7 +537,8 @@ trait CheckAnalysis extends PredicateHelper { // Simplify the predicates before validating any unsupported correlation patterns // in the plan. -BooleanSimplification(sub).foreachUp { +// TODO(rxin): Why did this need to call BooleanSimplification??? --- End diff -- @rxin From what i remember Reynold, most of this logic was housed in Analyzer before and we moved it to optimizer. In the old code we used to walk the plan after simplifying the predicates. The comment used to read "Simplify the predicates before pulling them out.". I just retained that semantics. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21813: [SPARK-24424][SQL] Support ANSI-SQL compliant syn...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21813#discussion_r203917186 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -442,17 +442,35 @@ class Analyzer( child: LogicalPlan): LogicalPlan = { val gid = AttributeReference(VirtualColumn.groupingIdName, IntegerType, false)() + // In case of ANSI-SQL compliant syntax for GROUPING SETS, groupByExprs is optional and + // can be null. In such case, we derive the groupByExprs from the user supplied values for + // grouping sets. + val finalGroupByExpressions = if (groupByExprs == Nil) { +selectedGroupByExprs.flatten.foldLeft(Seq.empty[Expression]) { (result, currentExpr) => --- End diff -- @viirya No. We should be getting an error as we don't have a group by specification. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21813: [SPARK-24424][SQL] Support ANSI-SQL compliant syn...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21813#discussion_r203917039 --- Diff: sql/core/src/test/resources/sql-tests/inputs/grouping_set.sql --- @@ -13,5 +13,39 @@ SELECT a, b, c, count(d) FROM grouping GROUP BY a, b, c GROUPING SETS ((a)); -- SPARK-17849: grouping set throws NPE #3 SELECT a, b, c, count(d) FROM grouping GROUP BY a, b, c GROUPING SETS ((c)); +-- Group sets without explicit group by +SELECT c1, sum(c2) FROM (VALUES ('x', 10, 0), ('y', 20, 0)) AS t (c1, c2, c3) GROUP BY GROUPING SETS (c1); +-- Group sets without group by and with grouping +SELECT c1, sum(c2), grouping(c1) FROM (VALUES ('x', 10, 0), ('y', 20, 0)) AS t (c1, c2, c3) GROUP BY GROUPING SETS (c1); + +-- Mutiple grouping within a grouping set +SELECT c1, c2, Sum(c3), grouping__id +FROM (VALUES ('x', 'a', 10), ('y', 'b', 20) ) AS t (c1, c2, c3) +GROUP BY GROUPING SETS ( ( c1 ), ( c2 ) ) +HAVING GROUPING__ID > 1; + +-- Group sets without explicit group by +SELECT grouping(c1) FROM (VALUES ('x', 'a', 10), ('y', 'b', 20)) AS t (c1, c2, c3) GROUP BY c1,c2 GROUPING SETS (c1,c2); + +-- Mutiple grouping within a grouping set +SELECT -c1 AS c1 FROM (values (1,2), (3,2)) t(c1, c2) GROUP BY GROUPING SETS ((c1), (c1, c2)); + +-- complex expression in grouping sets +SELECT a + b, b, sum(c) FROM (VALUES (1,1,1),(2,2,2)) AS t(a,b,c) GROUP BY GROUPING SETS ( (a + b), (b)); + +-- complex expression in grouping sets +SELECT a + b, b, sum(c) FROM (VALUES (1,1,1),(2,2,2)) AS t(a,b,c) GROUP BY GROUPING SETS ( (a + b), (b + a), (b)); + +-- more query constructs with grouping sets +SELECT c1 AS col1, c2 AS col2 +FROM (VALUES (1, 2), (3, 2)) t(c1, c2) +GROUP BY GROUPING SETS ( ( c1 ), ( c1, c2 ) ) +HAVING col2 IS NOT NULL +ORDER BY -col1; --- End diff -- @viirya Sorry Simon.. do i have to do something for this comment ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21813: [SPARK-24424][SQL] Support ANSI-SQL compliant syn...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21813#discussion_r203882365 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -442,17 +442,32 @@ class Analyzer( child: LogicalPlan): LogicalPlan = { val gid = AttributeReference(VirtualColumn.groupingIdName, IntegerType, false)() + val finalGroupByExpressions = if (groupByExprs == Nil) { --- End diff -- @viirya Sure will do. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21813: [SPARK-24424][SQL] Support ANSI-SQL compliant syn...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21813#discussion_r203861013 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveGroupingAnalyticsSuite.scala --- @@ -91,6 +91,40 @@ class ResolveGroupingAnalyticsSuite extends AnalysisTest { assertAnalysisError(originalPlan3, Seq("doesn't show up in the GROUP BY list")) } + test("grouping sets with no explicit group by expressions") { +val originalPlan = GroupingSets(Seq(Seq(), Seq(unresolved_a), Seq(unresolved_a, unresolved_b)), + Nil, r1, + Seq(unresolved_a, unresolved_b, UnresolvedAlias(count(unresolved_c +val expected = Aggregate(Seq(a, b, gid), Seq(a, b, count(c).as("count(c)")), + Expand( +Seq(Seq(a, b, c, nulInt, nulStr, 3), Seq(a, b, c, a, nulStr, 1), Seq(a, b, c, a, b, 0)), +Seq(a, b, c, a, b, gid), +Project(Seq(a, b, c, a.as("a"), b.as("b")), r1))) +checkAnalysis(originalPlan, expected) + +val originalPlan2 = GroupingSets(Seq(Seq(), Seq(unresolved_a), Seq(unresolved_a, unresolved_b)), + Nil, r1, + Seq(unresolved_a, unresolved_b, UnresolvedAlias(count(unresolved_c --- End diff -- @viirya Thanks.. u right. I will remove it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21813: [SPARK-24424][SQL] Support ANSI-SQL compliant syn...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21813#discussion_r203859940 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -442,17 +442,32 @@ class Analyzer( child: LogicalPlan): LogicalPlan = { val gid = AttributeReference(VirtualColumn.groupingIdName, IntegerType, false)() + val finalGroupByExpressions = if (groupByExprs == Nil) { --- End diff -- @viirya Yeah.. so for cube and rollup, we will always have groupByExprs setup right ? So i felt its better to keep the code consolidated here in this function. What do u think ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21813: [SPARK 24424][SQL] Support ANSI-SQL compliant syntax for...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/21813 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21813: [SPARK 24424] Support ANSI-SQL compliant syntax f...
GitHub user dilipbiswal opened a pull request: https://github.com/apache/spark/pull/21813 [SPARK 24424] Support ANSI-SQL compliant syntax for GROUPING SET ## What changes were proposed in this pull request? Enhances the parser and analyzer to support ANSI compliant syntax for GROUPING SET. As part of this change we derive the grouping expressions from user supplied groupings in the grouping sets clause. ```SQL SELECT c1, c2, max(c3) FROM t1 GROUP BY GROUPING SETS ((c1), (c1, c2)) ``` ## How was this patch tested? Added tests in SQLQueryTestSuite and ResolveGroupingAnalyticsSuite. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dilipbiswal/spark spark-24424 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21813.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21813 commit b5ada3feb7d243859714c04ec4fb8c225c1781e0 Author: Dilip Biswal Date: 2018-07-19T05:12:33Z [SPARK-24424] Support ANSI-SQL compliant syntax for GROUPING SET --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20795: [SPARK-23486]cache the function name from the ext...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/20795#discussion_r202177330 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/LookupFunctionsSuite.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import java.net.URI + +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog} +import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.internal.SQLConf + +class LookupFunctionsSuite extends PlanTest { + + test("SPARK-23486: the functionExists for the Persistent function check") { +val externalCatalog = new CustomInMemoryCatalog +val conf = new SQLConf() +val catalog = new SessionCatalog(externalCatalog, FunctionRegistry.builtin, conf) +val analyzer = { + catalog.createDatabase( +CatalogDatabase("default", "", new URI("loc"), Map.empty), +ignoreIfExists = false) + new Analyzer(catalog, conf) +} + +def table(ref: String): LogicalPlan = UnresolvedRelation(TableIdentifier(ref)) +val unresolvedPersistentFunc = UnresolvedFunction("func", Seq.empty, false) +val unresolvedRegisteredFunc = UnresolvedFunction("max", Seq.empty, false) +val plan = Project( + Seq(Alias(unresolvedPersistentFunc, "call1")(), Alias(unresolvedPersistentFunc, "call2")(), +Alias(unresolvedPersistentFunc, "call3")(), Alias(unresolvedRegisteredFunc, "call4")(), +Alias(unresolvedRegisteredFunc, "call5")()), + table("TaBlE")) +analyzer.LookupFunctions.apply(plan) + assert(externalCatalog.getFunctionExistsCalledTimes == 1) + +assert(analyzer.LookupFunctions.normalizeFuncName + (unresolvedPersistentFunc.name).database == Some("default")) + } + + test("SPARK-23486: the functionExists for the Registered function check") { + +val externalCatalog = new InMemoryCatalog +val conf = new SQLConf() +val customerFunctionReg = new CustomerFunctionRegistry +val catalog = new SessionCatalog(externalCatalog, customerFunctionReg, conf) +val analyzer = { + catalog.createDatabase( +CatalogDatabase("default", "", new URI("loc"), Map.empty), +ignoreIfExists = false) + new Analyzer(catalog, conf) +} + +def table(ref: String): LogicalPlan = UnresolvedRelation(TableIdentifier(ref)) +val unresolvedRegisteredFunc = UnresolvedFunction("max", Seq.empty, false) +val plan = Project( + Seq(Alias(unresolvedRegisteredFunc, "call1")(), Alias(unresolvedRegisteredFunc, "call2")()), + table("TaBlE")) +analyzer.LookupFunctions.apply(plan) +assert(customerFunctionReg.getIsRegisteredFunctionCalledTimes == 2) + +assert(analyzer.LookupFunctions.normalizeFuncName +(unresolvedRegisteredFunc.name).database == Some("default")) + + } +} + +class CustomerFunctionRegistry extends SimpleFunctionRegistry { --- End diff -- @gatorsmile Sure Sean. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20795: [SPARK-23486]cache the function name from the ext...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/20795#discussion_r202129753 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/LookupFunctionsSuite.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import java.net.URI + +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog} +import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.internal.SQLConf + +class LookupFunctionsSuite extends PlanTest { + + test("SPARK-23486: the functionExists for the Persistent function check") { +val externalCatalog = new CustomInMemoryCatalog +val conf = new SQLConf() +val catalog = new SessionCatalog(externalCatalog, FunctionRegistry.builtin, conf) +val analyzer = { + catalog.createDatabase( +CatalogDatabase("default", "", new URI("loc"), Map.empty), +ignoreIfExists = false) + new Analyzer(catalog, conf) +} + +def table(ref: String): LogicalPlan = UnresolvedRelation(TableIdentifier(ref)) +val unresolvedPersistentFunc = UnresolvedFunction("func", Seq.empty, false) +val unresolvedRegisteredFunc = UnresolvedFunction("max", Seq.empty, false) +val plan = Project( + Seq(Alias(unresolvedPersistentFunc, "call1")(), Alias(unresolvedPersistentFunc, "call2")(), +Alias(unresolvedPersistentFunc, "call3")(), Alias(unresolvedRegisteredFunc, "call4")(), +Alias(unresolvedRegisteredFunc, "call5")()), + table("TaBlE")) +analyzer.LookupFunctions.apply(plan) + assert(externalCatalog.getFunctionExistsCalledTimes == 1) + +assert(analyzer.LookupFunctions.normalizeFuncName + (unresolvedPersistentFunc.name).database == Some("default")) + } + + test("SPARK-23486: the functionExists for the Registered function check") { + +val externalCatalog = new InMemoryCatalog +val conf = new SQLConf() +val customerFunctionReg = new CustomerFunctionRegistry +val catalog = new SessionCatalog(externalCatalog, customerFunctionReg, conf) +val analyzer = { + catalog.createDatabase( +CatalogDatabase("default", "", new URI("loc"), Map.empty), +ignoreIfExists = false) + new Analyzer(catalog, conf) +} + +def table(ref: String): LogicalPlan = UnresolvedRelation(TableIdentifier(ref)) +val unresolvedRegisteredFunc = UnresolvedFunction("max", Seq.empty, false) +val plan = Project( + Seq(Alias(unresolvedRegisteredFunc, "call1")(), Alias(unresolvedRegisteredFunc, "call2")()), + table("TaBlE")) +analyzer.LookupFunctions.apply(plan) +assert(customerFunctionReg.getIsRegisteredFunctionCalledTimes == 2) + +assert(analyzer.LookupFunctions.normalizeFuncName +(unresolvedRegisteredFunc.name).database == Some("default")) + + } +} + +class CustomerFunctionRegistry extends SimpleFunctionRegistry { --- End diff -- @kevinyu98 Instead of extending FunctionRegistry and Catalog, what do think of extending SessionCatalog and overriding isRegisteredFunction and isPersistentFunction. So after a invocation of LookupFunction we get a count of how many times isRegisteredFunction was called and how many times isPersistentFunction was called ? We can just create an instance of analyzer with a extended Session catalog that we can use in more than one test ? Would that be simpler ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20795: [SPARK-23486]cache the function name from the ext...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/20795#discussion_r202128129 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1204,16 +1207,32 @@ class Analyzer( * only performs simple existence check according to the function identifier to quickly identify * undefined functions without triggering relation resolution, which may incur potentially * expensive partition/schema discovery process in some cases. - * + * In order to avoid duplicate external functions lookup, the external function identifier will + * store in the local hash set externalFunctionNameSet. * @see [[ResolveFunctions]] * @see https://issues.apache.org/jira/browse/SPARK-19737 */ object LookupFunctions extends Rule[LogicalPlan] { -override def apply(plan: LogicalPlan): LogicalPlan = plan.transformAllExpressions { - case f: UnresolvedFunction if !catalog.functionExists(f.name) => -withPosition(f) { - throw new NoSuchFunctionException(f.name.database.getOrElse("default"), f.name.funcName) -} +override def apply(plan: LogicalPlan): LogicalPlan = { + val externalFunctionNameSet = new mutable.HashSet[FunctionIdentifier]() + plan.transformAllExpressions { +case f: UnresolvedFunction + if externalFunctionNameSet.contains(normalizeFuncName(f.name)) => f +case f: UnresolvedFunction if catalog.isRegisteredFunction(f.name) => f +case f: UnresolvedFunction if catalog.isPersistentFunction(f.name) => + externalFunctionNameSet.add(normalizeFuncName(f.name)) + f +case f: UnresolvedFunction => + withPosition(f) { +throw new NoSuchFunctionException(f.name.database.getOrElse(catalog.getCurrentDatabase), + f.name.funcName) + } + } +} + +def normalizeFuncName(name: FunctionIdentifier): FunctionIdentifier = { + FunctionIdentifier(name.funcName.toLowerCase(Locale.ROOT), +name.database.orElse(Some(catalog.getCurrentDatabase))) --- End diff -- @kevinyu98 how about consideration of conf.caseSensitiveAnalysis ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20795: [SPARK-23486]cache the function name from the external c...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/20795 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20795: [SPARK-23486]cache the function name from the ext...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/20795#discussion_r201926692 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1204,16 +1207,32 @@ class Analyzer( * only performs simple existence check according to the function identifier to quickly identify * undefined functions without triggering relation resolution, which may incur potentially * expensive partition/schema discovery process in some cases. - * + * In order to avoid duplicate external functions lookup, the external function identifier will + * store in the local hash set externalFunctionNameSet. * @see [[ResolveFunctions]] * @see https://issues.apache.org/jira/browse/SPARK-19737 */ object LookupFunctions extends Rule[LogicalPlan] { -override def apply(plan: LogicalPlan): LogicalPlan = plan.transformAllExpressions { - case f: UnresolvedFunction if !catalog.functionExists(f.name) => -withPosition(f) { - throw new NoSuchFunctionException(f.name.database.getOrElse("default"), f.name.funcName) -} +override def apply(plan: LogicalPlan): LogicalPlan = { + val externalFunctionNameSet = new mutable.HashSet[FunctionIdentifier]() + plan.transformAllExpressions { +case f: UnresolvedFunction + if externalFunctionNameSet.contains(normalizeFuncName(f.name)) => f +case f: UnresolvedFunction if catalog.isRegisteredFunction(f.name) => f +case f: UnresolvedFunction if catalog.isPersistentFunction(f.name) => + externalFunctionNameSet.add(normalizeFuncName(f.name)) + f +case f: UnresolvedFunction => + withPosition(f) { +throw new NoSuchFunctionException(f.name.database.getOrElse(catalog.getCurrentDatabase), + f.name.funcName) + } + } +} + +def normalizeFuncName(name: FunctionIdentifier): FunctionIdentifier = { + FunctionIdentifier(name.funcName.toLowerCase(Locale.ROOT), +name.database.orElse(Some(catalog.getCurrentDatabase))) --- End diff -- @kevinyu98 I have a question. So we normalize the funcName here. How about name.database ? Is that normalized already by the time we are here ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21590: [SPARK-24423][SQL] Add a new option for JDBC sources
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/21590 Thank you very much @gatorsmile @maropu @viirya @HyukjinKwon @gengliangwang --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21590: [SPARK-24423][SQL] Add a new option for JDBC sour...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21590#discussion_r198315709 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala --- @@ -109,6 +134,20 @@ class JDBCOptions( s"When reading JDBC data sources, users need to specify all or none for the following " + s"options: '$JDBC_PARTITION_COLUMN', '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND', " + s"and '$JDBC_NUM_PARTITIONS'") + + require(!(query.isDefined && partitionColumn.isDefined), +s""" + |Options '$JDBC_QUERY_STRING' and '$JDBC_PARTITION_COLUMN' can not be specified together. + |Please define the query using `$JDBC_TABLE_NAME` option instead and make sure to qualify + |the partition columns using the supplied subquery alias to resolve any ambiguity. + |Example : + |spark.read.format("jdbc") + |.option("dbtable", "(select c1, c2 from t1) as subq") + |.option("partitionColumn", "subq.c1" + |.load() + """.stripMargin + ) --- End diff -- @maropu Currently we disallow it to be on the safe side. Lets take your example. When using the query option to pass on the query, we basically expect the users to supply ```SQL select c0 p0, c1 p1, c2 p2 from t where c0 > 1 ``` In spark , we will parentesize the query and add in an alias to confirm to the table subquery syntax. Given the user input the above query, he could decide to qualify the partition column names with the table name. So he could do the following : ``` SQL al df = spark.read .format("jdbc") .option("driver", "org.postgresql.Driver") .option("url", "jdbc:postgresql://localhost:5432/postgres?user=maropu") .option("query", "select c0 p0, c1 p1, c2 p2 from t where c0 > 1") .option("partitionColumn", "t.p2") ==> User qualifies the column names. .option("lowerBound", "1") .option("upperBound", "3") .option("numPartitions", "2") .load() ``` In this case we will end up generating the query of the following form - ``` SQL select * from (select c0 p0, c1 p1, c2 p2 from t where c0 > 1) __SPARK_GEN_ALIAS where t.p2 >= 1 and t.p2 <=3 ``` However this would be an invalid query. In the query option, its possible to specify a complex query involving joins. Thats the reason, we disallow it to be in safe side. In the dbtable option, users are responsible to explicitly specify the alias and would now how to qualify the partition columns. Lets see if we can improve this in future. If you have some ideas, please let us know. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21590: [SPARK-24423][SQL] Add a new option for JDBC sour...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21590#discussion_r198022351 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala --- @@ -65,13 +65,32 @@ class JDBCOptions( // Required parameters // require(parameters.isDefinedAt(JDBC_URL), s"Option '$JDBC_URL' is required.") - require(parameters.isDefinedAt(JDBC_TABLE_NAME), s"Option '$JDBC_TABLE_NAME' is required.") // a JDBC URL val url = parameters(JDBC_URL) - // name of table - val table = parameters(JDBC_TABLE_NAME) + // table name or a table subquery. + val tableOrQuery = (parameters.get(JDBC_TABLE_NAME), parameters.get(JDBC_QUERY_STRING)) match { +case (Some(name), Some(subquery)) => + throw new IllegalArgumentException( +s"Both '$JDBC_TABLE_NAME' and '$JDBC_QUERY_STRING' can not be specified at the same time." + ) +case (None, None) => + throw new IllegalArgumentException( +s"Option '$JDBC_TABLE_NAME' or '${JDBC_QUERY_STRING}' is required." --- End diff -- @maropu Sure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21590: [SPARK-24423][SQL] Add a new option for JDBC sour...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21590#discussion_r198022370 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala --- @@ -65,13 +65,32 @@ class JDBCOptions( // Required parameters // require(parameters.isDefinedAt(JDBC_URL), s"Option '$JDBC_URL' is required.") - require(parameters.isDefinedAt(JDBC_TABLE_NAME), s"Option '$JDBC_TABLE_NAME' is required.") // a JDBC URL val url = parameters(JDBC_URL) - // name of table - val table = parameters(JDBC_TABLE_NAME) + // table name or a table subquery. + val tableOrQuery = (parameters.get(JDBC_TABLE_NAME), parameters.get(JDBC_QUERY_STRING)) match { +case (Some(name), Some(subquery)) => + throw new IllegalArgumentException( +s"Both '$JDBC_TABLE_NAME' and '$JDBC_QUERY_STRING' can not be specified at the same time." + ) +case (None, None) => + throw new IllegalArgumentException( +s"Option '$JDBC_TABLE_NAME' or '${JDBC_QUERY_STRING}' is required." + ) +case (Some(name), None) => + if (name.isEmpty) { +throw new IllegalArgumentException(s"Option '${JDBC_TABLE_NAME}' can not be empty.") --- End diff -- @maropu OK. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21590: [SPARK-24423][SQL] Add a new option for JDBC sour...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21590#discussion_r198022388 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala --- @@ -65,13 +65,32 @@ class JDBCOptions( // Required parameters // require(parameters.isDefinedAt(JDBC_URL), s"Option '$JDBC_URL' is required.") - require(parameters.isDefinedAt(JDBC_TABLE_NAME), s"Option '$JDBC_TABLE_NAME' is required.") // a JDBC URL val url = parameters(JDBC_URL) - // name of table - val table = parameters(JDBC_TABLE_NAME) + // table name or a table subquery. + val tableOrQuery = (parameters.get(JDBC_TABLE_NAME), parameters.get(JDBC_QUERY_STRING)) match { +case (Some(name), Some(subquery)) => + throw new IllegalArgumentException( +s"Both '$JDBC_TABLE_NAME' and '$JDBC_QUERY_STRING' can not be specified at the same time." + ) +case (None, None) => + throw new IllegalArgumentException( +s"Option '$JDBC_TABLE_NAME' or '${JDBC_QUERY_STRING}' is required." + ) +case (Some(name), None) => + if (name.isEmpty) { +throw new IllegalArgumentException(s"Option '${JDBC_TABLE_NAME}' can not be empty.") + } else { +name.trim + } +case (None, Some(subquery)) => + if (subquery.isEmpty) { +throw new IllegalArgumentException(s"Option `${JDBC_QUERY_STRING}` can not be empty.") --- End diff -- @maropu OK. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21590: [SPARK-24423][SQL] Add a new option for JDBC sour...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21590#discussion_r198022295 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala --- @@ -174,3 +209,25 @@ object JDBCOptions { val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel") val JDBC_SESSION_INIT_STATEMENT = newOption("sessionInitStatement") } + +class JdbcOptionsInWrite( +@transient override val parameters: CaseInsensitiveMap[String]) + extends JDBCOptions(parameters) { + + import JDBCOptions._ + + def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) + + def this(url: String, table: String, parameters: Map[String, String]) = { +this(CaseInsensitiveMap(parameters ++ Map( + JDBCOptions.JDBC_URL -> url, + JDBCOptions.JDBC_TABLE_NAME -> table))) + } + + require( +parameters.get(JDBC_TABLE_NAME).isDefined, +s"Option '${JDBCOptions.JDBC_TABLE_NAME}' is required. " + + s"Option '${JDBCOptions.JDBC_QUERY_STRING}' is not applicable while writing.") + + val destinationTable = parameters(JDBC_TABLE_NAME) +} --- End diff -- @maropu I had it as table and refactored it just before i pushed :-). I will change it back. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21590: [SPARK-24423][SQL] Add a new option for JDBC sour...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21590#discussion_r198022202 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala --- @@ -150,6 +183,7 @@ class JDBCOptions( } object JDBCOptions { --- End diff -- @maropu Sure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21590: [SPARK-24423][SQL] Add a new option for JDBC sour...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21590#discussion_r198022168 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala --- @@ -65,13 +65,32 @@ class JDBCOptions( // Required parameters // require(parameters.isDefinedAt(JDBC_URL), s"Option '$JDBC_URL' is required.") - require(parameters.isDefinedAt(JDBC_TABLE_NAME), s"Option '$JDBC_TABLE_NAME' is required.") // a JDBC URL val url = parameters(JDBC_URL) - // name of table - val table = parameters(JDBC_TABLE_NAME) + // table name or a table subquery. + val tableOrQuery = (parameters.get(JDBC_TABLE_NAME), parameters.get(JDBC_QUERY_STRING)) match { +case (Some(name), Some(subquery)) => + throw new IllegalArgumentException( +s"Both '$JDBC_TABLE_NAME' and '$JDBC_QUERY_STRING' can not be specified at the same time." + ) +case (None, None) => + throw new IllegalArgumentException( +s"Option '$JDBC_TABLE_NAME' or '${JDBC_QUERY_STRING}' is required." + ) +case (Some(name), None) => + if (name.isEmpty) { +throw new IllegalArgumentException(s"Option '${JDBC_TABLE_NAME}' can not be empty.") + } else { +name.trim + } +case (None, Some(subquery)) => + if (subquery.isEmpty) { +throw new IllegalArgumentException(s"Option `${JDBC_QUERY_STRING}` can not be empty.") + } else { +s"(${subquery}) __SPARK_GEN_JDBC_SUBQUERY_NAME_${curId.getAndIncrement()}" + } + } - // --- End diff -- @maropu OK. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21590: [SPARK-24423][SQL] Add a new option for JDBC sour...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21590#discussion_r198022141 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala --- @@ -174,3 +209,25 @@ object JDBCOptions { val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel") val JDBC_SESSION_INIT_STATEMENT = newOption("sessionInitStatement") } + +class JdbcOptionsInWrite( --- End diff -- @maropu Can i take this on as a follow-up ? The reason is i am not fully familiar with all the options. I need to study those a bit more before i refactor them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21590: [SPARK-24423][SQL] Add a new option for JDBC sources
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/21590 @gatorsmile @maropu I have hopefully addressed the comments. Please take a look when you get a chance. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21590: [SPARK-24423][SQL] Add a new option for JDBC sources
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/21590 @gatorsmile Thanks a lot. I will process your comments and get back. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21590: [SPARK-24423][SQL] Add a new option for JDBC sour...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21590#discussion_r197355161 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala --- @@ -65,13 +65,38 @@ class JDBCOptions( // Required parameters // require(parameters.isDefinedAt(JDBC_URL), s"Option '$JDBC_URL' is required.") - require(parameters.isDefinedAt(JDBC_TABLE_NAME), s"Option '$JDBC_TABLE_NAME' is required.") + // a JDBC URL val url = parameters(JDBC_URL) - // name of table - val table = parameters(JDBC_TABLE_NAME) + val tableName = parameters.get(JDBC_TABLE_NAME) + val query = parameters.get(JDBC_QUERY_STRING) --- End diff -- @maropu Thank you for taking the time to think about this throughly. A couple of questions/comments. 1) Looks like for read path we give precedence to dbtable over query. I feel its good to explicitly disallow this with a clear message in case of an ambiguity. 2) Usage of lazy here (especially to trigger errors) makes me a little nervous. Like if we want to introduce a debug statement to print the variables in side the QueryOptions class, things will not work any more, right ? Thats the reason, i had opted to check for the "invalid query option in write path" in the write function itself (i.e when i am sure of the calling context). Perhaps that how its used every where in which case it may be okay to follow the same approach here. I am okay with this. Lets get some opinion from @gatorsmile. Once i have the final set of comments, i will make the changes. Thanks again. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21590: [SPARK-24423][SQL] Add a new option for JDBC sour...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21590#discussion_r196975167 --- Diff: docs/sql-programming-guide.md --- @@ -1302,9 +1302,18 @@ the following case-insensitive options: dbtable - The JDBC table that should be read. Note that anything that is valid in a FROM clause of - a SQL query can be used. For example, instead of a full table you could also use a - subquery in parentheses. + The JDBC table that should be read from or written into. Note that when using it in the read + path anything that is valid in a FROM clause of a SQL query can be used. + For example, instead of a full table you could also use a subquery in parentheses. + + + +query + + A query that will be used to read data into Spark. The specified query will be parenthesized and used + as a subquery in the FROM clause. Spark will also assign a alias to the subquery clause. + As an example, spark will issue a query of the following form to the datasource. + SELECT columns FROM (user_specified_query) spark_generated_alias --- End diff -- @viirya OK, i will add this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21590: [SPARK-24423][SQL] Add a new option for JDBC sour...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21590#discussion_r196897983 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala --- @@ -65,13 +65,38 @@ class JDBCOptions( // Required parameters // require(parameters.isDefinedAt(JDBC_URL), s"Option '$JDBC_URL' is required.") - require(parameters.isDefinedAt(JDBC_TABLE_NAME), s"Option '$JDBC_TABLE_NAME' is required.") + // a JDBC URL val url = parameters(JDBC_URL) - // name of table - val table = parameters(JDBC_TABLE_NAME) + val tableName = parameters.get(JDBC_TABLE_NAME) + val query = parameters.get(JDBC_QUERY_STRING) + // Following two conditions make sure that : + // 1. One of the option (dbtable or query) must be specified. + // 2. Both of them can not be specified at the same time as they are conflicting in nature. + require( +tableName.isDefined || query.isDefined, +s"Option '$JDBC_TABLE_NAME' or '${JDBC_QUERY_STRING}' is required." + ) + + require( +!(tableName.isDefined && query.isDefined), +s"Both '$JDBC_TABLE_NAME' and '$JDBC_QUERY_STRING' can not be specified." + ) + + // table name or a table expression. + val tableExpression = tableName.map(_.trim).getOrElse { +// We have ensured in the code above that either dbtable or query is specified. +query.get match { + case subq if subq.nonEmpty => s"(${subq}) spark_gen_${curId.getAndIncrement()}" + case subq => subq +} + } + + require(tableExpression.nonEmpty, --- End diff -- @gengliangwang I see your point. Does this read better to you ? ``` require(tableOrQuery.nonEmpty, s"Empty string is not allowed in either '$JDBC_TABLE_NAME' or '${JDBC_QUERY_STRING}' options" ) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21590: [SPARK-24423][SQL] Add a new option for JDBC sour...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21590#discussion_r196895138 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala --- @@ -65,13 +65,38 @@ class JDBCOptions( // Required parameters // require(parameters.isDefinedAt(JDBC_URL), s"Option '$JDBC_URL' is required.") - require(parameters.isDefinedAt(JDBC_TABLE_NAME), s"Option '$JDBC_TABLE_NAME' is required.") + // a JDBC URL val url = parameters(JDBC_URL) - // name of table - val table = parameters(JDBC_TABLE_NAME) + val tableName = parameters.get(JDBC_TABLE_NAME) --- End diff -- @gengliangwang Thanks.. Actually i had tried a couple of different ways. Some how i found this a little hard to follow when i embed the error message. I like to check things upfront along with comments on top easy to follow. But if others find this easy to follow as well, then i will change. ```SQL val tableExpression = if (parameters.isDefinedAt(JDBC_TABLE_NAME)) { require(!parameters.isDefinedAt(JDBC_QUERY_STRING), s"Both '$JDBC_TABLE_NAME' and '$JDBC_QUERY_STRING' can not be specified." ) parameters.get(JDBC_TABLE_NAME).get.trim } else { require(parameters.isDefinedAt(JDBC_QUERY_STRING), s"Option '$JDBC_TABLE_NAME' or '${JDBC_QUERY_STRING}' is required." ) s"(${parameters.get(JDBC_QUERY_STRING)}) ${curId.getAndIncrement()}" } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21590: [SPARK-24423][SQL] Add a new option for JDBC sour...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21590#discussion_r196847962 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala --- @@ -65,13 +65,38 @@ class JDBCOptions( // Required parameters // require(parameters.isDefinedAt(JDBC_URL), s"Option '$JDBC_URL' is required.") - require(parameters.isDefinedAt(JDBC_TABLE_NAME), s"Option '$JDBC_TABLE_NAME' is required.") + // a JDBC URL val url = parameters(JDBC_URL) - // name of table - val table = parameters(JDBC_TABLE_NAME) + val tableName = parameters.get(JDBC_TABLE_NAME) + val query = parameters.get(JDBC_QUERY_STRING) + // Following two conditions make sure that : + // 1. One of the option (dbtable or query) must be specified. + // 2. Both of them can not be specified at the same time as they are conflicting in nature. + require( +tableName.isDefined || query.isDefined, +s"Option '$JDBC_TABLE_NAME' or '${JDBC_QUERY_STRING}' is required." + ) + + require( +!(tableName.isDefined && query.isDefined), +s"Both '$JDBC_TABLE_NAME' and '$JDBC_QUERY_STRING' can not be specified." + ) + + // table name or a table expression. + val tableExpression = tableName.map(_.trim).getOrElse { --- End diff -- @viirya ok. i will change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21590: [SPARK-24423][SQL] Add a new option for JDBC sour...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21590#discussion_r196848042 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala --- @@ -65,13 +65,38 @@ class JDBCOptions( // Required parameters // require(parameters.isDefinedAt(JDBC_URL), s"Option '$JDBC_URL' is required.") - require(parameters.isDefinedAt(JDBC_TABLE_NAME), s"Option '$JDBC_TABLE_NAME' is required.") + // a JDBC URL val url = parameters(JDBC_URL) - // name of table - val table = parameters(JDBC_TABLE_NAME) + val tableName = parameters.get(JDBC_TABLE_NAME) + val query = parameters.get(JDBC_QUERY_STRING) + // Following two conditions make sure that : + // 1. One of the option (dbtable or query) must be specified. + // 2. Both of them can not be specified at the same time as they are conflicting in nature. + require( +tableName.isDefined || query.isDefined, +s"Option '$JDBC_TABLE_NAME' or '${JDBC_QUERY_STRING}' is required." + ) + + require( +!(tableName.isDefined && query.isDefined), +s"Both '$JDBC_TABLE_NAME' and '$JDBC_QUERY_STRING' can not be specified." + ) + + // table name or a table expression. + val tableExpression = tableName.map(_.trim).getOrElse { +// We have ensured in the code above that either dbtable or query is specified. +query.get match { + case subq if subq.nonEmpty => s"(${subq}) spark_gen_${curId.getAndIncrement()}" --- End diff -- @viirya Will change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21590: [SPARK-24423][SQL] Add a new option for JDBC sour...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21590#discussion_r196847882 --- Diff: docs/sql-programming-guide.md --- @@ -1302,9 +1302,18 @@ the following case-insensitive options: dbtable - The JDBC table that should be read. Note that anything that is valid in a FROM clause of - a SQL query can be used. For example, instead of a full table you could also use a - subquery in parentheses. + The JDBC table that should be read from or written into. Note that when using it in the read + path anything that is valid in a FROM clause of a SQL query can be used. + For example, instead of a full table you could also use a subquery in parentheses. + + + +query + + A query that will be used to read data into Spark. The specified query will be parenthesized and used + as a subquery in the FROM clause. Spark will also assign a alias to the subquery clause. --- End diff -- @viirya I think its better to let users know how we generate the from clause. That way they can choose to qualify the partition columns if needed. However, if you strongly feel otherwise, i will remove from doc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21590: [SPARK-24423][SQL] Add a new option for JDBC sour...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21590#discussion_r196846549 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala --- @@ -65,13 +65,38 @@ class JDBCOptions( // Required parameters // require(parameters.isDefinedAt(JDBC_URL), s"Option '$JDBC_URL' is required.") - require(parameters.isDefinedAt(JDBC_TABLE_NAME), s"Option '$JDBC_TABLE_NAME' is required.") + // a JDBC URL val url = parameters(JDBC_URL) - // name of table - val table = parameters(JDBC_TABLE_NAME) + val tableName = parameters.get(JDBC_TABLE_NAME) + val query = parameters.get(JDBC_QUERY_STRING) + // Following two conditions make sure that : + // 1. One of the option (dbtable or query) must be specified. + // 2. Both of them can not be specified at the same time as they are conflicting in nature. + require( +tableName.isDefined || query.isDefined, +s"Option '$JDBC_TABLE_NAME' or '${JDBC_QUERY_STRING}' is required." + ) + + require( +!(tableName.isDefined && query.isDefined), +s"Both '$JDBC_TABLE_NAME' and '$JDBC_QUERY_STRING' can not be specified." + ) + + // table name or a table expression. + val tableExpression = tableName.map(_.trim).getOrElse { +// We have ensured in the code above that either dbtable or query is specified. +query.get match { + case subq if subq.nonEmpty => s"(${subq}) spark_gen_${curId.getAndIncrement()}" --- End diff -- @maropu Yeah. we need an alias. Systems like postgress require a mandatory table subquery alias. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21590: [SPARK-24423][SQL] Add a new option for JDBC sour...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21590#discussion_r196634511 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala --- @@ -1206,4 +1207,92 @@ class JDBCSuite extends SparkFunSuite }.getMessage assert(errMsg.contains("Statement was canceled or the session timed out")) } + + test("query JDBC option - negative tests") { +val query = "SELECT * FROM test.people WHERE theid = 1" +// load path +val e1 = intercept[RuntimeException] { + val df = spark.read.format("jdbc") +.option("Url", urlWithUserAndPass) +.option("query", query) --- End diff -- @HyukjinKwon Thanks.. I will update the doc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21590: [SPARK-24423][SQL] Add a new option for JDBC sources
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/21590 @gatorsmile Sorry to be late on this. Please look at this when you have time. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21590: [SPARK-24423][SQL] Add a new option for JDBC sour...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21590#discussion_r196490214 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala --- @@ -65,13 +65,38 @@ class JDBCOptions( // Required parameters // require(parameters.isDefinedAt(JDBC_URL), s"Option '$JDBC_URL' is required.") - require(parameters.isDefinedAt(JDBC_TABLE_NAME), s"Option '$JDBC_TABLE_NAME' is required.") + // a JDBC URL val url = parameters(JDBC_URL) - // name of table - val table = parameters(JDBC_TABLE_NAME) + val tableName = parameters.get(JDBC_TABLE_NAME) + val query = parameters.get(JDBC_QUERY_STRING) + // Following two conditions make sure that : + // 1. One of the option (dbtable or query) must be specified. + // 2. Both of them can not be specified at the same time as they are conflicting in nature. + require( +tableName.isDefined || query.isDefined, +s"Option '$JDBC_TABLE_NAME' or '${JDBC_QUERY_STRING}' is required." + ) + + require( +!(tableName.isDefined && query.isDefined), +s"Both '$JDBC_TABLE_NAME' and '$JDBC_QUERY_STRING' can not be specified." + ) + + // table name or a table expression. + val tableExpression = tableName.map(_.trim).getOrElse { +// We have ensured in the code above that either dbtable or query is specified. +query.get match { + case subq if subq.nonEmpty => s"(${subq}) spark_gen_${curId.getAndIncrement()}" --- End diff -- @maropu Don't mind using a constant name ? "spark_gen_alias" ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21590: [SPARK-24423][SQL] Add a new option for JDBC sour...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21590#discussion_r196489749 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala --- @@ -109,6 +134,20 @@ class JDBCOptions( s"When reading JDBC data sources, users need to specify all or none for the following " + s"options: '$JDBC_PARTITION_COLUMN', '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND', " + s"and '$JDBC_NUM_PARTITIONS'") + + require(!(query.isDefined && partitionColumn.isDefined), +s""" + |Options '$JDBC_QUERY_STRING' and '$JDBC_PARTITION_COLUMN' can not be specified together. + |Please define the query using `$JDBC_TABLE_NAME` option instead and make sure to qualify + |the partition columns using the supplied subquery alias to resolve any ambiguity. + |Example : + |spark.read.format("jdbc") + |.option("dbtable", "(select c1, c2 from t1) as subq") + |.option("partitionColumn", "subq.c1" + |.load() + """.stripMargin + ) --- End diff -- @maropu So since the we auto generate a subquery alias here for easy of use, we r disallowing the query option together with partition columns. As users wouldn't know how to qualify the partition columns given the suquery alias is generated implicitly. In this case, we ask them to use the existing dbtable to specify the query where they are in control to specify the alias themselves. Another option i considered is to introduce "queryAlias" as another option. But thought to avoid it for simplicity. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21590: [SPARK-24423][SQL] Add a new option for JDBC sour...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21590#discussion_r196487627 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala --- @@ -65,13 +65,38 @@ class JDBCOptions( // Required parameters // require(parameters.isDefinedAt(JDBC_URL), s"Option '$JDBC_URL' is required.") - require(parameters.isDefinedAt(JDBC_TABLE_NAME), s"Option '$JDBC_TABLE_NAME' is required.") + // a JDBC URL val url = parameters(JDBC_URL) - // name of table - val table = parameters(JDBC_TABLE_NAME) + val tableName = parameters.get(JDBC_TABLE_NAME) + val query = parameters.get(JDBC_QUERY_STRING) + // Following two conditions make sure that : + // 1. One of the option (dbtable or query) must be specified. + // 2. Both of them can not be specified at the same time as they are conflicting in nature. + require( +tableName.isDefined || query.isDefined, +s"Option '$JDBC_TABLE_NAME' or '${JDBC_QUERY_STRING}' is required." + ) + + require( +!(tableName.isDefined && query.isDefined), +s"Both '$JDBC_TABLE_NAME' and '$JDBC_QUERY_STRING' can not be specified." + ) --- End diff -- @maropu These two requires are using tableName and query which is computed in lines before. Thats why i have placed these two requires after. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21590: [SPARK-24423][SQL] Add a new option for JDBC sour...
GitHub user dilipbiswal opened a pull request: https://github.com/apache/spark/pull/21590 [SPARK-24423][SQL] Add a new option for JDBC sources ## What changes were proposed in this pull request? Here is the description in the JIRA - Currently, our JDBC connector provides the option `dbtable` for users to specify the to-be-loaded JDBC source table. ```SQL val jdbcDf = spark.read .format("jdbc") .option("*dbtable*", "dbName.tableName") .options(jdbcCredentials: Map) .load() ``` Normally, users do not fetch the whole JDBC table due to the poor performance/throughput of JDBC. Thus, they normally just fetch a small set of tables. For advanced users, they can pass a subquery as the option. ```SQL val query = """ (select * from tableName limit 10) as tmp """ val jdbcDf = spark.read .format("jdbc") .option("*dbtable*", query) .options(jdbcCredentials: Map) .load() ``` However, this is straightforward to end users. We should simply allow users to specify the query by a new option `query`. We will handle the complexity for them. ```SQL val query = """select * from tableName limit 10""" val jdbcDf = spark.read .format("jdbc") .option("*{color:#ff}query{color}*", query) .options(jdbcCredentials: Map) .load() ``` ## How was this patch tested? Added tests in JDBCSuite and JDBCWriterSuite. Also tested against MySQL, Postgress, Oracle, DB2 (using docker infrastructure) to make sure there are no syntax issues. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dilipbiswal/spark SPARK-24423 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21590.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21590 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21240: [SPARK-21274][SQL] Add a new generator function replicat...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/21240 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21240: [SPARK-21274][SQL] Add a new generator function r...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21240#discussion_r186279795 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala --- @@ -222,6 +222,51 @@ case class Stack(children: Seq[Expression]) extends Generator { } } +/** + * Replicate the row N times. N is specified as the first argument to the function. + * {{{ + * SELECT replicate_rows(2, "val1", "val2") -> + * 2 val1 val2 + * 2 val1 val2 + * }}} + */ +@ExpressionDescription( +usage = "_FUNC_(n, expr1, ..., exprk) - Replicates `n`, `expr1`, ..., `exprk` into `n` rows.", +examples = """ +Examples: + > SELECT _FUNC_(2, "val1", "val2"); + 2 val1 val2 + 2 val1 val2 + """) +case class ReplicateRows(children: Seq[Expression]) extends Generator with CodegenFallback { --- End diff -- @viirya If you don't mind, i would like to do it in a follow-up. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21240: [SPARK-21274][SQL] Add a new generator function r...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21240#discussion_r186279765 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -702,6 +703,20 @@ object TypeCoercion { } } + /** + * Coerces first argument in ReplicateRows expression and introduces a cast to Long + * if necessary. + */ + object ReplicateRowsCoercion extends TypeCoercionRule { +private val acceptedTypes = Seq(IntegerType, ShortType, ByteType) +override def coerceTypes(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { + case s @ ReplicateRows(children) if s.children.nonEmpty && s.childrenResolved && +s.children.head.dataType != LongType && acceptedTypes.contains(s.children.head.dataType) => --- End diff -- @viirya Thanks. I will fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21240: [SPARK-21274][SQL] Add a new generator function r...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21240#discussion_r186277072 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala --- @@ -222,6 +222,51 @@ case class Stack(children: Seq[Expression]) extends Generator { } } +/** + * Replicate the row N times. N is specified as the first argument to the function. + * {{{ + * SELECT replicate_rows(2, "val1", "val2") -> + * 2 val1 val2 + * 2 val1 val2 + * }}} + */ +@ExpressionDescription( +usage = "_FUNC_(n, expr1, ..., exprk) - Replicates `n`, `expr1`, ..., `exprk` into `n` rows.", --- End diff -- @viirya I did think about it Simon. But then, i decided to match the output with Hive. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21240: [SPARK-21274][SQL] Add a new generator function r...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21240#discussion_r186277065 --- Diff: sql/core/src/test/resources/sql-tests/inputs/udtf_replicate_rows.sql --- @@ -0,0 +1,38 @@ +CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES +(1, 'row1', 1.1), +(2, 'row2', 2.2), +(0, 'row3', 3.3), +(-1,'row4', 4.4), +(null,'row5', 5.5), +(3, 'row6', null) +AS tab1(c1, c2, c3); + +-- Requires 2 arguments at minimum. +SELECT replicate_rows(c1) FROM tab1; --- End diff -- Sure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21240: [SPARK-21274][SQL] Add a new generator function r...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21240#discussion_r186277062 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -702,6 +703,20 @@ object TypeCoercion { } } + /** + * Coerces first argument in ReplicateRows expression and introduces a cast to Long + * if necessary. + */ + object ReplicateRowsCoercion extends TypeCoercionRule { +private val acceptedTypes = Seq(IntegerType, ShortType, ByteType) +override def coerceTypes(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { + case s @ ReplicateRows(children) if s.childrenResolved && +s.children.head.dataType != LongType && acceptedTypes.contains(s.children.head.dataType) => --- End diff -- ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21240: [SPARK-21274][SQL] Add a new generator function replicat...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/21240 @maropu @viirya Thanks for the comments. I have made the changes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21240: [SPARK-21274][SQL] Add a new generator function r...
GitHub user dilipbiswal opened a pull request: https://github.com/apache/spark/pull/21240 [SPARK-21274][SQL] Add a new generator function replicate_rows to support EXCEPT ALL and INTERSECT ALL ## What changes were proposed in this pull request? Add a new UDTF replicate_rows. This function replicates the values based on the first argument to the function. This will be used in EXCEPT ALL AND INTERSECT ALL transformation (future PR) mainly to preserve "retain duplicates" semantics. Please refer to [Link](https://drive.google.com/open?id=1nyW0T0b_ajUduQoPgZLAsyHK8s3_dko3ulQuxaLpUXE) for design. The transformation code changes are in [Code](https://github.com/dilipbiswal/spark/tree/dkb_except_all_copy) Example ``` SELECT replicate_rows(3, 1, 2) ``` Result ``` spark-sql> SELECT replicate_rows(3, 1, 2); 3 1 2 3 1 2 3 1 2 Time taken: 0.045 seconds, Fetched 3 row(s) ``` Returns 3 rows based on the first parameter value. (Please fill in changes proposed in this fix) ## How was this patch tested? Added tests in GeneratorFunctionSuite, TypeCoercionSuite, SQLQueryTestSuite Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dilipbiswal/spark dkb_setop_replicaterows Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21240.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21240 commit 90efeffb039a4c3458add840f98ea91d01cdc4a8 Author: Dilip Biswal <dbiswal@...> Date: 2018-05-04T22:03:09Z [SPARK-21274] Add a new generator function replicate_rows to support EXCEPT ALL and INTERSECT ALL --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21049: [SPARK-23957][SQL] Remove redundant sort operators from ...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/21049 @henryr I might be a bit of a hardliner on this, but I think it's correct to eliminate the {{ORDER BY}} from common table expressions (e.g. MSSQL agrees with me, see this link). DB>> Yeah.. I had seen this. I had checked DB2 behaviour on CTE and it does not seem to remove the sort. ``` db2 => with x as (select * from t1 order by 1) select * from x C1 C2 --- --- 0 0 1 1 1 1 2 2 2 2 5 record(s) selected. ``` So perhaps the SQL standard does not explicitly clarify this ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21049: [SPARK-23957][SQL] Remove redundant sort operators from ...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/21049 @henryr Is there any reason to actually use an alias at the root of a plan like this (outside of composing with other plans, where this optimization would apply)? I can't think of a reason :-). Just that the API allows users do that. How about this query ? ``` SQL scala> spark.sql("with abcd as (select * from t1 order by t1.c1) select * from abcd").explain(true) 18/04/29 23:28:45 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException == Parsed Logical Plan == CTE [abcd] : +- 'SubqueryAlias abcd : +- 'Sort ['t1.c1 ASC NULLS FIRST], true :+- 'Project [*] : +- 'UnresolvedRelation `t1` +- 'Project [*] +- 'UnresolvedRelation `abcd` == Analyzed Logical Plan == c1: int, c2: int, c3: int Project [c1#7, c2#8, c3#9] +- SubqueryAlias abcd +- Sort [c1#7 ASC NULLS FIRST], true +- Project [c1#7, c2#8, c3#9] +- SubqueryAlias t1 +- HiveTableRelation `default`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#7, c2#8, c3#9] == Optimized Logical Plan == HiveTableRelation `default`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#7, c2#8, c3#9] == Physical Plan == HiveTableScan [c1#7, c2#8, c3#9], HiveTableRelation `default`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#7, c2#8, c3#9] ``` IMHO, its probably better to correctly detect the real subqueries and apply this optimization in order to be fully sure about it. cc @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21174: [SPARK-24085][SQL] Query returns UnsupportedOperationExc...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/21174 @gatorsmile @maropu Thank you very much !! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21049: [SPARK-23957][SQL] Remove redundant sort operators from ...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/21049 @henryr Since SubqueryAlias is used as a correlation name and used mostly for resolving attributes, in my understanding its not safe to apply this optimization. I will borrow @gatorsmile 's example here. Please note that the alias is specified after the sort. Below is plan after this optimization that removes sorts under SubqueryAlias->child. ```SQL scala> Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str_sort").orderBy('int.asc).as('df1) res0: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [int: int, int2: int ... 1 more field] scala> res0.explain(true) == Parsed Logical Plan == SubqueryAlias df1 +- AnalysisBarrier +- Sort [int#7 ASC NULLS FIRST], true +- Project [_1#3 AS int#7, _2#4 AS int2#8, _3#5 AS str_sort#9] +- LocalRelation [_1#3, _2#4, _3#5] == Analyzed Logical Plan == int: int, int2: int, str_sort: string SubqueryAlias df1 +- Sort [int#7 ASC NULLS FIRST], true +- Project [_1#3 AS int#7, _2#4 AS int2#8, _3#5 AS str_sort#9] +- LocalRelation [_1#3, _2#4, _3#5] == Optimized Logical Plan == LocalRelation [int#7, int2#8, str_sort#9] == Physical Plan == LocalTableScan [int#7, int2#8, str_sort#9] ``` In this case we should not be removing the top level sort from user's query right ? cc @gatorsmile for his opinion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21174: [SPARK-24085][SQL] Query returns UnsupportedOperationExc...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/21174 @gatorsmile Thanks a lot. Addressed the comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21174: [SPARK-24085][SQL] Query returns UnsupportedOpera...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21174#discussion_r184609062 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala --- @@ -55,7 +55,9 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { partitionSchema, sparkSession.sessionState.analyzer.resolver) val partitionSet = AttributeSet(partitionColumns) val partitionKeyFilters = - ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet))) +ExpressionSet(normalizedFilters. + filterNot(SubqueryExpression.hasSubquery(_)). + filter(_.references.subsetOf(partitionSet))) --- End diff -- @gatorsmile Will make the change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21174: [SPARK-24085][SQL] Query returns UnsupportedOpera...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21174#discussion_r184609073 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala --- @@ -955,4 +955,28 @@ class SubquerySuite extends QueryTest with SharedSQLContext { // before the fix this would throw AnalysisException spark.range(10).where("(id,id) in (select id, null from range(3))").count } + + test("SPARK-24085 scalar subquery in partitioning expression") { +withTempPath { tempDir => + withTable("parquet_part") { +sql( + s""" + |CREATE TABLE parquet_part (id_value string, id_type string) + |USING PARQUET + |OPTIONS ( + | path '${tempDir.toURI}' + |) + |PARTITIONED BY (id_type) --- End diff -- @gatorsmile OK. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21174: [SPARK-24085][SQL] Query returns UnsupportedOpera...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21174#discussion_r184609040 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala --- @@ -76,7 +76,10 @@ object FileSourceStrategy extends Strategy with Logging { fsRelation.partitionSchema, fsRelation.sparkSession.sessionState.analyzer.resolver) val partitionSet = AttributeSet(partitionColumns) val partitionKeyFilters = - ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet))) +ExpressionSet(normalizedFilters. + filterNot(SubqueryExpression.hasSubquery(_)). + filter(_.references.subsetOf(partitionSet))) --- End diff -- @gatorsmile Will make the change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21174: [SPARK-24085][SQL] Query returns UnsupportedOperationExc...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/21174 @maropu So with the fix, if the query predicate contains an scalar subquery expression, then that expression is not considered for partition pruning. For example, if the predicate was , part_key1 = (select ...) and part_key2 = 5 , then only the 2nd part of the expression is considered for pruning purposes and the first part will be a regular filter. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21174: [SPARK-24085] Query returns UnsupportedOperationExceptio...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/21174 @maropu Thanks for your response. ORC has CONVERT_METASTORE_ORC set to false as default. So its not converted to a file based datasource. If we set this to true then we would see the same issue for ORC. I have added test to cover the case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21174: [SPARK-24085] Query returns UnsupportedOperationE...
GitHub user dilipbiswal opened a pull request: https://github.com/apache/spark/pull/21174 [SPARK-24085] Query returns UnsupportedOperationException when scalar subquery is present in partitioning expression ## What changes were proposed in this pull request? In this case, the partition pruning happens before the planning phase of scalar subquery expressions. For scalar subquery expressions, the planning occurs late in the cycle (after the physical planning) in "PlanSubqueries" just before execution. Currently we try to execute the scalar subquery expression as part of partition pruning and fail as it implements Unevaluable. The fix attempts to ignore the Subquery expressions from partition pruning computation. Another option can be to somehow plan the subqueries before the partition pruning. Since this may not be a commonly occuring expression, i am opting for a simpler fix. Repro ``` SQL CREATE TABLE test_prc_bug ( id_value string ) partitioned by (id_type string) location '/tmp/test_prc_bug' stored as parquet; insert into test_prc_bug values ('1','a'); insert into test_prc_bug values ('2','a'); insert into test_prc_bug values ('3','b'); insert into test_prc_bug values ('4','b'); select * from test_prc_bug where id_type = (select 'b'); ``` ## How was this patch tested? Added test in SubquerySuite and hive/SQLQuerySuite You can merge this pull request into a Git repository by running: $ git pull https://github.com/dilipbiswal/spark spark-24085 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21174.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21174 commit 38c769274fca2931d0b0147e5e666b9cd7c99f59 Author: Dilip Biswal <dbiswal@...> Date: 2018-04-26T00:40:01Z [SPARK-24085] Query returns UnsupportedOperationException when scalar subquery is present in partitioning expression. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21049: [SPARK-23957][SQL] Remove redundant sort operators from ...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/21049 @henryr @gatorsmile I agree with Sean. To the best of my knowledge, spark does not treat "select ... from ( query)" as a subquery. It treats it as an aliased query. Please see the the grammar under "relationPrimary" rule. The subqueries supported in spark may mostly originate from the projection (non scalar) or predicate of the main query. So basically, we see this as expressions either under the Project or Filter operators of the main query block. We can look at SubquerySuite to find usage examples. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20579: [SPARK-23372][SQL] Writing empty struct in parquet fails...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/20579 Thanks a lot @cloud-fan @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20795: [SPARK-23486]cache the function name from the cat...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/20795#discussion_r176292845 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1192,11 +1195,23 @@ class Analyzer( * @see https://issues.apache.org/jira/browse/SPARK-19737 */ object LookupFunctions extends Rule[LogicalPlan] { -override def apply(plan: LogicalPlan): LogicalPlan = plan.transformAllExpressions { - case f: UnresolvedFunction if !catalog.functionExists(f.name) => -withPosition(f) { - throw new NoSuchFunctionException(f.name.database.getOrElse("default"), f.name.funcName) -} +override def apply(plan: LogicalPlan): LogicalPlan = { + val catalogFunctionNameSet = new mutable.HashSet[FunctionIdentifier]() + plan.transformAllExpressions { +case f: UnresolvedFunction if catalogFunctionNameSet.contains(f.name) => f +case f: UnresolvedFunction if catalog.functionExists(f.name) => + catalogFunctionNameSet.add(normalizeFuncName(f.name)) + f +case f: UnresolvedFunction => + withPosition(f) { +throw new NoSuchFunctionException(f.name.database.getOrElse("default"), + f.name.funcName) + } + } +} + +private def normalizeFuncName(name: FunctionIdentifier): FunctionIdentifier = { + FunctionIdentifier(name.funcName.toLowerCase(Locale.ROOT), name.database) --- End diff -- i agree @viirya --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20579: [SPARK-23372][SQL] Writing empty struct in parque...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/20579#discussion_r176232893 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -719,4 +720,27 @@ object DataSource extends Logging { } globPath } + + /** + * Called before writing into a FileFormat based data source to make sure the + * supplied schema is not empty. + * @param schema + */ + private def hasEmptySchema(schema: StructType): Unit = { +def hasEmptySchemaInternal(schema: StructType): Boolean = { --- End diff -- @cloud-fan I have gone ahead and changed the top level function name to validateSchema. I have kept the internal function name to be hasEmptySchema. Hopefully it makes sense now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20579: [SPARK-23372][SQL] Writing empty struct in parque...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/20579#discussion_r176187422 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -719,4 +720,27 @@ object DataSource extends Logging { } globPath } + + /** + * Called before writing into a FileFormat based data source to make sure the + * supplied schema is not empty. + * @param schema + */ + private def hasEmptySchema(schema: StructType): Unit = { +def hasEmptySchemaInternal(schema: StructType): Boolean = { --- End diff -- You are right @cloud-fan. Given we are raising the error from the function itself, should i rename it to validateSchema ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20579: [SPARK-23372][SQL] Writing empty struct in parquet fails...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/20579 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20579: [SPARK-23372][SQL] Writing empty struct in parquet fails...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/20579 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20868: [SPARK-23750][SQL] Inner Join Elimination based on Infor...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/20868 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20579: [SPARK-23372][SQL] Writing empty struct in parque...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/20579#discussion_r175952404 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -77,7 +77,6 @@ class ParquetFileFormat job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - --- End diff -- @cloud-fan will remove. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20579: [SPARK-23372][SQL] Writing empty struct in parque...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/20579#discussion_r175952408 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -719,4 +720,27 @@ object DataSource extends Logging { } globPath } + + /** + * Called before writing into a FileFormat based data source to make sure the + * supplied schema is not empty. + * @param schema + */ + private def verifySchema(schema: StructType): Unit = { +def verifyInternal(schema: StructType): Boolean = { --- End diff -- @cloud-fan will make the change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20579: [SPARK-23372][SQL] Writing empty struct in parquet fails...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/20579 @cloud-fan ok. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20579: [SPARK-23372][SQL] Writing empty struct in parquet fails...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/20579 @cloud-fan Thank you. I assumed (wrongly) that we don't want to change the behaviour for an external file based datasource and we wanted to scope our check only to spark built in data sources. I have made the change based on your suggestion. I have parked the verifySchema method in DataSource for now. Pl. let me know if thats the right place or we want to move it to a Utility class ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20579: [SPARK-23372][SQL] Writing empty struct in parquet fails...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/20579 @gatorsmile When you get a chance, could you please see if the check for internal datasource looks reasonable ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20579: [SPARK-23372][SQL] Writing empty struct in parquet fails...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/20579 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20579: [SPARK-23372][SQL] Writing empty struct in parquet fails...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/20579 @gatorsmile Thank you Sean. I will follow your suggestion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20579: [SPARK-23372][SQL] Writing empty struct in parquet fails...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/20579 @cloud-fan OK.. i was thinking of adding this check in each built in datasource like Text, CSV, Parquet, ORC, JSON etc. Just like we check it in Parquet with this PR. Would you have any concern with that approach ? That would some duplicate code under each specific format , but gives us the flexibility to change the behaviour for a datasource should we need ? What do you think ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20795: [SPARK-23486]cache the function name from the cat...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/20795#discussion_r175156300 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1192,11 +1195,23 @@ class Analyzer( * @see https://issues.apache.org/jira/browse/SPARK-19737 */ object LookupFunctions extends Rule[LogicalPlan] { -override def apply(plan: LogicalPlan): LogicalPlan = plan.transformAllExpressions { - case f: UnresolvedFunction if !catalog.functionExists(f.name) => -withPosition(f) { - throw new NoSuchFunctionException(f.name.database.getOrElse("default"), f.name.funcName) -} +override def apply(plan: LogicalPlan): LogicalPlan = { + val catalogFunctionNameSet = new mutable.HashSet[FunctionIdentifier]() + plan.transformAllExpressions { +case f: UnresolvedFunction if catalogFunctionNameSet.contains(f.name) => f +case f: UnresolvedFunction if catalog.functionExists(f.name) => + catalogFunctionNameSet.add(normalizeFuncName(f.name)) + f +case f: UnresolvedFunction => + withPosition(f) { +throw new NoSuchFunctionException(f.name.database.getOrElse("default"), + f.name.funcName) + } + } +} + +private def normalizeFuncName(name: FunctionIdentifier): FunctionIdentifier = { + FunctionIdentifier(name.funcName.toLowerCase(Locale.ROOT), name.database) --- End diff -- @viirya @kevinyu98 We need to check what happens in the following case . ``` use currentdb; select currentdb.function1(), function1() from ``` In this case, the 2nd function should be resolved from the local cache if this optimization were to work. If we just use name.database instead of defaulting to current database , will it still happen ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20579: [SPARK-23372][SQL] Writing empty struct in parque...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/20579#discussion_r175154988 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -542,6 +542,11 @@ case class DataSource( throw new AnalysisException("Cannot save interval data type into external storage.") } +if (data.schema.size == 0) { --- End diff -- @gatorsmile @cloud-fan OK.. sounds reasonable to me. I will rollback the latest change in this PR and we can discuss if we want to introduce the behaviour change in a future jira/pr. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20795: [SPARK-23486]cache the function name from the cat...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/20795#discussion_r175026674 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1192,11 +1195,24 @@ class Analyzer( * @see https://issues.apache.org/jira/browse/SPARK-19737 */ object LookupFunctions extends Rule[LogicalPlan] { -override def apply(plan: LogicalPlan): LogicalPlan = plan.transformAllExpressions { - case f: UnresolvedFunction if !catalog.functionExists(f.name) => -withPosition(f) { - throw new NoSuchFunctionException(f.name.database.getOrElse("default"), f.name.funcName) -} +override def apply(plan: LogicalPlan): LogicalPlan = { + val catalogFunctionNameSet = new mutable.HashSet[FunctionIdentifier]() + plan.transformAllExpressions { +case f: UnresolvedFunction + if catalogFunctionNameSet.contains(normalizeFuncName(f.name)) => f +case f: UnresolvedFunction if catalog.functionExists(f.name) => + catalogFunctionNameSet.add(normalizeFuncName(f.name)) + f +case f: UnresolvedFunction => + withPosition(f) { +throw new NoSuchFunctionException(f.name.database.getOrElse("default"), --- End diff -- @viirya Yeah.. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20795: [SPARK-23486]cache the function name from the cat...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/20795#discussion_r175025479 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1192,11 +1195,23 @@ class Analyzer( * @see https://issues.apache.org/jira/browse/SPARK-19737 */ object LookupFunctions extends Rule[LogicalPlan] { -override def apply(plan: LogicalPlan): LogicalPlan = plan.transformAllExpressions { - case f: UnresolvedFunction if !catalog.functionExists(f.name) => -withPosition(f) { - throw new NoSuchFunctionException(f.name.database.getOrElse("default"), f.name.funcName) -} +override def apply(plan: LogicalPlan): LogicalPlan = { + val catalogFunctionNameSet = new mutable.HashSet[FunctionIdentifier]() + plan.transformAllExpressions { +case f: UnresolvedFunction if catalogFunctionNameSet.contains(f.name) => f +case f: UnresolvedFunction if catalog.functionExists(f.name) => + catalogFunctionNameSet.add(normalizeFuncName(f.name)) + f +case f: UnresolvedFunction => + withPosition(f) { +throw new NoSuchFunctionException(f.name.database.getOrElse("default"), + f.name.funcName) + } + } +} + +private def normalizeFuncName(name: FunctionIdentifier): FunctionIdentifier = { + FunctionIdentifier(name.funcName.toLowerCase(Locale.ROOT), name.database) --- End diff -- @viirya Thank you. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org