[GitHub] spark pull request #14719: [SPARK-17154][SQL] Wrong result can be returned o...
Github user sarutak closed the pull request at: https://github.com/apache/spark/pull/14719 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14719: [SPARK-17154][SQL] Wrong result can be returned o...
Github user sarutak commented on a diff in the pull request: https://github.com/apache/spark/pull/14719#discussion_r80852382 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -672,6 +684,21 @@ class Analyzer( exprs.exists(_.find(_.isInstanceOf[UnresolvedDeserializer]).isDefined) } + private[sql] def resolveExpressionFromSpecificLogicalPlan( + nameParts: Seq[String], + planToSearchFrom: LogicalPlan, + targetPlanId: Long): Expression = { +lazy val name = UnresolvedAttribute(nameParts).name +planToSearchFrom.findByBreadthFirst(_.planId == targetPlanId) match { + case Some(foundPlan) => +foundPlan.resolve(nameParts, resolver).getOrElse { + failAnalysis(s"Could not find $name in ${foundPlan.output.mkString(", ")}") --- End diff -- O.K, I'll just return the resolved result. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14719: [SPARK-17154][SQL] Wrong result can be returned o...
Github user nsyca commented on a diff in the pull request: https://github.com/apache/spark/pull/14719#discussion_r80714835 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -1580,6 +1583,28 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { assert(df.persist.take(1).apply(0).toSeq(100).asInstanceOf[Long] == 100) } + test("""SPARK-17154: df("column_name") should return correct result when we do self-join""") { --- End diff -- I don't think we should support a self-join of the same Dataset/DateFrame of the same name. That is, `df.join(df)` should be blocked. We can ask the user to express it as `df.join(df.as("df2"))` , which is clearer. We certainly must not support `df.join(df, df("col1") === df("col2")` , which blindly put `"col1"` and `"col2"` to the first `df`. @sarutak 's solution does change the behaviour to an error. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14719: [SPARK-17154][SQL] Wrong result can be returned o...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/14719#discussion_r78271331 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -1580,6 +1583,28 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { assert(df.persist.take(1).apply(0).toSeq(100).asInstanceOf[Long] == 100) } + test("""SPARK-17154: df("column_name") should return correct result when we do self-join""") { --- End diff -- Although I can't immediately think out the actual use case the self-join of two same Datasets, I am still wondering do we want to disallow it? Conceptually, it should work, even you can't select columns from it due to ambiguousness. But I think you can still save it or do other operators on it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14719: [SPARK-17154][SQL] Wrong result can be returned o...
Github user sarutak commented on a diff in the pull request: https://github.com/apache/spark/pull/14719#discussion_r78191196 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -1580,6 +1583,28 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { assert(df.persist.take(1).apply(0).toSeq(100).asInstanceOf[Long] == 100) } + test("""SPARK-17154: df("column_name") should return correct result when we do self-join""") { --- End diff -- > I'm also thinking about this. If a plan id matches more than one sub-tree in the logical plan, should we just fail the query instead of using BFS to pick the first one? If logical-plan on the right side is copied by `dedupRight`, there should be multiple logical-plans which have same planId so it maybe better to fail the query in case of direct-self-join. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14719: [SPARK-17154][SQL] Wrong result can be returned o...
Github user sarutak commented on a diff in the pull request: https://github.com/apache/spark/pull/14719#discussion_r78190424 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -1580,6 +1583,28 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { assert(df.persist.take(1).apply(0).toSeq(100).asInstanceOf[Long] == 100) } + test("""SPARK-17154: df("column_name") should return correct result when we do self-join""") { --- End diff -- Yeah, direct-self-join (means both child Datasets are same) is still ambiguous. In this case, `df("colmn-name")` will refers to a Dataset of the right side in the proposed implementation. I'm wondering a direct-self-join like df.join(df, , ) is similar to a query like as follows. SELECT ... FROM my_table df join my_table df on ; Those queries should not be valid so I also think we shouldn't allow users to join two same Datasets and warn to duplicate the Dataset if they intend to do direct-self-join. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14719: [SPARK-17154][SQL] Wrong result can be returned o...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14719#discussion_r78182198 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -1580,6 +1583,28 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { assert(df.persist.take(1).apply(0).toSeq(100).asInstanceOf[Long] == 100) } + test("""SPARK-17154: df("column_name") should return correct result when we do self-join""") { --- End diff -- this is a good question! I'm also thinking about this. If a plan id matches more than one sub-tree in the logical plan, should we just fail the query instead of using BFS to pick the first one? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14719: [SPARK-17154][SQL] Wrong result can be returned o...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/14719#discussion_r78144199 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -1580,6 +1583,28 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { assert(df.persist.take(1).apply(0).toSeq(100).asInstanceOf[Long] == 100) } + test("""SPARK-17154: df("column_name") should return correct result when we do self-join""") { --- End diff -- What happens when: val joined = df.join(df, "inner") // columns: col1, col2, col3, col1, col2, col3 val selected = joined.select(df("col1")) As there are two plans with the same plan id, the breadth-first search will get one plan among them. So df("col") will be resolved. However, I think in this case, we should have an ambiguous error message. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14719: [SPARK-17154][SQL] Wrong result can be returned o...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/14719#discussion_r78143918 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -672,6 +684,21 @@ class Analyzer( exprs.exists(_.find(_.isInstanceOf[UnresolvedDeserializer]).isDefined) } + private[sql] def resolveExpressionFromSpecificLogicalPlan( + nameParts: Seq[String], + planToSearchFrom: LogicalPlan, + targetPlanId: Long): Expression = { +lazy val name = UnresolvedAttribute(nameParts).name +planToSearchFrom.findByBreadthFirst(_.planId == targetPlanId) match { + case Some(foundPlan) => +foundPlan.resolve(nameParts, resolver).getOrElse { + failAnalysis(s"Could not find $name in ${foundPlan.output.mkString(", ")}") --- End diff -- Return `u` as before? CheckAnalysis will detect unresolved attributes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14719: [SPARK-17154][SQL] Wrong result can be returned o...
Github user sarutak commented on a diff in the pull request: https://github.com/apache/spark/pull/14719#discussion_r78126865 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -683,8 +710,14 @@ class Analyzer( try { expr transformUp { case GetColumnByOrdinal(ordinal, _) => plan.output(ordinal) -case u @ UnresolvedAttribute(nameParts) => - withPosition(u) { plan.resolve(nameParts, resolver).getOrElse(u) } +case u @ UnresolvedAttribute(nameParts, targetPlanIdOpt) => + withPosition(u) { +targetPlanIdOpt match { + case Some(targetPlanId) => +resolveExpressionFromSpecificLogicalPlan(nameParts, plan, targetPlanId) --- End diff -- Thank you for taking a look. If resolved attributes are not in output of a child-logical-plan even though those are in output of a sub-tree, `CheckAnalysys` verifies and raise `AnalysysException` as well as before applying this patch --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14719: [SPARK-17154][SQL] Wrong result can be returned o...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14719#discussion_r77995059 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -683,8 +710,14 @@ class Analyzer( try { expr transformUp { case GetColumnByOrdinal(ordinal, _) => plan.output(ordinal) -case u @ UnresolvedAttribute(nameParts) => - withPosition(u) { plan.resolve(nameParts, resolver).getOrElse(u) } +case u @ UnresolvedAttribute(nameParts, targetPlanIdOpt) => + withPosition(u) { +targetPlanIdOpt match { + case Some(targetPlanId) => +resolveExpressionFromSpecificLogicalPlan(nameParts, plan, targetPlanId) --- End diff -- The resolved attribute must be among the output of children, is it guaranteed here? `resolveExpressionFromSpecificLogicalPlan` will pick a sub-tree and its output may not be propagated all the way up. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14719: [SPARK-17154][SQL] Wrong result can be returned o...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14719#discussion_r77829253 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -524,11 +524,16 @@ class Analyzer( val newRight = right transformUp { case r if r == oldRelation => newRelation } transformUp { -case other => other transformExpressions { - case a: Attribute => - attributeRewrites.get(a).getOrElse(a).withQualifier(a.qualifier) -} +case other => + val transformed = other transformExpressions { +case a: Attribute => + attributeRewrites.get(a).getOrElse(a).withQualifier(a.qualifier) + } + + transformed.setPlanId(other.planId) + transformed } + newRight.setPlanId(right.planId) --- End diff -- ah I understand now. Different dataframes must have different logical plans, the problem we are trying to fix is indirect self-join. For indirect self-join, the `left` and `right` here must be different plans and have different plan ids. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14719: [SPARK-17154][SQL] Wrong result can be returned o...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14719#discussion_r77820548 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -524,11 +524,16 @@ class Analyzer( val newRight = right transformUp { case r if r == oldRelation => newRelation } transformUp { -case other => other transformExpressions { - case a: Attribute => - attributeRewrites.get(a).getOrElse(a).withQualifier(a.qualifier) -} +case other => + val transformed = other transformExpressions { +case a: Attribute => + attributeRewrites.get(a).getOrElse(a).withQualifier(a.qualifier) + } + + transformed.setPlanId(other.planId) + transformed } + newRight.setPlanId(right.planId) --- End diff -- I don't quite understand this. If it's a self-join, and `left` and `right` is same plan(same plan id), then after `dedupRight`, `left` and `right` is not same plan but still have same plan id right? How do we resolve `UnresolvedAttribute` with plan id? I think it's still ambiguous. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14719: [SPARK-17154][SQL] Wrong result can be returned o...
GitHub user sarutak opened a pull request: https://github.com/apache/spark/pull/14719 [SPARK-17154][SQL] Wrong result can be returned or AnalysisException can be thrown after self-join or similar operations ## What changes were proposed in this pull request? When we join two DataFrames which are originated from a same DataFrame, operations to the joined DataFrame can fail. One reproducible example is as follows. {code} val df = Seq( (1, "a", "A"), (2, "b", "B"), (3, "c", "C"), (4, "d", "D"), (5, "e", "E")).toDF("col1", "col2", "col3") val filtered = df.filter("col1 != 3").select("col1", "col2") val joined = filtered.join(df, filtered("col1") === df("col1"), "inner") val selected1 = joined.select(df("col3")) {code} In this case, AnalysisException is thrown. Another example is as follows. {code} val df = Seq( (1, "a", "A"), (2, "b", "B"), (3, "c", "C"), (4, "d", "D"), (5, "e", "E")).toDF("col1", "col2", "col3") val filtered = df.filter("col1 != 3").select("col1", "col2") val rightOuterJoined = filtered.join(df, filtered("col1") === df("col1"), "right") val selected2 = rightOuterJoined.select(df("col1")) selected2.show {code} In this case, we will expect to get the answer like as follows. {code} 1 2 3 4 5 {code} But the actual result is as follows. {code} 1 2 null 4 5 {code} The cause of the problems in the examples is that the logical plan related to the right side DataFrame and the expressions of its output are re-created in the analyzer (at ResolveReference rule) when a DataFrame has expressions which have a same exprId each other. Re-created expressions are equally to the original ones except exprId. This will happen when we do self-join or similar pattern operations. In the first example, df("col3") returns a Column which includes an expression and the expression have an exprId (say id1 here). After join, the expresion which the right side DataFrame (df) has is re-created and the old and new expressions are equally but exprId is renewed (say id2 for the new exprId here). Because of the mismatch of those exprIds, AnalysisException is thrown. In the second example, df("col1") returns a column and the expression contained in the column is assigned an exprId (say id3). On the other hand, a column returned by filtered("col1") has an expression which has the same exprId (id3). After join, the expressions in the right side DataFrame are re-created and the expression assigned id3 is no longer present in the right side but present in the left side. So, referring df("col1") to the joined DataFrame, we get col1 of right side which includes null. To resolve this issue, I have introduced `LazilyDeterminedAttribute`. It is returned when we refer a column like `df("expr")` and determines which expression `df("expr")` should point to lazily. ## How was this patch tested? I added some test cases. You can merge this pull request into a Git repository by running: $ git pull https://github.com/sarutak/spark SPARK-17154 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14719.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14719 commit 7c40aa8982c6050aa95f6a6fee2997ef9005c380 Author: Kousuke Saruta Date: 2016-08-19T16:44:35Z Fixed the issue that self-join or similar join patterns can cause wrong results --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org